当前位置 : 主页 > 编程语言 > c++ >

spark通过API连接es

来源:互联网 收集:自由互联 发布时间:2021-06-30
SparkConnectionEs.java import data.spark.batch.cardbin.util.CardBinFields;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.sql.SQLContext;//import ....public class SparkConnectionEs{ //spark直连es并通过CardBinFields实体
SparkConnectionEs.java
import data.spark.batch.cardbin.util.CardBinFields;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SQLContext;
//import ....

public class SparkConnectionEs{
    //spark直连es并通过CardBinFields实体转为sparkRdd从而注册成table
    private static String sourceIP = "192.168.23.23";
    private static String esPath = "ybs_cardbin_info_bak/cardbin";//es_index/es_type
    public static void main(String[] args) throws Exception {
    JavaRDD
 
   esdataRdd = JavaEsSpark.esRDD(sparkContext, esPath).map(new Function
  
   >, CardBinFields>() { private static final long serialVersionUID = 1L; public CardBinFields call(Tuple2
   
    > v1) throws Exception { CardBinFields cardbin = new CardBinFields(); cardbin.setId(v1._1); cardbin.setBank_no(v1._2.get("bank_no").toString()); return cardbin; } }); DataFrame tfcardnoDF = sqlContext.createDataFrame(esdataRdd, CardBinFields.class).select("id", "bank_no"); tfcardnoDF.registerTempTable("ES_FIELDS"); } } /** pom.xml 
     
     
      org.elasticsearch
      
     
      elasticsearch-hadoop
      
     
      2.2.0-m1
      
     **/
   
  
 
网友评论