前言 受spark sql在喜马拉雅的使用之xql 这篇文章影响,我发现类似下面这种语法是极好的: //加载mysql表load jdbc.`mysql1.tb_v_user` as mysql_tb_user;//处理后映射成spark临时表select * fr
前言
受spark sql在喜马拉雅的使用之xql 这篇文章影响,我发现类似下面这种语法是极好的:
//加载mysql表load jdbc.`mysql1.tb_v_user` as mysql_tb_user;//处理后映射成spark临时表select * from mysql_tb_user limit 100 as result_csv;//保存到文件里save result_csv as csv.`/tmp/todd/csv_test`;//加载文件load csv.`/tmp/todd/csv_test` as csv_input;//处理文件select * from csv_input limit 10 as csv_input_result;//再次保存结果,并且设置分区字段save csv_input_result as json.`/tmp/todd/result_json` partitionBy uid;核心是load,select,save 三个语法动作。这个用来做ETL会更加方便。而且和Spark SQL Server 结合,会显得更有优势,不用每次启动向Yarn申请资源。所以这里也仿照了一套。
感受新语法
需要自己编译一个新版本,具体参看: https://github.com/allwefantasy/streamingpro/blob/master/README-CN.md
先启动一个StreamingPro Rest Server:
准备一个只包含
{}的query.json的文件(名字可以任意),然后按如下的方式启动即可:
SHome=/Users/allwefantasy/streamingpro./bin/spark-submit --class streaming.core.StreamingApp \--master local[2] \--name sql-interactive \$SHome/streamingpro-spark-2.0-0.4.15-SNAPSHOT.jar \-streaming.name sql-interactive \-streaming.job.file.path file://$SHome/query.json \-streaming.platform spark \-streaming.rest true \-streaming.driver.port 9003 \-streaming.spark.service true \-streaming.thrift true \-streaming.enableHiveSupport true我们先看下脚本:
//链接一个mysql 数据库,并且将该库注册为db1connect jdbc where driver="com.mysql.jdbc.Driver" and url="jdbc:mysql://127.0.0.1/alarm_test?characterEncoding=utf8" and user="root" and password="****" as db1;//加载t_report表为trload jdbc.`db1.t_report` as tr;// 把tr表处理完成后映射成new_tr表select * from tr as new_tr;//保存到/tmp/todd目录下,并且格式为jsonsave new_tr as json.`/tmp/todd`;为此,StreamingPro新添加了一个接口/run/script,专门为了执行脚本
curl --request POST \ --url http://127.0.0.1:9003/run/script \ --data 'sql=上面的脚本内容'我建议用PostMan之类的工具做测试。之后用/run/sql来查看结果
curl --request POST \ --url http://127.0.0.1:9003/run/sql \ --data 'sql=select * from json.`/tmp/todd`'