发布消息
using NewLife.Log; using NewLife.Messaging; using NewLife.RocketMQ; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1_RocketMq { class Program { //押金订单 static string json2 = " { " + " OrderNo:\"DFGDFGIAL456827689\"," + " OrderTime:\"2020-08-31 02:55:35\"," + " ShopNo:\"DSLFJHLJFIEINCXLKFG\", " + " ShopName:\"五大花园店泡脚牛肉米线\", " + " ReceivingArea:\"北京(BJ)北京(010)\"," + " Address:\"北京市海淀区知春路65号中国卫星通信大厦B座23层\"," + " Revicer:\"李世民\"," + " Phone:\"18756542695\"," + " Total:\"\"," + " ShipFee:\"3.50\", " + " TotalDeposit:\"200.00\", " + " DailyRent:\"50\", " + " PointsDiscount:\"\"," + " RealPay:\"\"," + " Remark:\"9-1-01测试订单,不需要加肉\", " + " DataItems:[" + " \"肥牛 x2¥59.00\"," + " \"大白菜 x1¥59.00\", " + " \"白萝卜 x3¥59.00\", " + " \"千层肚 x5¥59.00\", " + " \"黄喉 x6¥59.00\", " + " \"菌花 x6¥59.00\", " + " \"腰片 x6¥59.00\", " + " \"虾滑 x6¥59.00\", " + " \"肥牛 x2¥59.00\", " + " \"大白菜 x1¥59.00\", " + " \"白萝卜 x3¥59.00\", " + " \"千层肚 x5¥59.00\", " + " \"黄喉 x6¥59.00\", " + " \"菌花 x6¥59.00\", " + " \"腰片 x6¥59.00\", " + " \"虾滑 x6¥59.00\", " + " \"肥牛 x2¥59.00\", " + " \"大白菜 x1¥59.00\", " + " \"白萝卜 x3¥59.00\", " + " \"千层肚 x5¥59.00\", " + " \"黄喉 x6¥59.00\", " + " \"菌花 x6¥59.00\", " + " \"腰片 x6¥59.00\", " + " \"虾滑 x6¥59.00\", " + " \"肥牛 x2¥59.00\", " + " \"大白菜 x1¥59.00\", " + " \"白萝卜 x3¥59.00\", " + " \"千层肚 x5¥59.00\", " + " \"黄喉 x6¥59.00\", " + " \"菌花 x6¥59.00\", " + " \"腰片 x6¥59.00\", " + " \"虾滑 x6¥59.00\", " + " ] }"; static void Main(string[] args) { var mq = new Producer { Topic = "DESKTOP-ACRFQSI", NameServerAddress = "192.168.1.194:9876", //Log = XTrace.Log, }; mq.Start(); //发送消息方式一,可以设置key NewLife.RocketMQ.Protocol.Message message = new NewLife.RocketMQ.Protocol.Message() { BodyString = json2, Keys ="key002", Tags = "TagC", Flag = 0, WaitStoreMsgOK = true }; var sr = mq.Publish(message); //发送消息方式二 //var sr = mq.Publish(json2, "TagA"); string log = $"发送成功的消息,内容>{json2},MsgId={sr.MsgId},BrokerName= {sr.Queue.BrokerName} ,QueueId={sr.Queue.QueueId},QueueOffset= {sr.QueueOffset}"; Console.WriteLine(log); // 阿里云发送消息不能过快,否则报错“服务不可用” LogHelpter.AddLog(log); Console.WriteLine("完成"); mq.Dispose(); Console.ReadLine(); } } }消费消息
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace ConsoleApp_Consumer { class Program { static void Main(string[] args) { Console.WriteLine("消息接收测试"); //测试消费消息 var consumer = new NewLife.RocketMQ.Consumer { Topic = "DESKTOP-ACRFQSI", Group = "CID_ONSAPI_OWNER", NameServerAddress = "192.168.1.194:9876", //设置每次接收消息只拉取一条信息 BatchSize = 1, //FromLastOffset = true, //SkipOverStoredMsgCount = 0, //BatchSize = 20, //Log = NewLife.Log.XTrace.Log, }; consumer.OnConsume = (q, ms) => { string mInfo= $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}"; Console.WriteLine(mInfo); foreach (var item in ms.ToList()) { string msg = $"消息:msgId={item.MsgId},key={item.Keys},产生时间【{item.BornTimestamp.ToDateTime()}】,内容>{item.Body.ToStr()}"; Console.WriteLine(msg); } // return false;//通知消息队:不消费消息 return true; //通知消息队:消费了消息 }; consumer.Start(); Console.ReadLine(); } } }停止接收RocketMq消息,
consumer.OnConsume = null; consumer.Dispose();重新启动接收参考:
static Func<MessageQueue, MessageExt[], bool> reviceMsgFunc= (q, ms) => { // string mInfo = $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}"; // Console.WriteLine(mInfo); // LogHelpter.AddLog("********收到消息结构>" + mInfo); foreach (var item in ms.ToList()) { string msg = $"消息msgId={item.MsgId},key={item.Keys},QueueOffset={item.QueueOffset},产生时间【{item.BornTimestamp.ToDateTime()}】,内容>{item.Body.ToStr()}"; Console.WriteLine(msg); LogHelpter.AddLog("收到消息>" + msg); } // return false;//不消费 return true;//消费了消息 }; //启动 Task.Run(() => { System.Threading.Thread.Sleep(50 * 1000); try { consumer = new NewLife.RocketMQ.Consumer { Topic = "topic_order_list_print", Group = "group_order_print", NameServerAddress = "127.0.0.1:9876", //设置每次接收消息只拉取一条信息 BatchSize = 1, //FromLastOffset = true, //SkipOverStoredMsgCount = 0, //BatchSize = 20, //Log = NewLife.Log.XTrace.Log, }; consumer.OnConsume = reviceMsgFunc; consumer.Start(); //consumer.StartSchedule(); string log = "再次启动接收消息,成功" + DateTime.Now.ToFullString(); Console.WriteLine(log); LogHelpter.AddLog(log); Console.ReadLine(); } catch (Exception ex) { Console.WriteLine("再次启动接收消息出错" + ex.Message); LogHelpter.AddLog("再次启动接收消息出错," + ex.Message); } });