Spark 基础知识

支撑大小调整、优化和故障排除的核心概念。 如果你不熟悉 Fabric 中的 Spark,请先阅读此内容。

常规注意事项

场景:你对 Spark 不熟悉。 注意事项是什么
用例 最佳做法
使用优化的序列化格式 建议:优先选择 Avro、Parquet 或者 ORC(Optimized Row Columnar)这样的格式,因为它们嵌入架构、具有更高的压缩率,并且优化存储和处理效率。 在 Fabric 平台中,使用 Delta 格式,以确保原子性、一致性、隔离性、持久性(ACID)以及性能优势。
请谨慎使用 XML/JSON 不要依赖大型 JavaScript 对象表示法(JSON)或可扩展标记语言(XML)文件的架构推理,因为 Spark 读取整个数据集来推断架构,这会减慢处理速度,消耗大量内存。

在读取 JSON/XML 时,提供静态主架构,或使用 .option("samplingRatio", 0.1) 加快读取速度,但请注意,如果示例不代表完整数据集,读取可能会失败。 一种更安全的方法是从具有代表性的样本中推断出架构,并将该架构保留用于所有读取操作。

避免分析大型 XML 文件。 由于标记处理和类型强制转换,XML 分析本身运行速度会变慢。
优化联接和筛选 请在联接之前应用列修剪和行级筛选,以减少数据混洗处理和内存使用量。

使用 DataFrame API 时,Catalyst 优化器会自动处理谓词下推。 避免弹性分布式数据集 (RDD) API,因为它们绕过催化剂优化。
首选DataFrame而不是RDD 要做到:在大多数操作中使用数据帧而不是 RDD。 数据帧使用 Catalyst 优化器和 Tungsten 执行引擎来高效执行。
启用自适应查询执行(AQE) 启用 AQE 以动态优化洗牌分区,并自动处理数据倾斜问题。

执行程序内存管理

方案:你想要了解性能优化的执行程序内存管理。

