当前位置 博文首页 > Stacking:.Net RabbitMQ实战指南——客户端开发

    Stacking:.Net RabbitMQ实战指南——客户端开发

    作者:Stacking 时间:2021-06-06 18:29

    开发中关键的Class和Interface有Channel、Connection、ConnectionFactory、Consumer等,与RabbitMQ相关的开发工作,基本上是围绕Connection和Channel这两个类展开的。

    连接RabbitMQ

    一个Connection可以创建多个Channel实例,但Channel实例不能在线程间共享,应用程序应该为每一个线程开辟一个Channel。

     

    Channel或者Connection中有个isOpen方法可以用来检测其是否已处于开启状态。但并不推荐使用,这个方法的返回值依赖于shutdownCause的存在,有可能会产生竞争。更多的是捕获ShutdownSignalException,IOException或SocketException等异常判断RabbitMq的连接状态。

    实际操作过程中遇到BrokerUnreachableException异常

     

     因为我使用的账号是guest,guest账号默认是不支持远程连接,需要在http://localhost:15672(前提是安装了web插件)的Admin选项卡中添加一个新用户(或者使用命令行添加)。

    安装web插件

    rabbitmq-plugins enable rabbitmq_management

    添加新用户:

    sudo rabbitmqctl add_user  user_admin  passwd_admin

     

    如上图所示,新添加的用户没有任何权限,需要点击用户名设置权限。

    示例代码:

    var factory = new ConnectionFactory {
        HostName = "localhost",   //主机名
        UserName = "mymq",        //默认用户名
        Password = "123456",      //默认密码
        RequestedHeartbeat = TimeSpan.FromSeconds(30)
    };
    
    using (var connection = factory.CreateConnection())//连接服务器
    {
        //创建一个通道
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("stacking", false, false, false, null);//创建消息队列
            var properties = channel.CreateBasicProperties();
            properties.DeliveryMode = 1;
            string message = "RabbitMQ Test"; //传递的消息内容
            channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生产消息
            Console.WriteLine($"Send:{message}");
        }
    }

    在管理界面处看到消息插入成功

     

     使用新加的账号链接MQ还会提示BrokerUnreachableException异常,很纳闷。折腾了半天把WSL升级到WSL2就链接成功。

    交换器和队列

    交换器和队列是应用层面的构建模块,使用前应对其进行声明确保其存在。

     var exchangeName = "exchange_name";
     channel.ExchangeDeclare(exchangeName, "direct", true);//创建一个持久化的、非自动删除的、绑定类型为direct的交换器
     var queueName = channel.QueueDeclare().QueueName;   //创建一个非持久化的、排他的、自动删除的队列(队列名由RabbitMQ自动生成)
     channel.QueueBind(queueName, exchangeName, "routing_key");  //使用路由键(routing_key)将队列和交换器绑定
    
     channel.QueueDeclare("queue_name", true);   // QueueDeclare拥有多个重载

    ExchangeDeclare方法详解

    各个参数详细说明如下:

    exchange:交换器的名称。

    type:交换器的类型,常见的如fanout、direct、topic。

    durable:设置是否持久化。durable设置为true表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。

    autoDelete:设置是否自动删除。autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑才会删除。

    internal:设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。

    argument:其他一些结构化参数

    QueueDeclareNoWait方法实现设置了一个nowait参数(AMQP中Exchange.Declare命令的参数),意思是不需要等待服务区返回结果。

    ExchangeDeclarePassive方法用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常。

    QueueDeclare方法详解

    方法的参数详细说明如下:

    queue:队列的名称。

    durable:设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。

    exclusive:设置是否排他。为true则设置队列为排他的。

    autoDelete:设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。

    arguments:设置队列的其他一些参数,如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority等。

    如果一个队列被声明为排他队列,则该队列仅对首次声明它的连接可见,并在连接断开时自动删除。需要注意:排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)可以访问同一连接创建的排他队列;“首次”是指如果一个连接已经声明了一个排他队列,其他连接不允许再建立同名的排他队列;即使该队列是持久化的,一旦连接关闭或客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。

    QueueDeclareNoWait方法实现设置了一个nowait参数,意思是不需要等待服务区返回结果。

    QueueDeclarePassive方法用来检测相应的队列是否存在。如果存在则正常返回;如果不存在则抛出异常。

     QueueBind方法详解

    方法中涉及的参数:

    queue:队列名称;

    exchange:交换器的名称;

    routingKey:用来绑定队列和交换器的路由键;

    argument:定义绑定的一些参数。

    ExchangeBind方法详解

    不仅可以将交换器与队列绑定,也可以将交换器与交换器绑定。绑定之后,消息从source交换器转发到destination交换器

    方法中涉及的参数:

    destination:目的交换器名;

    source:源交换器的名称;

    routingKey:用来绑定队列和交换器的路由键;

    argument:定义绑定的一些参数。

    交换器的使用并不会真正耗费服务器的性能,而队列会。要衡量RabbitMQ当前的QPS只需看队列的即可。

    发送消息

    BasicPublish方法用来发送一条消息到。为了更好地控制发送,可以使用mandatory这个参数

    对应的具体参数解释如下:

    exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到RabbitMQ默认的交换器中。

    routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中。

    basicProperties:消息的基本属性集,其包含14个属性成员,分别有contentType、contentEncoding、headers(Map<String,Object>)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。

    byte[] body:消息体(payload),真正需要发送的消息。

    mandatory:设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,会调用basic.return方法将消息返还给生产者;设为false时,出现上述情形broker会直接将消息扔掉。

    丰富了一下第一部分的代码:

    var properties = channel.CreateBasicProperties();
    properties.DeliveryMode = 1;
    properties.Priority = 2;
    properties.ContentType = "text/plain";
    properties.Expiration = "60000";
    string message = "RabbitMQ Test"; //传递的消息内容
    channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生产消息

    消费消息

    RabbitMQ的消费模式分两种:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume进行消费,而拉模式则是调用Basic.Get进行消费。

    推模式

    推模式接收消息需要实例化一个EventingBasicConsumer类,订阅Received事件来接收消息。EventingBasicConsumer实现了DefaultBasicConsumer类,实际使用中如果不满足需求可以继承该类。

    示例代码:

    var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (ch, ea) =>
                        {
                            var body = ea.Body.ToArray();
                            Console.WriteLine($"Received:{Encoding.UTF8.GetString(body)}");
                            channel.BasicAck(ea.DeliveryTag, false);
                        };
                        var consumerTag = channel.BasicConsume("stacking", false, consumer);

    BasicConsume方法对应的参数说明如下:

    queue:队列的名称;

    autoAck:设置是否自动确认。建议设成false,即不自动确认;

    consumerTag:消费者标签,用来区分多个消费者;

    arguments:设置消费者的其他参数;

    callback:设置消费者的回调函数。

    BasicConsume返回字符串类型consumerTag,可以通过调用channel.BasicCancel(consumerTag)显式地取消一个消费者的订阅。BasicCancel方法会首先触发HandleConsumerOk方法,之后触发HandleDelivery方法,最后才触发HandleCancelOk方法.

    拉模式

    拉模式通过channel.BasicGet方法可以单条地获取消息。

    示例代码:

    var result = channel.BasicGet("stacking",false);
    Console.WriteLine($"Received:{Encoding.UTF8.GetString(result.Body.ToArray())}");
    channel.BasicAck(result.DeliveryTag, false);

    Basic.Consume将信道(Channel)置为接收模式,直到取消队列的订阅,RabbitMQ会不断地推送消息给消费者,当然推送消息的个数还是会受到Basic.Qos的限制。如果只想从队列获得单条消息而不是持续订阅,建议使用Basic.Get进行消费。但是不能将Basic.Get放在一个循环里来代替Basic.Consume,这样做会严重影响RabbitMQ的性能。

    消费端的确认与拒绝

    为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。消费者在订阅队列时指定autoAck参数,当autoAck为false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当autoAck为true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

    当autoAck参数设置为false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题。

    当autoAck参数设置为false,对于RabbitMQ服务端而言,队列中的消息分为两部分:一部分是等待投递给消费者的消息;一部分是已投递给消费者,但是还没有收到消费者确认信号的消息。如果RabbitMQ一直没有收到消费者的确认信号,并且此消息的消费者断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者(可能还是原来的那个消费者 ),并且RabbitMQ不会为未确认的消息设置过期时间。

    消费消息时autoAck参数设置为false需要主动调用channel.BasicAck对消息进行确认,以便RabbitMQ删除消息,对应的也可以调用channel.BasicReject方法拒绝消息,由其他消费端处理或者丢弃。

    deliveryTag可以看作消息的编号。如果requeue参数设置为true,则RabbitMQ会重新将这条消息存入队列;如果requeue参数设置为false,则RabbitMQ立即会把消息从队列中移除,不会把它发送给新的消费者。

    Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令,对应的实现方法为channel.BasicNack.

    其中deliveryTag和requeue的含义可以参考BasicReject方法。multiple参数设置为false则表示仅拒绝编号为deliveryTag的单条消息;multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。

    channel.BasicReject或者channel.BasicNack中的requeue设置为false,可以启用“死信队列”的功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。

    关闭连接

    可以显示的调用Connection和Channel的Close方法来关闭连接,也可以借助using来管理连接。

    Connection和Channel所具备的生命周期如下:

    Open:开启状态,代表当前对象可以使用。

    Closing:正在关闭状态。当前对象被显式地通知调用关闭方法(shutdown),这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成。

    Closed:已经关闭状态。当前对象已经接收到所有的内部对象已完成关闭动作的通知,并且其也关闭了自身。

    在Connection和Channel中都定义了对应实现监听状态的改变。

    Connection

     

     Channel

    Github

    示例代码地址:https://github.com/MayueCif/RabbitMQ

    bk