当前位置 博文首页 > Shockang的博客:Kafka 的拦截器是什么?怎么用?

    Shockang的博客:Kafka 的拦截器是什么?怎么用?

    作者:[db:作者] 时间:2021-08-13 09:53

    前言

    本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

    本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

    正文

    Kafka 拦截器

    Kafka 拦截器分为生产者拦截器和消费者拦截器。

    生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;

    而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。

    链式拦截器

    值得一提的是,这两种拦截器都支持的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。

    举个例子,假设你想在生产消息前执行两个“前置动作”:

    第一个是为消息增加一个头信息,封装发送该消息的时间,第二个是更新发送消息数字段,那么当你将这两个拦截器串联在一起统一指定给 Producer 后,Producer 会按顺序执行上面的动作,然后再发送消息。

    当前 Kafka 拦截器的设置方法是通过参数配置完成的。

    生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes,它指定的是一组类的列表,每个类就是特定逻辑的拦截器实现类。

    拿上面的例子来说,假设第一个拦截器的完整类路径是 com.shockang.study.bigdata.kafka.interceptors.AddTimeStampInterceptor,第二个类是 com.shockang.study.bigdata.kafka.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定拦截器:

    Properties props = new Properties();
    List<String> interceptors = new ArrayList<>();
    interceptors.add("com.shockang.study.bigdata.kafka.interceptors.AddTimeStampInterceptor"); // 拦截器1
    interceptors.add("com.shockang.study.bigdata.kafka.interceptors.UpdateCounterInterceptor"); // 拦截器2
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
    

    如何编写拦截器类?

    现在问题来了,我们应该怎么编写 AddTimeStampInterceptor 和 UpdateCounterInterceptor 类呢?

    Producer 拦截器

    其实很简单,这两个类以及你自己编写的所有 Producer 端拦截器实现类都要继承 org.apache.kafka.clients.producer.ProducerInterceptor 接口。

    该接口是 Kafka 提供的,里面有两个核心的方法。

    onSend

    该方法会在消息发送之前被调用。如果你想在发送之前对消息“美美容”,这个方法是你唯一的机会。

    onAcknowledgement

    该方法会在消息成功提交或发送失败之后被调用。

    onAcknowledgement 的调用要早于 send(callback) 中 callback 的调用。

    值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全哦。

    还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。

    Consumer 拦截器

    同理,指定消费者拦截器也是同样的方法,只是具体的实现类要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,这里面也有两个核心方法。

    onConsume

    该方法在消息返回给 Consumer 程序之前调用。

    也就是说在开始正式处理消息之前,拦截器会先拦一道,搞一些事情,之后再返回给你。

    onCommit

    Consumer 在提交位移之后调用该方法。

    通常你可以在该方法中做一些记账类的动作,比如打日志等。

    一定要注意的是,指定拦截器类时要指定它们的全限定名,即 full qualified name。

    通俗点说就是要把完整包名也加上,不要只有一个类名在那里,并且还要保证你的 Producer 程序能够正确加载你的拦截器类。

    典型使用场景

    1. 客户端监控
    2. 端到端系统性能检测
    3. 消息审计

    实践

    需求

    某个业务只有一个 Producer 和一个 Consumer,想知道该业务消息从被生产出来到最后被消费的平均总时长是多少

    Producer 拦截器

    package com.shockang.study.bigdata.kafka.interceptors;
    
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import redis.clients.jedis.Jedis;
    
    import java.util.Map;
    
    public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
    
    
        private Jedis jedis; // 省略Jedis初始化
    
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            //在发送消息前更新总的已发送消息数
            jedis.incr("totalSentMessage");
            return record;
        }
    
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        }
    
    
        @Override
        public void close() {
        }
    
    
        @Override
        public void configure(Map<String, ?> configs) {
        }
    }
    

    Consumer 拦截器

    package com.shockang.study.bigdata.kafka.interceptors;
    
    
    import org.apache.kafka.clients.consumer.ConsumerInterceptor;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import redis.clients.jedis.Jedis;
    
    import java.util.Map;
    
    public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
    
    
        private Jedis jedis; //省略Jedis初始化
    
    
        @Override
        public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
            long lantency = 0L;
            for (ConsumerRecord<String, String> record : records) {
                lantency += (System.currentTimeMillis() - record.timestamp());
            }
            jedis.incrBy("totalLatency", lantency);
            long totalLatency = Long.parseLong(jedis.get("totalLatency"));
            long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
            jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
            return records;
        }
    
    
        @Override
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        }
    
    
        @Override
        public void close() {
        }
    
    
        @Override
        public void configure(Map<String, ?> configs) {
        }
    }
    

    在上面的消费者拦截器中,我们在真正消费一批消息前首先更新了它们的总延时,方法就是用当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到 Redis 中。

    之后的逻辑就很简单了,我们分别从 Redis 中读取更新过的总延时和总消息数,两者相除即得到端到端消息的平均处理延时。

    创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。

    这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。

    cs