通过


订阅 Google Pub/Sub

Azure Databricks 在 Databricks Runtime 13.3 LTS 及以上版本中提供了一个内置连接器,可用于订阅 Google Pub/Sub。 此连接器为来自订阅者的记录提供精确一次的处理语义。

注意

Pub/Sub 可能会发布重复的记录,记录到达订阅者手中的顺序也可能会打乱。 应编写 Azure Databricks 代码来处理重复记录和无序记录。

语法示例

下面的代码示例演示了用于配置从 Pub/Sub 读取的结构化流的基本语法:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

有关更多配置选项,请参阅“配置 Pub/Sub 流式处理读取”选项。

配置对 Pub/Sub 的访问权限

下表描述了配置凭据所需的角色:

角色 必需或可选 使用方式
roles/pubsub.viewerroles/viewer 必填 检查订阅是否存在并获取订阅
roles/pubsub.subscriber 必填 从订阅中提取数据
roles/pubsub.editorroles/editor 可选 如果订阅不存在,则启用创建订阅,同时允许在流终止时使用 deleteSubscriptionOnStreamStop 删除订阅

Databricks 建议在提供授权选项时使用机密。 授权连接需要以下选项:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Pub/Sub 架构

流的架构与从 Pub/Sub 提取的记录匹配,如下表所述:

字段 类型
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

配置 Pub/Sub 流式读取选项

下表描述了 Pub/Sub 支持的其他选项。 所有选项都在使用 .option("<optionName>", "<optionValue>") 语法进行结构化流式传输读取的过程中配置。

注意

某些 Pub/Sub 配置选项使用提取概念,而不是微批处理。 这反映了内部实现的详细信息,并且选项的工作方式类似于其他结构化流连接器中的推论,只是记录是被提取然后处理的。

选项 默认值 说明
numFetchPartitions 设置为流初始化时存在的执行程序数量的一半。 从订阅中提取记录的并行 Spark 任务数。
deleteSubscriptionOnStreamStop false 如果true,流式处理作业结束时传递到流的订阅将被删除。
maxBytesPerTrigger 每个触发的微批处理期间待处理批量的软限制。
maxRecordsPerFetch 1000 在处理记录之前,每个任务要获取的记录数。
maxFetchPeriod 10 秒 每个任务在处理记录之前的获取时间长度。 Databricks 建议使用默认值。

Pub/Sub 的增量批处理语义

可以使用 Trigger.AvailableNow 以增量批处理方式消费来自 Pub/Sub 源的可用记录。

Azure Databricks 在 Trigger.AvailableNow 设置中记录你开始读取的时间戳。 批次处理的记录包括之前获取的所有数据,以及时间戳小于记录流开始时间戳的任何新发布记录。

请参阅 AvailableNow:增量批处理

监视流式处理指标

结构化流式处理进度指标报告了已提取且准备处理的记录数、记录大小,以及自流开始以来观察到的重复记录数。 以下是此类指标的示例:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

限制

Pub/Sub 不支持推测执行 (spark.speculation)。