当前位置 : 主页 > 编程语言 > java >

pyspark和spark pipe性能对比 用例程序

来源:互联网 收集:自由互联 发布时间:2022-07-20
//构造数据 public class Main { public static void main(String[] args) throws IOException { File file = new File("/home/gt/testdata.dat"); file.delete(); file.createNewFile(); OutputStream out = new FileOutputStream(file); OutputStreamWr


//构造数据
public class Main {
public static void main(String[] args) throws IOException {
File file = new File("/home/gt/testdata.dat");
file.delete();
file.createNewFile();
OutputStream out = new FileOutputStream(file);
OutputStreamWriter osw=new OutputStreamWriter(out);
BufferedWriter writer = new BufferedWriter(osw);
for(int i=0;i<9999999;i++){
writer.write("aaabbbcccdddeee");
writer.newLine();
}
writer.close();
osw.close();
out.close();
}
}

pipe相关代码:

#!/usr/bin/python
#coding=utf-8
def fff(line):
s = set()
l = list()
length = len(line)
for i in range(0,length-1):
if line[i] not in s:
l.append(line[i])
s.add(line[i])
return "".join(l)
result = ""
#var = 1
#while var == 1 :
for i in range(1,1111111):
s = raw_input()
if s is None or s =="" :
break
result += fff(s) + "\n"
printpackage test

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object PipeTest

def main(args: Array[String]) {
val t0 = System.currentTimeMillis();
val sparkConf = new SparkConf().setAppName("pipe Test")
val sc = new SparkContext(sparkConf)

val a = sc.textFile("/home/gt/testdata.dat", 9)
val result = a.pipe(" /home/gt/spark/bin/pipe.py").saveAsTextFile("/home/gt/output.dat")
sc.stop()
println("!!!!!!!!!"

pyspark相关代码

#-*- coding: utf-8 -*-
from __future__ import print_function

import sys
import time
from pyspark import SparkContext

#去掉重复的字母
if __name__ == "__main__":
t0 = time.time()
sc = SparkContext(appName="app2ap")
lines = sc.textFile("/home/gt/testdata.dat", 9)
def fff(line):
s = set()
l = list()
length = len(line)
for i in range(0,length-1):
if line[i] not in s:
l.append(line[i])
s.add(line[i])
return "".join(l)
rdd = lines.map(fff)
rdd.saveAsTextFile("/home/gt/output.dat")
sc.stop()
print("!!!!!!")
print(time.time()-t0)

附加原生的程序:

package test

import java.util.ArrayList
import java.util.HashSet

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object Test

def fff(line: String): String = {
val s = new HashSet[Char]()
val l = new ArrayList[Char]()
val length = line.length()
for (i <- 0 to length - 1) {
val c = line.charAt(i)
if (!s.contains(c)) {
l.add(c)
s.add(c)
}
}
return l.toArray().mkString
}

def main(args: Array[String]) {
val t0 = System.currentTimeMillis();
val sparkConf = new SparkConf().setAppName("pipe Test")
val sc = new SparkContext(sparkConf)

val a = sc.textFile("/home/gt/testdata.dat", 9)
val result = a.map(fff).saveAsTextFile("/home/gt/output.dat")
sc.stop()
println("!!!!!!!!!"

结论是Spark Scala是25s,pipe是50s,pyspark是75s


上一篇:【详细教程】一文参透MongoDB聚合查询
下一篇:没有了
网友评论