当前位置 博文首页 > Rango_lhl:Flink使用二次聚合实现TopN计算-乱序数据

    Rango_lhl:Flink使用二次聚合实现TopN计算-乱序数据

    作者:Rango_lhl 时间:2021-05-26 18:21

    一、背景说明:

    在上篇文章实现了TopN计算,但是碰到迟到数据则会无法在当前窗口计算,需要对其中的键控状态优化

    Flink使用二次聚合实现TopN计算

    本次需求是对数据进行统计,要求每隔5秒,输出最近10分钟内访问量最多的前N个URL,数据流预览如下(每次一条从端口传入):

    208.115.111.72 - - 17/05/2015:10:25:49 +0000 GET /?N=A&page=21   //15:50-25:50窗口数据
    208.115.111.72 - - 17/05/2015:10:25:50 +0000 GET /?N=A&page=21
    208.115.111.72 - - 17/05/2015:10:25:51 +0000 GET /?N=A&page=21
    208.115.111.72 - - 17/05/2015:10:25:52 +0000 GET /?N=A&page=21   //第一次触发计算,15:50-25:50窗口
    208.115.111.72 - - 17/05/2015:10:25:47 +0000 GET /?N=A&          //迟到数据,不同url
    208.115.111.72 - - 17/05/2015:10:25:53 +0000 GET /?N=A&page=21   //第二次触发计算,15:50-25:50窗口
    208.115.111.72 - - 17/05/2015:10:25:46 +0000 GET /?N=A&page=21   //迟到数据
    208.115.111.72 - - 17/05/2015:10:25:54 +0000 GET /?N=A&page=21   //第三次触发计算
    

    最后统计输出结果如下(迟到数据均在25:50窗口):

    ==============2015-05-17 10:25:50.0==============               //第一次触发计算结果
    Top1 Url:/?N=A&page=21 Counts:1
    ==============2015-05-17 10:25:50.0==============
    
    ==============2015-05-17 10:25:50.0==============               //第二次触发计算结果
    Top1 Url:/?N=A&page=21 Counts:1
    Top2 Url:/?N=A& Counts:1
    ==============2015-05-17 10:25:50.0==============
    
    ==============2015-05-17 10:25:50.0==============               //第三次触发计算结果
    Top1 Url:/?N=A&page=21 Counts:2
    Top2 Url:/?N=A& Counts:1
    ==============2015-05-17 10:25:50.0==============
    

    二、实现过程

    1. 实现思路:
      ①建立环境,设置并行度及CK。
      ②定义watermark策略及事件时间,获取数据并对应到JavaBean。
      ③第一次聚合,按url分组开窗聚合,使用aggregate算子进行增量计算。
      ④第二次聚合,按窗口聚合,使用MapState存放数据,定义第一个定时器,在watermark达到后1秒触发,对窗口数据排序输出,定义第二个定时器,窗口关闭后才清楚状态。
      ⑤打印结果及执行。

    ps:乱序数据不能使用读取本地文本文件的方式测试,文件读取加载比较快,无法观察到迟到数据处理效果,乱序数据的开发测试这里从服务器端口获取数据的方式测试

    1. 代码细节说明:

    只针对优化部分代码说明,其他代码可以在顺序数据篇文章查看,这里提取重写KeyedProcessFunction里面方法的部分代码

    @Override
    public void processElement(UrlCount value, Context ctx, Collector<String> out) throws Exception {
    	//状态装入数据
    	mapState.put(value.getUrl(), value);
    	//定时器,窗口一秒后触发
    	ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1L);
    	//再加一个定时器来清除状态用,在窗口关闭后再清除状态,这样延迟数据到达后窗口还能做排序
    	ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+61001L);
    }
    //定时器内容
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    	if (timestamp == ctx.getCurrentKey()+61001L){
    		mapState.clear();
    		return;}
    ...
    
    • 这里改用MapState,如若使用ListState,进来迟到数据后,则会出现同个url在同个窗口的统计出现多个计数的情况,列表状态不具备去重功能,故在这里使用map状态来实现去重。
    • 这里使用定时器来清除状态,原写法是在onTimer最后排序完直接清除状态,则会导致迟到数据到达后,原窗口其他数据被清除掉无法实现排名的输出,这里定时器的时间是在61001毫秒后清除状态数据。
    • 定时器61001毫秒 = 允许迟到数据1秒(forBoundedOutOfOrderness)+窗口迟到数据1分钟(allowedLateness)+第一个定时器1毫秒。

    三、完整代码

    package com.test.topN;
    
    import bean.ApacheLog;
    import bean.UrlCount;
    import org.apache.commons.compress.utils.Lists;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import java.sql.Timestamp;
    import java.text.SimpleDateFormat;
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.Map;
    /**
     * @author: Rango
     * @create: 2021-05-26 10:16
     * @description: 每隔5秒,输出最近10分钟内访问量最多的前N个URL
     **/
    public class URLTopN3 {
        public static void main(String[] args) throws Exception {
    
            //1.建立环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
    
            //2.读取端口数据并映射到JavaBean,并定义watermark时间语义
            WatermarkStrategy<ApacheLog> wms = WatermarkStrategy
                    .<ApacheLog>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                    .withTimestampAssigner(new SerializableTimestampAssigner<ApacheLog>() {
                        @Override
                        public long extractTimestamp(ApacheLog element, long recordTimestamp) {
                            return element.getTs();
                        }});
    
            SingleOutputStreamOperator<ApacheLog> apacheLogDS = env.socketTextStream("hadoop102", 9999)
                    .map(new MapFunction<String, ApacheLog>() {
                        @Override
                        public ApacheLog map(String value) throws Exception {
                            SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yy:HH:mm:ss");
                            String[] split = value.split(" ");
                            return new ApacheLog(split[0],
                                    split[2],
                                    sdf.parse(split[3]).getTime(),
                                    split[5],
                                    split[6]);
                        }})
                    .assignTimestampsAndWatermarks(wms);
    
            //3.第一次聚合,按url转为tuple2分组,开窗,增量聚合
            SingleOutputStreamOperator<UrlCount> aggregateDS = apacheLogDS
                    .map(new MapFunction<ApacheLog, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(ApacheLog value) throws Exception {
                    return new Tuple2<>(value.getUrl(), 1);
                }}).keyBy(data -> data.f0)
                    .window(SlidingEventTimeWindows.of(Time.minutes(10),Time.seconds(5)))
                    .allowedLateness(Time.minutes(1))
                    .aggregate(new HotUrlAggFunc(), new HotUrlWindowFunc());
    
            //4.第二次聚合,对第一次聚合输出按窗口分组,再全窗口聚合,建立定时器你,每5秒钟触发一次
            SingleOutputStreamOperator<String> processDS = aggregateDS
                    .keyBy(data -> data.getWindowEnd())
                    .process(new HotUrlProcessFunc(5));
    
            processDS.print();
            env.execute();
        }
        //实现AggregateFunction类中的方法
        public static class HotUrlAggFunc implements AggregateFunction<Tuple2<String, Integer>,Integer,Integer>{
            @Override
            public Integer createAccumulator() {return 0;}
            @Override
            public Integer add(Tuple2<String, Integer> value, Integer accumulator) { return accumulator+1;}
            @Override
            public Integer getResult(Integer accumulator) {return accumulator;}
            @Override
            public Integer merge(Integer a, Integer b) {return a+b; }
        }
        //实现窗口函数的apply方法,把累加函数输出的整数结果,转换为javabean类urlcount来做输出,方便后续按窗口聚合
        public static class HotUrlWindowFunc implements WindowFunction<Integer, UrlCount,String, TimeWindow> {
            @Override
            public void apply(String urls, TimeWindow window, Iterable<Integer> input, Collector<UrlCount> out) throws Exception {
                //获取按key相加后的次数并新建javabean(urlcount)作为返回
                Integer count = input.iterator().next();
                out.collect(new UrlCount(urls,window.getEnd(),count));
            }
        }
        //继承KeyedProcessFunction方法,重写processElemnt方法
        public static class HotUrlProcessFunc extends KeyedProcessFunction<Long,UrlCount,String>{
            //定义TopN为入参
            private Integer TopN;
            public HotUrlProcessFunc(Integer topN) {
                TopN = topN;
            }
            //定义状态
            private MapState <String,UrlCount>mapState;
            //open方法中初始化状态
            @Override
            public void open(Configuration parameters) throws Exception {
                mapState = getRuntimeContext()
                        .getMapState(new MapStateDescriptor<String, UrlCount>("map-state",String.class,UrlCount.class));
            }
            @Override
            public void processElement(UrlCount value, Context ctx, Collector<String> out) throws Exception {
                //状态装入数据
                mapState.put(value.getUrl(), value);
                //定时器,窗口一秒后触发
                ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1L);
                //再加一个定时器来清除状态用,在窗口关闭后再清除状态,这样延迟数据到达后窗口还能做排序
                ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+61001L);
            }
            //定时器内容
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                if (timestamp == ctx.getCurrentKey()+61001L){
                    mapState.clear();
                    return;}
    
                //取出状态数据
                Iterator<Map.Entry<String, UrlCount>> iterator = mapState.iterator();
                ArrayList<Map.Entry<String, UrlCount>> entries = Lists.newArrayList(iterator);
    
                //排序
                entries.sort(((o1, o2) -> o2.getValue().getCount()-o1.getValue().getCount()));
    
                //排序后装入StringBulider作为输出TopN
                StringBuilder sb = new StringBuilder();
                sb.append("==============")
                        .append(new Timestamp(timestamp - 1L))
                        .append("==============")
                        .append("\n");
                for (int i = 0; i < Math.min(TopN,entries.size()); i++) {
                    UrlCount urlCount = entries.get(i).getValue();
                    sb.append("Top").append(i+1);
                    sb.append(" Url:").append(urlCount.getUrl());
                    sb.append(" Counts:").append(urlCount.getCount());
                    sb.append("\n");
                }
                sb.append("==============")
                        .append(new Timestamp(timestamp - 1L))
                        .append("==============")
                        .append("\n")
                        .append("\n");
    
                out.collect(sb.toString());
                Thread.sleep(200);
                }}}
               
    

    映射数据源的JavaBean

    package bean;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class ApacheLog {
        private String ip;
        private String userId;
        private Long ts;
        private String method;
        private String url;
    }
    

    第一次聚合输出的JavaBean

    package bean;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class UrlCount {
        private String url;
        private Long windowEnd;
        private Integer count;
    }
    

    学习交流,有任何问题还请随时评论指出交流。

    bk