この記事では、Event Hubs にデータをストリーミングし、Azure Stream Analytics で処理する方法について説明します。 次の手順について説明します。
- Event Hubs 名前空間を作成します。
- イベント ハブにメッセージを送信する Kafka クライアントを作成します。
- イベント ハブから Azure BLOB ストレージにデータをコピーする Stream Analytics ジョブを作成します。
イベント ハブによって公開される Kafka エンドポイントを使用するときに、プロトコル クライアントを変更したり、独自のクラスターを実行したりする必要はありません。 Azure Event Hubs では、 Apache Kafka バージョン 1.0 以降がサポートされています。
[前提条件]
このクイック スタートを完了するには、次の前提条件を満たしていることを確認します。
- Azure サブスクリプション。 アカウントをお持ちでない場合は、開始する前に 無料アカウント を作成してください。
- Java Development Kit (JDK) 1.7 以降。
- Maven バイナリ アーカイブをダウンロードしてインストールします。
- Git
- Azure Storage アカウント。 お持ちでない場合は、先に進む前に 作成 してください。 このチュートリアルの Stream Analytics ジョブは、出力データを Azure BLOB ストレージに格納します。
Event Hubs 名前空間を作成する
Event Hubs 名前空間を作成すると、その名前空間の Kafka エンドポイントが自動的に有効になります。 Kafka プロトコルを使用するアプリケーションからイベント ハブにイベントをストリーミングできます。 Azure portal を使用したイベント ハブの作成に関するページの手順に従って、Event Hubs 名前空間を作成します。 専用クラスターを使用している場合は、「専用クラスター に名前空間とイベント ハブを作成する」を参照してください。
注
Kafka 用の Event Hubs は、 Basic レベルではサポートされていません。
Event Hubs で Kafka を使用してメッセージを送信する
Kafka 用 Azure Event Hubs リポジトリをマシンに複製します。
フォルダーに移動します:
azure-event-hubs-for-kafka/quickstart/java/producer。src/main/resources/producer.configでプロデューサーの構成の詳細を更新します。 イベント ハブ名前空間の名前と接続文字列を指定します。bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/に移動し、任意TestDataReporter.javaエディターでファイルを開きます。次のコード行をコメント アウトします。
//final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);コメントされたコードの代わりに、次のコード行を追加します。
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");このコードは、 JSON 形式でイベント データを送信します。 Stream Analytics ジョブの入力を構成するときは、入力データの形式として JSON を指定します。
プロデューサーを実行 し、Event Hubs にストリーム配信します。 Windows コンピューターで、 Node.js コマンド プロンプトを使用する場合は、これらのコマンドを実行する前に
azure-event-hubs-for-kafka/quickstart/java/producerフォルダーに切り替えます。mvn clean package mvn exec:java -Dexec.mainClass="TestProducer"
イベント ハブがデータを受信することを確認する
エンティティのEvent Hubsを選択します。 test という名前のイベント ハブが表示されることを確認します。
イベント ハブに入ってくるメッセージが表示されることを確認します。
Stream Analytics ジョブを使用してイベント データを処理する
このセクションでは、Azure Stream Analytics ジョブを作成します。 Kafka クライアントは、イベント ハブにイベントを送信します。 イベント データを入力として受け取り、Azure BLOB ストレージに出力する Stream Analytics ジョブを作成します。 Azure Storage アカウントをお持ちでない場合は、作成します。
Stream Analytics ジョブのクエリは、分析を実行せずにデータを通過します。 入力データを変換して、別の形式または得られた分析情報を使用して出力データを生成するクエリを作成できます。
Stream Analytics のジョブの作成
- Azure portal で [+ リソースの作成] を選択します。
- Azure Marketplace メニューで [分析] を選択し、[Stream Analytics ジョブ] を選択します。
- [ 新しい Stream Analytics ] ページで、次の操作を行います。
ジョブの 名前 を入力します。
サブスクリプションを選択 します。
リソース グループの [新規作成] を選択し、名前を入力します。 既存のリソース グループを使用することもできます。
ジョブの 場所 を選択します。
[ 作成] を選択してジョブを作成します。
ジョブ入力の構成
通知メッセージで、[ リソースに移動 ] を選択して Stream Analytics ジョブ ページを 表示します。
左側のメニューの [ジョブ トポロジ] セクションで [入力] を選択します。
[ ストリーム入力の追加] を選択し、[ イベント ハブ] を選択します。
イベント ハブの入力構成ページで、次の操作を行います。
入力の エイリアス を指定します。
Azure サブスクリプションを選択します。
先ほど作成した イベント ハブ名前空間 を選択します。
イベント ハブのテストを選択します。
保存 を選択します。
ジョブ出力の構成
- メニュー の [ ジョブ トポロジ ] セクションで [出力] を選択します。
- ツール バーの [+ 追加] を選択し、[BLOB ストレージ] を選択します
- Blob Storage の出力設定ページで、次の操作を行います。
出力の エイリアス を指定します。
Azure サブスクリプションを選択 します。
Azure Storage アカウントを選択します。
Stream Analytics クエリからの出力データを格納する コンテナーの名前 を入力します。
保存 を選択します。
クエリを定義する
受信データ ストリームを読み取る Stream Analytics ジョブをセットアップしたら、次の手順は、リアルタイムでデータを分析する変換を作成することです。 変換クエリは 、Stream Analytics クエリ言語を使用して定義します。 このチュートリアルでは、変換を実行せずにデータを渡すクエリを定義します。
クエリ を選択します。
クエリ ウィンドウで、
[YourOutputAlias]を前に作成した出力エイリアスに置き換えます。[YourInputAlias]は、前に作成した入力エイリアスに置き換えます。ツール バーの [保存] を 選択します。
Stream Analytics ジョブを実行する
左側のメニューで [ 概要 ] を選択します。
[スタート] を選択します。
[ ジョブの開始 ] ページで、[開始] を選択 します。
ジョブの状態が 開始 から 実行に変わるまで待ちます。
シナリオをテストする
Kafka プロデューサーをもう一度実行して、イベント ハブにイベントを送信します。
mvn exec:java -Dexec.mainClass="TestProducer"Azure BLOB ストレージに出力データが生成されていることを確認します。 次のサンプル行のような 100 行の JSON ファイルがコンテナーに表示されます。
{"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}Azure Stream Analytics ジョブは、イベント ハブから入力データを受信し、このシナリオでは Azure BLOB ストレージに格納しました。
次のステップ
この記事では、プロトコル クライアントを変更したり、独自のクラスターを実行したりせずに Event Hubs にストリーム配信する方法について説明しました。 Apache Kafka 用 Event Hubs の詳細については、 Azure Event Hubs の Apache Kafka 開発者ガイドを参照してください。