当前位置 主页 > 服务器问题 > Linux/apache问题 >

    Spring Batch批处理框架使用解析

    栏目:Linux/apache问题 时间:2019-12-12 10:37

    这篇文章主要介绍了Spring Batch批处理框架使用解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

    使用Spring Batch做为批处理框架,可以完成常规的数据量不是特别大的离线计算。

    现在写一个简单的入门版示例。

    这里默认大家已经掌握了Spring Batch的基本知识,示例只是为了快速上手实践

    目标1:程序随机生成字符串,经过Spring Batch后,统一在字符串后加入“----PROCESSED”,并输出

    目标2:程序读取txt文件,经过Spring Batch后,统一加入如上字段,并输出

    Spring Batch的流程

    读取数据----itemReader 处理数据----itemProcess 数据写入----itemWrite

    分析目标可知,两个目标的输入数据源不同,处理方式基本一致,数据完成后的写入规则一致

    由此可以分段完成代码

    itemReader

    目标一

    这里没有使用Spring Batch自带的集中reader,所以自定义了随机生成字符串的reader

    这里代码并不完善,reader会无线循环生成随机字符串,但不影响本次学习的目的

    public class MyItemReader implements ItemReader<String> {
      @Override
      public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return RandomStringUtils.randomAlphabetic(10);
      }
    }

    目标二

    由于是读取文件中的内容,所以不用自定义reader实现,可直接使用FlatFileItemReader,在Batch的config中配置即可

      @Bean
      public ItemReader<String> textReader(){
     
        FlatFileItemReader<String> reader=new FlatFileItemReader<>();
        File file = new File("D:\\FTP\\ttest.txt");
        reader.setResource(new FileSystemResource(file));
        reader.setLineMapper(new LineMapper<String>() {
          @Override
          public String mapLine(String line, int lineNumber) throws Exception {
            return line;
          }
        });
        return reader;
     
      }

    itemProcess

    这里采用同一种处理方式即可

    public class MyItemProcessor implements ItemProcessor<String,String> {
     
      @Override
      public String process(String s) throws Exception {
        return s+"---------PROCESSED";
      }
    }

    itemWriter

    也采用同一种即可

    public class MyItemWriter implements ItemWriter<String> {
      @Override
      public void write(List<? extends String> items) throws Exception {
        for (String item : items) {
          System.out.println(item);
        }
      }
    }

    配置完成Batch Config

    @Configuration
    @EnableBatchProcessing
    public class BatchConfiguration extends DefaultBatchConfigurer {
     
      @Autowired
      public StepBuilderFactory stepBuilderFactory;
      @Autowired
      public JobBuilderFactory jobBuilderFactory;
     
      @Bean
      public MyItemProcessor processor(){
        return new MyItemProcessor();
      }
     
      @Bean
      public ItemWriter<String> writer(){
        return new MyItemWriter();
      }
     
      @Bean
      public ItemReader<String> textReader(){
        FlatFileItemReader<String> reader=new FlatFileItemReader<>();
        File file = new File("D:\\FTP\\ttest.txt");
        reader.setResource(new FileSystemResource(file));
        reader.setLineMapper(new LineMapper<String>() {
          @Override
          public String mapLine(String line, int lineNumber) throws Exception {
            return line;
          }
        });
        return reader;
      }
     
      @Bean
      public ItemReader<String> stringReader(){
        return new MyItemReader();
      }
     
      @Override
      public void setDataSource(DataSource dataSource) {
        super.setDataSource(dataSource);
      }
     
      @Bean
      public Step myStep(){
        return stepBuilderFactory
            .get("step1")
            //这个chunk size是最后调用写入的时候,一次性写入多少条已处理的数据
            .<String,String>chunk(10)
    //        .reader(textReader())
            .reader(stringReader())
            .processor(processor())
            .writer(writer())
            .build();
     
      }
     
      @Bean
      public Job MyJob(){
        return jobBuilderFactory
            .get("MyJOB")
            .listener(new JobExecutionListenerSupport(){
              //所有处理结束后调用
              @Override
              public void afterJob(JobExecution jobExecution) {
                if(jobExecution.getStatus() == BatchStatus.COMPLETED){
                  System.out.println("OK");
                }
              }
            })
            .flow(myStep())
            .end()
            .build();
      }
    }