チュートリアル:Stream Analytics を使用して Event Hubs イベントの Apache Kafka を処理する
この記事では、データを Event Hubs にストリーム配信し、Azure Stream Analytics で処理する方法について説明します。 次の手順について説明します。
- Event Hubs 名前空間を作成します。
- イベント ハブにメッセージを送信する Kafka クライアントを作成する。
- イベント ハブから AzureBlob ストレージにデータをコピーする Stream Analytics ジョブを作成する。
イベント ハブで公開されている Kafka エンドポイントを使用する場合は、プロトコル クライアントを変更したり、独自のクラスターを実行したりする必要はありません。 Azure Event Hubs では、Apache Kafka バージョン 1.0 以降がサポートされています。
前提条件
このクイック スタートを完了するには、次の前提条件を満たしている必要があります。
- Azure サブスクリプション。 お持ちでない場合は、開始する前に無料アカウントを作成してください。
- Java Development Kit (JDK) 1.7 以降
- Maven バイナリ アーカイブのダウンロードとインストール
- Git
- Azure ストレージ アカウント。 持っていない場合は、次に進む前に作成します。 このチュートリアルの Stream Analytics ジョブでは、出力データを Azure Blob ストレージに格納します。
Event Hubs 名前空間を作成します
Event Hubs 名前空間を作成すると、名前空間の Kafka エンドポイントが自動的に有効になります。 Kafka プロトコルが使用されているアプリケーションからイベント ハブにイベントをストリーミングできます。 Event Hubs 名前空間を作成するには、Azure portal を使用したイベント ハブの作成に関するページの手順に従います。 専用クラスターを使用している場合は、専用クラスターでの名前空間とイベント ハブの作成に関する記事を参照してください。
Note
Kafka の Event Hubs は、Basic レベルではサポートされていません。
Event Hubs で Kafka を使用してメッセージを送信する
Kafka 用の Azure Event Hubs リポジトリをお使いのマシンに複製します。
azure-event-hubs-for-kafka/quickstart/java/producer
フォルダーに移動します。src/main/resources/producer.config
でプロデューサーの構成の詳細を更新します。 イベント ハブの名前空間に名前と接続文字列を指定します。bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/
に移動し、TestDataReporter.java ファイルを任意のエディターで開きます。次のコード行をコメント アウトします。
//final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
コメント アウトしたコードの代わりに、次のコード行を追加します。
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");
このコードはイベント データを JSON 形式で送信します。 Stream Analytics ジョブの入力を構成するときは、入力データの形式として JSON を指定します。
プロデューサーを実行し、Event Hubs にストリーム配信します。 Windows マシンでは、Node.js コマンド プロンプトを使用するときに、これらのコマンドを実行する前に
azure-event-hubs-for-kafka/quickstart/java/producer
フォルダーに切り替えます。mvn clean package mvn exec:java -Dexec.mainClass="TestProducer"
イベント ハブがデータを受信したことを確認する
[エンティティ] で [イベント ハブ] を選択します。 test というイベント ハブが表示されていることを確認します。
イベント ハブに送信されたメッセージが表示されることを確認します。
Stream Analytics ジョブを使用してイベント データを処理する
このセクションでは、Azure Stream Analytics ジョブを作成します。 Kafka クライアントはイベントをイベント ハブに送信します。 イベント データを入力として受け取り、Azure Blob ストレージに出力する Stream Analytics ジョブを作成します。 Azure ストレージ アカウントを持っていない場合は、作成します。
Stream Analytics ジョブのクエリは、分析を実行せずにデータをパス スルーします。 入力データを変換して、異なる形式で、得られた分析情報を使用して出力データを生成するクエリを作成できます。
Stream Analytics のジョブの作成
- Azure portal で、 [+ リソースの作成] を選択します。
- [Azure Marketplace] メニューで [Analytics] を選択し、 [Stream Analytics ジョブ] を選択します。
-
[新しい Stream Analytics] ページで、次の手順を実行します。
ジョブの名前を入力します。
サブスクリプションを選択します。
リソース グループの [新規作成] を選択し、名前を入力します。 既存のリソース グループを使うこともできます。
ジョブの場所を選択します。
[作成] を選択してジョブを作成します。
ジョブの入力を構成する
通知メッセージで、 [リソースに移動] を選択すると、 [Stream Analytics ジョブ] ページが表示されます。
左側のメニューの [ジョブ トポロジ] セクションで [入力] を選択します。
[ストリーム入力の追加] を選択して [イベント ハブ] を選択します。
[イベント ハブ] の入力の構成ページで、次の手順を実行します。
入力にエイリアスを指定します。
Azure サブスクリプションを選択します。
以前作成したイベント ハブの名前空間を選択します。
イベント ハブに test を選択します。
[保存] を選択します。
ジョブの出力を構成する
- メニューの [ジョブ トポロジ] セクションで [出力] を選択します。
- ツールバーの [+ 追加] を選択し、 [BLOB ストレージ] を選択します。
- BLOB のストレージ出力設定ページで、次の手順を実行します。
出力にエイリアスを指定します。
Azure サブスクリプションを選択します。
[Azure ストレージ アカウント] を選択します。
Stream Analytics クエリの出力データを格納するコンテナーの名前を入力します。
[保存] を選択します。
クエリを定義する
着信データ ストリームを読み取るように Stream Analytics ジョブを設定したら、次の手順として、データをリアルタイムで分析する変換を作成します。 変換クエリの定義には、Stream Analytics クエリ言語を使用します。 このチュートリアルでは、変換を実行せずにデータをパス スルーするクエリを定義します。
[クエリ] を選択します。
クエリ ウィンドウで、
[YourOutputAlias]
を前に作成した出力エイリアスで置き換えます。[YourInputAlias]
を前に作成した入力エイリアスに置き換えます。ツールバーの [保存] を選択します。
Stream Analytics ジョブの実行
左側のメニューで [概要] を選択します。
[スタート] を選択します。
[ジョブの開始] ウィンドウで [開始] を選択します。
ジョブの状態が [開始中] から [実行中] に変わるまで待ちます。
シナリオをテストする
Kafka プロデューサーを再度実行してイベントをイベントのハブに送信します。
mvn exec:java -Dexec.mainClass="TestProducer"
出力データ が Azure Blob ストレージに生成されていることを確認します。 次のサンプル行のような 100 行のコンテナーに JSON ファイルが表示されます。
{"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
このシナリオで、Azure Stream Analytics ジョブは、イベント ハブから入力データを受け取り、Azure Blob ストレージに格納しました。
次のステップ
この記事では、プロトコル クライアントを変更したり独自のクラスターを実行したりせずに Event Hubs にストリーム配信する方法を紹介しました。 Apache Kafka での Event Hubs の詳細については、「Azure Event Hubs のための Apache Kafka 開発者ガイド」を参照してください。