“Apache Flink的Table API提供了对数据注册为Table的方式, 实现把数据通过SQL的方式进行计算。Table API与SQL API实现了Apache Flink的批流统一的实现方式。Table API与SQL API的核心概念就是TableEnviroment。TableEnviroment对象提供方法注册数据源与数据表信息。那么数据源与数据表的信息则存储在CataLog中。所以,CataLog是TableEnviroment的重要组成部分。”
Apache Flink在获取TableEnviroment对象后,可以通过Register实现对数据源与数据表进行注册。注册完成后数据库与数据表的原信息则存储在CataLog中。CataLog中保存了所有的表结构信息、数据目录信息等。
内部CataLog注册
...tableEnv.registerDataSet("USER",dataset,'name,'age)val result = tableEnv.sqlQuery("SELECT name,sum(age) FROM `USER` GROUP BY name")//使用Table对象注册TabletableEnv.registerTable("TABLE_RES",table)//输出注册的Table中的内容tableEnv.sqlQuery("SELECT * FROM `TABLE_RES`").toDataSet[Row].print()...
val env = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv = StreamTableEnvironment.create(env)//使用CSV的方式进行注册表结构.// 参数(path:数据的路径地址,fieldNames:字段名称,fieldTypes:字段类型,fieldDelim:csv分隔符,rowDelim:行分割方式)val csvTableSource:CsvTableSource =new CsvTableSource("../datas.csv",Array("exitcode","count"),Array(Types.STRING,Types.INT),",","\n")tableEnv.registerTableSource("csv",csvTableSource)以上使用的是StreamExecutionEnvironment进行的处理,当然也可以使用Batch的方式对数据进行注册可以自己进行尝试。
val csvPath = "D:/flink.csv"val fieldNames = Array[String]("user","age")val fieldTypes = Array[TypeInformation[_]](Types.STRING,Types.INT)val csvSink:CsvTableSink = new CsvTableSink(csvPath,",")tableEnv.registerTableSink("csv",fieldNames,fieldTypes,csvSink)tableEnv.sqlQuery("SELECT * FROM `USER` ").insertInto("csv")
外部CataLog注册
Apache Flink除了实现内部的CataLog作为所有Table的元数据存储介质之外还可以把CataLog放到其他的存储介质中。外部的CataLog可以自定义实现,然后在TableEnvironment中注册实现。Apache Flink官方提供了InMemoryCataLog的实现,开发者可以参考来实现其他的存储介质的CataLog。val memoryCataLog:ExternalCatalog = new InMemoryExternalCatalog("UserCataLog")tableEnv.registerExternalCatalog("user",memoryCataLog)以上为Apache Flink的CataLog的实现。阿里云Blink中对CataLog进行了重构,并且增加了两种cataLog。一种是基于内存的InMemoryCataLog,另外一种是能够桥接Hive metaStore的Hive CataLog。使Flink能够直接读取Hive的metaStore信息,打通Hive的数据连接。
⬇⬇⬇你好,我是CainGao。一线大数据开发者,关注我一起交流场景实现 ⬇⬇⬇