JavaScript を使用してイベント ハブとの間でイベントを送受信する

このクイックスタートでは、@azure/event-hubs npm パッケージを使用して、イベント ハブとの間でイベントを送受信する方法について説明します。

前提条件

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前にイベント ハブの概要を参照してください。

このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。

  • Microsoft Azure サブスクリプション。 Azure Event Hubs を含む Azure サービスを使用するには、サブスクリプションが必要です。 既存の Microsoft Azure アカウントをお持ちでない場合は、アカウントを作成する際に、無料試用版にサインアップするか、MSDN サブスクライバー特典を利用できます。
  • Node.js LTS。 最新の長期サポート (LTS) バージョンをダウンロードしてください。
  • Visual Studio Code (推奨) または他の任意の統合開発環境 (IDE)。
  • Event Hubs 名前空間とイベント ハブを作成する。 最初の手順では、Azure Portal を使用して Event Hubs 型の名前空間を作成し、アプリケーションがイベント ハブと通信するために必要な管理資格情報を取得します。 名前空間とイベント ハブを作成するには、こちらの記事の手順に従います。

イベントを送信するために npm パッケージをインストールする

Event Hubs 用の Node Package Manager (npm) パッケージをインストールするには、パスに npm を設定してコマンド プロンプトを開き、サンプルを格納するフォルダーにディレクトリを変更します。

これらのコマンドを実行します。

npm install @azure/event-hubs
npm install @azure/identity

Azure に対してアプリを認証する

このクイック スタートでは、Azure Event Hubs に接続する 2 つの方法である、パスワードレスと接続文字列について説明します。 1 番目のオプションとして、Azure Active Directory のセキュリティ プリンシパルとロールベースのアクセス制御 (RBAC) を使用して Event Hubs 名前空間に接続する方法について説明します。 コードや構成ファイル、または Azure Key Vault などのセキュリティで保護されたストレージに、ハードコーディングされた接続文字列を含める心配はありません。 2 番目のオプションとして、接続文字列を使用して Event Hubs 名前空間に接続する方法について説明します。 Azure を初めて使用する場合は、接続文字列オプションの方が理解しやすいかもしれません。 実際のアプリケーションと運用環境では、パスワードレス オプションを使用することをお勧めします。 詳細については、「認証と承認」を参照してください。 パスワードレス認証の詳細については、概要ページを参照してください。

Azure AD ユーザーにロールを割り当てる

ローカルでの開発時には、Azure Event Hubs に接続するユーザー アカウントに正しいアクセス許可があることを確認してください。 メッセージを送受信するには、Azure Event Hubs データ所有者ロールが必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールが必要です。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページを参照してください。

次の例では、ユーザー アカウントに Azure Event Hubs Data Owner ロールを割り当てます。これにより、Azure Event Hubs リソースにフル アクセスできます。 実際のシナリオでは、より安全な運用環境を実現するため、最小限の特権の原則に従って、必要な最小限のアクセス許可のみをユーザーに付与します。

Azure Event Hubs の Azure の組み込みロール

Azure Event Hubs の場合、Azure portal および Azure リソース管理 API による名前空間とそれに関連するすべてのリソースの管理は、Azure RBAC モデルを使用して既に保護されています。 Azure では、Event Hubs 名前空間へのアクセス権を付与するために、次の Azure 組み込みロールが提供されています。

  • Azure Event Hubs データ所有者: Event Hubs 名前空間とそのエンティティ (キュー、トピック、サブスクリプション、フィルター) へのデータ アクセスが可能です。
  • Azure Event Hubs データ送信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を送信者に付与します。
  • Azure Event Hubs データ受信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を受信者に付与します。

カスタム ロールを作成する場合は、Event Hubs 操作に必要な権限に関するページを参照してください。

