SparkConnectionEs.java import data.spark.batch.cardbin.util.CardBinFields;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.sql.SQLContext;//import ....public class SparkConnectionEs{ //spark直连es并通过CardBinFields实体
import data.spark.batch.cardbin.util.CardBinFields;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SQLContext;
//import ....
public class SparkConnectionEs{
//spark直连es并通过CardBinFields实体转为sparkRdd从而注册成table
private static String sourceIP = "192.168.23.23";
private static String esPath = "ybs_cardbin_info_bak/cardbin";//es_index/es_type
public static void main(String[] args) throws Exception {
JavaRDD
esdataRdd = JavaEsSpark.esRDD(sparkContext, esPath).map(new Function
>, CardBinFields>() { private static final long serialVersionUID = 1L; public CardBinFields call(Tuple2
> v1) throws Exception { CardBinFields cardbin = new CardBinFields(); cardbin.setId(v1._1); cardbin.setBank_no(v1._2.get("bank_no").toString()); return cardbin; } }); DataFrame tfcardnoDF = sqlContext.createDataFrame(esdataRdd, CardBinFields.class).select("id", "bank_no"); tfcardnoDF.registerTempTable("ES_FIELDS"); } } /** pom.xml
org.elasticsearch
elasticsearch-hadoop
2.2.0-m1
**/
