当前位置 : 主页 > 网络编程 > PHP >

Scala实践Spark(三) 数据读取与保存

来源:互联网 收集:自由互联 发布时间:2023-09-06
文章目录 ​​读取​​ ​​数据源​​ ​​数据格式​​ ​​保存​​ ​​JSON​​ ​​csv​​ ​​SequenceFile​​ ​​对象文件​​ ​​非文件系统数据源​​ ​​protocol buffer​


文章目录

  • ​​读取​​
  • ​​数据源​​
  • ​​数据格式​​
  • ​​保存​​
  • ​​JSON​​
  • ​​csv​​
  • ​​SequenceFile​​
  • ​​对象文件​​
  • ​​非文件系统数据源​​
  • ​​protocol buffer​​
  • ​​文件压缩​​
  • ​​文件系统​​
  • ​​Spark SQL​​
  • ​​Apache Hive​​
  • ​​数据库​​

读取

数据源

  • 本地或分布式文件系统(NFS、HDFS等)
  • Spark中的结构化数据源
  • Cassandra、HBase、Elasticsearch、JDBC源

数据格式

文本文件、JSON、CSV、SequenceFiles、Protocol Buffers、对象文件

文本文件

val input = sc.textFile("file:///home/spark/README.md")

这里"///"是指文件根目录
如果要读取一个目录有两种方式:

  • textFile,传递一个目录作为参数,它会把各部分都读取到RDD中
  • wholeTextFiles 返回一个pairRDD,键是输入文件的文件名。用于有必要知道数据的各部分来自哪个文件

通过​​wholeTextFile​​读取并求每个文件的平均值

val input = sc.wholeTextFiles("file:///home/spark/saleFiles")
val result = input.mapValues{y =>
val nums = y.split(" ").map(x=>x.toDouble)
nums.sum / nums.size.toDouble
}

此外,Spark支持通配字符如​​part-*.txt​​来用于大规模数据集

保存

result.saveAsTextFile(outputFile)

JSON

三种方式:一是文本读取,二是JSON序列化,三是使用自定义的Hadoop格式来操作JSON数据。

import org.apache.spark._
import scala.util.parsing.json.JSON
object JSONApp {
def main(args:Array[String]): Unit ={
val conf = new SparkConf().setMaster("local").setAppName("JSONApp");
val sc = new SparkContext(conf);
val inputFile = "file:///usr/local/spark/examples/src/main/resources/people.json"
val jsonStr = sc.textFile(inputFile);
val result = jsonStr.map(s => JSON.parseFull(s));
result.foreach(
{
r => r match {
case Some(map:Map[String,Any]) => println(map)
case None => println("parsing failed!")
case other => println("unknown data structure" + other)
}
}
);
}
}

使用第三方Jackson

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
...
case class Person(name: String, lovesPandas: Boolean) //Must be a top-level class
...
// Parse it into a specific case class. We use flatMap to handle errors
// by returning an empty list (None) if we encounter an issue and a
// list with one element if everything is ok (Some(_)).
val result = input.flatMap(record => {
  try {
    Some(mapper.readValue(record, classOf[Person]))
  } catch {
    case e: Exception => None
  }
})

对于大规模数据集而言,格式错误是常见的现象,如果选择跳过格式不正确的数据,应该尝试通过累加器跟踪错误的个数。

保存JSON

result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
.saveAsTextFile(outputFile)

csv

如果所有数据没有包含换行符,也可以使用textFile()读取并解析数据

import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader

val input = sc.textFile(inputFile)
val result = input.map{ line=>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
}

如果字段中嵌有换行符,就需要完整读取每个文件,然后解析各段。

case class Person(name:String,favoriteAnimal:String)
val input = sc.wholeTextFile(inputFile)
val result = input.flatMap{ case (_,txt) =>
val reader = new CSVReader(new StringReader(txt));
reader.readAll().map(x => Person(x(0),x(1)))
}

保存CSV,使用StringWriter或StringIO将结果放到RDD中

pandaLovers.map(person=>List(person.name,person.favoriteAnimal).toArray).mapPartitions{people =>
val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter);
csvWriter.writeAll(people.tolist)
Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)

SequenceFile

是由没有相对关系结构的键值对文件组成的常用Hadoop格式。SequenceFile是由实现Hadoop的Writable接口元素组成。标准的经验法是尝试在类名的后面加上Writable这个词,检查它是否是​​org.apache.hadoop.io.Writable​​​,如果找不到对应的writable类型,可以通过override​​org.apache.hadoop.io.Writable​​​的readfields和write类来实现自己的Writable类。
在SparkContext可以调用​​​sequenceFile(path,keyClass,valueClass,minPartitions)​​,keyClass和valueClass参数都必须使用正确的Writable类.

val data = sc.sequenceFile(inFile,classOf[Text],classOf[IntWritable]).map{case (x,y) => (x.toString,y.get())}

在Scala中保存SequenceFile

val data = sc.parallelize(List(("Panda",3),("Kay",6),("Snail",2)))
data.saveAsSequenceFile(outputFile)

对象文件

对象文件就像对SequenceFile的简单封装,它允许存储只包含值的RDD,对象文件是使用Java序列化写出来的。对象文件的输出和Hadoop的输出不一样,对象文件通常用于Spark作业间的通信。​​objeceFile()​​​读取对象文件返回RDD,​​savaAsObject​​​保存对象文件。
在Scala使用老式API读取KeyValueTextInputFormat

