当前位置 博文首页 > weixin_41997073的博客:【星海出品】RabbitMQ快速入门

    weixin_41997073的博客:【星海出品】RabbitMQ快速入门

    作者:[db:作者] 时间:2021-07-15 15:38

    本文会从基础概念开始,扩展到实战再扩展回概念,由浅入深去了解rabbitMQ

    rabbitMQ入门
    RabbitMQ 是部署最广泛的开源消息代理。
    是一个由erlang开发的基于AMQP(Advanced Message Queue )协议的开源实现。
    用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都非常的优秀。是当前最主流的消息中间件之一。

    AMQP
    AMQP,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,同样,消息使用者也不用知道发送者的存在。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

    RabbitMQ常用的场景
    异步:批量数据异步处理 例:批量上传文件,日志埋点,订单全流程状态监控
    削峰:高负载任务负载均衡 例:电商秒杀 统一系统中 自己发送给自己
    解耦:串行任务并行化 例 抓单系统–》订单系统
    广播:基于发布/订阅模式实现一对多通信。 例: 多用户发送邮件

    rabbitMQ的安装
    在linux系统中rabbitMQ官网给出了各版本的安装方式。
    http://www.rabbitmq.com/install-rpm.html

    由于使用了erlang语言开发,所以需要erlang的包。
    erlang和RabbitMQ的兼容性,
    参考: https://www.rabbitmq.com/which-erlang.html#compatibility-matrix

    下载 rabbitmq-server 、erlang.rpm。

    下载后可以通过rpm -ql rabbitmq-server 查看rpm的文件。

    配置文件路径默认为/etc/rabbitmq/rabbitmq
    注:环境变量文件,可以不配置

    rabbitmq.con?g配置文件,3.7支持新旧两种配置文件格式

    rabbitmq-env.conf配置
    常用的参数

    RABBITMQ_NODE_IP_ADDRESS= //IP地址,空串bind所有地址,指定地址bind指定网络接口
    RABBITMQ_NODE_PORT= //TCP端口号,默认是5672 RABBITMQ_NODENAME=
    //节点名称。默认是rabbit RABBITMQ_CONFIG_FILE= //配置文件路径 ,即rabbitmq.config文件路径
    RABBITMQ_MNESIA_BASE= //mnesia所在路径 RABBITMQ_LOG_BASE=
    //日志所在路径 RABBITMQ_PLUGINS_DIR= //插件所在路径

    rabbitmq.config配置
    如果是用rpm包安装,可从默认docs目录复制配置文件样例:
    cp /usr/share/doc/rabbitmq-server-3.6.5/rabbitmq.config.example

    /etc/rabbitmq.config tcp_listerners
    #设置rabbimq的监听端口,默认为[5672]。
    disk_free_limit
    #磁盘低水位线,若磁盘容量低于指定值则停止接收数据,默认值为{mem_relative,
    1.0},即与内存相关联1:1,也可定制为多少byte.
    vm_memory_high_watermark
    #设置内存低水位线,若低于该水位线,则开启流控机制,默认值是0.4,即内存总量的40%。
    hipe_compile
    #将部分rabbimq代码用High Performance Erlang compiler编译,可提升性能,该参数是实验性,若出现erlang vm segfaults,应关掉。
    force_fine_statistics
    #该参数属于rabbimq_management,若为true则进行精细化的统计,但会影响性能。 frame_max #包大小,若包小则低延迟,若包则高吞吐,默认是131072=128K。 heartbeat #客户端与服务端心跳间隔,设置为0则关闭心跳,默认是600秒。

    rabbitMQ插件

    启用 rabbitmq-server -detached 关闭 rabbitmqctl stop

    -v 显示所有插件的详情(详细)
    -m 仅仅只显示插件的名称 (简约)
    -E 仅仅只显示显式启用的插件
    -e 仅仅只显示显式、隐式启用的插件 表示用于过滤插件名称表达式

    # 显示所有的插件,每一行一个 
    rabbitmq-plugins list
    
    # 显示所有的插件,并且显示插件的版本号和描述信息 
    rabbitmq-plugins list -v
    
    # 显示所有名称含有 "management" 的插件 
    rabbitmq-plugins list -v management
    
    # 显示所有显示或者隐式启动的插件 
    rabbitmq-plugins list -e rabbit
    
    # 启用指定插件
    rabbitmq-plugins enable
    
    # 禁用指定插件
    rabbitmq-plugins disable
    
    # 禁用所有插件
    rabbitmq-plugins set
    
    # 启用management插件和它所依赖的插件,禁用其他所有插件
    rabbitmq-plugins set rabbitmq_management
    

    rabbitMQ ctl
    用户管理:

    添加用户:
    rabbitmqctl add_user username password
    修改密码:
    rabbitmqctl change_password username newpass>
    删除用户:
    rabbitmqctl delete_user username
    用户列表:
    rabbitmqctl list_users
    设置用户角色:
    rabbitmqctl set_user_tags username tag1,tag2
    删除用户所有角色:
    rabbitmqctl set_user_tags username

    访问控制

    删除用户访问虚拟主机权限:
    rabbitmqctl clear_permissions -p vhost username
    查看虚拟主机对应用户的权限:
    rabbitmqctl list_permissions -p vhost
    查看用户拥有哪些虚拟主机的权限:
    rabbitmqctl list_user_permissions username
    查看所有虚拟主机
    rabbitmqctl list_vhosts name tracing
    给用户设置虚拟主机的权限:
    rabbitmqctl
    set_permissions -p vhost username “." ".” “.*”

    rabbitMQ 的实战

    ss -tanl | grep 5672
    LISTEN0128          *:25672                    *:*
    LISTEN0128          *:15672                    *:*
    LISTEN0128         :::5672                    :::*
    

    开始登录WEB界面,http://RabbitMQ服务器IP:15672 登录后在Admin中Virtual Hosts中建立一个虚拟主机
    缺省虚拟主机,默认只能是guest用户在本机连接。新建的用户 自己的名称默认无法访问任何虚拟主机。

    python对接rabbitMQ,推荐使用Pika

    Pika是纯Python实现的支持AMQP协议的库

    #send.py生产者代码
    import pika
    import time
    #配置连接参数
    params = pika.ConnectionParameters('192.168.142.135')
    
    #配置连接参数
    #params = pika.ConnectionParameters('192.168.0.1')
    credential = pika.PlainCredentials('zhanghao','password')
    
    params =pika.ConnectionParameters(
    	'192.168.0.1',5672,# broker, port
    	'test',# virtual host
    	credential# user password
    	)
    
    
    #建立连接
    connection = pika.BlockingConnection(params)
    with connection:
    	#建立通道
    	channel = connection.channel()
    	
    	#创建一个队列,queue命名为hello。如果queue不存在,消息将被dropped
    	channel.queue_declare(queue='hello')
    	
    	#也可以连接发送message,只需要加个for循环for i in range(2),body中增加{}.format(i), 循环最好再增加一个time.sleep(0.5)
    	channel.basic_publish(
    		exchange='',#使用缺省exchange
    		routing_key='hello',# routing_key必须指定,这里要求和目标queue一致
    		body='Hello World'#消息
    	    )
    	print("Sent Message OK")
    
    # recieve.py消费者代码
    import pika
    #获取对端连接的信息
    params = pika.URLParameters('amqp://server_zhanghao:server_password@server_ip:5672/server_rabbitmq_vhost')
    #建立连接
    connection = pika.BlockingConnection(params)
    
    def callback(channel,method,properties,body):
    	print('Get a message = {}'.format(body))
    
    #使用 with 可以自动关闭channel,防止遗漏导致的资源浪费
    with connection:
    	channel = connection.channel()
    	channel.basic_consume(
    		'hello',     #队列名
    		callback,    #消费回调函数
    		True         #不回应
        ) 
    	print('Waiting for messages. To exit press CTRL+C')
    	channel.start_consuming()
    

    如果启用两个消费者
    观察结果,可以看到,2个消费者是交替拿到不同的消息。

    这种工作模式是一种竞争工作方式,对某一个消息来说,只能有一个消费者拿走它。 从结果知道,使用的是轮询方式拿走数据的。
    注意:上面server用到缺省exchange。 使用缺省exchange,就必须指定routing_key,使用它找到queue

    消息队列的使用过程大概如下:

    (1)客户端连接到消息队列服务器,打开一个channel。

    (2)客户端声明一个exchange,并设置相关属性。

    (3)客户端声明一个queue,并设置相关属性。

    (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。

    (5) 客户端投递消息到exchange。exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

    AMQP 里主要要说两个组件:Exchange 和 Queue

    这两者都在 Server 端,又称作 Broker ,

    这部分是 RabbitMQ 实现的,客户端 通常有 Producer 和 Consumer 两种类型。

    几个概念

    P: 为Producer,数据的发送方。

    C:为Consumer,数据的接收方。

    Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

    Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

    Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

    Routing Key:路由关键字,

    exchange根据这个关键字进行消息投递。

    vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

    channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

    实现广播模式

    生产者使用广播模式。在test虚拟主机主机下构建了一个logs交换机 至于queue,可以由生产者创建,也可以由消费者创建。
    使用消费者端创建,生产者把数据发往交换机logs,采用了fanout,然后将数据通过交换机发往已经绑定 到此交换机的所有queue。

    绑定Bingding,建立exchange和queue之间的联系

    #生成一个交换机
    channel.exchange_declare(
    exchange='logs',#新交换机
    exchange_type='fanout'#广播
    )
    
    #消费者端
    result = channel.queue_declare(queue='')#生成一个随机名称的queue
    result = channel.queue_declare(queue='',exclusive=True)#生成一个随机名称的queue,并在断开连接时删除queue
    
    #生成queue
    q1 = channel.queue_declare(queue='',exclusive=True)
    q2 = channel.queue_declare(queue='',exclusive=True)
    q1name = q1.method.queue  #可以通过result.method.queue查看随机名称
    q2name = q2.method.queue
    print(q1name,q2name)
    
    #绑定
    channel.queue_bind(exchange='logs',queue=q1name)
    channel.queue_bind(exchange='logs',queue=q2name)
    

    实例:

    # send.py生产者代码
    import time
    import pika
    
    parameters = pika.URLParameters('amqp://wayne:wayne@192.168.142.135:5672/test')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    with connection:
    	channel.exchange_declare(exchange='logs',#新交换机
    			exchange_type='fanout')#扇出,广播
    	
    	for i inrange(40):
    		channel.basic_publish(exchange='logs',	#使用指定的exchange
    				routing_key='',					#广播模式,不用指定routing_key
    				body='data-{:02}'.format(i))	#消息
    		time.sleep(0.01)
    	print("send OK")
    

    消费者代码
    构建queue并绑定到test虚拟主机的logs交换机上

    # recieve.py消费者代码
    import pika
    
    params = pika.URLParameters('amqp://zhanghao:password@server_rabbitmq_ip:5672/server_rabbitmq_vhost')
    connection = pika.BlockingConnection(params)
    channel = connection.channel()
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    
    #生成队列,名称随机,exclusive=True断开删除该队列
    q1 = channel.queue_declare(queue='',exclusive=True)
    q2 = channel.queue_declare(queue='',exclusive=True)
    name1 = q1.method.queue#查看队列名
    name2 = q2.method.queue
    print(name1,name2)
    
    #绑定到交换机
    channel.queue_bind(exchange='logs',queue=name1)
    channel.queue_bind(exchange='logs',queue=name2)
    
    def callback(channel,method,properties,body):
    	print('{}\n{}'.format(channel,method))	
    	print('Get a message = {}'.format(body))
    	
    with connection:
    	channel.basic_consume(
    	name1,		#队列名
    	callback,	#消费回调函数
    	True		#不回应
        )
    	channel.basic_consume(name2,callback,True)
    	
    	print('Waiting for messages. To exit press CTRL+C')
    	channel.start_consuming()
    #先启动消费者可以看到已经创建了exchange
    
    

    路由配置模式

    绑定的时候指定的routing_key=‘black’,和fanout类似了,都是1对多,但是不同。
    因为fanout时,exchange不做数据过滤的,1个消息,所有绑定的queue都会拿到一个副本。
    direct时候,要按照routing_key分配数据,black有2个queue设置了,就会把1个消息分发给这2个queue。

    Topic模式

    Topic就是更加高级的路由,支持模式匹配而已。 Topic的routing_key必须使用 .点号分割的单词组成。最多255个字节。
    支持使用通配符: *表示严格的一个单词 #表示0个或者多个单词
    如果queue绑定的routing_key只是一个#,这个queue其实可以接收所有的消息。
    如果没有使用任何通配符,效果类似于direct,因为只能和字符匹配了。
    交换机在路由消息的时候,只要和queue的routing_key匹配,就把消息发给该queue

    RPC远程过程调用

    RabbitMQ的RPC的应用场景较少,因为有更好的RPC通信框架。

    消息队列的作用

    1、系统间解耦 2、解决生产者、消费者速度匹配
    由于稍微上规模的项目都会分层、分模块开发,模块间或系统间尽量不要直接耦合,需要开放公共接口提供给别的
    模块或系统调用,而调用可能触发并发问题,为了缓冲和解耦,往往采用中间件技术。
    RabbitMQ只是消息中间件中的一种应用程序,也是较常用的消息中间件服务。

    cs