当前位置 博文首页 > 韩超的博客 (hanchao5272):MQ: kafka的Java接入与入门示例(topi
本文主要通过实际编码来对《MQ: 一张图读懂kafka工作原理》提到的部分原理进行验证与实现。
相关文章参考:
后续代码依赖于以下版本,其他版本不保证代码可用:
kafka
服务版本:2.11-1.0.1kafka-clients.jar
版本:2.2.0spring-kafka.jar
版本:1.3.5.RELEASEspring-boot
版本:1.5.10.RELEASEpom.xml
先引入kafka的spring依赖包,这个包提供Producer和Consumer相关的操作。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
如果想进行Topic、Partition相关的操作,则引入下面的包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
application.properties
只给出最基本配置,如需调优,请自行增加其他配置。
spring.kafka.bootstrap-servers=127.0.0.1:9092
Toplic的增删改查需要通过AdminClient
操作,主要依赖kafka-clients.jar
。
/**
* 获取kafka管理客户端
*/
private static AdminClient getKafkaAdminClient() {
Map<String, Object> props = new HashMap<>(1);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "10.126.153.155:9092");
return KafkaAdminClient.create(props);
}
/**
* 获取全部topic名称
*/
private static Collection<String> getAllTopic(AdminClient client) throws InterruptedException, ExecutionException {
return client.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
}
/**
* 显示指定topic的信息
*/
private static void showTopicInfo(AdminClient client, Collection<String> topics, List<String> wantedTopicList) throws InterruptedException, ExecutionException {
client.describeTopics(topics).all().get().forEach((topic, description) -> {
if (wantedTopicList.contains(topic)) {
log.info("==== Topic {} Begin ====", topic);
for (TopicPartitionInfo partition : description.partitions()) {
log.info(partition.toString());
}
log.info("==== Topic {} End ====", topic);
}
});
}
可以一次性创建多个Topic,每个topic需要指定名称、Partition数量和Replicas数量。
Replicas数量不能超过broker数量。本文使用的kafka只有一个broker。
//创建topic:副本数不能超过broker数量
client.createTopics(Lists.newArrayList(
//聊天室 3分区
new NewTopic(TOPIC_CHAT_ROOM, 3, (short) 1),
//邮件 3分区
new NewTopic(TOPIC_MAIL, 3, (short) 1),
//短信 1分区
new NewTopic(TOPIC_SMS, 1, (short) 1)
));
可以一次性删除多个Topic。
client.deleteTopics(Lists.newArrayList("topic-send-sms","topic-send-mail"));
无法已经存在的Topic的分区数量等配置,只能删掉之后重建。
/**
* 聊天室 3分区
*/
public static final String TOPIC_CHAT_ROOM = "topic-hc-chat-room";
public static final String PERSON_LORA = "Lora";
public static final String PERSON_JACK = "Jack";
public static final String PERSON_PAUL = "Paul";
/**
* 邮件 3分区
*/
public static final String TOPIC_MAIL = "topic-hc-mail";
public static final String CONSUMER_GROUP_MAIL_1 = "MailConsumer-Group-1";
public static final String CONSUMER_MAIL = "MailConsumer-ALL";
public static final String CONSUMER_GROUP_MAIL_2 = "MailConsumer-Group-2";
public static final String CONSUMER_MAIL_PARTITION_0 = "MailConsumer-P0";
public static final String CONSUMER_MAIL_PARTITION_12 = "MailConsumer-P1,P2";
public static final String CONSUMER_GROUP_MAIL_3 = "MailConsumer-Group-3";
public static final String CONSUMER_MAIL_MULTI_0 = "MailConsumer-M0";
public static final String CONSUMER_MAIL_MULTI_1 = "MailConsumer-M1";
public static final String CONSUMER_MAIL_MULTI_2 = "MailConsumer-M2";
public static final String CONSUMER_MAIL_MULTI_3 = "MailConsumer-M3";
/**
* 短信 1分区
*/
public static final String TOPIC_SMS = "topic-hc-sms";
public static final String CONSUMER_SMS = "SmsConsumer";
/**
* 显示、创建、删除topic
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
//获取kafka管理客户端
AdminClient client = getKafkaAdminClient();
//查询全部topic
Collection<String> topics = getAllTopic(client);
//创建topic:副本数不能超过broker数量
client.createTopics(Lists.newArrayList(
//聊天室 3分区
new NewTopic(TOPIC_CHAT_ROOM, 3, (short) 1),
//邮件 3分区
new NewTopic(TOPIC_MAIL, 3, (short) 1),
//短信 1分区
new NewTopic(TOPIC_SMS, 1, (short) 1)
));
//查询topic详情
List<String> wantedTopicList = Lists.newArrayList(TOPIC_CHAT_ROOM, TOPIC_MAIL, TOPIC_SMS);
showTopicInfo(client, topics, wantedTopicList);
//删除topic:想要修改topic的配置如分区等需要删掉重建
client.deleteTopics(Lists.newArrayList("topic-send-sms","topic-send-mail"));
}
运行结果
运行结果显示了Topic的Partition、Leader、Replicas和isr配置。
INFO - Kafka version: 2.2.0
INFO - Kafka commitId: 05fcfde8f69b0349
INFO - ==== Topic topic-hc-chat-room Begin ====
INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - (partition=1, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - (partition=2, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - ==== Topic topic-hc-chat-room End ====
INFO - ==== Topic topic-hc-mail Begin ====
INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - (partition=1, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - (partition=2, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - ==== Topic topic-hc-mail End ====
INFO - ==== Topic topic-hc-sms Begin ====
INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null))
INFO - ==== Topic topic-hc-sms End ====
发送端的逻辑比较清晰,只需要主要发送时传递的必填与选填参数即可。
参考上一篇文章,消息发送参数:topic必填、partition选填、key选填、message必填。
根据这些参数,将消息发送至哪个Topic-Partition的路由规则,还是去参考上一篇文章。
下面通过一个API来展示:
/**
* <p>生产者</P>
*
* @author hanchao
*/
@Slf4j
@RestController
public class ProducerController {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送消息
*/
@GetMapping("/kafka/batch-send")
public boolean batchSendMessage(@RequestParam(required = false) String producer,
@RequestParam String topic,
@RequestParam(required = false) Integer partition,
@RequestParam(required = false) String key,
@RequestParam String value,
@RequestParam(required = false)