cba賽程 cba



文章插圖
cba賽程 cba

文章插圖
在消息隊列模型中 , 如何將消息廣播到所有的消費者 , 這種模式稱為“發布/訂閱” 。本文主要以一個簡單的小例子 , 簡述通過fanout交換機 , 實現消息的發布與訂閱 , 僅供學習分享使用 , 如有不足之處 , 還請指正 。
Fanout交換機模型
扇形交換機 , 采用廣播模式 , 根據綁定的交換機 , 路由到與之對應的所有隊列 。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上 。很像子網廣播 , 每臺子網內的主機都獲得了一份復制的消息 。Fanout交換機轉發消息是最快的 。
RabbitMQ控制臺操作新增兩個隊列
在同一個Virtual host下新增兩個隊列Q1,Q2 , 如下圖所示:
綁定fanout交換機
將兩個隊列綁定到系統默認的fanout交換機 , 如下所示:
示例效果圖
生產者 , 采用Fanout類型交換機發布消息 , 如下圖所示:
當生產者發布 一條消息時 , Q1,Q2兩個隊列均會收到 , 如下圖所示:
當啟動消費者后 , 兩個消費者 , 均會訂閱到相關消息 , 如下圖所示:
【cba賽程 cba】核心代碼消息發布
建立連接后 , 將通道聲明類型為Fanout的交換機 , 如下所示:
1/// <summary> 2/// fanout類型交換機 , 發送消息 3/// </summary> 4public class RabbitMqFanoutSendHelper : RabbitMqHelper { 5/// <summary> 6/// 發送消息 7/// </summary> 8/// <param name="msg"></param> 9/// <returns></returns>10public bool SendMsg(string msg)11{12try13{14using (var conn = GetConnection("/Alan.hsiang"))15{16using (var channel = conn.CreateModel())17{18channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);1920var body = Encoding.UTF8.GetBytes(msg);21 22channel.BasicPublish(exchange: "amq.fanout",23routingKey: "",24basicProperties: null,25body: body);26 27//Console.WriteLine(" [x] Sent {0}", message);28};29};30return true;31}32catch (Exception ex)33{34throw ex;35}36}37}
消息訂閱
建立連接后 , 通道聲明類型為Fanout的交換機 , 并綁定隊列進行訂閱 , 如下所示:
1/// <summary> 2/// 扇形交換機接收消息 3/// </summary> 4public class RabbitMqFanoutReceiveHelper : RabbitMqHelper 5{ 6public RabbitMqReceiveEventHandler OnReceiveEvent; 78private IConnection conn; 9 10private IModel channel;11 12private EventingBasicConsumer consumer;13 14public bool StartReceiveMsg(string queueName)15{16try17{18conn = GetConnection("/Alan.hsiang");19 20channel = conn.CreateModel();21channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);22//此處隨機取出交換機下的隊列23//var queueName = channel.QueueDeclare().QueueName;24channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");25consumer = new EventingBasicConsumer(channel);26consumer.Received += (model, ea) =>27{28var body = ea.Body.ToArray();29var message = Encoding.UTF8.GetString(body);30//Console.WriteLine(" [x] Received {0}", message);31if (OnReceiveEvent != null)32{33OnReceiveEvent(queueName+"::"+message);34}35};36channel.BasicConsume(queue: queueName,37autoAck: true,38consumer: consumer);39return true;40}41catch (Exception ex)42{43throw ex;44}45}46}