当前位置 : 主页 > 编程语言 > java >

spark.groupByKey,combineByKey

来源:互联网 收集:自由互联 发布时间:2021-06-28
spark.groupByKey,combineByKey pairRdd中最好不要用groupByKey,因为groupBy类函数会使用shuffl带来性能问题,所以pairRdd一般使用combineByKey:示例:使用前rdd格式: JavaPairRDD pairRdd2 = pairRdd.combineByKey(e - {
spark.groupByKey,combineByKey
pairRdd中最好不要用groupByKey,因为groupBy类函数会使用shuffl带来性能问题,所以pairRdd一般使用combineByKey:
示例:
使用前rdd格式: JavaPairRDD
 
  
	pairRdd2 = pairRdd.combineByKey(e -> {
			ArrayList
  
    list = new ArrayList
   
    (); list.add(e); return list; }, (list, e) -> { list.add(e); return list; }, (lista, listb) -> { lista.addAll(listb); return lista; }); 使用后pairRdd2格式:JavaPairRDD
    
     > 可使用dataset的groupByKey()+mapGroups()代替pairRdd的combineByKey(): //df原schema: StructType flatSchema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("asin", StringType, false), DataTypes.createStructField("pathId", StringType, true), DataTypes.createStructField("rank", IntegerType, true), }); //df转换后的schema: StructType returnSchema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("asin", StringType, false), DataTypes.createStructField("bsr_ext", DataTypes.createMapType(StringType, IntegerType, true), true) }); //同一个asin数据组合为map df = df.groupByKey(row -> row.
     
       getAs("asin"), Encoders.STRING()).mapGroups((key, values) -> { String asin = key; Iterator
      
        t = values; Map
       
         map = Maps.newHashMap(); while (t.hasNext()) { Row row = t.next(); String pathId = row.getAs("pathId"); Integer rank = row.getAs("rank"); map.put(pathId, rank); } return new GenericRowWithSchema(new Object[] { asin, asScalaMap(map) }, returnSchema); }, RowEncoder.apply(returnSchema));
       
      
     
    
   
  
 
网友评论