当前位置 主页 > 服务器问题 > Linux/apache问题 > 最大化 缩小

    Java lambda表达式实现Flink WordCount过程解析

    栏目:Linux/apache问题 时间:2020-02-05 11:23

    这篇文章主要介绍了Java lambda表达式实现Flink WordCount过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

    本篇我们将使用Java语言来实现Flink的单词统计。

    代码开发

    环境准备

    导入Flink 1.9 pom依赖

    <dependencies>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.9.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>1.9.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.7</version>
        </dependency>
      </dependencies>

    构建Flink流处理环境

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    自定义source

    每秒生成一行文本

    DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
          private boolean isCanal = false;
          private String[] words = {
              "important oracle jdk license update",
              "the oracle jdk license has changed for releases starting april 16 2019",
              "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
              "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
              "downloading and using this product an faq is available here ",
              "commercial license and support is available with a low cost java se subscription",
              "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
          };
    
          @Override
          public void run(SourceContext<String> ctx) throws Exception {
            // 每秒发送一行文本
            while (!isCanal) {
              int randomIndex = RandomUtils.nextInt(0, words.length);
              ctx.collect(words[randomIndex]);
              Thread.sleep(1000);
            }
          }
    
          @Override
          public void cancel() {
            isCanal = true;
          }
        });

    单词计算

    // 3. 单词统计
        // 3.1 将文本行切分成一个个的单词
        SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
          // 切分单词
          Arrays.stream(line.split(" ")).forEach(word -> {
            ctx.collect(word);
          });
        }).returns(Types.STRING);
    
        //3.2 将单词转换为一个个的元组
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
            .map(word -> Tuple2.of(word, 1))
            .returns(Types.TUPLE(Types.STRING, Types.INT));
    
        // 3.3 按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);
    
        // 3.4 对每组单词数量进行累加
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
            .timeWindow(Time.seconds(3))
            .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));
    
        resultDS.print();
    
    下一篇:没有了