当前位置 博文首页 > 好好学习天天向上:消息队列RocketMQ-03 |消息类别
SendResult result = producer.send(msg);
//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("localhost:9876");
//3.1启动发送的服务
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建要发送的消息对象,指定topic,指定内容body
Message msg = new Message("topic1", ("hello
rocketmq"+i).getBytes("UTF-8"));
//3.2 同步消息
//SendResult result = producer.send(msg);
//System.out.println("返回结果:" + result);
//异步消息
producer.send(msg, new SendCallback() {
//表示成功返回结果
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
} /
/表示发送消息失败
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
System.out.println("消息"+i+"发完了,做业务逻辑去了!");
} /
/休眠10秒
TimeUnit.SECONDS.sleep(10);
//5.关闭连接
producer.shutdown();
producer.sendOneway(msg);
Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-
8"));
//设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
List<Message> msgList = new ArrayList<Message>();
Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-
8"));
Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-
8"));
Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-
8"));
msgList.add(msg1);
msgList.add(msg2);
msgList.add(msg3);
SendResult result = producer.send(msgList);
Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq
2").getBytes("UTF-8"));
//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");
添加SQL语法过滤,需要在broker.conf配置文件添加对应配置文件才能开启。
enablePropertyFilter=true
//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");
//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));
cs