当前位置 博文首页 > 可飞:源码简析XXL-JOB的注册和执行过程

    可飞:源码简析XXL-JOB的注册和执行过程

    作者:可飞 时间:2021-05-10 18:19

    一,前言

    XXL-JOB是一个优秀的国产开源分布式任务调度平台,他有着自己的一套调度注册中心,提供了丰富的调度和阻塞策略等,这些都是可视化的操作,使用起来十分方便。

    由于是国产的,所以上手还是比较快的,而且他的源码也十分优秀,因为是调试平台所以线程这一块的使用是很频繁的,特别值得学习研究。

    XXL-JOB一同分为两个模块,调度中心模块和执行模块。具体解释,我们copy下官网的介绍:

    • 调度模块(调度中心):
      负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
      支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。

    • 执行模块(执行器):
      负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
      接收“调度中心”的执行请求、终止请求和日志请求等。

    XXL-JOB中“调度模块”和“任务模块”完全解耦,调度模块进行任务调度时,将会解析不同的任务参数发起远程调用,调用各自的远程执行器服务。这种调用模型类似RPC调用,调度中心提供调用代理的功能,而执行器提供远程服务的功能。

    下面看下springboot环境下的使用方式,首先看下执行器的配置:

        @Bean
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            //调度中心地址
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            //执行器AppName
            xxlJobSpringExecutor.setAppname(appname);
            //执行器注册地址,默认为空即可
            xxlJobSpringExecutor.setAddress(address);
            //执行器IP [选填]:默认为空表示自动获取IP
            xxlJobSpringExecutor.setIp(ip);
            //执行器端口
            xxlJobSpringExecutor.setPort(port);
            //执行器通讯TOKEN
            xxlJobSpringExecutor.setAccessToken(accessToken);
            //执行器运行日志文件存储磁盘路径
            xxlJobSpringExecutor.setLogPath(logPath);
            //执行器日志文件保存天数
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    
            return xxlJobSpringExecutor;
        }
    

    XXL-JOB提供了多种任务执行方式,我们今天看下最简单的bean执行模式。如下:

        /**
         * 1、简单任务示例(Bean模式)
         */
        @XxlJob("demoJobHandler")
        public void demoJobHandler() throws Exception {
            XxlJobHelper.log("XXL-JOB, Hello World.");
    
            for (int i = 0; i < 5; i++) {
                XxlJobHelper.log("beat at:" + i);
                TimeUnit.SECONDS.sleep(2);
            }
            // default success
        }
    

    现在在调度中心稍做配置,我们这段代码就可以按照一定的策略进行调度执行,是不是很神奇?我们先看下官网上的解释:

    原理:每个Bean模式任务都是一个Spring的Bean类实例,它被维护在“执行器”项目的Spring容器中。任务类需要加“@JobHandler(value=”名称”)”注解,因为“执行器”会根据该注解识别Spring容器中的任务。任务类需要继承统一接口“IJobHandler”,任务逻辑在execute方法中开发,因为“执行器”在接收到调度中心的调度请求时,将会调用“IJobHandler”的execute方法,执行任务逻辑。

    纸上得来终觉浅,绝知此事要躬行,今天的任务就是跟着这段话,我们大体看一波源码的实现方式。

    二,XxlJobSpringExecutor

    XxlJobSpringExecutor其实看名字,我们都能想到,这是XXL-JOB为了适应spring模式的应用而开发的模板类,先看下他的实现结构。

    XxlJobSpringExecutor继承自XxlJobExecutor,同时由于是用在spring环境,所以实现了多个spring内置的接口来配合实现整个执行器模块功能,每个接口的功能就不细说了,相信大家都可以百度查到。

    我们看下初始化方法afterSingletonsInstantiated

        // start
        @Override
        public void afterSingletonsInstantiated() {
    
            //注册每个任务
            initJobHandlerMethodRepository(applicationContext);
    
            // refresh GlueFactory
            GlueFactory.refreshInstance(1);
    
            // super start
            try {
                super.start();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    

    主流程看上去是比较简单的,首先是注册每一个JobHandler,然后进行初始化操作, GlueFactory.refreshInstance(1)是为了另一种调用模式时用到的,主要是用到了groovy,不在这次的分析中,我们就不看了。我们继续看下如何注册JobHandler的。

     private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
            if (applicationContext == null) {
                return;
            }
            // 遍历所有beans,取出所有包含有@XxlJob的方法
            String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
            for (String beanDefinitionName : beanDefinitionNames) {
                Object bean = applicationContext.getBean(beanDefinitionName);
    
                Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
                try {
                    annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                            new MethodIntrospector.MetadataLookup<XxlJob>() {
                                @Override
                                public XxlJob inspect(Method method) {
                                    return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                                }
                            });
                } catch (Throwable ex) {
                    logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
                }
                if (annotatedMethods==null || annotatedMethods.isEmpty()) {
                    continue;
                }
                //遍历@XxlJob方法,取出executeMethod以及注解中对应的initMethod, destroyMethod进行注册
                for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
                    Method executeMethod = methodXxlJobEntry.getKey();
                    XxlJob xxlJob = methodXxlJobEntry.getValue();
                    if (xxlJob == null) {
                        continue;
                    }
    
                    String name = xxlJob.value();
                    if (name.trim().length() == 0) {
                        throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                    if (loadJobHandler(name) != null) {
                        throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                    }
    
                    executeMethod.setAccessible(true);
    
                    // init and destory
                    Method initMethod = null;
                    Method destroyMethod = null;
    
                    if (xxlJob.init().trim().length() > 0) {
                        try {
                            initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                            initMethod.setAccessible(true);
                        } catch (NoSuchMethodException e) {
                            throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                        }
                    }
                    if (xxlJob.destroy().trim().length() > 0) {
                        try {
                            destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                            destroyMethod.setAccessible(true);
                        } catch (NoSuchMethodException e) {
                            throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                        }
                    }
    
                    // 注册 jobhandler
                    registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
                }
            }
    
        }
    

    XxlJobSpringExecutor由于实现了ApplicationContextAware,所以通过applicationContext可以获得所有容器中的bean实例,再通过MethodIntrospector来过滤出所有包含@XxlJob注解的方法,最后把对应的executeMethod以及注解中对应的initMethod, destroyMethod进行注册到jobHandlerRepository中,jobHandlerRepository是一个线程安全ConcurrentMap,MethodJobHandler实现自IJobHandler接口的一个模板类,主要作用就是通过反射去执行对应的方法。看到这,之前那句话任务类需要加“@JobHandler(value=”名称”)”注解,因为“执行器”会根据该注解识别Spring容器中的任务。我们就明白了。

    public class MethodJobHandler extends IJobHandler {
        ....
        public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
            this.target = target;
            this.method = method;
    
            this.initMethod = initMethod;
            this.destroyMethod = destroyMethod;
        }
    
        @Override
        public void execute() throws Exception {
            Class<?>[] paramTypes = method.getParameterTypes();
            if (paramTypes.length > 0) {
                method.invoke(target, new Object[paramTypes.length]);       // method-param can not be primitive-types
            } else {
                method.invoke(target);
            }
        }
    

    三,执行服务器initEmbedServer

    看完上面的JobHandler注册,后面紧着就是执行器模块的启动操作了,下面看下start方法:

        public void start() throws Exception {
    
            // 初始化日志path
            XxlJobFileAppender.initLogPath(logPath);
    
            // 注册adminBizList
            initAdminBizList(adminAddresses, accessToken);
    
            // 初始化日志清除线程
            JobLogFileCleanThread.getInstance().start(logRetentionDays);
    
            // 初始化回调线程,用来把执行结果回调给调度中心
            TriggerCallbackThread.getInstance().start();
    
            // 执行服务器启动
            initEmbedServer(address, ip, port, appname, accessToken);
        }
    

    前几个操作,我们就不细看了,大家有兴趣的可以自行查看,我们直接进入initEmbedServer方法查看内部服务器如何启动,以及向调试中心注册的。

        private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
            ...
            // start
            embedServer = new EmbedServer();
            embedServer.start(address, port, appname, accessToken);
        }
    
        public void start(final String address, final int port, final String appname, final String accessToken) {
            ```
            // 启动netty服务器
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline()
                                    .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                    .addLast(new HttpServerCodec())
                                    .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                    .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                        }
                    })
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
    
            // bind
            ChannelFuture future = bootstrap.bind(port).sync();
    
            logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
    
            // 执行向调度中心注册
            startRegistry(appname, address);
            ```
        }
    

    因为执行器模块本身需要有通讯交互的需求,不然调度中心是无法调用他的,所以内嵌了一个netty服务器进行通信。启动成功后,正式向调试中心执行注册请求。我们直接看注册的代码:

        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                //执行注册请求
                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                    registryResult = ReturnT.SUCCESS;
                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    break;
                } else {
                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                }
            } catch (Exception e) {
                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
            }
        }
    
        @Override
        public ReturnT<String> registry(RegistryParam registryParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
        }
    

    XxlJobRemotingUtil.postBody就是个符合XXL-JOB规范的restful的http请求处理,里面不止有注册请求,还有下线请求,回调请求等,碍于篇幅,就不一一展示了,调度中心接到对应的请求,会有对应的DB处理:

            // services mapping
            if ("callback".equals(uri)) {
                List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
                return adminBiz.callback(callbackParamList);
            } else if ("registry".equals(uri)) {
                RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
                return adminBiz.registry(registryParam);
            } else if ("registryRemove".equals(uri)) {
                RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
                return adminBiz.registryRemove(registryParam);
            } else {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
            }
    

    跟到这里,我们就已经大概了解了整个注册的流程。同样当调度中心向我们执行器发送请求,譬如说执行任务调度的请求时,也是同样的http请求发送我们上面分析的执行器中内嵌netty服务进行操作,这边只展示调用方法:

        @Override
        public ReturnT<String> run(TriggerParam triggerParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
        }
    

    这样,我们执行器模块收到请求后会执行我们上面注册中的jobHandle进行对应的方法执行,执行器会将请求存入“异步执行队列”并且立即响应调度中心,异步运行对应方法。这样一套注册和执行的流程就大致走下来了。

    四,结尾

    当然事实上XXL-JOB的代码还有许多丰富的特性,碍于本人实力不能一一道明,我这也是抛转引玉,只是把最基础的一些地方介绍给大家,有兴趣的话,大家可以自行查阅相关代码,总的来说,毕竟是国产开源的优秀项目,还是值得赞赏的,也希望国内以后有越来越多优秀开源框架。

    bk