当前位置 博文首页 > 韩超的博客 (hanchao5272):MQ: kafka的Java接入与入门示例(topi

    韩超的博客 (hanchao5272):MQ: kafka的Java接入与入门示例(topi

    作者:[db:作者] 时间:2021-09-05 16:08

    本文主要通过实际编码来对《MQ: 一张图读懂kafka工作原理》提到的部分原理进行验证与实现。

    相关文章参考:

    • MQ: 消息队列常见应用场景及主流消息队列ActiveMQ、RabbitMQ、RocketMQ和Kafka的简单对比
    • MQ: 一张图读懂kafka工作原理

    1.版本说明

    后续代码依赖于以下版本,其他版本不保证代码可用:

    • kafka 服务版本:2.11-1.0.1
    • kafka-clients.jar 版本:2.2.0
    • spring-kafka.jar 版本:1.3.5.RELEASE
    • spring-boot版本:1.5.10.RELEASE

    2.kafka接入

    pom.xml

    先引入kafka的spring依赖包,这个包提供Producer和Consumer相关的操作。

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>1.3.5.RELEASE</version>
    </dependency>
    

    如果想进行Topic、Partition相关的操作,则引入下面的包:

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.2.0</version>
    </dependency>
    

    application.properties

    只给出最基本配置,如需调优,请自行增加其他配置。

    spring.kafka.bootstrap-servers=127.0.0.1:9092
    

    3.示例代码:Topic的增删改查

    Toplic的增删改查需要通过AdminClient操作,主要依赖kafka-clients.jar

    3.1.获取kafka管理客户端

        /**
         * 获取kafka管理客户端
         */
        private static AdminClient getKafkaAdminClient() {
            Map<String, Object> props = new HashMap<>(1);
            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "10.126.153.155:9092");
            return KafkaAdminClient.create(props);
        }
    

    3.2.获取全部topic名称

        /**
         * 获取全部topic名称
         */
        private static Collection<String> getAllTopic(AdminClient client) throws InterruptedException, ExecutionException {
            return client.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
        }
    

    3.3.显示指定Topic的详细配置

        /**
         * 显示指定topic的信息
         */
        private static void showTopicInfo(AdminClient client, Collection<String> topics, List<String> wantedTopicList) throws InterruptedException, ExecutionException {
            client.describeTopics(topics).all().get().forEach((topic, description) -> {
                if (wantedTopicList.contains(topic)) {
                    log.info("==== Topic {} Begin ====", topic);
                    for (TopicPartitionInfo partition : description.partitions()) {
                        log.info(partition.toString());
                    }
                    log.info("==== Topic {} End ====", topic);
                }
            });
        }
    

    3.4.新建Topic

    可以一次性创建多个Topic,每个topic需要指定名称、Partition数量和Replicas数量。

    Replicas数量不能超过broker数量。本文使用的kafka只有一个broker。

            //创建topic:副本数不能超过broker数量
            client.createTopics(Lists.newArrayList(
                    //聊天室   3分区
                    new NewTopic(TOPIC_CHAT_ROOM, 3, (short) 1),
                    //邮件     3分区
                    new NewTopic(TOPIC_MAIL, 3, (short) 1),
                    //短信     1分区
                    new NewTopic(TOPIC_SMS, 1, (short) 1)
            ));
    

    3.5.删除Topic

    可以一次性删除多个Topic。

     client.deleteTopics(Lists.newArrayList("topic-send-sms","topic-send-mail"));
    

    3.6.修改Topic

    无法已经存在的Topic的分区数量等配置,只能删掉之后重建。

    3.7.测试代码与运行结果

        /**
         * 聊天室   3分区
         */
        public static final String TOPIC_CHAT_ROOM = "topic-hc-chat-room";
        public static final String PERSON_LORA = "Lora";
        public static final String PERSON_JACK = "Jack";
        public static final String PERSON_PAUL = "Paul";
    
        /**
         * 邮件     3分区
         */
        public static final String TOPIC_MAIL = "topic-hc-mail";
    
        public static final String CONSUMER_GROUP_MAIL_1 = "MailConsumer-Group-1";
        public static final String CONSUMER_MAIL = "MailConsumer-ALL";
    
        public static final String CONSUMER_GROUP_MAIL_2 = "MailConsumer-Group-2";
        public static final String CONSUMER_MAIL_PARTITION_0 = "MailConsumer-P0";
        public static final String CONSUMER_MAIL_PARTITION_12 = "MailConsumer-P1,P2";
    
        public static final String CONSUMER_GROUP_MAIL_3 = "MailConsumer-Group-3";
        public static final String CONSUMER_MAIL_MULTI_0 = "MailConsumer-M0";
        public static final String CONSUMER_MAIL_MULTI_1 = "MailConsumer-M1";
        public static final String CONSUMER_MAIL_MULTI_2 = "MailConsumer-M2";
        public static final String CONSUMER_MAIL_MULTI_3 = "MailConsumer-M3";
    
        /**
         * 短信     1分区
         */
        public static final String TOPIC_SMS = "topic-hc-sms";
        public static final String CONSUMER_SMS = "SmsConsumer";
    		/**
         * 显示、创建、删除topic
         */
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //获取kafka管理客户端
            AdminClient client = getKafkaAdminClient();
    
            //查询全部topic
            Collection<String> topics = getAllTopic(client);
    
            //创建topic:副本数不能超过broker数量
            client.createTopics(Lists.newArrayList(
                    //聊天室   3分区
                    new NewTopic(TOPIC_CHAT_ROOM, 3, (short) 1),
                    //邮件     3分区
                    new NewTopic(TOPIC_MAIL, 3, (short) 1),
                    //短信     1分区
                    new NewTopic(TOPIC_SMS, 1, (short) 1)
            ));
    
            //查询topic详情
            List<String> wantedTopicList = Lists.newArrayList(TOPIC_CHAT_ROOM, TOPIC_MAIL, TOPIC_SMS);
            showTopicInfo(client, topics, wantedTopicList);
    
            //删除topic:想要修改topic的配置如分区等需要删掉重建
            client.deleteTopics(Lists.newArrayList("topic-send-sms","topic-send-mail"));
        }
    

    运行结果

    运行结果显示了Topic的Partition、Leader、Replicas和isr配置。

     INFO - Kafka version: 2.2.0 
     INFO - Kafka commitId: 05fcfde8f69b0349 
     INFO - ==== Topic topic-hc-chat-room Begin ==== 
     INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
     INFO - (partition=1, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
     INFO - (partition=2, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
     INFO - ==== Topic topic-hc-chat-room End ==== 
     INFO - ==== Topic topic-hc-mail Begin ==== 
     INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
     INFO - (partition=1, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
     INFO - (partition=2, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
     INFO - ==== Topic topic-hc-mail End ==== 
     INFO - ==== Topic topic-hc-sms Begin ==== 
     INFO - (partition=0, leader=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), replicas=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null), isr=c2-v03-0-0-1.yidian.com:9092 (id: 0 rack: null)) 
     INFO - ==== Topic topic-hc-sms End ==== 
    

    4.示例代码:Producer发送消息

    发送端的逻辑比较清晰,只需要主要发送时传递的必填与选填参数即可。

    参考上一篇文章,消息发送参数:topic必填、partition选填、key选填、message必填。

    根据这些参数,将消息发送至哪个Topic-Partition的路由规则,还是去参考上一篇文章。

    下面通过一个API来展示:

    /**
     * <p>生产者</P>
     *
     * @author hanchao
     */
    @Slf4j
    @RestController
    public class ProducerController {
    
        @Resource
        private KafkaTemplate<String, String> kafkaTemplate;
    
        /**
         * 发送消息
         */
        @GetMapping("/kafka/batch-send")
        public boolean batchSendMessage(@RequestParam(required = false) String producer,
                                        @RequestParam String topic,
                                        @RequestParam(required = false) Integer partition,
                                        @RequestParam(required = false) String key,
                                        @RequestParam String value,
                                        @RequestParam(required = false)