共用方式為


訂閱Google Pub/Sub

Azure Databricks 提供內建連接器來訂閱 Databricks Runtime 13.3 LTS 和更新版本中的 Google Pub/Sub。 此連接器提供訂閱者記錄的一次性處理語意。

注意

Pub/Sub 可能會發佈重複的記錄,而記錄可能會依序抵達訂閱者。 您應該撰寫 Azure Databricks 程式代碼來處理重複和順序錯亂的記錄。

語法範例

下列程式代碼範例示範如何設定從 Pub/Sub 讀取的結構化串流的基本語法:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

如需更多組態選項,請參閱 設定發佈/子串流讀取的選項。

設定對 Pub/Sub 的存取

Databricks 建議在提供授權選項時使用秘密。 授權連線需要下列選項:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

下表描述所設定認證所需的角色:

角色 必要或選用 其使用方式
roles/pubsub.viewerroles/viewer 必要 檢查訂用帳戶是否存在並取得訂用帳戶
roles/pubsub.subscriber 必要 從訂用帳戶擷取數據
roles/pubsub.editorroles/editor 選擇性 如果訂用帳戶不存在,也允許在串流終止時使用 deleteSubscriptionOnStreamStop 來刪除訂用帳戶,則啟用建立訂閱

發行/子架構

數據流的架構符合從 Pub/Sub 擷取的記錄,如下表所述:

欄位 類型
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

設定發佈/子串流讀取的選項

下表描述 Pub/Sub 支援的選項。 所有選項都會設定為使用 .option("<optionName>", "<optionValue>") 語法進行結構化串流讀取的一部分。

注意

某些 Pub/Sub 組態選項會使用擷取的概念而不是微批次。 這會反映內部實作詳細數據,而且選項的運作方式類似於其他結構化串流連接器中的主編,不同之處在於會擷取和處理記錄。

選項 預設值 說明
numFetchPartitions 將 設定為數據流初始化時存在的執行程式數目的一半。 從訂用帳戶擷取記錄的平行 Spark 工作數目。
deleteSubscriptionOnStreamStop false 如果 true為 ,當串流作業結束時,會刪除傳遞至數據流的訂用帳戶。
maxBytesPerTrigger none 每個觸發的微批次期間,要處理的批次大小軟性限制。
maxRecordsPerFetch 1000 處理記錄之前要擷取每個工作的記錄數目。
maxFetchPeriod 10 秒 處理記錄之前,每個工作要擷取的時間持續時間。 Databricks 建議使用預設值。

Pub/Sub 的累加批處理語意

您可以使用 Trigger.AvailableNow 從 Pub/Sub 來源取用增量批次的可用記錄。

Azure Databricks 會在您使用 設定開始讀取 Trigger.AvailableNow 時記錄時間戳。 批次處理的記錄包含所有先前擷取的數據,以及時間戳小於記錄數據流開始時間戳的任何新發行記錄。

請參閱 設定累加批處理

監視串流計量

結構化串流進度計量會報告擷取和準備處理的記錄數目、擷取和準備處理的記錄大小,以及串流開始後所看到重複專案的數目。 以下是這些計量的範例:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

限制

Pub/Sub 不支持推測性執行 (spark.speculation)。