window window是针对DataStream,一种可以把无界的数据切割为有界数据块的手段,可以是时间驱动的【time window】或者数据驱动的【count window】,元素个数。 类型:分为 tumbling window:滚动窗口
window
window是针对DataStream,一种可以把无界的数据切割为有界数据块的手段,可以是时间驱动的【time window】或者数据驱动的【count window】,元素个数。
类型:分为 tumbling window:滚动窗口【没有重叠】、sliding window:滑动窗口【有重叠】
time window
通过socket接收数据,统计窗口内的单词数量。不使用keyBy时,使用timeWindowAll来代替timeWindow
public class WindowStudent {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001);
// 使用滚动窗口,每10秒计算一次前10秒的窗口数据
// 对每个时间窗口内的数据进行单词统计
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
// value 接收到的每行数据
@Override
public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {
String[] splitValue = value.split(" ");
for (String str : splitValue) {
out.collect(new Tuple2<String,Integer>(str,1));
}
}
}).keyBy(0).timeWindow(Time.seconds(10)).sum(1).print();
// 使用滑动窗口 每隔5秒统计前10秒窗口内的数据
streamSource.flatMap(new FlatMapFunction<String, Tuple2>() {
// value 接收到的每行数据
@Override
public void flatMap(String value, Collector<Tuple2> out) throws Exception {
String[] splitValue = value.split(" ");
for (String str : splitValue) {
out.collect(new Tuple2(str,1));
}
}
})
.keyBy(0)
// 第一个参数:窗口大小 第二个参数:滑动间隔
.timeWindow(Time.seconds(10),Time.seconds(5))
.sum(1).print();
env.execute();
}
}
CountWindow
CountWindow 是针对元素个数来进行分隔。
注意:当前面有keyBy对元素进行分组时,当分组中的元素达到窗口大小才会进行计算,而不是总元素大小
不使用keyBy时,使用CountWindowAll来代替CountWindow
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001);
// 使用滚动窗口,每隔6个元素计算一次
// 对每个窗口内的数据进行单词统计
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
// value 接收到的每行数据
@Override
public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {
String[] splitValue = value.split(" ");
for (String str : splitValue) {
out.collect(new Tuple2<String,Integer>(str,1));
}
}
}).keyBy(0)
// 表示每隔6个元素计算一次 前面有keyBy,会对元素进行分组,当分组中的元素有6个才会进行计算
.countWindow(6)
.sum(1).print();
// 使用滑动窗口 每隔2个元素计算一次前5个元素
streamSource.flatMap(new FlatMapFunction<String, Tuple2>() {
// value 接收到的每行数据
@Override
public void flatMap(String value, Collector<Tuple2> out) throws Exception {
String[] splitValue = value.split(" ");
for (String str : splitValue) {
out.collect(new Tuple2(str,1));
}
}
})
.keyBy(0)
// 第一个参数:窗口大小 第二个参数:滑动间隔
.countWindow(5,2)
.sum(1).print();
env.execute();
}
自定义window
timeWindow和countWindow都是基于window()实现的,timeWindowAll则是windowAll。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001);
// 使用滚动窗口,每隔10个元素计算一次
// 对每个窗口内的数据进行单词统计
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
// value 接收到的每行数据
@Override
public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {
String[] splitValue = value.split(" ");
for (String str : splitValue) {
out.collect(new Tuple2<String,Integer>(str,1));
}
}
}).keyBy(0)
// 表示每隔10个元素计算一次 前面有keyBy,会对元素进行分组,当分组中的元素有6个才会进行计算
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(1).print();
// 使用滑动窗口 每隔2秒统计前5秒的数据
streamSource.flatMap(new FlatMapFunction<String, Tuple2>() {
// value 接收到的每行数据
@Override
public void flatMap(String value, Collector<Tuple2> out) throws Exception {
String[] splitValue = value.split(" ");
for (String str : splitValue) {
out.collect(new Tuple2(str,1));
}
}
})
.keyBy(0)
// 第一个参数:窗口大小 第二个参数:滑动间隔
.window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))
.sum(1).print();
env.execute();
}
增量聚合与全量聚合
增量聚合:窗口中每进入一条数据就进行一次计算,代表函数:reduce,sum,min,max等
全量聚合:等属于窗口的数据到齐,才开始进行计算,可以对窗口内的数据进行排序等需求 ,代表函数:apply,process。process比apply包含更多的上下文信息。