spark.groupByKey,combineByKey pairRdd中最好不要用groupByKey,因为groupBy类函数会使用shuffl带来性能问题,所以pairRdd一般使用combineByKey:示例:使用前rdd格式: JavaPairRDD pairRdd2 = pairRdd.combineByKey(e - {
pairRdd中最好不要用groupByKey,因为groupBy类函数会使用shuffl带来性能问题,所以pairRdd一般使用combineByKey: 示例: 使用前rdd格式: JavaPairRDDpairRdd2 = pairRdd.combineByKey(e -> { ArrayList list = new ArrayList (); list.add(e); return list; }, (list, e) -> { list.add(e); return list; }, (lista, listb) -> { lista.addAll(listb); return lista; }); 使用后pairRdd2格式:JavaPairRDD > 可使用dataset的groupByKey()+mapGroups()代替pairRdd的combineByKey(): //df原schema: StructType flatSchema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("asin", StringType, false), DataTypes.createStructField("pathId", StringType, true), DataTypes.createStructField("rank", IntegerType, true), }); //df转换后的schema: StructType returnSchema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("asin", StringType, false), DataTypes.createStructField("bsr_ext", DataTypes.createMapType(StringType, IntegerType, true), true) }); //同一个asin数据组合为map df = df.groupByKey(row -> row. getAs("asin"), Encoders.STRING()).mapGroups((key, values) -> { String asin = key; Iterator t = values; Map
map = Maps.newHashMap(); while (t.hasNext()) { Row row = t.next(); String pathId = row.getAs("pathId"); Integer rank = row.getAs("rank"); map.put(pathId, rank); } return new GenericRowWithSchema(new Object[] { asin, asScalaMap(map) }, returnSchema); }, RowEncoder.apply(returnSchema));