当前位置 博文首页 > eclipse编程:昨晚12点,女朋友突然问我:你会RabbitMQ吗?我竟
在说起消息队列之前,必须要先说一下同步调用和异步调用。
同步调用:A服务去调用B服务,需要一直等着B服务,直到B服务执行完毕并把执行结果返回给A之后,A才能继续往下执行。
举个例子:过年回到家,老妈对你说:“你也不小了,该谈女朋友了,隔壁王阿姨给你......。”“妈!我谈的有!"
老妈嘴角微微上扬:“那她现在有空吗?让妈给你把把关。”
你被逼之下跟女朋友开视频说:“那个我妈在我旁边,她想跟你说说话。”
你女朋友一下子慌了,立马拿起眉笔、口红、遮瑕对你说:“你先别挂,等我2分钟,我稍微化一下妆。”
你就一直等着她,等她化好妆之后你把手机给了你老妈。所以同步调用的核心就是:等待。
异步调用:A服务去调用B服务,不用一直等待B服务的执行结果,也就是说在B服务执行的同时A服务可以接着执行下面的程序。
举个例子:上午10点钟,办公室里,正在上班的你给你女朋友发微信说:“亲爱的,等你不忙了给我发一张你的照片吧,我想你了。”然后你接着工作了。
等到下午2点你女朋友给你发了一张她的美颜照,你点开看了看,迷的颠三倒四。所以异步调用的核心就是:只用通知对方一下,不用等待,通知完我这边该干嘛干嘛!
上面所说的异步调用就是用消息队列去实现。
场景一:用户注册
现在很多网站都需要给注册的用户发送注册短信或者激活邮箱,如果使用同步调用的话用户只有注册成功后才能给用户发送短信和邮箱链接,这样花费的时间就会很长。
有了消息队列之后我们只需要将用户注册的信息写入到消息队列里面,接来下该干嘛干嘛。
发送邮箱和发送短信的服务随时从消息队列里面取出该用户的信息,然后再去发送短信和邮箱链接。这样花费的时间就会大大减少。
场景二:修改商品
在微服务项目中,有时候数据量太多的话就需要分库分表,例如下图中商品表分别存储在A数据库和B数据库中。
有一天我们去调用修改商品的服务去修改A数据库中的商品信息,由于我们还需要调用搜索商品的服务查询商品信息,所以修改完A库中的商品信息后必须保证B库中的商品信息和A库一样。
如果采用同步调用的方式,在修改完A库的商品信息之后需要等待B库的商品信息修改完,这样耗时过长。
有了消息队列之后我们修改完A库的商品信息之后只需要将要修改的商品信息写入消息队列中,接下来该干什么干什么。
搜索商品的服务从消息队列中读取要修改的商品信息,然后同步B库中的商品信息,这样就大大地缩短响应时间。
MQ(Message Quene) : 江湖人称消息队列,小名又叫消息中间件。消息队列基于生产者和消费者模型,生产者不断向消息队列中发送消息,消费者不断从队列中获取消息。
因为消息的生产和消费都是异步的,而且没有业务逻辑的侵入,所以可以轻松的实现系统间解耦。
当今市面上有很多消息中间件,ActiveMQ、RabbitMQ、Kafka以及阿里巴巴自研的消息中间件RocketMQ等。
RabbitMQ 稳定可靠,支持多协议,有消息确认,基于erlang语言。
Kafka高吞吐,高性能,快速持久化,无消息确认,无消息遗漏,可能会有有重复消息,依赖于zookeeper,成本高。
ActiveMQ不够灵活轻巧,对队列较多情况支持不好。
RocketMQ性能好,高吞吐,高可用性,支持大规模分布式,协议支持单一。
基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。
AMQP:即Advanced Message Queuing Protocol, 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
RabbitMQ主要特性:
保证可靠性:使用一些机制来保证可靠性,如持久化、传输确认、发布确认
可伸缩性:支持消息集群,多台RabbitMQ服务器可以组成一个集群
高可用性:RabbitMQ集群中的某个节点出现问题时队列任然可用
支持多种协议
支持多语言客户端
提供良好的管理界面
提供跟踪机制:如果消息出现异常,可以通过跟踪机制分析异常原因
提供插件机制:可通过插件进行多方面扩展
指定版本,该版本包含了RabbitMQ的后台图形化页面
docker pull rabbitmq:management
方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
yum remove erlang*
erlang-23.2.1-1.el7.x86_64.rpm
rabbitmq-server-3.8.9-1.el7.noarch.rpm
rpm -ivh erlang-23.2.1-1.el7.x86_64.rpm
yum install -y rabbitmq-server-3.8.9-1.el7.noarch.rpm
注意:安装完成后配置文件在:/usr/share/doc/rabbitmq-server-3.8.9/rabbitmq.config.example目录中,需要 将配置文件复制到/etc/rabbitmq/目录中,并修改名称为rabbitmq.config
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
ls /etc/rabbitmq/rabbitmq.config
vim /etc/rabbitmq/rabbitmq.config
将上图中框着的部分修改为下图:
rabbitmq-plugins enable rabbitmq_management
systemctl status rabbitmq-server
rabbitmq常用命令
systemctl start rabbitmq-server
systemctl restart rabbitmq-server
systemctl stop rabbitmq-server
username:guest
password:guest
上面的Tags选项,其实是指定用户的角色。超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
虚拟主机:为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。
其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。
创建好虚拟主机,我们还要给用户添加访问权限。点击添加好的虚拟主机,进入虚拟机设置界面。
说白了就是一个生产者发送消息,一个消费者接受消息,一对一的关系。
在上图的模型中,有以下概念:
producer:生产者,消息发送者
consumer:消费者:消息的接受者
queue:消息队列,图中红色部分。类似一个仓库,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
说白了就是一个生产者发送消息,多个消费者接受消息。只要其中的一个消费者抢先接收到了消息,其他的就接收不到了。一对多的关系。
这里引入了交换机(Exchange)的概念,交换机绑定所有的队列。也就是说消息生产者会先把消息发送给交换机,然后交换机把消息发送到与它绑定的所有队列里面,消费者从它所绑定的队列里面获取消息。
在广播模式下,消息发送流程是这样的:
可以有多个消费者
每个消费者有自己的queue(队列)
每个队列都要绑定到Exchange(交换机)
生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
交换机把消息发送给绑定过的所有队列
队列的消费者都能拿到消息。实现一条消息被多个消费者消费
举个例子:消息生产者发送消息时给了交换机一个红桃A,消息生产者对交换机说:”这条消息只能给有红桃A的队列“。交换机发现队列一手里是黑桃K,队列二手里是红桃A,所以它将这条消息给了队列二。
在路由-直连模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
举个例子:消息生产者发送消息时给了交换机一个暗号:hello.mq,消息生产者对交换机说:”这条消息只能给暗号以hello开头的队列“。交换机发现它与队列一的暗号是hello.java,与队列二的暗号是news.today,所以它将这条消息给了队列一。
Topic类型的交换机与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如:b.hello
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
</dependencies>
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//ip地址
factory.setHost("##.##.##.##");
//端口
factory.setPort(5672);
//虚拟主机
factory.setVirtualHost("myhost");
//账户
factory.setUsername("root");
//密码
factory.setPassword("########");
Connection connection = factory.newConnection();
return connection;
}
}
消息生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 获取RabbitMQ的连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 创建队列,如果存在就不创建,不存在就创建
// 参数1 队列名, 参数2 durable:数据是否持久化 ,参数3 exclusive:是否排外的,记住false就行
// 参数4 autoDelete:是否自动删除,消费者消费完消息之后是否删除这个队列
// 参数5 arguments: 其他参数
channel.queueDeclare("queue", false, false, false, null);
// 写到队列中的消息内容
String message = "你好啊,mq!";
// 参数1 交换机,此处没有
// 参数2 发送到哪个队列
// 参数3 属性
// 参数4 内容
channel.basicPublish("", "queue", null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
}
消息消费者
public class Consumer {
public static void main(String[] args) throws Exception {
//获取RabbitMq的连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("简单模式获取消息:"+new String(body));
}
});
}
}
测试结果:
消息生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 获取RabbitMQ的连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 创建队列,如果存在就不创建,不存在就创建
// 参数1 队列名, 参数2 durable:数据是否持久化 ,参数3 exclusive:是否排外的,记住false就行
// 参数4 autoDelete:是否自动删除,消费者消费完消息之后是否删除这个队列
// 参数5 arguments: 其他参数
channel.queueDeclare("queue", false, false, false, null);
// 写到队列中的消息内容
String message = "你好啊,mq";
// 参数1 交换机,此处无
// 参数2 发送到哪个队列
// 参数3 属性
// 参数4 内容
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "queue", null, (message+i).getBytes());
}
//关闭通道和连接
channel.close();
connection.close();
}
}
消费者01
public class ConsumerOne {
public static void main(String[] args) throws Exception {
//创建一个RabbitMq的连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
channel.basicConsume("queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者01:"+new String(body));
}
});
}
}
消费者02
public class ConsumerTwo {
public static void main(String[] args) throws Exception {
//创建一个RabbitMq的连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
channel.basicConsume("queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者02:"+new String(body));
}
});
}
}
测试结果:
消费者01
消费者02
消息生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 获取RabbitMQ的连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 创建队列,如果存在就不创建,不存在就创建
// 参数1 队列名, 参数2 durable:数据是否持久化 ,参数3 exclusive:是否排外的,记住false就行
// 参数4 autoDelete:是否自动删除,消费者消费完消息之后是否删除这个队列
// 参数5 arguments: 其他参数
channel.queueDeclare("queue01", false, false, false, null);
channel.queueDeclare("queue02", false, false, false, null);
//创建交换机,如果存在就不创建。并指定交换机的类型是FANOUT即广播模式
channel.exchangeDeclare("fanout-exchange", BuiltinExchangeType.FANOUT);
//绑定交换机与队列,第一个参数是队列,第二个参数是交换机,第三个参数是路由key,这里不指定key
channel.queueBind("queue01", "fanout-exchange", "");
channel.queueBind("queue02", "fanout-exchange", "");
// 消息内容
String message = "这是一条广播消息";
// 参数1 交换机
// 参数2 发送到哪个队列,因为指定了交换机,所以这里队列名为空
// 参数3 属性
// 参数4 内容
channel.basicPublish("fanout-exchange", "", null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
}
消费者01
public class ConsumerOne {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMq连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue01",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者01:"+new String(body));
}
});
}
}
消费者02
public class ConsumerTwo {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMq连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue02",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者02:"+new String(body));
}
});
}
}
测试结果
1)路由模式之Direct(直连)
消息生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 获取RabbitMQ的连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 创建队列,如果存在就不创建,不存在就创建
// 参数1 队列名, 参数2 durable:数据是否持久化 ,参数3 exclusive:是否排外的,记住false就行
// 参数4 autoDelete:是否自动删除,消费者消费完消息之后是否删除这个队列
// 参数5 arguments: 其他参数
channel.queueDeclare("queue03", false, false, false, null);
channel.queueDeclare("queue04", false, false, false, null);
//创建交换机,如果存在就不创建。并指定交换机的类型是DIRECT模式
channel.exchangeDeclare("direct-exchange", BuiltinExchangeType.DIRECT);
//绑定交换机与队列,第一个参数是队列,第二个参数是交换机,第三个参数是路由key,这里指定路由key是a
channel.queueBind("queue03", "direct-exchange", "a");
//绑定交换机与队列,第一个参数是队列,第二个参数是交换机,第三个参数是路由key,这里指定路由key是b
channel.queueBind("queue04", "direct-exchange", "b");
//消息
String message = "这是一条key为a的消息";
// 参数1 交换机
// 参数2 路由key
// 参数3 属性
// 参数4 内容
channel.basicPublish("direct-exchange", "a", null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
}
消费者03
public class ConsumerThree {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMQ连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue03",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者03:"+new String(body));
}
});
}
}
消费者04
public class ConsumerFour {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMQ连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue04",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者04:"+new String(body));
}
});
}
}
测试结果
只有消费者03收到了消息
2)路由模式之-Topic
消息生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 获取RabbitMQ的连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 创建队列,如果存在就不创建,不存在就创建
// 参数1 队列名, 参数2 durable:数据是否持久化 ,参数3 exclusive:是否排外的,记住false就行
// 参数4 autoDelete:是否自动删除,消费者消费完消息之后是否删除这个队列
// 参数5 arguments: 其他参数
channel.queueDeclare("queue05", false, false, false, null);
channel.queueDeclare("queue06", false, false, false, null);
//创建交换机,如果存在就不创建。并指定交换机的类型是TOPIC模式
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//绑定交换机与队列,第一个参数是队列,第二个参数是交换机,第三个参数是路由key,这里指定路由key是a.*
//*是通配符,意思只要key满足a开头,.后面是什么都可以
channel.queueBind("queue05", "topic-exchange", "a.*");
//绑定交换机与队列,第一个参数是队列,第二个参数是交换机,第三个参数是路由key,这里指定路由key是b.*
//*是通配符,意思只要key满足b开头,.后面是什么都可以
channel.queueBind("queue06", "topic-exchange", "b.*");
// channel.queueDeclare("queue", false, false, false, null);
// 消息内容
String message = "这是一条key为a.hello的消息";
// 参数1 交换机,此处无
// 参数2 路由key
// 参数3 属性
// 参数4 内容
channel.basicPublish("topic-exchange", "a.hello", null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
}
消息消费者05
public class ConsumerFive {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMQ连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue05",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者05:"+new String(body));
}
});
}
}
消息消费者06
public class ConsumerSix {
public static void main(String[] args) throws Exception {
//创建一个新的RabbitMQ连接
Connection connection = ConnectionUtil.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//第一个参数:要从哪个队列获取消息
channel.basicConsume("queue06",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者06:"+new String(body));
}
});
}
}
测试结果
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
spring:
application:
name: mq-springboot
rabbitmq:
host: ##.##.##.##
port: 5672
username: root
password: #####
virtual-host: myhost
消息生产者:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
rabbitTemplate.convertAndSend("quenue","你好mq");
}
消息消费者
@Component
public class SingleCunstomer {
//监听的队列
@RabbitListener(queues = "queue")
public void receive(String message){
System.out.println("消息:" + message);
}
}
消息生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("quenue","你好mq!");
}
}
消息消费者
@Component
public class WorkCunstomer {
@RabbitListener(queues = "queue")
public void customerOne(String message){
System.out.println("消费者一:" + message);
}
@RabbitListener(queues = "queue")
public void customerTwo(String message){
System.out.println("消费者二:" + message);
}
}
消息生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg() {
//参数1 交换机 参数2 路由key 参数三 消息
rabbitTemplate.convertAndSend("fanout-exchange","","这是一条广播消息");
}
消息消费者
@Component
public class FanoutCunstomer {
@RabbitListener(queues = "queue01")
public void customerOne(String message){
System.out.println("消费者一:" + message);
}
@RabbitListener(queues = "queue02")
public void customerTwo(String message){
System.out.println("消费者二:" + message);
}
}
1)Direct(直连)模式
消息生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg() {
//参数1 交换机 参数2 路由key 参数三 消息
rabbitTemplate.convertAndSend("direct-exchange","a","这是一条广播消息");
}
消息消费者
@Component
public class DirectCunstomer {
//监听的队列 queue03
@RabbitListener(queues = "queue03")
//监听的队列 queue04
public void customerOne(String message){
System.out.println("消费者一:" + message);
}
@RabbitListener(queues = "queue04")
public void customerTwo(String message){
System.out.println("消费者二:" + message);
}
}