文章插圖

文章插圖
在消息隊列模型中 , 如何將消息廣播到所有的消費者 , 這種模式稱為“發布/訂閱” 。本文主要以一個簡單的小例子 , 簡述通過fanout交換機 , 實現消息的發布與訂閱 , 僅供學習分享使用 , 如有不足之處 , 還請指正 。
Fanout交換機模型
扇形交換機 , 采用廣播模式 , 根據綁定的交換機 , 路由到與之對應的所有隊列 。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上 。很像子網廣播 , 每臺子網內的主機都獲得了一份復制的消息 。Fanout交換機轉發消息是最快的 。
在同一個Virtual host下新增兩個隊列Q1,Q2 , 如下圖所示:
將兩個隊列綁定到系統默認的fanout交換機 , 如下所示:
生產者 , 采用Fanout類型交換機發布消息 , 如下圖所示:
建立連接后 , 將通道聲明類型為Fanout的交換機 , 如下所示:
1/// <summary> 2/// fanout類型交換機 , 發送消息 3/// </summary> 4public class RabbitMqFanoutSendHelper : RabbitMqHelper { 5/// <summary> 6/// 發送消息 7/// </summary> 8/// <param name="msg"></param> 9/// <returns></returns>10public bool SendMsg(string msg)11{12try13{14using (var conn = GetConnection("/Alan.hsiang"))15{16using (var channel = conn.CreateModel())17{18channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);1920var body = Encoding.UTF8.GetBytes(msg);21 22channel.BasicPublish(exchange: "amq.fanout",23routingKey: "",24basicProperties: null,25body: body);26 27//Console.WriteLine(" [x] Sent {0}", message);28};29};30return true;31}32catch (Exception ex)33{34throw ex;35}36}37}建立連接后 , 通道聲明類型為Fanout的交換機 , 并綁定隊列進行訂閱 , 如下所示:
1/// <summary> 2/// 扇形交換機接收消息 3/// </summary> 4public class RabbitMqFanoutReceiveHelper : RabbitMqHelper 5{ 6public RabbitMqReceiveEventHandler OnReceiveEvent; 78private IConnection conn; 9 10private IModel channel;11 12private EventingBasicConsumer consumer;13 14public bool StartReceiveMsg(string queueName)15{16try17{18conn = GetConnection("/Alan.hsiang");19 20channel = conn.CreateModel();21channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);22//此處隨機取出交換機下的隊列23//var queueName = channel.QueueDeclare().QueueName;24channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");25consumer = new EventingBasicConsumer(channel);26consumer.Received += (model, ea) =>27{28var body = ea.Body.ToArray();29var message = Encoding.UTF8.GetString(body);30//Console.WriteLine(" [x] Received {0}", message);31if (OnReceiveEvent != null)32{33OnReceiveEvent(queueName+"::"+message);34}35};36channel.BasicConsume(queue: queueName,37autoAck: true,38consumer: consumer);39return true;40}41catch (Exception ex)42{43throw ex;44}45}46}
- 華為手機怎么刷機教程圖解 華為手機怎么刷機教程 很難
- 免費腦圖軟件推薦 免費腦圖軟件下載
- 可以讓微信語音變聲的軟件 微信發變聲語音軟件
- 工作手機是什么軟件 用手機工作的軟件
- 電腦充電保護在哪里設置 華為手機充電保護在哪里設置
- 惠普服務器擴容 惠普服務器磁盤陣列配置
- 硬編碼和軟編碼的優劣 什么是硬編碼什么是軟編碼
- 查看pdf需要什么軟件text 查看pdf用什么軟件
- js圖片上傳方法 JS上傳圖片
- 華為nova8參數配置詳情 華為nova7參數配置詳情
