使用 Spark 结构化流式处理将数据流式传输到湖屋

结构化流是建立在 Spark 上的可缩放且容错的流处理引擎。 Spark 负责在数据持续到达时逐步和连续地运行流处理操作。

结构化流式处理在 Spark 2.2 中可用。 自那时起,它一直是数据流处理的推荐做法。 结构化流背后的基本原则是将实时数据流视为始终连续追加新数据的表,就像表中的新行一样。 有一些预先定义的内置流式处理文件源(例如 CSV、JSON、ORC、Parquet)以及对 Kafka 和事件中心等消息服务的内置支持。

本文提供了有关如何在高吞吐量的生产环境中通过 Spark 结构化流式处理优化事件的处理和引入的见解。 建议的方法包括:

  • 数据流吞吐量优化
  • 优化增量表中的写入操作
  • 事件批处理

Spark 作业定义和 Spark 笔记本

Spark 笔记本是验证想法和进行试验,以从数据或代码中获取见解的绝佳工具。 笔记本还广泛用于数据准备、可视化、机器学习和其他大数据方案。 Spark 作业定义是在 Spark 群集上长时间运行的、面向代码的非交互式任务。 Spark 作业定义提供可靠性和可用性。

Spark 笔记本是测试代码逻辑并满足所有业务要求的绝佳来源。 但是,若要使其在生产方案中保持运行,已启用重试策略的 Spark 作业定义是最佳解决方案。

Spark 作业定义的重试策略

在 Microsoft Fabric 中,用户可以为 Spark 作业定义作业设置重试策略。 尽管作业中的脚本可能是无限的,但运行该脚本的基础结构可能会遇到需要停止作业的问题。 或者,由于底层基础设施的修补需求,职位可能会被取消。 重试策略允许用户设置规则,以便在作业因任何潜在问题而停止时自动重启作业。 参数指定作业应重启的频率、最多无限次重试以及设置两次重试之间的时间。 这样,用户就可以确保其 Spark 作业定义作业继续无限运行,直到用户决定停止它们。

流媒体来源

使用事件中心设置流式传输需要基本配置,其中包括事件中心命名空间名称、中心名称、共享访问密钥名称和消费者组。 使用者组是整个事件中心的视图。 它使多个消费应用程序都有各自独立的事件流视图,并按其步调和偏移量独立读取流。

分区是能够处理大量数据的重要组成部分。 单个处理器每秒处理事件的容量有限,而多个处理器在并行执行时可以做得更好。 分区允许并行处理大量事件。

如果使用过多的分区且引入率较低,则分区读取器会处理此数据的一小部分,从而导致处理不理想。 理想的分区数直接取决于所需的处理速率。 如果要扩展事件处理能力,可考虑添加更多分区。 分区没有特定的吞吐量限制。 但是命名空间中的聚合吞吐量受吞吐量单位数限制。 增加命名空间中吞吐量单位的数量时,可能需要添加额外分区来允许并发读取器实现其自身的最大吞吐量。

建议调查并测试吞吐量方案的最佳分区数。 但通常会看到使用 32 个或更多分区的高吞吐量方案。

建议使用 Azure 事件中心连接器 for Apache Spark (azure-event-hubs-spark) 将 Spark 应用程序连接到 Azure 事件中心。

Lakehouse 作为流式处理接收器

Delta Lake 是一个开源存储层,在数据湖存储解决方案的基础上提供 ACID(原子性、一致性、隔离性和持久性)事务。 Delta Lake 还支持可缩放的元数据处理、架构演变、时间旅行(数据版本控制)、开放格式和其他功能。

在 Fabric 数据工程中,Delta Lake 用于:

  • 使用 Spark SQL 轻松插入、更新和删除数据。
  • 压缩数据以最大程度地减少查询数据所花费的时间。
  • 查看执行操作之前和之后表的状态。
  • 检索对表执行的操作的历史记录。

Delta 被添加为 writeStream 中使用的可能输出接收器格式之一。 有关现有输出接收器的详细信息,请参阅 Spark 结构化流式处理编程指南

