当前位置 博文首页 > MPolaris:ActiceMQ详解

    MPolaris:ActiceMQ详解

    作者:MPolaris 时间:2021-01-20 00:12

    1. MQ理解

    1.1 MQ的产品种类和对比

    MQ即消息中间件。MQ是一种理念,ActiveMQ是MQ的落地产品。

    消息中间件产品

    image-20210114005458326

    各类MQ对比

    image-20210114005649665
    • Kafka
      • 编程语言:Scala
      • 大数据领域的主流MQ
    • RabbitMQ
      • 编程语言:Erlang
      • 基于erlang语言,不好修改底层,不要查找问题的原因,不建议选用。
    • RocketMQ
      • 编程语言:Java
      • 适用于大型项目,适用于集群
    • ActiveMQ
      • 编程语言:Java
      • 适用于中小型项目
    1.2 MQ产生背景

    系统之间直接调用存在的问题?

    微服务架构后,链式调用是我们在写程序时候的一般流程,为了完成一个整体功能会将其拆分成多个函数(或子模块),比如模块A调用模块B,模块B调用模块C,模块C调用模块D。但在大型分布式应用中,系统间的RPC交互繁杂,一个功能背后要调用上百个接口并非不可能,从单机架构过渡到分布式微服务架构的通例。这些架构会有哪些问题?

    • 系统之间接口耦合比较严重

      每新增一个下游功能,都要对上游的相关接口进行改造。举个例子:如果系统A要发送数据给系统B和系统C,发送给每个系统的数据可能有差异,因此系统A对要发送给每个系统的数据进行了组装,然后逐一发送。当代码上线后又新增了一个需求:把数据也发送给D,新上了一个D系统也要接受A系统的数据,此时就需要修改A系统,让他感知到D系统的存在,同时把数据处理好再给D。在这个过程你会看到每接入一个下游系统都要对系统A进行代码改造,开发联调的效率很低。其整体架构如下图:

      image-20210114010536965
    • 面对大流量并发时容易被冲垮

      每个接口模块的吞吐能力是有限的,这个上限能力如果是堤坝,当大流量(洪水)来临时容易被冲垮。举例秒杀业务:上游系统发起下单购买操作就是下单一个操作很快就完成。然而下游系统要完成秒杀业务后面的所有逻辑(读取订单,库存检查,库存冻结,余额检查,余额冻结,订单生产,余额扣减,库存减少,生成流水,余额解冻,库存解冻)。

    • 等待同步存在性能问题

      RPC接口上基本都是同步调用,整体的服务性能遵循“木桶理论”,即整体系统的耗时取决于链路中最慢的那个接口。比如A调用B/C/D都是50ms,但此时B又调用了B1,花费2000ms,那么直接就拖累了整个服务性能。

      image-20210114010955756

    根据上述的几个问题,在设计系统时可以明确要达到的目标:

    1. 要做到系统解耦,当新的模块接进来时可以做到代码改动最小,能够解耦

    2. 设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费不被冲垮,能削峰

    3. 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力,能够异步

    1.3 MQ主要作用
    • 异步 调用者无需等待
    • 解耦 解决了系统之间耦合调用的问题
    • 消峰 抵御洪峰流量,保护了主业务
    1.4 MQ的定义

    面向消息的中间件(message-oriented middleware)MOM能够很好的解决以上问题。是指利用高效可靠的消息传递机制与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储、流量削峰,异步通信,数据同步等功能。

    大致的过程是这样的:发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题topic中,在合适的时候消息服务器会将消息转发给接受者。在这个过程中发送和接收是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然的关系。尤其在发布pub/订阅sub模式下,也可以完成一对多的通信即让一个消息有多个接受者。

    image-20210114011510051
    1.5 MQ特点

    采用异步处理模式

    消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(主题或者队列)上。消息接收者则订阅或者监听该通道。一条消息可能最终转发给一个或者多个消息接收者,这些消息接收者都无需对消息发送者做出同步回应。整个过程都是异步的。

    案例:也就是说一个系统跟另一个系统之间进行通信的时候,假如系统A希望发送一个消息给系统B让他去处理。但是系统A不关注系统B到底怎么处理或者有没有处理好,所以系统A把消息发送给MQ然后就不管这条消息的“死活了”,接着系统B从MQ里面消费出来处理即可。至于怎么处理,是否处理完毕,什么时候处理都是系统B的事儿与系统A无关。

    应用系统之间解耦合

    发送者和接受者不必了解对方,只需要确认消息。发送者和接受者不必同时在线。

    整体架构

    image-20210114012651889

    MQ缺点

    两个系统之间不能同步调用,不能实时回复,不能响应某个调用的回复。

    1.6 CentOS7安装ActiveMQ
    cd /root
    mkdir active_mq
    tar -xzvf apache-activemq-5.14.0-bin.tar.gz
    
    # /etc/init.d/目录增加增加activemq文件
    cd /etc/init.d/
    vim activemq
    
    #!/bin/sh
    #
    # /etc/init.d/activemq
    # chkconfig: 345 63 37
    # description: activemq servlet container.
    # processname: activemq 5.14.0
    
    # Source function library.
    #. /etc/init.d/functions
    # source networking configuration.
    #. /etc/sysconfig/network
    
    export JAVA_HOME=/root/java/jdk1.8.0_221
    export CATALINA_HOME=/root/active_mq/apache-activemq-5.14.0
    
    case  $1 in
         start)
             sh $CATALINA_HOME/bin/activemq start
         ;;
         stop)
             sh $CATALINA_HOME/bin/activemq stop
         ;;
         restart)
             sh $CATALINA_HOME/bin/activemq stop
             sleep 1
             sh $CATALINA_HOME/bin/activemq start
         ;;
    
    esac
    exit 0
    
    # 对activemq文件授予权限
    chmod 777 activemq
    
    # 设置开机启动并启动activemq
    chkconfig activemq on
    service activemq start
    
    # 启动时指定日志输出文件,activemq日志默认的位置是在:%activemq安装目录%/data/activemq.log
    service activemq start  >  /root/active_mq/activemq.log
    
    # 访问地址:http://IP地址:8161/
    # 默认账户:admin/admin
    # 61616 端口提供JMS服务
    # 8161 端口提供管理控制台服务 
    
    # 查看activemq状态
    service activemq status
    
    # 关闭activemq服务
    service activemq stop
    

    2. Java程序生成消息基本案例

    2.1 JMS简介

    JMS 总体编码规范

    image-20210114235714356

    JMS开发基本步骤

    image-20210114235837770

    Destination

    Destination 即目的地。下面拿 jvm 和 mq 做个对比,目的地可以理解为是数据存储的地方。

    image-20210115001050565

    两种Destination

    image-20210115001820225
    2.2 Idea新建Maven工程
    <!--  activemq 所需要的jar包-->
    <dependency>
    	<groupId>org.apache.activemq</groupId>
    	<artifactId>activemq-all</artifactId>
    	<version>5.15.9</version>
    </dependency>
    
    <!--  activemq 和 spring 整合的基础包 -->
    <dependency>
    	<groupId>org.apache.xbean</groupId>
    	<artifactId>xbean-spring</artifactId>
    	<version>3.16</version>
    </dependency>
    
    <!-- junit/log4j等基础配置   -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
    </dependency>
    
    2.3 队列消息(Queue)

    队列消息特点

    点对点消息传递域的特点如下

    • 每个消息只能有一个消费者,类似 1对1 的关系。
    • 消息的生产者和消费者之间 没有时间上的相关性。无论消费者在生产者发送消息时是否处于运行状态,消费者都可以提取消息。如我们发送短信,发送者发送后接受者不一定会及收及看。
    • 消息被消费后队列 不会再存储,所以消费者 不会消费到已经被消费过的消息。
    image-20210115210703169

    队列消息生产者

    public class JmsProduce {
        public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
        public static final String QUEUE_NAME = "queue_01";
    
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2.通过连接工厂获得连接connection并启动访问
            Connection conn = factory.createConnection();
            conn.start();
            //3.创建会话session
            //  两个参数:事务,签收
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4.创建目的地(具体是队列queue还是主题topic)
            //  Destination -> Queue/Topic
            Queue queue = session.createQueue(QUEUE_NAME);
            //5.创建消息的生产者
            MessageProducer producer = session.createProducer(queue);
            //6.通过使用消息生产者发送三条消息到MQ队列中
            for (int i = 0; i < 3; i++) {
                //创建消息
                TextMessage textMessage = session.createTextMessage("msg -> " + i);
                //通过消息生产者发送给MQ
                producer.send(textMessage);
            }
            //7.关闭资源
            producer.close();
            session.close();
            conn.close();
    
            System.out.println("====> 消息发布到MQ完成");
        }
    }
    
    image-20210115005955450

    队列消息消费者 - 同步阻塞式 receive

    public class JmsConsumer {
        public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
        public static final String QUEUE_NAME = "queue_01";
    
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection conn = factory.createConnection();
            conn.start();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(QUEUE_NAME);
    
            // 创建消息的消费者
            MessageConsumer consumer = session.createConsumer(queue);
            while (true) {
                // reveive():一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式,和socket的accept方法类似的。
                // reveive(Long time):等待n毫秒之后还没有收到消息就结束阻塞。
                // 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
                TextMessage message = (TextMessage) consumer.receive(4000L);
                if (null != message) {
                    System.out.println("====> 消费者的消息:" + message.getText());
                } else {
                    break;
                }
            }
    
            consumer.close();
            session.close();
            conn.close();
        }
    }
    
    image-20210115011322834

    队列消息消费者 - 异步非阻塞监听式 MessageListener

    public class JmsConsumer {
        public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
        public static final String QUEUE_NAME = "queue_01";
    
        public static void main(String[] args) throws Exception {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection conn = factory.createConnection();
            conn.start();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(QUEUE_NAME);
            MessageConsumer consumer = session.createConsumer(queue);
    
            // 监听器
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(null != message && message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("====> 消费者接受到消息:" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            System.in.read(); //保证控制台不停
    
            consumer.close();
            session.close();
            conn.close();
        }
    }
    
    

    消费者三种情况

    • 先生产,只启动一个消费者 ① => ①消费者会消费掉全部消息
    • 先生产,然后先启动消费者①,再启动消费者② => ①消费者会消费掉全部消息,②消费者不能消费消息
    • 先启动消费者①和②,再生产 => ①和②轮询消费,各自消费一半消息
    2.4 主题消息(Topic)

    主题消息特点

    在发布订阅消息传递域中,目的地被称为主题(topic)。

    发布/订阅消息传递域的特点如下:

    • 生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系。

    • 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费 自它订阅之后发布的消息。

    • 生产者生产时,topic 不保存消息,它是 无状态的 不落地的,假如无人订阅就去生产那就是一条废消息,所以一般先启动消费者再启动生产者。

    默认情况下如上所述,但是JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅。

    image-20210115212649958

    主题消息生产者

    public class JmsProduce {
        public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
        public static final String TOPIC_NAME = "topic_01";
    
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection conn = factory.createConnection();
            conn.start();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //只有这一步和Queue有区别
            Topic topic = session.createTopic(TOPIC_NAME);
    
            MessageProducer producer = session.createProducer(topic);
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("msg -> " + i);
                producer.send(textMessage);
            }
            producer.close();
            session.close();
            conn.close();
    
            System.out.println("====> 消息发布到MQ完成");
        }
    }
    

    主题消息消费者

    存在多个消费者,每个消费者都能收到自从自己启动后所有生产的消息。

    public class JmsConsumer {
        public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
        public static final String TOPIC_NAME = "topic_01";
    
        public static void main(String[] args) throws Exception {
            System.out.println("=====> 1号消费者");//多加几个消费者做实验
    
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection conn = factory.createConnection();
            conn.start();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //只有这一步和Queue有区别
            Topic topic = session.createTopic(TOPIC_NAME);
    
            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(null != message && message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("====> 消费者接受到消息:" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            System.in.read();
    
            consumer.close();
            session.close();
            conn.close();
        }
    }
    
    image-20210115214856154
    2.5 Topic和Queue对比
    image-20210115215225937

    3. JMS (Java消息服务) 详解

    3.1 Java消息服务是什么

    Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。

    image-20210115221749323
    3.2 JMS四大组成元素
    image-20210115221836939

    Message - 消息头

    JMS的消息头有哪些属性:

    • JMSDestination:消息目的地。主要是指Queue和Topic。

    • JMSDeliveryMode:消息持久化模式。分为持久模式和非持久模式,一条持久性的消息应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。一条非持久的消息最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。

    • JMSExpiration:消息过期时间。可以设置消息在一定时间后过期,默认是永不过期消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。如果发送后在消息过期时间之后还没有被发送到目的地,则该消息被清除。

    • JMSPriority:消息的优先级。消息优先级从0-9十个级别,0-4是普通消息,5-9是加急消息。 JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。

    • JMSMessageID:消息的唯一标识符。唯一标识每个消息的标识由MQ产生,也可以自己指定但是每个消息的标识要求唯一。

    说明:消息的生产者可以set这些属性,消息的消费者可以get这些属性。这些属性在send方法里面也可以设置。

    public class JmsProduce {
        public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
        public static final String TOPIC_NAME = "topic_01";
    
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection conn = factory.createConnection();
            conn.start();
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic(TOPIC_NAME);
            MessageProducer producer = session.createProducer(topic);
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("msg -> " + i);
    
                //这里可以指定每个消息的目的地
                textMessage.setJMSDestination(topic);
                //消息的模式,持久模式/非持久模式
                textMessage.setJMSDeliveryMode(0);
                //消息的过期时间
                textMessage.setJMSExpiration(1000);
                //消息的优先级
                textMessage.setJMSPriority(10);
                //指定每个消息的标识。MQ会给我们默认生成一个,我们也可以自己指定。
                textMessage.setJMSMessageID("ABCD");
                
                //上面的属性也可以通过send重载方法进行设置
                producer.send(textMessage);
            }
            producer.close();
            session.close();
            conn.close();
    
            System.out.println("====> 消息发布到MQ完成");
        }
    }
    

    Message - 消息体

    理解:封装具体的消息数据

    五种消息格式

    image-20210115232313263

    注意:发送和接收的消息体类型必须一致对应

    消息生产者

    for (int i = 0; i < 3; i++) {
    	// 发送TextMessage消息体
    	TextMessage textMessage = session.createTextMessage("topic " + i);
    	producer.send(textMessage);
    
    	// 发送MapMessage 消息体。set方法添加,get方式获取
    	MapMessage mapMessage = session.createMapMessage();
    	mapMessage.setString("name","rose" + i);
    	mapMessage.setInt("age", 18 + i);
    	producer.send(mapMessage);
    }
    

    消息消费者

    public void onMessage(Message message) {
        if(null != message && message instanceof TextMessage) {
             TextMessage textMessage = (TextMessage) message;
             try {
                 System.out.println("====> 消费者接受到text消息:" + textMessage.getText());
             } catch (JMSException e) {
                  e.printStackTrace();
             }
         }
    
         if(null != message && message instanceof MapMessage) {
              MapMessage mapMessage = (MapMessage) message;
              try {
                   System.out.println("====> 消费者接受到map消息:" + mapMessage.getString("name"));
                   System.out.println("====> 消费者接受到map消息:" + mapMessage.getString("age"));
               } catch (JMSException e) {
                   e.printStackTrace();
               }
          }
    }
    

    Message - 消息属性

    如果需要除消息头字段之外的值那么可以使用消息属性。它是 识别 / 去重 / 重点标注 等操作非常有用的方法。

    它们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。

    下图是设置消息属性的API:

    image-20210115233623890

    生产者

    for (int i = 0; i < 3; i++) {
    	TextMessage textMessage = session.createTextMessage("topic " + i);
    
    	// 调用Message的set*Property()方法就能设置消息属性
    	// 根据value的数据类型的不同,有相应的API
    	textMessage.setStringProperty("From","rose@qq.com");
    	textMessage.setByteProperty("Spec", (byte) 1);
    	textMessage.setBooleanProperty("Invalide",true);
    
    	producer.send(textMessage);
    }
    

    消费者

    public void onMessage(Message message) {
    	if(null != message && message instanceof TextMessage) {
    		TextMessage textMessage = (TextMessage) message;
    		try {
    			System.out.println("消息体:" + textMessage.getText());
    			System.out.println("消息属性:" + textMessage.getStringProperty("From"));
    			System.out.println("消息属性:" + textMessage.getByteProperty("Spec"));
    			System.out.println("消息属性:" + textMessage.getBooleanProperty("Invalide"));
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    3.5 JMS的可靠性

    RERSISTENT - 持久性

    什么是持久化消息 => 保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者,虽然这样增加了消息传送的开销但却增加了可靠性。

    我的理解:在消息生产者将消息成功发送给MQ消息中间件之后。无论是出现任何问题如:MQ服务器宕机、消费者掉线等。都保证(topic要之前注册过,queue不用)消息消费者能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。

    image-20210115235300769
    1. Queue消息非持久和持久
    • Queue非持久,当服务器宕机消息不存在(消息丢失了)。

    注意:只要服务器没有宕机,即便是非持久,消费者不在线的话消息也不会丢失,等待消费者在线还是能够收到消息的。

    //非持久化的消费者和之前的代码一样。下面演示非持久化的生产者。
    
    // 非持久化
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    
    • Queue持久化,当服务器宕机消息依然存在。Queue消息默认是持久化的。

    持久化消息,保证这些消息只被传送一次和成功使用一次。对于这些消息可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

    //持久化的消费者和之前的代码一样。下面演示持久化的生产者。
    
    //持久化
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
    1. Topic消息非持久和持久
    • Topic非持久,Topic默认就是非持久化的,因为生产者生产消息时消费者也要在线,这样消费者才能消费到消息。

    • Topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。

    //持久化topic生产者代码
    
    // 设置持久化topic 
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    // 设置持久化topic之后再启动连接
    conn.start();
    
    //持久化topic消费者代码
    
    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection conn = activeMQConnectionFactory.createConnection();
        
    	// 设置客户端ID,向MQ服务器注册自己的名称
        conn.setClientID("marrry");
        
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        
    	// 创建一个topic订阅者对象。一参是topic,二参是订阅者名称
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
        // 之后再开启连接
        connection.start();
        
        //之前是消息的消费者,这里就改为主题的订阅者
        Message message = topicSubscriber.receive();
        while (null != message){
            TextMessage textMessage = (TextMessage)message;
            System.out.println(" 收到的持久化 topic:" + textMessage.getText());
            message = topicSubscriber.receive(2000L);//继续监听2s,从激活到离线
            //经测试:离线再激活后仍然能收到之前的消息
        }
        
        session.close();
        conn.close();
    }
    
    image-20210116000703132

    注意:

    1. 一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题。

    2. 然后再运行生产者发送消息。

    3. 之后无论消费者是否在线都会收到消息。如果不在线的话,下次连接的时候会把没有收过的消息都接收过来。

    Transaction - 事务

    生产者开启事务后,执行commit方法这批消息才真正的被提交。不执行commit方法这批消息不会提交。执行rollback方法之前的消息会回滚掉。生产者的事务机制要高于签收机制,当生产者开启事务后签收机制不再重要。

    消费者开启事务后,执行commit方法这批消息才算真正的被消费。不执行commit方法这些消息不会标记已消费,下次还会被消费。执行rollback方法不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息下次还会被消费。消费者利用commit和rollback方法,甚至能够违反一个消费者只能消费一次消息的原理。

    image-20210116004820189

    注意:消费者和生产者需要同时操作事务才行吗? => 消费者和生产者的事务完全没有关联,各自是各自的事务。

    • 生产者
    public class JmsProduce {
        public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
        public static final String TOPIC_NAME = "topic_01";
    
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection conn = factory.createConnection();
            conn.start();
    
            //1.创建会话session,两个参数transacted=事务,acknowledgeMode=确认模式(签收)
            //设置为开启事务
            Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
            Topic topic = session.createTopic(TOPIC_NAME);
            MessageProducer producer = session.createProducer(topic);
    
            try {
                for (int i = 0; i < 3; i++) {
                    TextMessage textMessage = session.createTextMessage("topic " + i);
                    producer.send(textMessage);
    //                if(i == 2) {
    //                    throw new RuntimeException("=====> GG");
    //                }
                }
    
                // 2. 开启事务后,使用commit提交事务,这样这批消息才能真正的被提交。
                session.commit();
    
                System.out.println("====> 消息发布到MQ完成");
            } catch (JMSException e) {
                System.out.println("出现异常,消息回滚");
    
                // 3. 工作中一般当代码出错我们在catch代码块中回滚。这样这批发送的消息就能回滚。
                session.rollback();
    
            } finally {
                producer.close();
                session.close();
                conn.close();
            }
        }
    }
    
    //如果有一条抛出异常,则回滚
    //Exception in thread "main" java.lang.RuntimeException: =====> GG
    
    • 消费者
    public class JmsConsumer {
        public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616";
        public static final String TOPIC_NAME = "topic_01";
    
        public static void main(String[] args) throws Exception {
            System.out.println("=====> 1号消费者");
    
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection conn = factory.createConnection();
            conn.start();
    
            // 创建会话session,两个参数transacted=事务,acknowledgeMode=确认模式(签收)
            // 消费者开启了事务就必须手动提交,不然会重复消费消息
            final Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
            Topic topic = session.createTopic(TOPIC_NAME);
            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new MessageListener() {
                int a = 0;
                @Override
                public void onMessage(Message message) {
                    if(null != message && message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("消息体:" + textMessage.getText());
                            if(a == 0){
                                System.out.println("commit");
                                session.commit();
                            }
                            if (a == 2) {
                                System.out.println("rollback");
                                session.rollback();
                            }
                            a++;
                        } catch (JMSException e) {
                            System.out.println("出现异常,消费失败,放弃消费");
                            try {
                                session.rollback();
                            } catch (JMSException ex) {
                                ex.printStackTrace();
                            }
                        }
                    }
                }
            });
            System.in.read();
            consumer.close();
            session.close();
            conn.close();
        }
    }
    
    // 不执行commit方法的1和2消息不会标记已消费,下次还会被消费
    // 执行rollback方法不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息下次还会被消费
    
    // =====> 1号消费者
    // 消息体:topic 0
    // commit
    // 消息体:topic 1
    // 消息体:topic 2
    // rollback
    // 消息体:topic 1
    // 消息体:topic 2