当前位置 : 主页 > 网络安全 > 测试自动化 >

性能 – 如何在Spark Streaming应用程序中异步写入行以加快批处理执行?

来源:互联网 收集:自由互联 发布时间:2021-06-22
我有一个spark作业,我需要在每个微批处理中编写SQL查询的输出.写入是一项昂贵的操作,并且导致批处理执行时间超过批处理间隔. 我正在寻找提高写入性能的方法. 正在异步执行单独线程
我有一个spark作业,我需要在每个微批处理中编写SQL查询的输出.写入是一项昂贵的操作,并且导致批处理执行时间超过批处理间隔.

我正在寻找提高写入性能的方法.

>正在异步执行单独线程中的写操作,如下面显示的一个好选项?
>这会导致任何副作用,因为Spark本身以分布式方式执行吗?
>还有其他/更好的方法可以加快写入速度吗?

// Create a fixed thread pool to execute asynchronous tasks
val executorService = Executors.newFixedThreadPool(2)
dstream.foreachRDD { rdd =>
  import org.apache.spark.sql._
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
  import spark.implicits._
  import spark.sql

  val records = rdd.toDF("record")
  records.createOrReplaceTempView("records")
  val result = spark.sql("select * from records")

  // Submit a asynchronous task to write
  executorService.submit {
    new Runnable {
      override def run(): Unit = {
        result.write.parquet(output)
      }
    }
  }
}
1 – 在一个单独的线程中异步执行写操作,如下面显示的一个好选项?

没有.理解这个问题的关键是问’谁在写’.写入由在集群中的执行程序上为作业分配的资源完成.将write命令放在异步线程池上就像将新的办公室管理器添加到具有固定员工的办公室.鉴于他们必须共享相同的员工,两位经理是否能够完成比单独工作更多的工作?嗯,一个合理的答案是“只有第一位经理没有给他们足够的工作,所以有一些免费的能力”.

回到我们的集群,我们正在处理一个对IO很重的写操作.并行化写入作业将导致争用IO资源,从而使每个独立作业更长.最初,我们的工作可能看起来比“单一经理版本”更好,但麻烦最终会打击我们.
我制作了一个图表,试图说明它是如何工作的.请注意,并行作业将花费更长的时间与它们在时间轴中并发的时间量成比例.

一旦我们达到工作开始延迟的那一点,我们就会有一个不稳定的工作,最终会失败.

2-这会导致任何副作用,因为Spark本身是以分布式方式执行的吗?

我能想到的一些效果:

>可能更高的集群负载和IO争用.
>作业在Threadpool队列上排队,而不是在Spark Streaming Queue上排队.我们失去了通过Spark UI和监控API监控我们的工作的能力,因为延迟是“隐藏的”,从Spark Streaming的角度来看一切都很好.

3-还有其他/更好的方法可以加快写入速度吗?
(订购从便宜到昂贵)

>如果要附加到镶木地板文件,请经常创建新文件.追加随着时间的推移变得昂贵.>增加批处理间隔或使用Window操作编写更大的Parquet块. Parquet喜欢大文件>调整数据的分区和分配=>确保Spark可以并行执行写操作>增加群集资源,必要时添加更多节点>使用更快的存储空间

网友评论