以下示例演示如何将数据流式传输到 Delta Lake。

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

关于在示例中剪切的代码:

  • format() 是定义数据的输出格式的指令。
  • outputMode() 定义以哪种方式写入流式处理中的新行,(即追加、覆盖)。
  • toTable() 将流式处理的数据保存到使用作为参数传递的值创建的 Delta 表中

优化 Delta 写入

数据分区是创建可靠的流式处理解决方案的关键部分:分区可改进数据组织方式,同时提高吞吐量。 进行 Delta 操作后,文件很容易被碎片化,从而导致出现过多的小文件。 由于在磁盘上写入文件的时间过长,文件过大也是一个问题。 数据分区的挑战是找到适当的平衡,以实现最佳文件大小。 Spark 支持在内存和磁盘上分区。 将数据保存到 Delta Lake 和从 Delta Lake 查询数据时,正确分区的数据可以提供最佳性能。

  • 对磁盘上的数据进行分区时,可以选择如何使用 partitionBy() 根据列对数据进行分区。 partitionBy() 是一个函数,用于根据写入磁盘时提供的一列或多列将大型语义模型分区成较小的文件。 分区是一种在使用大型语义模型时提高查询性能的方法。 避免选择生成过小或过大的分区的列。 基于一组具有良好基数的列定义分区,将数据拆分为最佳大小的文件。
  • 可以使用 repartition()coalesce() 转换对内存中的数据进行分区,在多个工作器节点上分布数据,并创建多个任务,这些任务可以使用弹性分布式数据集 (RDD) 的基础知识并行读取和处理数据。 它允许将语义模型划分为逻辑分区,可以在群集的不同节点上计算这些分区。
    • repartition() 用于增加或减少内存中的分区数。 重新分配通过网络将所有数据重新分布,并在所有分区之间实现均衡。
    • coalesce() 仅用于有效地减少分区数。 这是 repartition() 的优化版本,其中使用 coalesce() 减少了跨所有分区的数据移动。

在高吞吐量的方案中,将这两种分区方法组合在一起是一个很好的解决方案。 repartition() 在内存中创建特定数量的分区,而 partitionBy() 将每个内存分区和分区列的文件写入磁盘。 以下示例演示了在同一 Spark 作业中这两种分区策略的用法:数据首先在内存中拆分为 48 个分区(假设我们总共有 48 个 CPU 核心)然后在磁盘上基于有效负载中的两个现有列进行分区。

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

优化写入

优化对 Delta Lake 的写入的另一个选项是使用优化写入。 优化写入是一项可选功能,可改进将数据写入 Delta 表的方式。 Spark 在写入数据之前会合并或拆分分区,从而最大程度地提高写入磁盘的数据吞吐量。 但是,它会导致完全洗牌,因此对于某些工作负载,它可能会导致性能下降。 使用 coalesce() 和/或 repartition() 对磁盘上的数据进行分区的作业可以重构为开始使用优化写入

以下代码是使用优化写入的示例。 请注意,仍然使用 partitionBy()

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

批处理事件

为了最大程度地减少操作数量,以改进将数据引入 Delta Lake 所花费的时间,批处理事件是一种可行的替代方法。

触发器定义应执行(触发)流式查询并发出新数据的频率。 设置触发器则定义了微批处理的周期性处理时间间隔,将数据和批处理事件积累到少数持久化操作中,而不是一直写入磁盘。

以下示例展示了一个流式查询,其中事件每隔一分钟定期处理。

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

在 Delta 表写入操作中组合事件批处理的优点是,它可以创建较大的 Delta 文件,其中包含更多数据,从而避免小型文件。 应分析要引入的数据量,并找到最佳处理时间,以优化 Delta 库创建的 Parquet 文件的大小。

监视

Spark 3.1 及更高版本具有内置的结构化流式处理 UI 包含以下流式处理指标:

  • 输入速率
  • 处理速率
  • 输入行
  • 批次时长
  • 操作持续时间