重要

ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。

  1. Azure portal で、メインの検索バーまたは左側のナビゲーションを使用して Event Hubs 名前空間を見つけます。

  2. 概要ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。

    ロールを割り当てる方法を示すスクリーンショット。

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、Azure Event Hubs Data Owner を検索して一致する結果を選択します。 [次へ] を選びます。

  6. [アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。

  7. ダイアログで、自分の Azure AD ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。

  8. [レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。

送信イベント

このセクションでは、イベント ハブにイベントを送信する JavaScript アプリケーションを作成します。

  1. Visual Studio Code など、お好みのエディターを開きます。

  2. send.js というファイルを作成し、そこに次のコードを貼り付けます。

    このコード内の次のプレースホルダーは、実際の値に置き換えてください。

    • EVENT HUBS RESOURCE NAME
    • EVENT HUB NAME
    const { EventHubProducerClient } = require("@azure/event-hubs");
    const { DefaultAzureCredential } = require("@azure/identity");
    
    // Event hubs 
    const eventHubsResourceName = "EVENT HUBS RESOURCE NAME";
    const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`; 
    const eventHubName = "EVENT HUB NAME";
    
    // Azure Identity - passwordless authentication
    const credential = new DefaultAzureCredential();
    
    async function main() {
    
      // Create a producer client to send messages to the event hub.
      const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);
    
      // Prepare a batch of three events.
      const batch = await producer.createBatch();
      batch.tryAdd({ body: "passwordless First event" });
      batch.tryAdd({ body: "passwordless Second event" });
      batch.tryAdd({ body: "passwordless Third event" });    
    
      // Send the batch to the event hub.
      await producer.sendBatch(batch);
    
      // Close the producer client.
      await producer.close();
    
      console.log("A batch of three events have been sent to the event hub");
    }
    
    main().catch((err) => {
      console.log("Error occurred: ", err);
    });
    
  3. node send.js を実行して、このファイルを実行します。 このコマンドにより、3 つのイベントのバッチがイベント ハブに送信されます

  4. Azure portal で、イベント ハブがメッセージを受信したことを確認します。 ページを最新の情報に更新して、グラフを更新します。 メッセージが受信されたことが示されるまでに数秒かかることがあります。

    イベント ハブがメッセージを受信したことを確認する

    Note

    情報提供を目的とした補足コメントを含む完全なソース コードについては、GitHub の sendEvents.js ページにアクセスしてください。

お疲れさまでした。 これで、イベントをイベント ハブに送信できました。

受信イベント

このセクションでは、JavaScript アプリケーション内で Azure Blob Storage チェックポイント ストアを使用して、イベント ハブからイベントを受信します。 これにより、Azure Storage Blob 内で定期的に受信したメッセージのメタデータ チェックポイントが実行されます。 この手法によって、後で前回終了したところからメッセージを継続して受信しやすくなります。

警告

このコードを Azure Stack Hub で実行すると、特定の Storage API バージョンを対象としている場合を除き、実行時エラーが発生します。 これは、Event Hubs SDK では、Azure で利用できる最新の Azure Storage API が使用されますが、Azure Stack Hub プラットフォームではこれを利用できない可能性があるためです。 Azure Stack Hub でサポートされる Storage Blob SDK のバージョンは、Azure で一般的に利用できるバージョンと異なる場合があります。 チェックポイント ストアとして Azure Blob Storage を使用している場合は、Azure Stack Hub ビルドでサポートされている Azure Storage API バージョンを確認し、コード内でそのバージョンを対象にします。

たとえば、Azure Stack Hub バージョン 2005 上で実行している場合、Storage サービスで利用できる最も高いバージョンは 2019-02-02 となります。 既定では、Event Hubs SDK クライアント ライブラリでは、Azure で利用できる最も高いバージョン (SDK のリリース時点では 2019-07-07) が使用されます。 この場合は、このセクションの手順に従うことに加え、Storage サービス API バージョン 2019-02-02 を対象とするコードを追加する必要もあります。 特定の Storage API バージョンを対象にする方法の例については、GitHub にある JavaScript サンプルと TypeScript サンプルを参照してください。

Azure Storage アカウントと BLOB コンテナーを作成する

Azure Storage アカウントを作成して、そこに BLOB コンテナーを作成するには、次の作業を行います。

  1. Azure Storage アカウントを作成する
  2. ストレージ アカウントに BLOB コンテナーを作成する
  3. BLOB コンテナーに対する認証

ローカルで開発する場合は、BLOB データにアクセスするユーザー アカウントに正しいアクセス許可があることを確認します。 BLOB データの読み取りと書き込みを行うには、ストレージ BLOB データ共同作成者が必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールに割り当てられている必要があります。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページでご覧いただけます。

このシナリオでは、最小限の特権の原則に従って、ストレージ アカウントに限定したアクセス許可をユーザー アカウントに割り当てます。 この方法を使って、ユーザーに必要最小限のアクセス許可のみを与え、より安全な運用環境を作成します。

次の例では、ストレージ BLOB データ共同作成者ロールを自分のユーザー アカウントに割り当てます。これにより、そのストレージ アカウント内の BLOB データに対する読み取りと書き込みの両方のアクセス権が付与されます。

重要

ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1 分から 2 分ですが、まれに 8 分程度までかかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。

  1. Azure portal で、メインの検索バーまたは左側のナビゲーションを使ってストレージ アカウントを見つけます。

  2. ストレージ アカウントの概要ページで、左側のメニューから [アクセス制御 (IAM)] を選びます。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。

    ストレージ アカウント ロールの割り当て方法を示すスクリーンショット。

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、ストレージ BLOB データ共同作成者を検索し、一致する結果を選び、[次へ] を選びます。

  6. [アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。

  7. ダイアログで、自分の Azure AD ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。

  8. [レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。

イベントを受信するために npm パッケージをインストールする

受信側では、追加で 2 つのパッケージをインストールする必要があります。 このクイックスタートでは、既に読み取ったイベントをプログラムが読み取らないよう、Azure Blob Storage を使用してチェックポイントを永続化します。 BLOB 内で定期的に受信したメッセージのメタデータ チェックポイントが実行されます。 この手法によって、後で前回終了したところからメッセージを継続して受信しやすくなります。

これらのコマンドを実行します。

npm install @azure/storage-blob
npm install @azure/eventhubs-checkpointstore-blob
npm install @azure/identity

イベントを受信するコードを記述する

  1. Visual Studio Code など、お好みのエディターを開きます。

  2. receive.js というファイルを作成し、そこに次のコードを貼り付けます。

    このコード内の次のプレースホルダーは、実際の値に置き換えてください。

    • EVENT HUBS RESOURCE NAME
    • EVENT HUB NAME
    • STORAGE ACCOUNT NAME
    • STORAGE CONTAINER NAME
    const { DefaultAzureCredential } = require("@azure/identity");
    const { EventHubConsumerClient, earliestEventPosition  } = require("@azure/event-hubs");
    const { ContainerClient } = require("@azure/storage-blob");    
    const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");
    
    // Event hubs 
    const eventHubsResourceName = "EVENT HUBS RESOURCE NAME";
    const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`; 
    const eventHubName = "EVENT HUB NAME";
    const consumerGroup = "$Default"; // name of the default consumer group
    
    // Azure Storage 
    const storageAccountName = "STORAGE ACCOUNT NAME";
    const storageContainerName = "STORAGE CONTAINER NAME";
    const baseUrl = `https://${storageAccountName}.blob.core.windows.net`;
    
    // Azure Identity - passwordless authentication
    const credential = new DefaultAzureCredential();
    
    async function main() {
    
      // Create a blob container client and a blob checkpoint store using the client.
      const containerClient = new ContainerClient(
        `${baseUrl}/${storageContainerName}`,
        credential
      );  
      const checkpointStore = new BlobCheckpointStore(containerClient);
    
      // Create a consumer client for the event hub by specifying the checkpoint store.
      const consumerClient = new EventHubConsumerClient(consumerGroup, fullyQualifiedNamespace, eventHubName, credential, checkpointStore);
    
      // Subscribe to the events, and specify handlers for processing the events and errors.
      const subscription = consumerClient.subscribe({
          processEvents: async (events, context) => {
            if (events.length === 0) {
              console.log(`No events received within wait time. Waiting for next interval`);
              return;
            }
    
            for (const event of events) {
              console.log(`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`);
            }
            // Update the checkpoint.
            await context.updateCheckpoint(events[events.length - 1]);
          },
    
          processError: async (err, context) => {
            console.log(`Error : ${err}`);
          }
        },
        { startPosition: earliestEventPosition }
      );
    
      // After 30 seconds, stop processing.
      await new Promise((resolve) => {
        setTimeout(async () => {
          await subscription.close();
          await consumerClient.close();
          resolve();
        }, 30000);
      });
    }
    
    main().catch((err) => {
      console.log("Error occurred: ", err);
    });
    
  3. コマンド プロンプトから node receive.js を実行して、このファイルを実行します。 受信したイベントに関するメッセージがウィンドウに表示されます。

    C:\Self Study\Event Hubs\JavaScript>node receive.js
    Received event: 'First event' from partition: '0' and consumer group: '$Default'
    Received event: 'Second event' from partition: '0' and consumer group: '$Default'
    Received event: 'Third event' from partition: '0' and consumer group: '$Default'
    

    Note

    情報提供を目的とした補足コメントを含む完全なソース コードについては、GitHub の receiveEventsUsingCheckpointStore.js ページにアクセスしてください。

お疲れさまでした。 これで、イベント ハブからイベントを受信できました。 受信側プログラムは、イベント ハブの既定のコンシューマー グループのすべてのパーティションからイベントを受信します。

次のステップ

GitHub で次のサンプルを確認します。