Azure 事件中樞
Azure 事件中樞是大規模的遙測擷取服務,能夠收集、轉換及儲存數百萬個事件。 這個分散式串流平台提供低延遲和可設定的保留期,讓您能夠將大量遙測資料輸入雲端,並使用發佈/訂閱語意從多個應用程式讀取資料。
本文說明了如何使用結構化串流搭配 Azure 事件中樞和 Azure Databricks 叢集。
注意
Azure 事件中樞提供與 Apache Kafka 相容的端點,您可搭配使用 Databricks Runtime 中提供的結構化串流 Kafka 連接器,以處理來自 Azure 事件中樞的訊息。 Databricks 建議使用結構化串流 Kafka 連接器來處理來自 Azure 事件中樞的訊息。
需求
如需目前的版本支援,請參閱 Azure 事件中樞 Spark 連接器專案讀我檔案中的「最新版本」。
使用 Maven 座標
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
在 Azure Databricks 工作區中建立程式庫。注意
此連接器會定期更新,而且可能會有較新版本可用:建議您從 Maven 存放庫提取最新的連接器
將已建立的程式庫安裝到您的叢集中。
結構描述
記錄的結構描述為:
資料行 | 類型 |
---|---|
body |
binary |
partition |
字串 |
offset |
字串 |
sequenceNumber |
long |
enqueuedTime |
timestamp |
publisher |
字串 |
partitionKey |
字串 |
properties |
map[string,json] |
始終以位元組陣列的形式提供 body
。 使用 cast("string")
明確還原序列化 body
資料行。
組態
本節討論了使用事件中樞所需的組態設定。
如需使用 Azure 事件中樞設定結構化串流的詳細指引,請參閱 Microsoft 所開發的結構化串流和 Azure 事件中樞整合指南。
如需使用結構化串流的詳細指引,請參閱 Azure Databricks 上的串流。
Connection string
需要事件中樞連接字串才能連線到事件中樞服務。 您可從 Azure 入口網站或使用程式庫中的 ConnectionStringBuilder
,取得事件中樞執行個體的連接字串。
Azure 入口網站
當您從 Azure 入口網站取得連接字串時,它可能有也可能沒有 EntityPath
索引鍵。 考量:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
若要連線到您的 EventHubs,EntityPath
必須存在。 如果您的連接字串沒有,無需擔心。
透過以下方式可以解決問題:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
或者,您可使用 ConnectionStringBuilder
來建立連接字串。
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
與事件中樞相關的所有組態都會在您的 EventHubsConf
中發生。 若要建立 EventHubsConf
,您必須傳遞連接字串:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
如需獲取有效連接字串的詳細資訊,請參閱連接字串。
如需組態的完整清單,請參閱 EventHubsConf。 以下是可供您開始使用的組態子集:
選項 | 值 | 預設 | 查詢類型 | 描述 |
---|---|---|---|---|
consumerGroup |
String | "$Default" | 串流和批次 | 取用者群組可讓您檢視整個事件中樞。 取用者群組能讓多個取用應用程式擁有自己的事件串流檢視,以及按照自己的步調及運用自己的位移自行讀取串流。 如需詳細資訊,請參閱 Microsoft 文件。 |
startingPosition |
EventPosition | 開始串流 | 串流和批次 | 結構化串流作業的起始位置。 如需選項讀取順序的相關資訊,請參閱 startingPositions 。 |
maxEventsPerTrigger |
long | partitionCount 1000- |
串流查詢 | 每個觸發程序間隔所處理事件數目上限的速率限制。 指定的總計事件數目將會依比例分割到不同磁碟區的分割區。 |
針對每個選項,EventHubsConf
中都有對應的設定。 例如:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
可讓使用者使用 EventPosition
類別來指定開始 (和結束) 位置。 EventPosition
定義了事件中樞分割區中事件的位置。 位置可是加入佇列的時間、位移、序號、串流開頭或串流結尾。
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
如果您要在特定位置啟動 (或結束),只要建立正確的 EventPosition
,並在 EventHubsConf
中設定:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)