MQ消息队列(5)C#利用RabbitMQ实现消息订阅与发布-C/S开发框架
作者:csframework|C/S框架网  发布日期:2021/12/30 18:45:56

MQ消息队列(5)C#利用RabbitMQ实现消息订阅与发布-C/S开发框架

在消息队列模型中,如何将消息广播到所有的消费者,这种模式成为“发布/订阅”。本文主要以一个简单的小例子,简述通过fanout交换机,实现消息的发布与订阅,仅供学习分享使用,如有不足之处,还请指正。

Fanout交换机模型

扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

MQ消息队列(5)C#利用RabbitMQ实现消息订阅与发布-C/S开发框架

RabbitMQ控制台操作

新增两个队列

在同一个Virtual host下新增两个队列Q1,Q2,如下图所示:

MQ消息队列(5)C#利用RabbitMQ实现消息订阅与发布-C/S开发框架

绑定fanout交换机

将两个队列绑定到系统默认的fanout交换机,如下所示:

MQ消息队列(5)C#利用RabbitMQ实现消息订阅与发布-C/S开发框架

示例效果图

生产者,采用Fanout类型交换机发布消息,如下图所示:

MQ消息队列(5)C#利用RabbitMQ实现消息订阅与发布-C/S开发框架

当生产者发布 一条消息时,Q1,Q2两个队列均会收到,如下图所示:

MQ消息队列(5)C#利用RabbitMQ实现消息订阅与发布-C/S开发框架

当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:

MQ消息队列(5)C#利用RabbitMQ实现消息订阅与发布-C/S开发框架

核心代码

消息发布

建立连接后,将通道声明类型为Fanout的交换机,如下所示:

     /// <summary>
     /// fanout类型交换机,发送消息
     /// </summary>
     public class RabbitMqFanoutSendHelper : RabbitMqHelper {
         /// <summary>
         /// 发送消息
         /// </summary>
         /// <param name="msg"></param>
         /// <returns></returns>
         public bool SendMsg(string msg)
         {
             try
             {
                 using (var conn = GetConnection("/Alan.hsiang"))
                 {
                     using (var channel = conn.CreateModel())
                     {
                         channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
                         
                         var body = Encoding.UTF8.GetBytes(msg);
 
                         channel.BasicPublish(exchange: "amq.fanout",
                                              routingKey: "",
                                              basicProperties: null,
                                              body: body);
 
                         //Console.WriteLine(" [x] Sent {0}", message);
                     };
                 };
                 return true;
             }
             catch (Exception ex)
             {
                 throw ex;
             }
         }
     }

消息订阅

建立连接后,通道声明类型为Fanout的交换机,并绑定队列进行订阅,如下所示:

    /// <summary>
     /// 扇形交换机接收消息
     /// </summary>
     public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
     {
         public RabbitMqReceiveEventHandler OnReceiveEvent;
 
         private IConnection conn;
 
         private IModel channel;
 
         private EventingBasicConsumer consumer;
 
         public bool StartReceiveMsg(string queueName)
         {
             try
             {
                 conn = GetConnection("/Alan.hsiang");
 
                 channel = conn.CreateModel();
                 channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
                 //此处随机取出交换机下的队列
                 //var queueName = channel.QueueDeclare().QueueName;
                 channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
                 consumer = new EventingBasicConsumer(channel);
                 consumer.Received += (model, ea) =>
                 {
                     var body = ea.Body.ToArray();
                     var message = Encoding.UTF8.GetString(body);
                     //Console.WriteLine(" [x] Received {0}", message);
                     if (OnReceiveEvent != null)
                     {
                         OnReceiveEvent(queueName+"::"+message);
                     }
                 };
                 channel.BasicConsume(queue: queueName,
                                         autoAck: true,
                                         consumer: consumer);
                 return true;
             }
             catch (Exception ex)
             {
                 throw ex;
             }
         }
     }

关于RabbitMQ的基础知识介绍,可参考前几篇博文。

上一篇 下一篇