当前位置 博文首页 > 南橘ryc:【进阶之路】多线程条件下分段处理List集合的几种方法

    南橘ryc:【进阶之路】多线程条件下分段处理List集合的几种方法

    作者:南橘ryc 时间:2021-06-04 18:25

    这两个月来因为工作和家庭的事情,导致一直都很忙,没有多少时间去汲取养分,也就没有什么产出,最近稍微轻松了一点,后续的【进阶之路】会慢慢回到正轨。

    开门见山的说,第一次接触到多线程处理同一个任务,是使用IO多线程下载文件,之后也一直没有再处理这一块的任务,直到前几天有同事问我,为什么多线程处理一个list集合会出现各种bug,以及如何使用多线程的方式处理同一个list集合。

    第一、为什么会出现类似于重复处理某一个模块的问题?

    我们都知道,在Java中,每个线程都有自己独立的工作内存,线程对共享变量的所有操作都必须在自己的工作内存中进行,不能直接从主内存中读写。

    如果线程1的修改内容想被线程2得到,那么线程1工作内存中修改后的共享变量需要先刷新到主内存中,再把主内存中更新过的共享变量更新到工作内存2中。

    image.png

    这个时候一般我们是考虑使用java中各种同步化的方法,首先,因为是需要高效处理list集合,所以可以排除synchronized方法,于是我想到了使用CompletionService操作异步任务。

    大家可以在这篇文章看到具体的详解:
    【进阶之路】线程池拓展与CompletionService操作异步任务

    一、CompletionService

    首先,按照之前文章的方法自定义一个WeedThreadPool

    public class WeedThreadPool extends ThreadPoolExecutor {
        private final ThreadLocal<Long> startTime =new ThreadLocal<>();
        private final Logger log =Logger.getLogger("WeedThreadPool");
        //统计执行次数
        private final AtomicLong numTasks =new AtomicLong();
        //统计总执行时间
        private final AtomicLong totalTime =new AtomicLong();
        /**
         * 这里是实现线程池的构造方法,我随便选了一个,大家可以根据自己的需求找到合适的构造方法
         */
        public WeedThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    }
    
    

    然后就是实现线程池处理list集合的方法

    public class WeedExecutorServiceDemo {
        BlockingQueue<Runnable> taskQueue;
        final static WeedThreadPool weedThreadPool = new WeedThreadPool(3, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
        // 开始时间
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            //记录任务开始时间
            long start = System.currentTimeMillis();
            CompletionService<List<Integer>> cs = new ExecutorCompletionService<>(weedThreadPool);
            int tb=1;
            //生成集合
            List<List<Integer>> list1 =new ArrayList();
            for (int i = 0; i < 10; i++) {
                List<Integer> list =new ArrayList();
                //随机生成任务处理
                int hb=tb;
                tb =tb*2;
                int finalTb = tb;
                cs.submit(new Callable<List<Integer>>(){
    
                    @Override
                    public List<Integer> call() throws Exception {
                        for (int j = hb; j< finalTb; j++){
                            list.add(j);
                        }
                        System.out.println(Thread.currentThread().getName()+"["+list+"]");
    
                        return list;
                    }
                });
            }
            //注意在处理完毕后结束任务
            weedThreadPool.shutdown();
            for (int i = 0; i < 10; i++) {
                Future<List<Integer>> future = cs.take();
                if (future != null) {
                    list1.add(future.get());
                    System.out.println(future.get());
                }
            }
            System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
            System.out.println("結果["+list1.size()+"]==="+list1);
        }
    }
    
    

    处理结果:

    image.png

    从结果上来看,还是比较美好的,通过CompletionService能够比较快速地分段处理任务,我之前也有提过,合理的线程池大小设计有助于提高任务的处理效率,网上通用的设置方法一般是这样的:

    最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目

    进而得出

    最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目

    二、ForkJoinPool

    当然,除了使用CompletionService之外,也可以使用ForkJoinPool来设计一个处理方法。

    image.png

    ForkJoinPool和ThreadPoolExecutor都是继承自AbstractExecutorService抽象类,所以它和ThreadPoolExecutor的使用几乎没有多少区别。其核心思想是将大的任务拆分成多个小任务,然后在将多个小任务处理汇总到一个结果上。

    ForkJoinPool框架通过初始化ForkJoinTask来执行任务,并提供了以下两个子类:

    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask :用于有返回结果的任务。

    我们实现的过程中可以使用RecursiveTask方法来分段处理list集合。

    public class RecursiveTaskDemo {
    
        private static final ExecutorService executor = new ThreadPoolExecutor(2, 3, 10, TimeUnit.SECONDS, new LinkedBlockingQueue(10));
        private static final int totalRow = 53000;
        private static final int splitRow = 10000;
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            long start = System.currentTimeMillis();
            //先循环生成待待处理集合
            List<Integer> list = new ArrayList<>(totalRow);
            for (int i = 0; i < totalRow; i++) {
                list.add(i);
            }
            //计算出需要创建的任务数
            int loopNum = (int)Math.ceil((double)totalRow/splitRow);
            ForkJoinPool pool = new ForkJoinPool(loopNum);
            ForkJoinTask<List> submit = pool.submit(new MyTask(list, 0, list.size()));
    
            List<List<Integer>>list1=new ArrayList<>();
            list1.add(submit.get());
            System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
            System.out.println("結果["+list1.size()+"]==="+list1);
        }
        //继承RecursiveTask
        static class MyTask extends RecursiveTask<List> {
            private List<Integer> list;
            private int startRow;
            private int endRow;
    
            public MyTask(List<Integer> list, int startRow, int endRow) {
                this.list = list;
                this.startRow = startRow;
                this.endRow = endRow;
            }
    
            /**
             * 递归处理数据,计算
             * @return
             */
            @Override
            protected List compute() {
                if (endRow - startRow <= splitRow) {
                    List<Integer> ret = new ArrayList<>();
                    for (int i = startRow; i < endRow; i++) {
                        //递归处理数据
                        ret.add(list.get(i));
                    }
                    System.out.println(Thread.currentThread().getName()+"["+ret+"]");
                    return ret;
                }
                int loopNum = (int)Math.ceil((double)totalRow/splitRow);
                int startRow = 0;
                List<MyTask> myTaskList = new ArrayList<>();
                for (int i = 0; i < loopNum; i++) {
                    if (startRow > totalRow) {
                        break;
                    }
                    int endRow = Math.min(startRow + splitRow, totalRow);
                    System.out.println(String.format("startRow:%s, endRow:%s", startRow, endRow));
                    myTaskList.add(new MyTask(list, startRow, endRow));
                    startRow += splitRow;
                }
                //调用不同线程上独立执行的任务
                invokeAll(myTaskList);
                List<Integer> ret = new ArrayList<>();
                //归并
                for (MyTask myTask : myTaskList) {
                    ret.addAll(myTask.join());
                }
                return ret;
            }
        }
    }
    

    处理结果:

    image.png

    通过上文展示的方法,大家可以在不加锁的方式来增加任务处理的效率,遇到类似于爬虫数据处理、数据迁移等场景都可以采用,实测效果还不错。当然,根据处理结果来分析,CompletionService的效率大概更高一些~。

    大家好,我是练习java两年半时间的南橘,下面是我的微信,需要之前的导图或者想互相交流经验的小伙伴可以一起互相交流哦。

    bk