Azure 事件中樞

Azure 事件中樞 是超大規模遙測擷取服務,可收集、轉換及儲存數百萬個事件。 這個分散式串流平台提供低延遲和可設定的保留期,讓您能夠將大量遙測資料輸入雲端,並使用發佈/訂閱語意從多個應用程式讀取資料。

本文說明如何使用結構化串流搭配Azure 事件中樞和 Azure Databricks 叢集。

注意

Azure 事件中樞提供與 Apache Kafka 相容的端點,您可以搭配 使用Databricks Runtime 中提供的結構化串流 Kafka 連接器 ,可處理來自Azure 事件中樞的訊息。 Databricks 建議使用結構化串流 Kafka 連接器來處理來自Azure 事件中樞的訊息。

需求

如需目前的版本支援,請參閱 Azure 事件中樞 Spark 連線or 專案 讀我檔案中的「最新版本」

  1. 使用 Maven 座標 com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17 在 Azure Databricks 工作區中建立程式庫

    注意

    此連接器會定期更新,而且可能會有較新版本可用:建議您從 Maven 存放庫提取最新的連接器

  2. 將已建立的程式庫 安裝到您的叢集中。

結構描述

記錄的架構為:

資料行 類型
body binary
partition string
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

body一律以位元組陣列的形式提供 。 使用 cast("string") 明確還原序列化資料 body 行。

快速入門

讓我們從快速範例開始:WordCount。 下列筆記本是使用結構化串流搭配 Azure 事件中樞 執行 WordCount 所需的所有筆記本。

使用結構化串流筆記本Azure 事件中樞 WordCount

取得筆記本

組態

本節討論使用事件中樞所需的組態設定。

如需使用 Azure 事件中樞 設定結構化串流的詳細指引,請參閱 Microsoft 開發的結構化串流和Azure 事件中樞整合指南

如需使用結構化串流的詳細指引,請參閱 Azure Databricks 上的串流。

Connection string

需要事件中樞連接字串才能連線到事件中樞服務。 您可以從 Azure 入口網站 或使用 ConnectionStringBuilder 程式庫中的 ,取得事件中樞實例 的連接字串

Azure 入口網站

當您從Azure 入口網站取得連接字串時,它可能或可能沒有 EntityPath 金鑰。 考量:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

若要連線到您的 EventHubs, EntityPath 必須存在 。 如果您的連接字串沒有,別擔心。 這會負責:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

連線ionStringBuilder

或者,您可以使用 ConnectionStringBuilder 來建立連接字串。

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

與事件中樞相關的所有組態都會在您的 中 EventHubsConf 發生。 若要建立 EventHubsConf ,您必須傳遞連接字串:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

如需取得有效連接字串的詳細資訊,請參閱 連線ion String

如需組態的完整清單,請參閱 EventHubsConf 。 以下是 可供您開始使用的組態 子集:

選項 預設 查詢類型 描述
consumerGroup String 「$Default」 串流和批次 取用者群組是整個事件中樞的檢視。 取用者群組能讓多個取用應用程式擁有自己的事件串流檢視,以及按照自己的步調及運用自己的位移自行讀取串流。 如需詳細資訊, 請參閱 Microsoft 檔
startingPosition EventPosition 資料流程開頭 串流和批次 結構化串流作業的起始位置。 如需選項讀取順序的相關資訊,請參閱 startingPositions
maxEventsPerTrigger long partitionCount

* 1000
串流查詢 每個觸發程式間隔所處理事件數目上限的速率限制。 指定的事件總數將會依比例分割到不同磁片區的分割區。

針對每個選項,中 EventHubsConf 都有對應的設定。 例如:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf 可讓使用者使用 EventPosition 類別指定開始(和結束)位置。 EventPosition 定義事件中樞分割區中事件的位置。 位置可以是排入佇列的時間、位移、序號、資料流程開頭或資料流程結尾。

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

如果您想要在特定位置啟動 (或結束),只要建立正確的 EventPosition ,並在中 EventHubsConf 設定它:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

具有Azure 事件中樞的生產結構化串流

當您在生產環境中執行串流查詢時,您可能想要比將筆記本附加至叢集並以互動方式執行串流查詢時更強固和執行時間保證。 匯入並執行下列筆記本,以示範如何使用 Azure 事件中樞 和 Azure Databricks 在生產環境中設定和執行結構化串流。

如需詳細資訊,請參閱 結構化串流 的生產考慮。

使用 Azure 事件中樞 筆記本進行生產結構化串流

取得筆記本