当前位置 : 主页 > 编程语言 > java >

Flink高级特性(2)

来源:互联网 收集:自由互联 发布时间:2023-09-06
watermark 水位线 处理乱序 数据流从数据产生到DataSource,再到具体的算子,中间是有一个过程和时间,有可能会导致数据乱序问题,通过watermark + EventTime来处理。 作用 : 由于网络延迟等

watermark 水位线 处理乱序

数据流从数据产生到DataSource,再到具体的算子,中间是有一个过程和时间,有可能会导致数据乱序问题,通过watermark + EventTime来处理。

作用由于网络延迟等原因,一条数据会迟到计算,比如使用event time来划分窗口,我们知道窗口中的数据是计算一段时间的数据,如果一个数据来晚了,它的时间范围已经不属于这个窗口了,则会被丢弃,但他的event time实际上是属于这个窗口的。引入watermark机制则会等待晚到的数据一段时间,等待时间到则触发计算,如果数据延迟很大,通常也会被丢弃或者另外处理。

生成方式

watermark生成方式有两种:

  • 周期性生成watermark【常用】:每隔N秒自动向流中注入一个watermark,由executionConfig.setAutoWatermarkInterval()决定,默认是200毫秒。注意:假如设置3秒延时,使用事件的时间戳,如果有窗口的停止时间等于maxEventTime – 3,那么这个窗口被触发执行,否则一直等待
  • 基于事件生成:基于某些事件触发watermark的生成,每个元素都有机会判断是否生成一个watermark


public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 设置并行度为1 用来演示单并行度下的watermark
        env.setParallelism(1);
        // 设置周期性生成watermark 默认200ms
        env.getConfig().setAutoWatermarkInterval(200);
        // 传输过来的数据格式:10001,19980009908(id,时间戳)
        DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001);
        // 将数据转成Tuple2(id,时间戳)
        SingleOutputStreamOperator<Tuple2<Long, Long>> streamOperator = streamSource.map(new MapFunction<String, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(String values) throws Exception {
                String[] split = values.split(",");
                return new Tuple2<>(Long.valueOf(split[0]), Long.valueOf(split[1]));
            }
        });
        // 分配时间戳和 watermark 设置允许的最大乱序时间范围10秒
        SingleOutputStreamOperator<Tuple2<Long, Long>> watermarkStream = streamOperator.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Long, Long>>(Time.seconds(10)) {
            // 从数据流中抽取时间戳作为EventTime
            @Override
            public long extractTimestamp(Tuple2<Long, Long> longLongTuple2) {
                System.out.println("key:" + longLongTuple2.f0 + ",eventTime:" + longLongTuple2.f1);
                return longLongTuple2.f1;
            }
        });
        watermarkStream.keyBy(0)
                // 按照消息的EventTime分配窗口 和调用TimeWindow效果一样
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                // 全量聚合 参数1:接收的数据 参数2:输出的数据类型
                .apply(new WindowFunction<Tuple2<Long, Long>, String, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<Long, Long>> input, Collector<String> out) throws Exception {
                        ArrayList<Long> result = new ArrayList<>();
                        // 将时间戳放入队列 便于排序
                        input.forEach(in->{
                            result.add(in.f1);
                        });
                        // 对时间戳排序  小的在前
                        Collections.sort(result);
                        // 将目前window内排序后的数据以及window的开始和结束时间打印出来
                        System.out.println("result:"+ JSON.toJSONString(result)+",windowStart:"+timeWindow.getStart()+",windowEnd:"+timeWindow.getEnd());
                        out.collect(JSON.toJSONString(result));
                    }
                }).print();

        env.execute();
    }


延迟数据的处理方式

三种方式:

  • 直接丢弃【默认】:Flink默认的策略就是对迟到的数据直接丢弃
  • 重新激活已经关闭的窗口并重新计算以修正结果。
  • 将迟到数据收集起来另外处理。


并行度

TaskManager和Slot,TaskManager是从节点,Slot个数一般对应的就是TaskManager的cpu数量,Slot用来执行具体的算子。

Flink高级特性(2)_并行度

Flink可以通过四种方式设置并行度,优先级从高到低排序:算子层面>执行环境层面>客户端层面>系统层面

// 算子层面
streamSource.map(new MapFunction<String, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(String values) throws Exception {
                String[] split = values.split(",");
                return new Tuple2<>(Long.valueOf(split[0]), Long.valueOf(split[1]));
            }
        }).setParallelism(2)
// 执行环境层面
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

客户端层面:提交任务时通过-p 指定并行度

系统层面:通过设置flink-conf.yaml文件中的parallelism.default属性来设置默认并行度

网友评论