//构造数据 public class Main { public static void main(String[] args) throws IOException { File file = new File("/home/gt/testdata.dat"); file.delete(); file.createNewFile(); OutputStream out = new FileOutputStream(file); OutputStreamWr
public class Main {
public static void main(String[] args) throws IOException {
File file = new File("/home/gt/testdata.dat");
file.delete();
file.createNewFile();
OutputStream out = new FileOutputStream(file);
OutputStreamWriter osw=new OutputStreamWriter(out);
BufferedWriter writer = new BufferedWriter(osw);
for(int i=0;i<9999999;i++){
writer.write("aaabbbcccdddeee");
writer.newLine();
}
writer.close();
osw.close();
out.close();
}
}
pipe相关代码:
#!/usr/bin/python#coding=utf-8
def fff(line):
s = set()
l = list()
length = len(line)
for i in range(0,length-1):
if line[i] not in s:
l.append(line[i])
s.add(line[i])
return "".join(l)
result = ""
#var = 1
#while var == 1 :
for i in range(1,1111111):
s = raw_input()
if s is None or s =="" :
break
result += fff(s) + "\n"
printpackage test
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object PipeTest
def main(args: Array[String]) {
val t0 = System.currentTimeMillis();
val sparkConf = new SparkConf().setAppName("pipe Test")
val sc = new SparkContext(sparkConf)
val a = sc.textFile("/home/gt/testdata.dat", 9)
val result = a.pipe(" /home/gt/spark/bin/pipe.py").saveAsTextFile("/home/gt/output.dat")
sc.stop()
println("!!!!!!!!!"
pyspark相关代码
#-*- coding: utf-8 -*-from __future__ import print_function
import sys
import time
from pyspark import SparkContext
#去掉重复的字母
if __name__ == "__main__":
t0 = time.time()
sc = SparkContext(appName="app2ap")
lines = sc.textFile("/home/gt/testdata.dat", 9)
def fff(line):
s = set()
l = list()
length = len(line)
for i in range(0,length-1):
if line[i] not in s:
l.append(line[i])
s.add(line[i])
return "".join(l)
rdd = lines.map(fff)
rdd.saveAsTextFile("/home/gt/output.dat")
sc.stop()
print("!!!!!!")
print(time.time()-t0)
附加原生的程序:
package testimport java.util.ArrayList
import java.util.HashSet
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Test
def fff(line: String): String = {
val s = new HashSet[Char]()
val l = new ArrayList[Char]()
val length = line.length()
for (i <- 0 to length - 1) {
val c = line.charAt(i)
if (!s.contains(c)) {
l.add(c)
s.add(c)
}
}
return l.toArray().mkString
}
def main(args: Array[String]) {
val t0 = System.currentTimeMillis();
val sparkConf = new SparkConf().setAppName("pipe Test")
val sc = new SparkContext(sparkConf)
val a = sc.textFile("/home/gt/testdata.dat", 9)
val result = a.map(fff).saveAsTextFile("/home/gt/output.dat")
sc.stop()
println("!!!!!!!!!"
结论是Spark Scala是25s,pipe是50s,pyspark是75s