当前位置 博文首页 > 好好学习天天向上:消息队列RocketMQ-02 |消息发送

    好好学习天天向上:消息队列RocketMQ-02 |消息发送

    作者:[db:作者] 时间:2021-07-15 21:57

    三.消息发送

    1、ONE TO ONE(单生产者,单消费者)

    1.Provider

    public class Producer {
        public static void main(String[] args) throws Exception {
    /**
     1. 谁来发?
     2. 发给谁?
     3. 怎么发?
     4. 发什么?
     5. 发的结果是什么?4消费者
     6. 打扫战场
     **/
    //1.创建一个发送消息的对象Producer
            DefaultMQProducer producer = new DefaultMQProducer("group1");
    //2.设定发送的命名服务器地址
            producer.setNamesrvAddr("139.xxx.49.xxx:9876");
    //3.1启动发送的服务
            producer.start();
    //4.创建要发送的消息对象,指定topic,指定内容body
            Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
    //3.2发送消息
            SendResult result = producer.send(msg);
            System.out.println("返回结果:" + result);
    //5.关闭连接
            producer.shutdown();
        }
    }
    

    2.Provider执行遇到问题

    (1).sendDefaultImpl call timeout(这个报错有很多需要配置的)

    启动server

    ./mqnamesrv  -n "139.xxx.49.xxx:9876" &
    

    配置brokerconfig文件

    cd /usr/local/rocketmq-4.9.0/conf
    vim broker.conf
    namesrvAddr=139.xxx.49.xxx:9876
    brokerIP1 = 139.xxx.49.xxx
    autoCreateTopicEnable=true
    

    启动broker

    ./mqbroker -n 139.xxx.49.xxx:9876  autoCreateTopicEnable=true -c /usr/local/rocketmq-4.9.0/conf/broker.conf &
    
    (2).No route info of this topic:

    启动broker时加上autoCreateTopicEnable=true

    3.Consumer

    public class Consumer {
        public static void main(String[] args) throws Exception {
    /**
     1. 谁来发?
     2. 发给谁?
     3. 怎么发?
     4. 发什么?
     5. 发的结果是什么?
     6. 打扫战场
     **/
    //1.创建一个接收消息的对象Consumer
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    //2.设定接收的命名服务器地址
            consumer.setNamesrvAddr("139.xxx.49.xxx:9876");
    //3.设置接收消息对应的topic,对应的sub标签为任意
            consumer.subscribe("topic1", "*");
    //3.开启监听,用于接收消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    //遍历消息
                    for (MessageExt msg : list) {
    
                        System.out.println("收到消息:" + msg);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    //4.启动接收消息的服务
            consumer.start();
            System.out.println("接受消息服务已经开启!");
    //5 不要关闭消费者!
        }
    }
    

    2、ONE TO MANY(单生产者,多消费者,负载均衡模式)

    1.Provider

    public class Producer2 {
        public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
            //1.创建一个发送消息的对象Producer
            DefaultMQProducer producer = new DefaultMQProducer("group2");
    //2.设定发送的命名服务器地址
            producer.setNamesrvAddr("139.xxx.49.xxx:9876");
    //3.1启动发送的服务
            producer.start();
            //循环发送10次消息。
            for (int i = 0; i < 10; i++) {
    //4.创建要发送的消息对象,指定topic,指定内容body
                Message msg = new Message("topic2", "nihao rocketmq".getBytes("UTF-8"));
    //3.2发送消息
                SendResult result = producer.send(msg);
                System.out.println("返回结果:" + result);
            }
            //5.关闭连接
            producer.shutdown();
        }
    
    }
    

    2.Consumer

    public class Consumer2 {
        public static void main(String[] args) throws MQClientException {
            //1.创建一个接收消息的对象Consumer
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
    //2.设定接收的命名服务器地址
            consumer.setNamesrvAddr("139.155.49.250:9876");
    //3.设置订阅接收消息对应的topic,对应的sub标签为任意
            consumer.subscribe("topic2", "*");
    //设置当前消费者的消费模式(默认模式:负载均衡)
            consumer.setMessageModel(MessageModel.CLUSTERING);
    //3.开启监听,用于接收消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    //遍历消息
                    for (MessageExt msg : msgs) {
                        System.out.println("收到消息:" + msg);
                        System.out.println("消息是:" + new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
        }
    }
    

    3、MANYTO MANY(单生产者,多消费者,广播模式)

    1.Provider

    public class Producer2 {
        public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
            //1.创建一个发送消息的对象Producer
            DefaultMQProducer producer = new DefaultMQProducer("group2");
    //2.设定发送的命名服务器地址
            producer.setNamesrvAddr("139.xxx.49.xxx:9876");
    //3.1启动发送的服务
            producer.start();
            //循环发送10次消息。
            for (int i = 0; i < 10; i++) {
    //4.创建要发送的消息对象,指定topic,指定内容body
                Message msg = new Message("topic2", "nihao rocketmq".getBytes("UTF-8"));
    //3.2发送消息
                SendResult result = producer.send(msg);
                System.out.println("返回结果:" + result);
            }
            //5.关闭连接
            producer.shutdown();
        }
    }
    

    2.Consumer

    public class Consumer3 {
        public static void main(String[] args) throws MQClientException {
    
            //1.创建一个接收消息的对象Consumer
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
    //2.设定接收的命名服务器地址
            consumer.setNamesrvAddr("139.155.49.250:9876");
    //3.设置订阅接收消息对应的topic,对应的sub标签为任意
            consumer.subscribe("topic2", "*");
    //设置当前消费者的消费模式(默认模式:负载均衡)
            consumer.setMessageModel(MessageModel.BROADCASTING);
    //3.开启监听,用于接收消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    //遍历消息
                    for (MessageExt msg : msgs) {
                        System.out.println("收到消息:" + msg);
                        System.out.println("消息是:" + new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
        }
    }
    
    cs