共用方式為


Azure 事件中樞

Azure 事件中樞是大規模的遙測擷取服務,能夠收集、轉換及儲存數百萬個事件。 這個分散式串流平台提供低延遲和可設定的保留期,讓您能夠將大量遙測資料輸入雲端,並使用發佈/訂閱語意從多個應用程式讀取資料。

本文說明了如何使用結構化串流搭配 Azure 事件中樞和 Azure Databricks 叢集。

注意

Azure 事件中樞提供與 Apache Kafka 相容的端點,您可搭配使用 Databricks Runtime 中提供的結構化串流 Kafka 連接器,以處理來自 Azure 事件中樞的訊息。 Databricks 建議使用結構化串流 Kafka 連接器來處理來自 Azure 事件中樞的訊息。

需求

如需目前的版本支援,請參閱 Azure 事件中樞 Spark 連接器專案讀我檔案中的「最新版本」。

  1. 使用 Maven 座標 com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17 在 Azure Databricks 工作區中建立程式庫

    注意

    此連接器會定期更新,而且可能會有較新版本可用:建議您從 Maven 存放庫提取最新的連接器

  2. 將已建立的程式庫安裝到您的叢集中。

結構描述

記錄的結構描述為:

資料行 類型
body binary
partition 字串
offset 字串
sequenceNumber long
enqueuedTime timestamp
publisher 字串
partitionKey 字串
properties map[string,json]

始終以位元組陣列的形式提供 body。 使用 cast("string") 明確還原序列化 body 資料行。

組態

本節討論了使用事件中樞所需的組態設定。

如需使用 Azure 事件中樞設定結構化串流的詳細指引,請參閱 Microsoft 所開發的結構化串流和 Azure 事件中樞整合指南

如需使用結構化串流的詳細指引,請參閱 Azure Databricks 上的串流

Connection string

需要事件中樞連接字串才能連線到事件中樞服務。 您可從 Azure 入口網站或使用程式庫中的 ConnectionStringBuilder,取得事件中樞執行個體的連接字串。

Azure 入口網站

當您從 Azure 入口網站取得連接字串時,它可能有也可能沒有 EntityPath 索引鍵。 考量:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

若要連線到您的 EventHubs,EntityPath 必須存在。 如果您的連接字串沒有,無需擔心。 透過以下方式可以解決問題:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

或者,您可使用 ConnectionStringBuilder 來建立連接字串。

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

與事件中樞相關的所有組態都會在您的 EventHubsConf 中發生。 若要建立 EventHubsConf,您必須傳遞連接字串:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

如需獲取有效連接字串的詳細資訊,請參閱連接字串

如需組態的完整清單,請參閱 EventHubsConf。 以下是可供您開始使用的組態子集

選項 預設 查詢類型 描述
consumerGroup String "$Default" 串流和批次 取用者群組可讓您檢視整個事件中樞。 取用者群組能讓多個取用應用程式擁有自己的事件串流檢視,以及按照自己的步調及運用自己的位移自行讀取串流。 如需詳細資訊,請參閱 Microsoft 文件
startingPosition EventPosition 開始串流 串流和批次 結構化串流作業的起始位置。 如需選項讀取順序的相關資訊,請參閱 startingPositions
maxEventsPerTrigger long partitionCount

1000-
串流查詢 每個觸發程序間隔所處理事件數目上限的速率限制。 指定的總計事件數目將會依比例分割到不同磁碟區的分割區。

針對每個選項,EventHubsConf 中都有對應的設定。 例如:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf 可讓使用者使用 EventPosition 類別來指定開始 (和結束) 位置。 EventPosition 定義了事件中樞分割區中事件的位置。 位置可是加入佇列的時間、位移、序號、串流開頭或串流結尾。

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

如果您要在特定位置啟動 (或結束),只要建立正確的 EventPosition,並在 EventHubsConf 中設定:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)