当前位置 博文首页 > 废物大师兄:ElasticJob 快速上手

    废物大师兄:ElasticJob 快速上手

    作者:废物大师兄 时间:2021-01-19 12:05

    1.  ElasticJob 是什么

    ElasticJob 是一个分布式调度解决方案,由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成。 

    ElasticJob-Lite 定位为轻量级无中心化解决方案,使用jar的形式提供分布式任务的协调服务。

    ElasticJob 已于2020年5月28日成为 Apache ShardingSphere 的子项目。 

    ElasticJob特性:

    • 弹性调度
      • 支持任务在分布式场景下的分片和高可用
      • 能够水平扩展任务的吞吐量和执行效率
      • 任务处理能力随资源配备弹性伸缩 
    • 资源分配
      • 在适合的时间将适合的资源分配给任务并使其生效
      • 相同任务聚合至相同的执行器统一处理
      • 动态调配追加资源至新分配的任务  
    • 作业治理
      • 失效转移
      • 错过作业重新执行
      • 自诊断修复
    • 作业开放生态
      • 可扩展的作业类型统一接口
      • 丰富的作业类型库,如数据流、脚本、HTTP、文件、大数据等
      • 易于对接业务作业,能够与 Spring 依赖注入无缝整合  
    • 可视化管控端
      • 作业管控端
      • 作业执行历史数据追踪
      • 注册中心管理 

    2.  实例演示

    这里采用最新版本 3.0.0-RC1 

    1、启动zookeeper服务

    首先,下载zookeeper-3.6.0版本,解压后复制一份zoo_sample.cfg,重命名未zoo.cfg,保持默认配置即可

    注意,zookeeper-3.6.0启动以后会占用三个端口,其中包括8080哦

    2、编写定时任务业务逻辑

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.4.1</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>elasticjob-demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    
        <properties>
            <java.version>1.8</java.version>
            <elasticjob-lite.version>3.0.0-RC1</elasticjob-lite.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.shardingsphere.elasticjob</groupId>
                <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
                <version>${elasticjob-lite.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-jdbc</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.shardingsphere.elasticjob</groupId>
                <artifactId>elasticjob-error-handler-dingtalk</artifactId>
                <version>${elasticjob-lite.version}</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    application.yml

    elasticjob:
      regCenter:
        serverLists: 192.168.100.15:2181
        namespace: elasticjob-demo
        baseSleepTimeMilliseconds: 2000
        maxSleepTimeMilliseconds: 4000
        maxRetries: 3
      jobs:
        firstJob:
          elasticJobClass: com.example.job.FirstJob
          cron: 0/6 * * * * ?
          shardingTotalCount: 3
          jobErrorHandlerType: DINGTALK
          props:
            dingtalk:
              webhook: https://oapi.dingtalk.com/robot/send?access_token=xxx
              secret: ASDF
              connectTimeout: 3000
              readTimeout: 5000
        secondJob:
          elasticJobClass: com.example.job.SecondJob
          cron: 0/10 * * * * ?
          shardingTotalCount: 1
          jobErrorHandlerType: DINGTALK
          props:
            dingtalk:
              webhook: https://oapi.dingtalk.com/robot/send?access_token=xxx
              secret: ASDF
              connectTimeout: 3000
              readTimeout: 5000 

    两个定时任务

    FirstJob.java

    package com.example.job;
    
    import org.apache.shardingsphere.elasticjob.api.ShardingContext;
    import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
    import org.springframework.stereotype.Component;
    
    /**
     * @author ChengJianSheng
     * @date 2021/1/13
     */
    @Component
    public class FirstJob implements SimpleJob {
        @Override
        public void execute(ShardingContext shardingContext) {
            switch (shardingContext.getShardingItem()) {
                case 0:
                    // do something by sharding item 0
                    System.out.println(0);
                    // int a = 1 / 0;
                    break;
                case 1:
                    // do something by sharding item 1
                    System.out.println(1);
                    break;
                case 2:
                    // do something by sharding item 2
                    System.out.println(2);
                    break;
                // case n: ...
            }
        }
    }
    

    SecondJob.java

    package com.example.job;
    
    import org.apache.shardingsphere.elasticjob.api.ShardingContext;
    import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
    import org.springframework.stereotype.Component;
    
    /**
     * @author ChengJianSheng
     * @date 2021/1/18
     */
    @Component
    public class SecondJob implements SimpleJob {
        @Override
        public void execute(ShardingContext shardingContext) {
            System.out.println("hello");
        }
    } 

    项目结构

    运行项目即可

    通过 ElasticJob-UI 查看任务

    https://shardingsphere.apache.org/elasticjob/current/cn/downloads/

    3.  启动报错排查

    项目启动过程中,可能会报如下错误

    org.apache.zookeeper.ClientCnxn$EndOfStreamException: Unable to read additional data from server sessionid 0x1000bdf48160002, likely server has closed socket

    org.apache.shardingsphere.elasticjob.reg.exception.RegException: org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout

    Caused by: org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout

    最开始,我以为是zookeeper版本的问题,后来换了版本也不行,防火墙关了也不行

    然后,我怀疑是开发环境问题,于是在本地运行zookeeper,程序连127.0.0.1:2181,居然可以了

    于是我陷入了沉思,为今之计,只剩下一个办法了,打断点调试

    找到了异常抛出的位置,如下图

    baseSleepTimeMilliseconds 表示 等待重试的间隔时间的初始值

    maxSleepTimeMilliseconds  表示 等待重试的间隔时间的最大值

    maxRetries 表示 最大重试次数

    根据代码中意思,如果在 maxSleepTimeMilliseconds * maxRetries 毫秒内还没有连接成功,则连接关闭,并抛出操作超时异常

    联想到,连接本地zookeeper可以,连开发环境zk就不行,再加上观察日志从连接开始到抛异常的时间间隔,我猜到应该是maxSleepTimeMilliseconds设置太短了

    于是,application.yml配置文件中将maxSleepTimeMilliseconds设置为4000,baseSleepTimeMilliseconds设置为2000

    然后好使

    回想刚开始报的那些错,其实根本就还没有连上zookeeper

    下一篇:没有了