当前位置 博文首页 > 好好学习天天向上:消息队列RocketMQ-02 |消息发送
public class Producer {
public static void main(String[] args) throws Exception {
/**
1. 谁来发?
2. 发给谁?
3. 怎么发?
4. 发什么?
5. 发的结果是什么?4消费者
6. 打扫战场
**/
//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("139.xxx.49.xxx:9876");
//3.1启动发送的服务
producer.start();
//4.创建要发送的消息对象,指定topic,指定内容body
Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
//3.2发送消息
SendResult result = producer.send(msg);
System.out.println("返回结果:" + result);
//5.关闭连接
producer.shutdown();
}
}
启动server
./mqnamesrv -n "139.xxx.49.xxx:9876" &
配置brokerconfig文件
cd /usr/local/rocketmq-4.9.0/conf
vim broker.conf
namesrvAddr=139.xxx.49.xxx:9876
brokerIP1 = 139.xxx.49.xxx
autoCreateTopicEnable=true
启动broker
./mqbroker -n 139.xxx.49.xxx:9876 autoCreateTopicEnable=true -c /usr/local/rocketmq-4.9.0/conf/broker.conf &
启动broker时加上autoCreateTopicEnable=true
public class Consumer {
public static void main(String[] args) throws Exception {
/**
1. 谁来发?
2. 发给谁?
3. 怎么发?
4. 发什么?
5. 发的结果是什么?
6. 打扫战场
**/
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("139.xxx.49.xxx:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic1", "*");
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历消息
for (MessageExt msg : list) {
System.out.println("收到消息:" + msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已经开启!");
//5 不要关闭消费者!
}
}
public class Producer2 {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group2");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("139.xxx.49.xxx:9876");
//3.1启动发送的服务
producer.start();
//循环发送10次消息。
for (int i = 0; i < 10; i++) {
//4.创建要发送的消息对象,指定topic,指定内容body
Message msg = new Message("topic2", "nihao rocketmq".getBytes("UTF-8"));
//3.2发送消息
SendResult result = producer.send(msg);
System.out.println("返回结果:" + result);
}
//5.关闭连接
producer.shutdown();
}
}
public class Consumer2 {
public static void main(String[] args) throws MQClientException {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("139.155.49.250:9876");
//3.设置订阅接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic2", "*");
//设置当前消费者的消费模式(默认模式:负载均衡)
consumer.setMessageModel(MessageModel.CLUSTERING);
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//遍历消息
for (MessageExt msg : msgs) {
System.out.println("收到消息:" + msg);
System.out.println("消息是:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
}
public class Producer2 {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group2");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("139.xxx.49.xxx:9876");
//3.1启动发送的服务
producer.start();
//循环发送10次消息。
for (int i = 0; i < 10; i++) {
//4.创建要发送的消息对象,指定topic,指定内容body
Message msg = new Message("topic2", "nihao rocketmq".getBytes("UTF-8"));
//3.2发送消息
SendResult result = producer.send(msg);
System.out.println("返回结果:" + result);
}
//5.关闭连接
producer.shutdown();
}
}
public class Consumer3 {
public static void main(String[] args) throws MQClientException {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("139.155.49.250:9876");
//3.设置订阅接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic2", "*");
//设置当前消费者的消费模式(默认模式:负载均衡)
consumer.setMessageModel(MessageModel.BROADCASTING);
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//遍历消息
for (MessageExt msg : msgs) {
System.out.println("收到消息:" + msg);
System.out.println("消息是:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
}
cs