当前位置 博文首页 > Rango_lhl:FlinkSQL写入Kafka/ES/MySQL示例-JAVA

    Rango_lhl:FlinkSQL写入Kafka/ES/MySQL示例-JAVA

    作者:Rango_lhl 时间:2021-06-18 18:24

    一、背景说明

    Flink的API做了4层的封装,上两层TableAPI、SQL语法相对简单便于编写,面对小需求可以快速上手解决,本文参考官网及部分线上教程编写source端、sink端代码,分别读取socket、kafka及文本作为source,并将流数据输出写入Kafka、ES及MySQL,方便后续查看使用。

    二、代码部分

    说明:这里使用connect及DDL两种写法,connect满足Flink1.10及以前版本使用,目前官方文档均是以DDL写法作为介绍,建议1.10以后的版本使用DDL写法操作,通用性更强。

    1.读取(Source)端写法

    1.1 基础环境建立,方便演示并行度为1且不设置CK

    //建立Stream环境,设置并行度为1
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
    //建立Table环境
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    

    1.2 读取Socket端口数据,并使用TableAPI及SQL两种方式查询

    //读取服务器9999端口数据,并转换为对应JavaBean
    SingleOutputStreamOperator<WaterSensor> mapDS = env.socketTextStream("hadoop102", 9999)
    		.map(value -> {
                            String[] split = value.split(",");
    			return new WaterSensor(split[0]
    					, Long.parseLong(split[1])
    					, Integer.parseInt(split[2]));});
    //创建表:将流转换成动态表。
    Table table = tableEnv.fromDataStream(mapDS);
    //对动态表进行查询,TableAPI方式
    Table selectResult = table.where($("id").isEqual("ws_001")).select($("id"), $("ts"), $("vc"));
    //对动态表镜像查询,SQL方式-未注册表
    Table selectResult = tableEnv.sqlQuery("select * from " + table);
    

    1.3 读取文本(FileSystem)数据,并使用TableAPI进行查询

    //Flink1.10写法使用connect方式,读取txt文件并建立临时表
    tableEnv.connect(new FileSystem().path("input/sensor.txt"))
    		.withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n"))
    		.withSchema(new Schema().field("id", DataTypes.STRING())
    					.field("ts", DataTypes.BIGINT())
    				        .field("vc",DataTypes.INT()))
    		.createTemporaryTable("sensor");
    
    //转换成表对象,对表进行查询。SQL写法参考Socket段写法
    Table table = tableEnv.from("sensor");
    Table selectResult = table.groupBy($("id")).aggregate($("id").count().as("id_count"))select($("id"), $("id_count"));
    

    1.4 消费Kafka数据,并使用TableAPI进行查询,分别用conncet及DDL写法

    //Flink1.10写法使用connect方式,消费kafka对应主题并建立临时表
    tableEnv.connect(new Kafka().version("universal")
                                .topic("sensor")
    			    .startFromLatest()
    			    .property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")
    			    .property(ConsumerConfig.GROUP_ID_CONFIG,"BD"))//消费者组
            .withSchema(new Schema().field("id", DataTypes.STRING())
    		                .field("ts", DataTypes.BIGINT())
    		                .field("vc",DataTypes.INT()))
    		                .withFormat(new Csv())
    		                .createTemporaryTable("sensor");
    
    //Flink1.10以后使用DDL写法
    tableEnv.executeSql("CREATE TABLE sensor (" +
    		"  `id` STRING," +
    		"  `ts` BIGINT," +
    		"  `vc` INT" +
    		") WITH (" +
    		"  'connector' = 'kafka'," +
    		"  'topic' = 'sensor'," +
    		"  'properties.bootstrap.servers' = 'hadoop102:9092'," +
    		"  'properties.group.id' = 'BD'," +
    		"  'scan.startup.mode' = 'latest-offset'," +
    		"  'format' = 'csv'" +
    		")");
    
    //转换成表对象,对表进行查询。SQL写法参考Socket段写法
    Table table = tableEnv.from("sensor");
    Table selectResult = table.groupBy($("id")).aggregate($("id").count().as("id_count"))
    		          .select($("id"), $("id_count"));
    

    2.写入(Sink)端部分写法

    2.1 写入文本文件

    //创建表:创建输出表,connect写法
    tableEnv.connect(new FileSystem().path("out/sensor.txt"))
    		.withFormat(new Csv())
    		.withSchema(new Schema().field("id", DataTypes.STRING())
    				.field("ts", DataTypes.BIGINT())
    				.field("vc",DataTypes.INT()))
    		.createTemporaryTable("sensor");
    
    //将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表
    selectResult.executeInsert("sensor");
    

    2.2 写入Kafka

    //connect写法
    tableEnv.connect(new Kafka().version("universal")
    		.topic("sensor")
    		.sinkPartitionerRoundRobin() //轮询写入
    		.property(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"))
    		.withSchema(new Schema().field("id", DataTypes.STRING())
    				        .field("ts", DataTypes.BIGINT())
    				        .field("vc",DataTypes.INT()))
    		.withFormat(new Json())
    		.createTemporaryTable("sensor");
    
    //DDL写法
    tableEnv.executeSql("CREATE TABLE sensor (" +
    		"  `id` STRING," +
    		"  `ts` BIGINT," +
    		"  `vc` INT" +
    		") WITH (" +
    		"  'connector' = 'kafka'," +
    		"  'topic' = 'sensor'," +
    		"  'properties.bootstrap.servers' = 'hadoop102:9092'," +
    		"  'format' = 'json'" +
    		")");
    
    //将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表
    selectResult.executeInsert("sensor");
    

    2.3 写入MySQL(JDBC方式,这里手动导入了mysql-connector-java-5.1.9.jar)

    //DDL
    tableEnv.executeSql("CREATE TABLE sink_sensor (" +
    		"  id STRING," +
    		"  ts BIGINT," +
    		"  vc INT," +
    		"  PRIMARY KEY (id) NOT ENFORCED" +
    		") WITH (" +
    		"  'connector' = 'jdbc'," +
    		"  'url' = 'jdbc:mysql://hadoop102:3306/test?useSSL=false'," +
    		"  'table-name' = 'sink_test'," +
    		"  'username' = 'root'," +
    		"  'password' = '123456'" +
    		")");
    
    //将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表
    selectResult.executeInsert("sensor");
    

    2.4 写入ES

    //connect写法
    tableEnv.connect(new Elasticsearch()
    		.index("sensor")
    		.documentType("_doc")
    		.version("7")
    		.host("localhost",9200,"http")
                    //设置为1,每行数据都写入是方便客户端输出展示,生产勿使用
    		.bulkFlushMaxActions(1)) 
    		.withSchema(new Schema()
    				.field("id", DataTypes.STRING())
    				.field("ts", DataTypes.BIGINT())
    				.field("vc",DataTypes.INT()))
    		.withFormat(new Json())
    		.inAppendMode()
    		.createTemporaryTable("sensor");
    //DDL写法
    tableEnv.executeSql("CREATE TABLE sensor (" +
    					"  id STRING," +
    					"  ts BIGINT," +
    					"  vc INT," +
    					"  PRIMARY KEY (id) NOT ENFORCED" +
    					") WITH (" +
    					"  'connector' = 'elasticsearch-7'," +
    					"  'hosts' = 'http://localhost:9200'," +
    					"  'index' = 'users'," +
    					"  'sink.bulk-flush.max-actions' = '1')";)
    
    //将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表
    selectResult.executeInsert("sensor");
    

    三、补充说明

    依赖部分pom.xml

     <properties>
            <java.version>1.8</java.version>
            <maven.compiler.source>${java.version}</maven.compiler.source>
            <maven.compiler.target>${java.version}</maven.compiler.target>
            <flink.version>1.12.0</flink.version>
            <scala.version>2.12</scala.version>
            <hadoop.version>3.1.3</hadoop.version>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>7.8.0</version>
            </dependency>
            <!-- elasticsearch 的客户端 -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>7.8.0</version>
            </dependency>
            <!-- elasticsearch 依赖 2.x 的 log4j -->
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>2.8.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>2.8.2</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.9.9</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.16</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-json</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>
    </project>
    

    学习交流,有任何问题还请随时评论指出交流。

    bk