参考:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html DataSet and DataStream Flink具有特殊类DataSet和DataStream来表示程序中的数据。 你可以将它们视为可以包含重复项的不可变
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html
DataSet and DataStream
Flink具有特殊类DataSet和DataStream来表示程序中的数据。 你可以将它们视为可以包含重复项的不可变数据集合。Anatomy of a Flink Program Flink程序的剖析
Flink程序看起来像是转换数据集合的常规程序。 每个程序包含相同的基本部分:- 获得执行环境, Obtain an execution environment,
- 加载/创建初始数据, Load/create the initial data,
- 指定此数据的转换, Specify transformations on this data,
- 指定放置计算结果的位置,Specify where to put the results of your computations,
- 触发程序执行 Trigger the program execution
getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(String host, int port, String... jarFiles)
通常,你只需要使用getExecutionEnvironment(),因为这将根据上下文执行正确的操作:如果你在IDE中执行程序或作为常规Java程序,它将创建一个本地环境,将执行你的程序 你的本地机器。 如果你从程序中创建了一个JAR文件,并通过命令行调用它,则Flink集群管理器将执行你的main方法,getExecutionEnvironment()将返回一个执行环境,用于在集群上执行你的程序。
对于指定数据源,执行环境有几种方法可以使用各种方法从文件中读取:你可以逐行读取它们,CSV文件或使用完全自定义数据输入格式。 要将文本文件作为一系列行读取,你可以使用:final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.readTextFile("file:///path/to/file");这将为你提供一个DataStream,然后你可以在其上应用转换来创建新的派生DataStream。 你可以通过使用转换函数调用DataStream上的方法来应用转换。 例如,map转换如下所示:
DataStream<String> input = ...; DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } });
这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。
一旦有了包含最终结果的DataStream,就可以通过创建接收器(sink)将其写入外部系统。 这些只是创建接收器的一些示例方法:writeAsText(String path)
print()
一旦指定了完整的程序,就需要通过调用StreamExecutionEnvironment上的execute()来触发程序执行。 根据ExecutionEnvironment的类型,将在本地计算机上触发执行或提交程序以在群集上执行。
execute()方法返回一个JobExecutionResult,它包含执行时间和累加器结果。Lazy Evaluation 惰性求值
所有Flink程序都是惰性执行:当执行程序的main方法时,数据加载和转换不会直接发生。 而是创建每个操作并将其添加到程序的计划中。 当执行环境上的execute()调用显式触发执行时,实际执行操作。 程序是在本地执行还是在集群上执行取决于执行环境的类型。 惰性求值使你可以构建Flink作为一个整体计划单元执行的复杂程序。Specifying Keys 指定键
一些转换(join,coGroup,keyBy,groupBy)要求在元素集合上定义键。 其他转换(Reduce,GroupReduce,Aggregate,Windows)允许数据在应用之前在键上分组。 DataSet分组:DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
DataStream设置键:
DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/);
Flink的数据模型不基于键值对。 因此,你无需将数据集类型物理打包到键和值中。 键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组运算符。
Define keys for Tuples 定义元组的键
最简单的情况是在元组的一个或多个字段上对元组进行分组:DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
元组在第一个字段(整数类型)上分组。
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。
注意嵌套元组:如果你有一个带有嵌套元组的DataStream,例如:DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定keyBy(0)将使系统使用完整的Tuple2作为键(以Integer和Float为键)。 如果要“导航”到嵌套的Tuple2中,则必须使用下面解释的字段表达式键。
Define keys using Field Expressions 使用字段表达式定义键
你可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于grouping, sorting, joining或coGrouping的键。 字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如Tuple和POJO类型。 在下面的示例中,我们有一个WC POJO,其中包含两个字段“word”和“count”。 要按字段分组,我们只需将其名称传递给keyBy()函数。// some ordinary POJO (Plain old Java Object) public class WC { public String word; public int count; } DataStream<WC> words = // [...] DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
字段表达式语法:
- 按字段名称选择POJO字段。 例如,“user”指的是POJO类型的“user”字段。
- 按字段名称或0偏移字段索引选择元组字段。 例如,“f0”和“5”分别表示Java元组类型的第一和第六字段。
- 你可以在POJO和Tuples中选择嵌套字段。 例如,“user.zip”指的是POJO的“zip”字段,其存储在POJO类型的“user”字段中。 支持任意嵌套和混合POJO和元组,例如“f1.user.zip”或“user.f3.1.zip”。
- 你可以使用“*”通配符表达式选择完整类型。 这也适用于非Tuple或POJO类型的类型。
public static class WC { public ComplexNestedClass complex; //nested POJO private int count; // getter / setter for private field (count) public int getCount() { return count; } public void setCount(int c) { this.count = c; } } public static class ComplexNestedClass { public Integer someNumber; public float someFloat; public Tuple3<Long, Long, String> word; public IntWritable hadoopCitizen; }
这些是上面示例代码的有效字段表达式:
- “count”:WC类中的count字段。
- “complex”:递归选择POJO类型ComplexNestedClass的字段复合体的所有字段。
- “complex.word.f2”:选择嵌套Tuple3的最后一个字段。
- “complex.hadoopCitizen”:选择Hadoop IntWritable类型。
Define keys using Key Selector Functions 使用键选择器函数定义键
定义键的另一种方法是“键选择器”函数。 键选择器函数将单个元素作为输入并返回元素的键。 键可以是任何类型,并且可以从确定性计算中输出。 以下示例显示了一个键选择器函数,它只返回一个对象的字段:// some ordinary POJO public class WC {public String word; public int count;} DataStream<WC> words = // [...] KeyedStream<WC> keyed = words .keyBy(new KeySelector<WC, String>() { public String getKey(WC wc) { return wc.word; } });
Specifying Transformation Functions 指定转换函数
大多数转换都需要用户定义的函数。 本节列出了如何指定它们的不同方法。Implementing an interface 实现接口
最基本的方法是实现一个提供的接口:class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } }; data.map(new MyMapFunction());
Anonymous classes 匿名类
你可以将函数作为匿名类传递:
data.map(new MapFunction<String, Integer> () { public Integer map(String value) { return Integer.parseInt(value); } });
Java 8 Lambdas
Flink还支持Java API中的Java 8 Lambdas。data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);
Rich functions 丰富的函数
所有需要用户定义函数的转换都可以将rich函数作为参数。 例如,替换class MyMapFunction implements MapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } };你可以写
class MyMapFunction extends RichMapFunction<String, Integer> { public Integer map(String value) { return Integer.parseInt(value); } };并像往常一样将函数传递给map转换:
data.map(new MyMapFunction());
丰富的函数也可以定义为匿名类:
data.map (new RichMapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });除了用户定义的函数(map,reduce等)之外,Rich函数还提供了四种方法:open,close,getRuntimeContext和setRuntimeContext。 这些用于参数化函数,创建和完成本地状态,访问广播变量以及访问运行时信息(如累加器和计数器)以及迭代信息。
Supported Data Types 支持的数据类型
Flink对DataSet或DataStream中可以包含的元素类型设置了一些限制。原因是系统分析类型以确定有效的执行策略。 有六种不同类别的数据类型:- 元组(Java Tuples and Scala Case Classes)
- Java普通对象(Java POJOs)
- 基本类型(Primitive Types)
- 常规类(Regular Classes)
- 值类型(Values)
- Hadoop可写接口的实现(Hadoop Writables)
- 特殊类型(Special Types)
Tuples and Case Classes 元组
元组是包含固定数量的具有各种类型的字段的复合类型。 Java API提供从Tuple1到Tuple25的类。 元组的每个字段都可以是包含更多元组的任意Flink类型,从而产生嵌套元组。 可以使用字段名称tuple.f4直接访问元组的字段,也可以使用通用getter方法tuple.getField(int position)。 字段索引从0开始。请注意,这与Scala元组形成对比,但它与Java的一般索引更为一致。DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements( new Tuple2<String, Integer>("hello", 1), new Tuple2<String, Integer>("world", 2)); wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() { @Override public Integer map(Tuple2<String, Integer> value) throws Exception { return value.f1; } }); wordCounts.keyBy(0); // also valid .keyBy("f0")
POJO Java普通对象
如果满足以下要求,则Flink将Java和Scala类视为特殊的POJO数据类型:- 类必须是公共的。
- 它必须有一个没有参数的公共构造函数(默认构造函数)。
- 所有字段都是公共的,或者必须通过getter和setter函数访问。 对于名为foo的字段,getter和setter方法必须命名为getFoo()和setFoo()。
- Flink必须支持字段的类型。 目前,Flink使用Avro序列化任意对象(例如Date)。
public class WordWithCount { public String word; public int count; public WordWithCount() {} public WordWithCount(String word, int count) { this.word = word; this.count = count; } } DataStream<WordWithCount> wordCounts = env.fromElements( new WordWithCount("hello", 1), new WordWithCount("world", 2)); wordCounts.keyBy("word"); // key by field expression "word"
Primitive Types 基本类型
Flink支持所有Java和Scala基本类型,如Integer,String和Double。General Class Types 常规类类型
Flink支持大多数Java和Scala类(API和自定义)。 限制适用于包含无法序列化的字段的类,如文件指针,I/O流或其他本机资源。 遵循Java Beans约定的类通常可以很好地工作。 所有未标识为POJO类型的类(请参阅上面的POJO要求)都由Flink作为常规类类型处理。 Flink将这些数据类型视为黑盒子,并且无法访问其内容(即,用于高效排序)。 使用序列化框架Kryo对常规类型进行序列化和反序列化。Values 值类型
值类型需手动描述其序列化和反序列化。它们不是通过通用序列化框架,而是通过使用读取和写入方法实现org.apache.flinktypes.Value接口来为这些操作提供自定义代码。当通用序列化效率非常低时,使用值类型是合理的。一个示例是将元素的稀疏向量实现为数组的数据类型。知道数组大部分为零,可以对非零元素使用特殊编码,而通用序列化只需编写所有数组元素。 org.apache.flinktypes.CopyableValue接口以类似的方式支持手动内部克隆逻辑。 Flink带有与基本数据类型对应的预定义值类型。 (ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。这些值类型充当基本数据类型的可变变体:它们的值可以被更改,允许程序员重用对象并降低垃圾收集器压力。Hadoop Writables Hadoop可写接口的实现
你可以使用实现org.apache.hadoop.Writable接口的类型。 write()和readFields()方法中定义的序列化逻辑将用于序列化。Special Types 特殊类型
你可以使用特殊类型,包括Scala的Either,Option和Try。 Java API有自己的自定义Either实现。 与Scala的Either类似,它代表两种可能类型的值,左或右。 两者都可用于错误处理或需要输出两种不同类型记录的运算符。Type Erasure & Type Inference 类型擦除和类型推断
注意:本节仅适用于Java。 Java编译器在编译后抛弃了大部分泛型类型信息。这在Java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。例如,DataStream <String>和DataStream <Long>的实例在JVM看来是一样的。 Flink在准备执行程序时(当调用程序的main方法时)需要类型信息。 Flink Java API尝试重建以各种方式丢弃的类型信息,并将其显式存储在数据集和运算符中。你可以通过DataStream.getType()检索类型。该方法返回TypeInformation的一个实例,这是Flink表示类型的内部方式。 类型推断有其局限性,在某些情况下需要程序员的“合作”。这方面的示例是从集合创建数据集的方法,例如ExecutionEnvironment.fromCollection(),你可以在其中传递描述类型的参数。但是像MapFunction<I,O>这样的通用函数也可能需要额外的类型信息。 ResultTypeQueryable接口可以通过输入格式和函数实现,以明确告知API其返回类型。调用函数的输入类型通常可以通过先前操作的结果类型来推断。Accumulators & Counters 累加器和计数器
累加器是具有增加操作(add operation)和最终累积结果(final accumulated result)的简单构造,可在作业结束后使用。 最直接的累加器是一个计数器(counter):你可以使用Accumulator.add(V value)方法递增它。 在工作结束时,Flink将汇总(合并)所有部分结果并将结果发送给客户。 在调试过程中,或者如果你想快速了解有关数据的更多信息,累加器非常有用。 Flink目前有以下内置累加器。 它们中的每一个都实现了Accumulator接口。- IntCounter,LongCounter和DoubleCounter:请参阅下面的使用计数器的示例。
- 直方图(Histogram):离散数量的区间的直方图实现。 在内部,它只是一个从Integer到Integer的映射。 你可以使用它来计算值的分布,例如 字数统计程序的每行字数分布。
How to use accumulators: 如何使用累加器:
首先,你必须在要使用它的用户定义转换函数中创建累加器对象(此处为计数器)。private IntCounter numLines = new IntCounter();其次,你必须注册累加器对象,通常在丰富(rich)函数的open()方法中。 在这里你还可以定义名称。
getRuntimeContext().addAccumulator("num-lines", this.numLines);
你现在可以在运算符函数中的任何位置使用累加器,包括open()和close()方法。
this.numLines.add(1);
整个结果将存储在JobExecutionResult对象中,该对象是从执行环境的execute()方法返回的(当前这仅在执行等待作业完成时才有效)。
myJobExecutionResult.getAccumulatorResult("num-lines")每个作业的所有累加器共享一个命名空间。 因此,你可以在作业的不同运算符函数中使用相同的累加器。 Flink将在内部合并所有具有相同名称的累加器。 关于累加器和迭代的说明:目前累加器的结果仅在整个作业结束后才可用。 我们还计划在下一次迭代中使前一次迭代的结果可用。 你可以使用聚合器来计算每次迭代统计信息,并根据此类统计信息确定迭代的终止。