共用方式為


Azure 事件中樞

Azure 事件中樞 是超大規模遙測擷取服務,可收集、轉換及儲存數百萬個事件。 這個分散式串流平台提供低延遲和可設定的保留期,讓您能夠將大量遙測資料輸入雲端,並使用發佈/訂閱語意從多個應用程式讀取資料。

本文說明如何使用結構化串流搭配 Azure 事件中樞 和 Azure Databricks 叢集。

注意

Azure 事件中樞 提供與 Apache Kafka 相容的端點,您可以搭配 使用Databricks Runtime 中提供的結構化串流 Kafka 連接器,可處理來自 Azure 事件中樞 的訊息。 Databricks 建議使用結構化串流 Kafka 連接器來處理來自 Azure 事件中樞 的訊息。

需求

如需目前的版本支援,請參閱spark連線or專案自述檔 Azure 事件中樞中的「最新版本」

  1. 使用 Maven 座標com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17在 Azure Databricks 工作區中建立連結庫

    注意

    此連接器會定期更新,而且可能會有較新版本可用:建議您從 Maven 存放庫提取最新的連接器

  2. 將已建立的連結庫 安裝到您的叢集中。

結構描述

記錄的架構為:

資料行 類型
body binary
partition 字串
offset 字串
sequenceNumber long
enqueuedTime timestamp
publisher 字串
partitionKey 字串
properties map[string,json]

body以位元組陣列的形式提供 。 使用 cast("string") 明確還原串行化數據 body 行。

組態

本節討論使用事件中樞所需的組態設定。

如需使用 Azure 事件中樞 設定結構化串流的詳細指引,請參閱 Microsoft 開發的結構化串流和 Azure 事件中樞 整合指南

如需使用結構化串流的詳細指引,請參閱 Azure Databricks 上的串流。

連接字串

需要事件中樞 連接字串 才能連線到事件中樞服務。 您可以從 Azure 入口網站 或使用 ConnectionStringBuilder 連結庫中的 ,取得事件中樞實例的 連接字串

Azure 入口網站

當您從 Azure 入口網站 取得 連接字串 時,它可能或可能沒有EntityPath密鑰。 考量:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

若要連線到您的 EventHubs, EntityPath 必須存在 。 如果您的 連接字串 沒有,別擔心。 這會負責:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

連線 ionStringBuilder

或者,您可以使用 ConnectionStringBuilder 來建立 連接字串。

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

與事件中樞相關的所有組態都會在您的 中 EventHubsConf發生。 若要建立 EventHubsConf,您必須傳遞 連接字串:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

如需取得有效 連接字串 的詳細資訊,請參閱 連線 字串

如需組態的完整清單,請參閱 EventHubsConf。 以下是 可供您開始使用的組態 子集:

選項 預設 查詢類型 描述
consumerGroup String “$Default” 串流和批次 取用者群組是整個事件中樞的檢視。 取用者群組能讓多個取用應用程式擁有自己的事件串流檢視,以及按照自己的步調及運用自己的位移自行讀取串流。 如需詳細資訊, 請參閱 Microsoft 檔
startingPosition EventPosition 數據流開頭 串流和批次 結構化串流作業的起始位置。 如需選項讀取順序的相關信息,請參閱 startingPositions
maxEventsPerTrigger long partitionCount

* 1000
串流查詢 每個觸發程式間隔所處理事件數目上限的速率限制。 指定的事件總數將會依比例分割到不同磁碟區的分割區。

針對每個選項,中 EventHubsConf都有對應的設定。 例如:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf 可讓使用者使用 EventPosition 類別指定開始(和結束)位置。 EventPosition 定義事件中樞分割區中事件的位置。 位置可以是加入佇列的時間、位移、序號、數據流開頭或數據流結尾。

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

如果您要在特定位置啟動 (或結束),只要建立正確的 EventPosition ,並在中 EventHubsConf設定它:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)