チュートリアル:Stream Analytics を使用して Event Hubs イベントの Apache Kafka を処理する

この記事では、データを Event Hubs にストリーム配信し、Azure Stream Analytics で処理する方法について説明します。 次の手順について説明します。

  1. Event Hubs 名前空間を作成します。
  2. イベント ハブにメッセージを送信する Kafka クライアントを作成する。
  3. イベント ハブから AzureBlob ストレージにデータをコピーする Stream Analytics ジョブを作成する。

イベント ハブで公開されている Kafka エンドポイントを使用する場合は、プロトコル クライアントを変更したり、独自のクラスターを実行したりする必要はありません。 Azure Event Hubs では、Apache Kafka バージョン 1.0 以降がサポートされています。

前提条件

このクイック スタートを完了するには、次の前提条件を満たしている必要があります。

Event Hubs 名前空間を作成します

Event Hubs 名前空間を作成すると、名前空間の Kafka エンドポイントが自動的に有効になります。 Kafka プロトコルが使用されているアプリケーションからイベント ハブにイベントをストリーミングできます。 Event Hubs 名前空間を作成するには、Azure portal を使用したイベント ハブの作成に関するページの手順に従います。 専用クラスターを使用している場合は、専用クラスターでの名前空間とイベント ハブの作成に関する記事を参照してください。

Note

Kafka の Event Hubs は、Basic レベルではサポートされていません。

Event Hubs で Kafka を使用してメッセージを送信する

  1. Kafka 用の Azure Event Hubs リポジトリをお使いのマシンに複製します。

  2. azure-event-hubs-for-kafka/quickstart/java/producer フォルダーに移動します。

  3. src/main/resources/producer.config でプロデューサーの構成の詳細を更新します。 イベント ハブの名前空間名前接続文字列を指定します。

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/ に移動し、TestDataReporter.java ファイルを任意のエディターで開きます。

  5. 次のコード行をコメント アウトします。

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. コメント アウトしたコードの代わりに、次のコード行を追加します。

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");            
    

    このコードはイベント データを JSON 形式で送信します。 Stream Analytics ジョブの入力を構成するときは、入力データの形式として JSON を指定します。

  7. プロデューサーを実行し、Event Hubs にストリーム配信します。 Windows マシンでは、Node.js コマンド プロンプトを使用するときに、これらのコマンドを実行する前に azure-event-hubs-for-kafka/quickstart/java/producer フォルダーに切り替えます。

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

イベント ハブがデータを受信したことを確認する

  1. [エンティティ][イベント ハブ] を選択します。 test というイベント ハブが表示されていることを確認します。

    イベント ハブ - test

  2. イベント ハブに送信されたメッセージが表示されることを確認します。

    イベント ハブ - メッセージ

Stream Analytics ジョブを使用してイベント データを処理する

このセクションでは、Azure Stream Analytics ジョブを作成します。 Kafka クライアントはイベントをイベント ハブに送信します。 イベント データを入力として受け取り、Azure Blob ストレージに出力する Stream Analytics ジョブを作成します。 Azure ストレージ アカウントを持っていない場合は、作成します

Stream Analytics ジョブのクエリは、分析を実行せずにデータをパス スルーします。 入力データを変換して、異なる形式で、得られた分析情報を使用して出力データを生成するクエリを作成できます。

Stream Analytics のジョブの作成

  1. Azure portal で、 [+ リソースの作成] を選択します。
  2. [Azure Marketplace] メニューで [Analytics] を選択し、 [Stream Analytics ジョブ] を選択します。
  3. [新しい Stream Analytics] ページで、次の手順を実行します。
    1. ジョブの名前を入力します。

    2. サブスクリプションを選択します。

    3. リソース グループ[新規作成] を選択し、名前を入力します。 既存のリソース グループを使うこともできます。

    4. ジョブの場所を選択します。

    5. [作成] を選択してジョブを作成します。

      新しい Stream Analytics ジョブ

ジョブの入力を構成する

  1. 通知メッセージで、 [リソースに移動] を選択すると、 [Stream Analytics ジョブ] ページが表示されます。

  2. 左側のメニューの [ジョブ トポロジ] セクションで [入力] を選択します。

  3. [ストリーム入力の追加] を選択して [イベント ハブ] を選択します。

    入力としてイベント ハブを追加する

  4. [イベント ハブ] の入力の構成ページで、次の手順を実行します。

    1. 入力にエイリアスを指定します。

    2. Azure サブスクリプションを選択します。

    3. 以前作成したイベント ハブの名前空間を選択します。

    4. イベント ハブtest を選択します。

    5. [保存] を選択します。

      イベント ハブの入力の構成

ジョブの出力を構成する

  1. メニューの [ジョブ トポロジ] セクションで [出力] を選択します。
  2. ツールバーの [+ 追加] を選択し、 [BLOB ストレージ] を選択します。
  3. BLOB のストレージ出力設定ページで、次の手順を実行します。
    1. 出力にエイリアスを指定します。

    2. Azure サブスクリプションを選択します。

    3. [Azure ストレージ アカウント] を選択します。

    4. Stream Analytics クエリの出力データを格納するコンテナーの名前を入力します。

    5. [保存] を選択します。

      BLOB ストレージの出力の構成

クエリを定義する

着信データ ストリームを読み取るように Stream Analytics ジョブを設定したら、次の手順として、データをリアルタイムで分析する変換を作成します。 変換クエリの定義には、Stream Analytics クエリ言語を使用します。 このチュートリアルでは、変換を実行せずにデータをパス スルーするクエリを定義します。

  1. [クエリ] を選択します。

  2. クエリ ウィンドウで、[YourOutputAlias] を前に作成した出力エイリアスで置き換えます。

  3. [YourInputAlias] を前に作成した入力エイリアスに置き換えます。

  4. ツールバーの [保存] を選択します。

    入力変数と出力変数の値が指定されたクエリ ウィンドウのキャプチャ画面。

Stream Analytics ジョブの実行

  1. 左側のメニューで [概要] を選択します。

  2. [スタート] を選択します。

    [スタート] メニュー

  3. [ジョブの開始] ウィンドウで [開始] を選択します。

    [ジョブの開始] ページ

  4. ジョブの状態が [開始中] から [実行中] に変わるまで待ちます。

    ジョブの状態 - 実行中

シナリオをテストする

  1. Kafka プロデューサーを再度実行してイベントをイベントのハブに送信します。

    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  2. 出力データAzure Blob ストレージに生成されていることを確認します。 次のサンプル行のような 100 行のコンテナーに JSON ファイルが表示されます。

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    このシナリオで、Azure Stream Analytics ジョブは、イベント ハブから入力データを受け取り、Azure Blob ストレージに格納しました。

次のステップ

この記事では、プロトコル クライアントを変更したり独自のクラスターを実行したりせずに Event Hubs にストリーム配信する方法を紹介しました。 Apache Kafka での Event Hubs の詳細については、「Azure Event Hubs のための Apache Kafka 開発者ガイド」を参照してください。