当前位置 主页 > 服务器问题 > Linux/apache问题 >

    Java lambda表达式实现Flink WordCount过程解析(3)

    栏目:Linux/apache问题 时间:2020-02-05 11:23

    之所以这里将所有的类型信息,因为Flink无法正确自动推断出来Collector中带的泛型。我们来看一下FlatMapFuntion的源代码

    @Public
    @FunctionalInterface
    public interface FlatMapFunction<T, O> extends Function, Serializable {
    
      /**
      * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
      * it into zero, one, or more elements.
      *
      * @param value The input value.
      * @param out The collector for returning result values.
      *
      * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
      *          to fail and may trigger recovery.
      */
      void flatMap(T value, Collector<O> out) throws Exception;
    }

    我们发现 flatMap的第二个参数是Collector<O>,是一个带参数的泛型。Java编译器编译该代码时会进行参数类型擦除,所以Java编译器会变成成:

    void flatMap(T value, Collector out)

    这种情况,Flink将无法自动推断类型信息。如果我们没有显示地提供类型信息,将会出现以下错误:

    org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
      In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
      An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
      Otherwise the type has to be specified explicitly using type information.

    这种情况下,必须要显示指定类型信息,否则输出将返回值视为Object类型,这将导致Flink无法正确序列化。

    所以,我们需要显示地指定Lambda表达式的参数类型信息,并通过returns方法显示指定输出的类型信息

    我们再看一段代码:

    SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
            .map(word -> Tuple2.of(word, 1))
            .returns(Types.TUPLE(Types.STRING, Types.INT));

    为什么map后面也需要指定类型呢?

    因为此处map返回的是Tuple2类型,Tuple2是带有泛型参数,在编译的时候同样会被查出泛型参数信息,导致Flink无法正确推断。

    更多关于对Java Lambda表达式的支持请参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持IIS7站长之家。