当前位置 : 主页 > 编程语言 > java >

Flink高级特性(1)

来源:互联网 收集:自由互联 发布时间:2023-09-03
window window是针对DataStream,一种可以把无界的数据切割为有界数据块的手段,可以是时间驱动的【time window】或者数据驱动的【count window】,元素个数。 类型:分为 tumbling window:滚动窗口

window

window是针对DataStream,一种可以把无界的数据切割为有界数据块的手段,可以是时间驱动的【time window】或者数据驱动的【count window】,元素个数。

类型:分为 tumbling window:滚动窗口【没有重叠】、sliding window:滑动窗口【有重叠】

time window

通过socket接收数据,统计窗口内的单词数量。不使用keyBy时,使用timeWindowAll来代替timeWindow

public class WindowStudent {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001);
        // 使用滚动窗口,每10秒计算一次前10秒的窗口数据
        // 对每个时间窗口内的数据进行单词统计
        streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            // value 接收到的每行数据
            @Override
            public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {
                String[] splitValue = value.split(" ");
                for (String str : splitValue) {
                    out.collect(new Tuple2<String,Integer>(str,1));
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(10)).sum(1).print();


        // 使用滑动窗口 每隔5秒统计前10秒窗口内的数据
        streamSource.flatMap(new FlatMapFunction<String, Tuple2>() {
            // value 接收到的每行数据
            @Override
            public void flatMap(String value, Collector<Tuple2> out) throws Exception {
                String[] splitValue = value.split(" ");
                for (String str : splitValue) {
                    out.collect(new Tuple2(str,1));
                }
            }
        })
        .keyBy(0)
        // 第一个参数:窗口大小 第二个参数:滑动间隔
        .timeWindow(Time.seconds(10),Time.seconds(5))
        .sum(1).print();
        env.execute();
    }
}


CountWindow

CountWindow 是针对元素个数来进行分隔。

注意:当前面有keyBy对元素进行分组时,当分组中的元素达到窗口大小才会进行计算,而不是总元素大小

不使用keyBy时,使用CountWindowAll来代替CountWindow

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001);
        // 使用滚动窗口,每隔6个元素计算一次
        // 对每个窗口内的数据进行单词统计
        streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            // value 接收到的每行数据
            @Override
            public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {
                String[] splitValue = value.split(" ");
                for (String str : splitValue) {
                    out.collect(new Tuple2<String,Integer>(str,1));
                }
            }
        }).keyBy(0)
                // 表示每隔6个元素计算一次 前面有keyBy,会对元素进行分组,当分组中的元素有6个才会进行计算
                .countWindow(6)
                .sum(1).print();


        // 使用滑动窗口 每隔2个元素计算一次前5个元素
        streamSource.flatMap(new FlatMapFunction<String, Tuple2>() {
            // value 接收到的每行数据
            @Override
            public void flatMap(String value, Collector<Tuple2> out) throws Exception {
                String[] splitValue = value.split(" ");
                for (String str : splitValue) {
                    out.collect(new Tuple2(str,1));
                }
            }
        })
                .keyBy(0)
                // 第一个参数:窗口大小 第二个参数:滑动间隔
                .countWindow(5,2)
                .sum(1).print();
        env.execute();
    }

自定义window

timeWindow和countWindow都是基于window()实现的,timeWindowAll则是windowAll。

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001);
        // 使用滚动窗口,每隔10个元素计算一次
        // 对每个窗口内的数据进行单词统计
        streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            // value 接收到的每行数据
            @Override
            public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {
                String[] splitValue = value.split(" ");
                for (String str : splitValue) {
                    out.collect(new Tuple2<String,Integer>(str,1));
                }
            }
        }).keyBy(0)
                // 表示每隔10个元素计算一次 前面有keyBy,会对元素进行分组,当分组中的元素有6个才会进行计算
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .sum(1).print();

		// 使用滑动窗口 每隔2秒统计前5秒的数据
        streamSource.flatMap(new FlatMapFunction<String, Tuple2>() {
            // value 接收到的每行数据
            @Override
            public void flatMap(String value, Collector<Tuple2> out) throws Exception {
                String[] splitValue = value.split(" ");
                for (String str : splitValue) {
                    out.collect(new Tuple2(str,1));
                }
            }
        })
                .keyBy(0)
                // 第一个参数:窗口大小 第二个参数:滑动间隔
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))
                .sum(1).print();
        env.execute();
    }

增量聚合与全量聚合

增量聚合:窗口中每进入一条数据就进行一次计算,代表函数:reduce,sum,min,max等

全量聚合:等属于窗口的数据到齐,才开始进行计算,可以对窗口内的数据进行排序等需求 ,代表函数:apply,process。process比apply包含更多的上下文信息。

网友评论