即使执行程序配置了 56 GB 内存,Spark 也不允许将其全部直接用于用户数据。 Spark Core 划分和管理执行程序内存:

  • 保留内存: 为系统和 Spark 内部开销保留的固定部分(例如 Java 虚拟机(JVM),内部部分。

  • 用户内存: 存储用户定义的函数(UDF)、局部变量、数据结构(列表、地图、字典)和计算期间创建的对象。

  • 存储内存: 用于保存缓存/持久化数据、广播变量以及可缓存的 shuffle 数据。

  • 执行内存: 用于中间计算(随机计算、联接、排序、聚合)。

  • 动态内存共享: 存储和执行内存之间的边界是可移动的。 Spark 可以将内存从一个区域借给另一个区域,从而允许灵活的内存使用。

  • 泄漏: 当存储或执行内存需求超出借款后可用的内存时发生。 这会强制数据到磁盘,这可能会影响性能。

    Spark 内存管理和溢出的关系图。

内存不足(OOM)错误

场景:Spark 作业因内存不足(OOM)错误而失败。

驱动程序 OOM:

当 Spark 驱动程序超出其分配的内存时,会发生驱动程序 OOM 错误。

常见原因:驱动程序密集型操作,例如collect()countByKey()或将大量数据拉入驱动程序内存中的大型toPandas()调用。

缓解:尽可能避免驱动程序密集型操作。 如果不可避免,请增加驱动程序大小和基准以查找最佳配置。

执行程序内存不足(OOM):

当 Spark 执行程序超出其分配的内存时,会发生执行程序 OOM 错误。

常见原因:大型数据集上的内存和计算密集型转换(例如,宽联接、聚合、随机排列)或缓存/持久化数据集,这些数据集超出了执行程序的可用内存(执行 + 存储区域)。

缓解:如有必要,增加执行程序内存,优化 Spark 内存分数(spark.memory.fraction,spark.memory.storageFraction),并选择性地保留。 确保缓存的数据适合可用内存。

数据倾斜

偏差的症状:

  • 在 Spark UI 中,某些任务的执行时间比其他任务更长(阶段任务显示长尾效应)。
  • 阶段指标中位数和最大任务时间之间的较大差距。
  • 对于几个分区,具有大型随机读取或写入大小的阶段。

常见原因:

  • 联接/组键(热键)的数据分布不均衡。
  • 数据卷的分区不正确或分区太少。
  • 生成大型记录或多个 null/空键的上游数据异常。

缓解:

  • 重新分区或合并以提高分区并行度和平衡大小。
  • 使用键加盐或自定义分区,以跨分区分散热点键。
  • 使用 AQE(自适应查询执行)合并洗牌后分区并启用偏斜连接优化。
  • 对小型查找表使用广播连接以完全避免数据混洗。
  • 在成本高昂的阶段之前保留均衡的中间数据集,然后重新运行作业。

UDF 最佳做法

场景:需要应用无法通过内置 DataFrame 函数表示的自定义逻辑。

尽可能使用 Spark 数据帧 API。 Catalyst 优化器优化内置函数,并在 JVM 上本机运行它们,从而提供最佳性能。

如果必须使用 UDF(用户定义的函数),请避免常规 PySpark Python UDF。 相反,请考虑以下替代方法:

  • Pandas UDF(也称为矢量化 UDF):使用 Apache Arrow 在 JVM 和 Python 之间高效传输数据。 Pandas UDF 允许矢量化作,与逐行 Python UDF 相比,性能显著提高。

  • Scala/Java UDF:直接在 JVM 上运行,避免 Python 序列化开销。 Scala/Java UDF 通常优于 Python UDF。

请谨慎使用 Python UDF。 每个执行程序都会启动单独的 Python 进程,需要在 JVM 和 Python 之间对数据进行序列化和反序列化。 这会产生性能瓶颈,尤其是在规模上。 

错误日志记录

方案:在 Fabric Spark 中记录错误日志的最佳实践
  1. 使用 log4j 而不是 print() 给司机带来沉重负担。 使用 log4j时,可以访问驱动程序日志中的日志并搜索它们(例如,使用记录器名称,例如:PySparkLogger)。

    Spark 日志图表。

  2. 在 try 和 except 块中包装读取、写入和转换。 使用 logger.error 处理异常,使用 logger.info 发送进度消息。

    • Python 日志记录: 非常适合记录仅在 Spark 驱动程序上执行的代码中的作、状态更新或调试信息。 Python 的日志记录模块不会传播到执行程序日志。 请参阅 开发、执行和管理笔记本文档

    • Spark log4j: 在 Spark 中,log4j 是可靠的生产级别应用程序日志记录标准,因为它与 Spark 的驱动程序/执行程序日志天然集成。

    PySpark 中的 log4j 用法示例:

    import traceback
    # Get log4j logger
    log4jLogger = spark._jvm.org.apache.log4j
    logger = log4jLogger.LogManager.getLogger("PySparkLogger")
    logger.info("Application started.")
    try:
        # Create DataFrame with 20 records
        data = [(f"Name{i}", i) for i in range(1, 21)]  # 20 records
        df = spark.createDataFrame(data, ["name", "age"])
        logger.info("DataFrame created successfully with 20 records.")
        df.show(s)  # 's' is not defined -> will throw error but the application will not fail
    except Exception as e:
        logger.error(f"Error while creating or showing DataFrame: {str(e)}\n{traceback.format_exc()}")
    
  3. 错误监控中心化:

    • 在环境中使用诊断发射器扩展(使用 Azure Log Analytics 监视 Apache Spark 应用程序),并将其附加到运行 Spark 应用程序的 Notebook。 发出器可以将事件日志、自定义日志(如 log4j)和指标发送到 Azure Log Analytics/Azure 存储/Azure 事件中心。 将 log4j 名称传递给属性: spark.synapse.diagnostic.emitter.\<destination\>.filter.loggerName.match.

    • 此外,为了调试,还可以将失败的行/记录收集到 Lakehouse (LH) 表,以捕获记录级别的错误数据。