当前位置 博文首页 > 好好学习天天向上:消息队列RocketMQ-03 |消息类别

    好好学习天天向上:消息队列RocketMQ-03 |消息类别

    作者:[db:作者] 时间:2021-07-15 21:56

    四、消息类别

    一.同步消息(即时性,且必须有回执)

    SendResult result = producer.send(msg);
    

    二.异步消息(需要返回,但是不是立刻马上)

    //1.创建一个发送消息的对象Producer
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    //2.设定发送的命名服务器地址
    producer.setNamesrvAddr("localhost:9876");
    //3.1启动发送的服务
    producer.start();
    for (int i = 0; i < 10; i++) {
    //4.创建要发送的消息对象,指定topic,指定内容body
    Message msg = new Message("topic1", ("hello
    rocketmq"+i).getBytes("UTF-8"));
    //3.2 同步消息
    //SendResult result = producer.send(msg);
    //System.out.println("返回结果:" + result);
    //异步消息
    producer.send(msg, new SendCallback() {
    //表示成功返回结果
    @Override
    public void onSuccess(SendResult sendResult) {
    System.out.println(sendResult);
    } /
    /表示发送消息失败
    @Override
    public void onException(Throwable throwable) {
    System.out.println(throwable);
    }
    });
    System.out.println("消息"+i+"发完了,做业务逻辑去了!");
    } /
    /休眠10秒
    TimeUnit.SECONDS.sleep(10);
    //5.关闭连接
    producer.shutdown();
    

    三.单向消息(发送过去就行,不需要回执)

    producer.sendOneway(msg);
    

    四.延时消息(设定一定延时,等时间到了就会自动发送)

    Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-
    8"));
    //设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
    msg.setDelayTimeLevel(3);
    SendResult result = producer.send(msg);
    System.out.println("返回结果:"+result);
    

    五.批量消息(一次发送多个消息)

    1、Provider

    List<Message> msgList = new ArrayList<Message>();
    Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-
    8"));
    Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-
    8"));
    Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-
    8"));
    msgList.add(msg1);
    msgList.add(msg2);
    msgList.add(msg3);
    SendResult result = producer.send(msgList);
    

    2、注意事项

    1. 有相同的topic
    2. 相同的waitStoreMsgOK
    3. 不能是延时消息
    4. 消息内容总长度不超过4M

    六.消息过滤

    1、分类过滤

    1.Provider

    Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq
    2").getBytes("UTF-8"));
    

    2.Consumer

    //接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
    consumer.subscribe("topic6","tag1 || tag2");
    

    2、语法过滤

    添加SQL语法过滤,需要在broker.conf配置文件添加对应配置文件才能开启。

    enablePropertyFilter=true
    

    1.基本语法

    • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
    • 字符比较,比如:=,<>,IN;
    • IS NULL 或者 IS NOT NULL;
    • 逻辑符号 AND,OR,NOT;

    2.支持类型

    • 数值,比如:123,3.1415;
    • 字符,比如:‘abc’,必须用单引号包裹起来;
    • NULL,特殊的常量
    • 布尔值,TRUE 或 FALSE

    3.Provider

    //为消息添加属性
    msg.putUserProperty("vip","1");
    msg.putUserProperty("age","20");
    

    4.Consumer

    //使用消息选择器来过滤对应的属性,语法格式为类SQL语法
    consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
    consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));
    
    cs