Azure Event Hubs
Azure Event Hubs は、数百万のイベントを収集、変換、保存する、ハイパースケールのテレメトリ インジェスト サービスです。 分散型のストリーミング プラットフォームとして、待機時間が短く、保持時間を設定可能なため、膨大な量のテレメトリをクラウドに送信し、パブリッシュ/サブスクライブ セマンティクスを使用して、複数のアプリケーションからデータを読み込むことができます。
この記事では、Azure Event Hubs と Azure Databricks クラスターの間で構造化ストリームを使用する方法を説明します。
Note
Azure Event Hubs は、Apache Kafka と互換性のあるエンドポイントを提供します。これを Databricks Runtime で使用できる構造化ストリーミング Kafka コネクタとともに使用して、Azure Event Hubs からのメッセージを処理することができます。 Databricks では、構造化ストリーミング Kafka コネクタを使用して、Azure Event Hubs からのメッセージを処理することをお勧めします。
要件
現在のリリースのサポートについては、Azure Event Hubs Spark コネクタ プロジェクトの readme ファイルの最新リリースの項を参照してください。
Maven 座標
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
を使用して、Azure Databricks ワークスペース内にライブラリを作成します。注意
このコネクタは定期的に更新されており、さらに新しいバージョンが利用できる可能性があります。Maven リポジトリから最新のコネクタをプルすることをお勧めします
スキーマ
レコードのスキーマは次のとおりです。
列 | Type |
---|---|
body |
binary |
partition |
string |
offset |
string |
sequenceNumber |
long |
enqueuedTime |
timestamp |
publisher |
string |
partitionKey |
string |
properties |
map[string,json] |
body
は常にバイト配列として提供されます。 body
列を明示的に逆シリアル化するには、cast("string")
を使用します。
構成
このセクションでは、Event Hubs を操作するために必要な構成設定について説明します。
Azure Event Hubs を使用した構造化ストリームの構成に関する詳細なガイダンスについては、Microsoft が開発した「構造化ストリーミングおよび Azure Event Hubs 統合ガイド」を参照してください。
構造化ストリームの使用に関する詳細なガイダンスについては、「Azure Databricks でのストリーミング」を参照してください。
接続文字列
Event Hubs サービスに接続するには、Event Hubs 接続文字列が必要です。 Event Hubs インスタンスの接続文字列は、Azure portal から、またはライブラリ内の ConnectionStringBuilder
を使用して取得できます。
Azure ポータル
Azure portal から接続文字列を取得するときに、この文字列が EntityPath
キーを持つ場合と持たない場合があります。 以下を検討してください。
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
EventHubs に接続するには、EntityPath
が存在する必要があります。 接続文字列にない場合、心配しないでください。
これにより対応されます。
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
または、ConnectionStringBuilder
を使用して、接続文字列を作成できます。
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
EventHubsConf
で、Event Hubs と関連するすべての構成が発生します。 EventHubsConf
を作成するには、次の接続文字列を渡す必要があります。
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
有効な接続文字列の取得の詳細については、「接続文字列」を参照してください。
構成の完全な一覧については、「EventHubsConf」を参照してください。 作業の開始に使用する構成のサブセットを次に示します。
オプション | 値 | Default | クエリの種類 | 説明 |
---|---|---|---|---|
consumerGroup |
String | “$Default” | ストリーミングとバッチ | コンシューマー グループは、イベント ハブ全体のビューです。 コンシューマー グループを使用することにより、複数のコンシューマー アプリケーションは、イベント ストリームの個別のビューをそれぞれ保有し、独自のペースで独自のオフセットによってストリームを別々に読み取ることができます。 詳細については、Microsoft のドキュメントに関するページを参照してください。 |
startingPosition |
EventPosition | ストリームの開始 | ストリーミングとバッチ | 構造化ストリーム ジョブの開始位置。 オプションの読み取り順序については、「startingPositions」を参照してください。 |
maxEventsPerTrigger |
long | partitionCount 1000- |
ストリーミング クエリ | トリガー間隔ごとに処理されるイベントの最大数に対するレート制限。 指定されたイベントの総数は、異なるボリュームのパーティション間で比例的に分割されます。 |
オプションごとに、EventHubsConf
に対応する設定が存在します。 次に例を示します。
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
では、ユーザーは EventPosition
クラスを使用して開始位置 (および終了位置) を指定できます。 EventPosition
は、イベント ハブ パーティション内のイベントの位置を定義します。 位置には、エンキューされた時間、オフセット、シーケンス番号、ストリームの始点、またはストリームの末尾を指定できます。
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
特定の位置から開始 (または終了) する場合は、正しい EventPosition
を作成し、EventHubsConf
に設定します。
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)