val input = sc.hadoopFile[Text,Text,KeyValueTextInputFormat](inputFile).map{
case (x,y) => (x.toString,y.toString)
}

我们也可以使用自定义Hadoop输入格式读取JSON数据。下面展示如何在Spark使用新式Hadoop API。

val input = sc.newAPIHadoopFile(inputFile,classOF[LzoJsonInputFormat],classOf[LongWritable],classOf[MapWritable],conf)

非文件系统数据源

hadoopFile/saveAsHadoopFile、hadoopDataset/saveAsHadoopDataset、newAPIHadoopDataset/saveAsNewAPIHadoopDataset来访问Hadoop所支持的费文件系统的存储格式,像Hbase和MongoDB这样的键值对存储都提供了用来直接读取Hadoop输入格式的接口

protocol buffer

PB是结构化数据,要求字段和类型都确定,是经过优化的编解码速度快,而且占用空间很小。
PB由可选字段、必须字段、重复字段三种字段组成。
下面是一个如何从简单的PB格式中读取许多VenueResponse对象。VenueResponse是一个只包含重复字段的简单格式,这个字段包含一条带有必须字段、可选字段以及枚举类型字段的PB消息.

PB的定义例子

message Venue{
required int32 id=1;
required string name=2;
required VenueType typr=3;
optional string address=4;
enum VenueType{
COFFEESHOP=0;
WORKPLACE=1;
CLUB=2;
OMNOMNOM=3;
OTHER=4;
}
}
message VenueResponse{
repeated Venue results=1;
}

使用Elephant Bird写出protocol buffer

val job = new Job()
val conf = job.getConfiguration
LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue],conf);
val dnaLounge = Places.Venue.newBuilder()
dnaLounge.setId(1);
dnaLounge.setName("DNA Lounge")
dnaLounge.setType(Places.Venue.VenueType.CLUB)
val data = sc.parallelize(List(dnaLounge.build()))
val outputData = data.map{pb=>
val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
protoWritable.set(pb)
(null,protoWritable)
}
outputData.saveAsNewAPIHadoopFile(outputFile,classOf[Text],classOf[ProtobufWritable[Places.Venue]],classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]],conf)

文件压缩

可用的压缩选项有:gzip、lzo、bzip2、zlib、Snappy
读取压缩的输入,推荐newAPIHadoopFile或者hadoopFile

文件系统

本地/常规文件系统
Spark支持从本地系统读取文件,不过他要求文件在集群所有节点的相同路径下都可以找到。
推荐先将文件放到hdfs、NFS、S3等共享文件系统上
以下是一个从本地读取压缩的文本文件

val rdd = sc.textFile("file:///home/spark/happypandas.gz")

在HDFS读取文件

val rdd = sc.textFile("hdfs://master:port/path")

Spark SQL

我们把一条SQL查询给Spark SQL,让它对一个数据源执行查询,然后得到由Row对象组成的RDD,每个Row对象表示一条记录。每个Row都有一个get()方法,会返回一个一般类型让我们进行类型转换,针对常见类型的专用get()方法(getFloat()、getInt()、getLong()、getString()、getShort()、getBoolean()).

Apache Hive

  • 1.提供Hive的配置文件,将hive-sit.xml文件复制到Spark的./conf/目录
  • 2.创建HiveContext对象
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name,age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0))

如果有记录间结构一致的JSON数据,Spark SQL可以自动腿短结构信息,并将数据读取为记录。要读取JSON数据,首先创建HiveContext,HiveContext.jsonFile方法来从整个文件获取由Row对象组成的RDD。

下列是一个JSON的示例

{"users":{"name":"Holden","location":"San Francisco"},"text":"Nice Dat out today"}
{"user":{"name":"Matei","location":"Berkeley"},"text":"Even nicer here :)"}

在Scala中使用Spark SQL读取JSON数据

val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name,text FROM tweets")

数据库

Spark可以从任何支持Java数据库连接(JDBC)的关系型数据库中读取数据,包括MySQL、Postgres、Cassandra、HBase、Elasticsearch,访问这些数据,需要构建一个​​org.apache.spark.rdd.JdbcRDD​

def createConnection()={
Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r:ResultSet) = {
(r.getInt(1),r.getString(2))
}
val data = new JdbcRDD(sc,createConnection,"SELECT * FROM panda WHERE ? <= id and id <= ?",lowerbound=1,upperBound=3,numPartition=2,mapRow=extractValues)
println(data.collect().toList)

JdbcRDD接受这样几个参数:

  • 提供一个用于对数据库创建连接的函数,这个函数让每个节点在连接必要的配置后创建自己读取数据的连接
  • 提供一个可以读取一定范围数据的查询,以及查询参数中lowerBound和upperBound的值,可以让Spark在不同机器上查询不同范围的数据。
  • 最后一个参数是将输出结果从java.sql.Result转为对操作数据有用的格式的函数,我们会得到(Int,String)对。

HBase:由于​​org.apache.hadoop.hbase.mapreduce.TableInputFormat​​​类的实现,Spark可以通过Hadoop输入格式访问HBase。返回键值对数据。
从HBase读取数据

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result //值的类型
import org.apache.hadoop.hbase.io.ImmutableBytesWritable //键的类型
import org.apache.hadoop.hbase.mapreduce.TableInputFormat //优化HBase的读取的设置项
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE,"tablename")
val rdd = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])


【文章出处:香港多ip站群服务器 http://www.558idc.com/hkzq.html提供,感恩】
上一篇:ThinkPhp5.1的安装与配置
下一篇:没有了
网友评论