为生产工作负载配置自动加载程序

Databricks 建议在生产环境中运行自动加载程序时遵循流式处理最佳做法

Databricks 建议在增量实时表中使用自动加载程序来引入增量数据。 增量实时表扩展了 Apache Spark 结构化流式处理中的功能,使你只需编写几行声明性 Python 或 SQL 即可部署生产质量的数据管道:

  • 自动缩放计算基础结构以节省成本
  • 使用预期进行数据质量检查
  • 自动架构演变处理
  • 通过事件日志中的指标进行监视

监视自动加载程序

查询自动加载程序发现的文件

注意

cloud_files_state 函数在 Databricks Runtime 11.3 LTS 和更高版本中可用。

自动加载程序提供用于检查流状态的 SQL API。 使用 cloud_files_state 函数,可以找到有关自动加载程序流发现的文件的元数据。 只需从 cloud_files_state 查询,提供与自动加载程序流关联的检查点位置。

SELECT * FROM cloud_files_state('path/to/checkpoint');

侦听流更新

若要进一步监视自动加载程序流,Databricks 建议使用 Apache Spark 的流式处理查询侦听器接口

自动加载程序在每批次中向流式处理查询侦听器报告指标。 你可以在流式处理查询进度仪表板的“原始数据”选项卡下的 numFilesOutstandingnumBytesOutstanding 指标中查看积压工作 (backlog) 中存在多少个文件以及积压工作 (backlog) 量有多大:

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

在 Databricks Runtime 10.4 LTS 及更高版本中,对于 AWS 和 Azure,使用文件通知模式时,指标还将包括云队列中文件事件的近似数量(表示为 approximateQueueSize)。

成本注意事项

运行自动加载程序时,主要成本来源是计算资源和文件发现的成本。

为了降低计算成本,Databricks 建议使用 Databricks 作业将自动加载程序计划为使用 Trigger.AvailableNow 的批处理作业,而不是连续运行(前提是你不要求低延迟)。 请参阅配置结构化流式处理触发器间隔

文件发现成本的形式可以是目录列表模式下存储帐户的 LIST 操作、订阅服务上的 API 请求,以及文件通知模式下的队列服务。 为了降低文件发现成本,Databricks 的建议是:

  • 在目录列表模式下连续运行自动加载程序时提供 ProcessingTime 触发器
  • 以词汇顺序构建文件到存储帐户的上传,以尽可能利用增量列表(已弃用)
  • 在无法增量列出时利用文件通知
  • 使用资源标记来标记自动加载程序创建的资源,以跟踪你的成本

使用 Trigger.AvailableNow 和速率限制

注意

在 Databricks Runtime 10.4 LTS 及更高版本中可用。

可以使用 Trigger.AvailableNow 将自动加载程序计划为在 Databricks 作业中作为批处理作业运行。 AvailableNow 触发器将指示自动加载程序处理在查询开始时间之前到达的所有文件。 在流开始后上传的新文件将被忽略,直到下一次触发。

使用 Trigger.AvailableNow,文件发现将与数据处理异步进行,并且可以通过速率限制跨多个微批处理数据。 默认情况下,自动加载程序每个微批处理最多处理 1000 个文件。 可以通过配置 cloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTrigger 来配置应在微批处理中处理的文件数或字节数。 文件限制是硬限制,但字节限制是软限制,这意味着可以处理的字节数多于 maxBytesPerTrigger 提供的字节数。 当这两个选项同时提供时,自动加载程序将处理达到其中一个限制所需的文件数量。

事件保留

自动加载程序使用 RocksDB 跟踪检查点位置中的已发现文件,以提供恰好一次的引入保证。 Databricks 强烈建议对所有大容量或长期引入流使用 cloudFiles.maxFileAge 选项。 此选项将使检查点位置的事件过期,从而缩短自动加载程序的启动时间。 启动时间可能会增加到每次自动加载程序运行的分钟数,如果源目录中存储的文件的最大期限具有上限,这会增加不必要的成本。 可为 cloudFiles.maxFileAge 设置的最小值为 "14 days"。 RocksDB 中的删除项显示为逻辑删除条目,因此,在存储使用率稳定下来前,可以预见存储使用率会随着事件过期而临时增加。

警告

cloudFiles.maxFileAge 作为针对高容量数据集的成本控制机制提供。 过于激进地调整 cloudFiles.maxFileAge 可能会导致数据质量问题,例如重复引入或缺少文件。 因此,Databricks 建议为 cloudFiles.maxFileAge 使用保守设置,例如 90 天,这与类似数据引入解决方案建议的值相当。

尝试优化 cloudFiles.maxFileAge 选项可能会导致自动加载程序忽略未处理的文件,或导致已处理的文件过期并重新处理,从而导致出现重复数据。 下面是选择 cloudFiles.maxFileAge 时需要注意的几个事项:

  • 如果流在很长时间后重启,将忽略从队列中拉取的超过 cloudFiles.maxFileAge 的文件通知事件。 同样,如果使用目录列表,将忽略在停机期间可能出现的超过 cloudFiles.maxFileAge 的文件。
  • 如果使用目录列表模式并使用 cloudFiles.maxFileAge(例如将其设置为 "1 month"),然后停止流并在将 cloudFiles.maxFileAge 设置为 "2 months" 的情况下重启流,那么,系统将重新处理超过 1 个月但未到 2 个月的文件。

如果你在首次启动流时设置此选项,则不会引入超过 cloudFiles.maxFileAge 的数据。因此,如果你需要引入旧数据,则不应在首次启动流时设置此选项, 而是应在后续运行中设置此选项。

使用 cloudFiles.backfillInterval 触发常规回填

自动加载程序可以按给定间隔触发异步回填,例如每天回填一次,或一周回填一次。 文件事件通知系统不能保证上传的所有文件 100% 传递,并且不对文件事件的延迟提供严格的 SLA。 Databricks 建议你使用自动加载程序触发定期回填,方法是使用 cloudFiles.backfillInterval 选项来保证在给定的 SLA 中发现所有文件(如果需要满足数据完整性的要求)。 触发定期回填不会导致重复。