当前位置 博文首页 > 垃圾代码生产者:从一次生产消费者的bug看看线程池如何增加线程

    垃圾代码生产者:从一次生产消费者的bug看看线程池如何增加线程

    作者:垃圾代码生产者 时间:2021-01-17 20:02

    0 背景

    某个闲来无事的下午,看到旧有的项目中,有个任务调度的地方都是同步的操作,就是流程A的完成依赖流程B,流程B的完成依赖流程C,按此类推。

    作为一名垃圾代码生产者,QA的噩梦、故障报告枪手的我来说,发掘可以“优化”的空间,是我的分内之事。

    因为是在一个工程内,并且本身工程组件没有使用到任何消息队列的软件(例如kafka、rocketMQ),如果只是要因为这个功能而贸然引用,对其进行维护的成本就比较高,我的技术组长大人是万万不会同意的。没办法,自己来吧。很快的,我完成了下面几个类的编写

    整体的设计很简单,就是传统的生产消费者,只是利用了阻塞队列,作为缓冲。

    • 在生产者内部有个定时执行的线程,将队列中的消息转发给消费者。生产者会单独占用一个线程
    • 每个消费者自己也有一个阻塞队列,用来接收生产者产生的消息,消费者们因为可能不是所有的topic每时每刻都会有消息的产生,因此利用线程池即可。

    1 代码实现

    
    public interface IEvent {
    
        String getTopic();
    
    }
    
    // 消息实体
    public class Event<T> implements IEvent{
    
        /**
         * 产生的时间戳
         */
        private long ts = System.currentTimeMillis();
    
        /**
         * 携带的实体数据
         */
        private T entity;
    
    
        /**
         * topic
         */
        private String topic;
    
    
        // setter getter 省略
    }
    
    // 如何处理消息
    public interface ConfigListener {
    
        String ALL = "all";
    
        /**
         * 提供给监听器处理
         *
         * @param event
         */
        void handler(IEvent event);
    
    
        /**
         * 优先级顺序
         * @return
         */
        int getOrder();
    
        /**
         *
         * @return
         */
        String getTopic();
    
    }
    
    // 创建4个消息处理的类,这里省略了,只展示一个
    public class RandomSleepConfigListener implements ConfigListener {
    
        @Override
        public void handler(IEvent event) {
            logger.info("execute " + this.getClass().getSimpleName());
            // 20ms - 50ms
            long t = (long) (Math.random() * 5) + 5L;
            try {
                TimeUnit.MILLISECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    
    
    
    // 线程池类
    public class ScheduleThreadPool {
    
        private static final AtomicInteger atomic = new AtomicInteger();
    
        // 被生产者单独使用的线程
        public static final ExecutorService EVENT_POOL = Executors.newFixedThreadPool(1, r -> new Thread(r, "EVENT-PRODUCER-" + atomic.incrementAndGet()));
    
        /**
         * 常驻线程2个,最大8个,最多接受任务128个,超过则由提交线程来处理
         */
        public static final ExecutorService EVENT_CONSUMER_POOL =
                new ThreadPoolExecutor(2, 8, 50L,
                        TimeUnit.MILLISECONDS,
                        new ArrayBlockingQueue<>(128),
                        r -> new Thread(r, "EVENT-CONSUMER-" + atomic.incrementAndGet()),
                        new ThreadPoolExecutor.CallerRunsPolicy());
    }
    
    
    
    // ############################### 以上的准备工作完成,下面就是编写生产者和消费者     ###########################################
    
    
    public class Producer {
    
        private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    
        /**
         * 外部提交的消息体会被送入到这个队列当中
         */
        private static final ArrayBlockingQueue<IEvent> blockingQueue = new ArrayBlockingQueue<>(128);
    
        /**
         *  topic, consumer
         */
        private static Map<String, Consumer> topic2ConsumerMap = Maps.newHashMap();
    
    
    
        // 一些初始化的工作
        static {
            logger.info("Producer init start...");
            // SPI方式插件式加载,这里可以改为你熟悉的加载类的方式
            Iterator<ConfigListener> configListenerIterator = ServiceBootstrap.loadAll(ConfigListener.class);
    
            // 整体遍历一遍,不同的listener分散到不同的地方去
            while (configListenerIterator.hasNext()) {
                ConfigListener configListener = configListenerIterator.next();
                String topic = configListener.getTopic();
                // 没有明确topic的,我们不进行处理
                if (null == topic) {
                    continue;
                }
    
                logger.info("we init {} topic", topic);
    
                if (topic2ConsumerMap.containsKey(topic)) {
                    topic2ConsumerMap.get(topic).addListener(configListener);
                } else {
                    topic2ConsumerMap.put(topic, new Consumer(topic).addListener(configListener));
                }
            }
    
            // 如果有定义对全部都适用的事件处理,需要加入到每个topic的listener的队列中去
            if (topic2ConsumerMap.containsKey(ConfigListener.ALL)) {
                Consumer consumer = topic2ConsumerMap.get(ConfigListener.ALL);
                topic2ConsumerMap.remove(ConfigListener.ALL);
    
                for (Map.Entry<String, Consumer> entry : topic2ConsumerMap.entrySet()) {
                    entry.getValue().addAllListener(consumer.getPriorityList());
                }
            }
    
            // 启动监听线程
            ScheduleThreadPool.EVENT_POOL.execute(() -> {
                //noinspection InfiniteLoopStatement
                int i = 0;
                while (true) {
                    try {
                        // 从队列获取需要处理的任务,没有会进行阻塞
                        IEvent iEvent = blockingQueue.take();
                        logger.info("from producer queue take a message {} {}", iEvent.getTopic(), (i++));
                        topic2ConsumerMap.get(iEvent.getTopic()).addEvent(iEvent);
                    } catch (InterruptedException e) {
                        //
                    }
                }
            });
    
            logger.info("Producer init end...");
        }
    
    
        /**
         * 阻塞队列添加要处理的事件
         * @param iEvent
         * @return true 添加成功
         */
        public static void publish(IEvent iEvent) throws InterruptedException {
            logger.info("publish start...");
            // 当队列满时,这个方法会被阻塞
            blockingQueue.put(iEvent);
            logger.info("publish over...");
        }
    
    }
    
    
    
    public class Consumer {
    
        private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    
        /**
         * 排序好的列表
         */
        private List<ConfigListener> priorityList = Lists.newArrayListWithCapacity(16);
    
        /**
         * 降序排列
         */
        private Comparator<ConfigListener> comparator = (o1, o2) -> o2.getOrder() - o1.getOrder();
    
    
        /**
         * 等待被处理的事件
         */
        private LinkedBlockingQueue<IEvent> waitEvent = new LinkedBlockingQueue<>(32);
    
        /**
         * 统计已经完成的任务数
         */
        private AtomicInteger count = new AtomicInteger();
    
        /**
         * 处理哪种topic
         */
        private String topic;
    
    //    //CODE-B 这块代码是后来产生问题的代码,也是因为这个代码引起了我对线程池创建过程的好奇
    //    {
    //        logger.info("non-static invoke--------");
    //        // 创建任务提交
    //        ScheduleThreadPool.EVENT_CONSUMER_POOL.execute(() -> {
    //            // 注意这里有个循环
    //            for (;;) {
    //                try {
    //                    logger.info("take event");
    //                    IEvent take = waitEvent.take();
    //                    priorityList.forEach(c -> c.handler(take));
    //                    int t = count.incrementAndGet();
    //                    logger.info("TOPIC[{}] size {}, remainingCapacity {} finish {} ",
    //                            topic, waitEvent.size(), waitEvent.remainingCapacity(), t);
    //                } catch (InterruptedException e) {
    //                    // 记录错误失败
    //                }
    //            }
    //        });
    //    }
    
        public Consumer(String topic) {
            this.topic = topic;
        }
    
    
        public List<ConfigListener> getPriorityList() {
            return priorityList;
        }
    
        public Consumer addListener(ConfigListener listener) {
            priorityList.add(listener);
            priorityList.sort(comparator);
            return this;
        }
    
        public void addAllListener(Collection<? extends ConfigListener> c) {
            priorityList.addAll(c);
            priorityList.sort(comparator);
        }
    
        public void addEvent(IEvent iEvent) {
            try {
                logger.info(" topic {} queueSize {} finish {}", this.topic, waitEvent.size(), count.get());
                waitEvent.put(iEvent);
            } catch (InterruptedException e) {
                //
            }
    
    
            // CODE-A
            ScheduleThreadPool.EVENT_CONSUMER_POOL.execute(() -> {
                // 注意这里和分发的producer不一样,不使用循环
                try {
                    logger.info("take event");
                    IEvent take = waitEvent.take();
                    priorityList.forEach(c -> c.handler(take));
                    int t = count.incrementAndGet();
                    logger.info("TOPIC[{}] size {}, remainingCapacity {} finish {} ",
                            topic, waitEvent.size(), waitEvent.remainingCapacity(), t);
                } catch (InterruptedException e) {
                    // 记录错误失败
                }
            });
    
        }
    
    }
    
    
    // 测试类
    public class ProductTest{
        // 这里我自己创建了4个消息处理的类,对应的topic分别如下
        String[] topics = {"random1","random2","random3","random4"};
    
        @Test(timeout = 30000L)
        public void publish() throws InterruptedException {
            
            for (int i = 0; i < 720; i++) {
                int j = i & 0x3;
                System.out.println(i);
                Producer.publish(new Event<String>("hello", topics[j]));
            }
    
            TimeUnit.SECONDS.sleep(60L);
        }
    
    }
    
    

    2 开搞

    代码都准备好了以后,我们就开始了,debug出来的结果和设想的符合预期

    4个topic,720个任务,每个处理掉180个

    2021-01-17 16:27:56.210 [EVENT-CONSUMER-3] INFO  - TOPIC[random1] size 0, remainingCapacity 32 finish 180 
    2021-01-17 16:27:56.210 [EVENT-CONSUMER-2] INFO  - TOPIC[random4] size 1, remainingCapacity 31 finish 179 
    2021-01-17 16:27:56.210 [EVENT-CONSUMER-3] INFO  - take event
    2021-01-17 16:27:56.210 [EVENT-CONSUMER-2] INFO  - take event
    2021-01-17 16:27:56.210 [EVENT-CONSUMER-3] INFO  - execute RandomSleepConfigListener2
    2021-01-17 16:27:56.210 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener3
    2021-01-17 16:27:56.215 [EVENT-CONSUMER-3] INFO  - TOPIC[random2] size 0, remainingCapacity 32 finish 180 
    2021-01-17 16:27:56.215 [EVENT-CONSUMER-3] INFO  - take event
    2021-01-17 16:27:56.215 [EVENT-CONSUMER-3] INFO  - execute RandomSleepConfigListener4
    2021-01-17 16:27:56.217 [EVENT-CONSUMER-2] INFO  - TOPIC[random3] size 0, remainingCapacity 32 finish 180 
    2021-01-17 16:27:56.221 [EVENT-CONSUMER-3] INFO  - TOPIC[random4] size 0, remainingCapacity 32 finish 180
    

    嗯,目前为止觉得很完美,然后看consumer类,觉得每次任务被推入阻塞队列,然后执行线程去从阻塞队列中去拉取消息出来,这不符合我作死的风格,改。
    然后就变为了CODE-B的模样,线程池创建出来后,一直循环来拉取即可

        {
            logger.info("non-static invoke--------");
            // 创建任务提交
            ScheduleThreadPool.EVENT_CONSUMER_POOL.execute(() -> {
                // 注意这里有个循环
                for (;;) {
                    try {
                        logger.info("take event");
                        IEvent take = waitEvent.take();
                        priorityList.forEach(c -> c.handler(take));
                        int t = count.incrementAndGet();
                        logger.info("TOPIC[{}] size {}, remainingCapacity {} finish {} ",
                                topic, waitEvent.size(), waitEvent.remainingCapacity(), t);
                    } catch (InterruptedException e) {
                        // 记录错误失败
                    }
                }
            });
        }
    

    然后,将CODE-A的代码注释掉,神奇的事情就发生了,直接一发入魂

    2021-01-17 16:32:49.539 [Time-limited test] INFO  - Producer init start...
    2021-01-17 16:32:49.562 [Time-limited test] INFO  - we init all topic
    2021-01-17 16:32:49.806 [Time-limited test] INFO  - non-static invoke--------   ##########
    2021-01-17 16:32:49.819 [Time-limited test] INFO  - we init random1 topic
    2021-01-17 16:32:49.819 [Time-limited test] INFO  - non-static invoke--------   ##########
    2021-01-17 16:32:49.819 [EVENT-CONSUMER-1] INFO  - take event**                 ##########
    2021-01-17 16:32:49.820 [EVENT-CONSUMER-2] INFO  - take event**                 ##########
    2021-01-17 16:32:49.821 [Time-limited test] INFO  - we init random2 topic
    2021-01-17 16:32:49.821 [Time-limited test] INFO  - non-static invoke--------
    2021-01-17 16:32:49.824 [Time-limited test] INFO  - we init random3 topic
    2021-01-17 16:32:49.824 [Time-limited test] INFO  - non-static invoke--------
    2021-01-17 16:32:49.826 [Time-limited test] INFO  - we init random4 topic
    2021-01-17 16:32:49.880 [Time-limited test] INFO  - non-static invoke--------
    2021-01-17 16:32:49.884 [Time-limited test] INFO  - Producer init end...
    2021-01-17 16:32:49.884 [Time-limited test] INFO  - publish start...
    2021-01-17 16:32:49.884 [Time-limited test] INFO  - publish over...
    
    
    
    2021-01-17 16:32:49.885 [ **EVENT-PRODUCER-3** ] INFO  -  topic random1 queueSize 0 finish 0     ##########
    2021-01-17 16:32:49.885 [Time-limited test] INFO  - publish over...
    
    2021-01-17 16:32:49.886 [EVENT-PRODUCER-3] INFO  - from producer queue take a message random2 1
    2021-01-17 16:32:49.886 [Time-limited test] INFO  - publish start...
    2021-01-17 16:32:49.886 [ **EVENT-PRODUCER-3** ] INFO  -  topic random2 queueSize 0 finish 0      ##########
    2021-01-17 16:32:49.886 [Time-limited test] INFO  - publish over...
    
    2021-01-17 16:32:49.886 [EVENT-PRODUCER-3] INFO  - from producer queue take a message random3 2
    2021-01-17 16:32:49.886 [Time-limited test] INFO  - publish start...
    2021-01-17 16:32:49.886 [**EVENT-PRODUCER-3**] INFO  -  topic random3 queueSize 0 finish 0        ##########
    2021-01-17 16:32:49.886 [Time-limited test] INFO  - publish over...
    
    2021-01-17 16:32:49.886 [EVENT-PRODUCER-3] INFO  - from producer queue take a message random4 3
    2021-01-17 16:32:49.886 [**EVENT-PRODUCER-3**] INFO  -  topic random4 queueSize 0 finish 0        ##########
    
    ....
    
    2021-01-17 16:32:50.031 [EVENT-PRODUCER-3] INFO  -  topic random1 queueSize 27 finish 5
    2021-01-17 16:32:50.031 [EVENT-PRODUCER-3] INFO  - from producer queue take a message random2 129
    2021-01-17 16:32:50.031 [EVENT-PRODUCER-3] INFO  -  topic random2 queueSize 32 finish 0
    .
    .
    .
    
    2021-01-17 16:32:50.275 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener
    2021-01-17 16:32:50.283 [EVENT-CONSUMER-2] INFO  - TOPIC[random1] size 4, remainingCapacity 28 finish 29 
    2021-01-17 16:32:50.283 [EVENT-CONSUMER-2] INFO  - take event
    2021-01-17 16:32:50.283 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener
    2021-01-17 16:32:50.289 [EVENT-CONSUMER-2] INFO  - TOPIC[random1] size 3, remainingCapacity 29 finish 30 
    2021-01-17 16:32:50.290 [EVENT-CONSUMER-2] INFO  - take event
    2021-01-17 16:32:50.290 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener
    2021-01-17 16:32:50.299 [EVENT-CONSUMER-2] INFO  - TOPIC[random1] size 2, remainingCapacity 30 finish 31 
    2021-01-17 16:32:50.299 [EVENT-CONSUMER-2] INFO  - take event
    2021-01-17 16:32:50.299 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener
    2021-01-17 16:32:50.305 [EVENT-CONSUMER-2] INFO  - TOPIC[random1] size 1, remainingCapacity 31 finish 32 
    2021-01-17 16:32:50.305 [EVENT-CONSUMER-2] INFO  - take event
    2021-01-17 16:32:50.306 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener
    2021-01-17 16:32:50.315 [EVENT-CONSUMER-2] INFO  - TOPIC[random1] size 0, remainingCapacity 32 finish 33 
    2021-01-17 16:32:50.316 [EVENT-CONSUMER-2] INFO  - take event
    
    

    看日志是只有topic1被消费了,其他的topic都没有被消费。

    第一段和第二段表明,生产者是如期按照我们设想的,逐个将详细进行分发,我的测试程序是按顺序进行1~4的消息分发的。

    EVENT-CONSUMER的线程编号只有到2,3是属于生产者线程的编号。于是我就感觉很奇怪,为什么线程池没有继续创建线程呢?

    3 分析原因

    我开始去查看了线程池execute()这个方法

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)    // ------------  debug后发现进入到这里条件无法满足
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    英文注释解释的很明白,execute在线程创建方面有会进行3种情况考虑

    1 本身workthread 小于 coresize 则果断进行创建
    
    2 线程池处于运行状态,将要执行的命令进行入队,这个入队就是我们在创建线程池时使用的队列,我这里用的是128个
    
    3 进入到第三部可能是线程池已经关闭了,或者是队列已经满了,如果是关闭,这一步肯定会失败,如果是队列满了那么也是同样的,之所以要再直接创建工作线程,是因为可能这个瞬间刚好有机会创建,因此不放弃这种可行性。
    

    4 哦豁是这样

    随后我就行了debug大法,发现一开始的2个消费者线程都是创建的十分的顺利,但是后面的线程任务就没办法了创建出新的线程了。

    仔细观察,发现是if (workerCountOf(recheck) == 0)到这一步判断不满足条件,就不往下进行创建了。

    那么是为什么呢? 哦原来是因为使用了死循环,尽管是阻塞队列,但线程却被死死地占用了。这个判断值不会为0. 于是就一直只有一个topic在消费消息。

    至于卡住的原因也很简单,使用阻塞队列,一定是某一个阻塞了。从后面观察来看,是生产者的缓冲队列满了。只进行到32的原因,也是因为刚好每个消费者的缓冲队列是32的大小。4个就是一个生产者的队列长度。当第一批128个分发玩了以后,从129开始,给topic的队列已经满了,put进行的阻塞。于是生产者和消费者处于全员懵逼的状态。

    最开始没有使用死循环的代码就和一般我们写的多线程代码一样,大家都靠本事去竞争,因此每个consumer都有机会被执行。

    那么最后一个问题,要想让线程池创建超过coreSize的线程要怎么做呢?从注释长短你就能看出,哪些条件比较简单,满足条件3只要我们创造多一些任务即可,或者将线程池的工作队列大小调小。(这里我选择调整队列大小,改为16,很快就创建出新的线程了)

    5 结论

    那么线程池创建的哲学是什么?

    首先,按照coreSize,创建出24h不间断休息的好员工
    其次,有处理不来的工作先堆放到某个地方,等待处理
    最后,核心员工不行,赶紧招募临时工,来一起进行攻坚

    希望这篇文章对你理解线程池有所帮助。

    下一篇:没有了