クイックスタート: JavaScript を使用してイベント ハブとの間でイベントを送受信する

このクイックスタートでは、@azure/event-hubs npm パッケージを使用して、イベント ハブとの間でイベントを送受信する方法について説明します。

前提条件

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。

このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。

  • Microsoft Azure サブスクリプション。 Azure Event Hubs を含む Azure サービスを使用するには、サブスクリプションが必要です。 Azure アカウントを持っていない場合、無料試用版でサインアップできます。
  • Node.js LTS。 最新の長期サポート (LTS) バージョンをダウンロードしてください。
  • Visual Studio Code (推奨) または他の任意の統合開発環境 (IDE)。
  • Event Hubs 名前空間とイベント ハブを作成する。 最初の手順では、Azure Portal を使用して Event Hubs 型の名前空間を作成し、アプリケーションがイベント ハブと通信するために必要な管理資格情報を取得します。 名前空間とイベント ハブを作成するには、こちらの記事の手順に従います。

イベントを送信するために npm パッケージをインストールする

Event Hubs 用の Node Package Manager (npm) パッケージをインストールするには、パスに npm を設定してコマンド プロンプトを開き、サンプルを格納するフォルダーにディレクトリを変更します。

これらのコマンドを実行します。

npm install @azure/event-hubs
npm install @azure/identity

Azure に対してアプリを認証する

このクイック スタートでは、Azure Event Hubs に接続する 2 つの方法である、パスワードレスと接続文字列について説明します。 1 番目のオプションとして、Microsoft Entra ID のセキュリティ プリンシパルとロールベースのアクセス制御 (RBAC) を使用して Event Hubs 名前空間に接続する方法について説明します。 コードや構成ファイル、または Azure Key Vault などのセキュリティで保護されたストレージに、ハードコーディングされた接続文字列を含める心配はありません。 2 番目のオプションとして、接続文字列を使用して Event Hubs 名前空間に接続する方法について説明します。 Azure を初めて使用する場合は、接続文字列オプションの方が理解しやすいかもしれません。 実際のアプリケーションと運用環境では、パスワードレス オプションを使用することをお勧めします。 詳細については、「認証と承認」を参照してください。 パスワードレス認証の詳細については、概要ページを参照してください。

Microsoft Entra ユーザーにロールを割り当てる

ローカルでの開発時には、Azure Event Hubs に接続するユーザー アカウントに正しいアクセス許可があることを確認してください。 メッセージを送受信するには、Azure Event Hubs データ所有者ロールが必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールが必要です。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページを参照してください。

次の例では、ユーザー アカウントに Azure Event Hubs Data Owner ロールを割り当てます。これにより、Azure Event Hubs リソースにフル アクセスできます。 実際のシナリオでは、より安全な運用環境を実現するため、最小限の特権の原則に従って、必要な最小限のアクセス許可のみをユーザーに付与します。

Azure Event Hubs の Azure の組み込みロール

Azure Event Hubs の場合、Azure portal および Azure リソース管理 API による名前空間とそれに関連するすべてのリソースの管理は、Azure RBAC モデルを使用して既に保護されています。 Azure では、Event Hubs 名前空間へのアクセス権を付与するために、次の Azure 組み込みロールが提供されています。

  • Azure Event Hubs データ所有者: Event Hubs 名前空間とそのエンティティ (キュー、トピック、サブスクリプション、フィルター) へのデータ アクセスが可能です。
  • Azure Event Hubs データ送信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を送信者に付与します。
  • Azure Event Hubs データ受信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を受信者に付与します。

カスタム ロールを作成する場合は、Event Hubs 操作に必要な権限に関するページを参照してください。

重要

ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。

  1. Azure portal で、メインの検索バーまたは左側のナビゲーションを使用して Event Hubs 名前空間を見つけます。

  2. 概要ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。

    ロールを割り当てる方法を示すスクリーンショット。

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、Azure Event Hubs Data Owner を検索して一致する結果を選択します。 [次へ] を選びます。

  6. [アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。

  7. ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。

  8. [レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。

送信イベント

このセクションでは、イベント ハブにイベントを送信する JavaScript アプリケーションを作成します。

  1. Visual Studio Code など、お好みのエディターを開きます。

  2. send.js というファイルを作成し、そこに次のコードを貼り付けます。

    このコード内の次のプレースホルダーは、実際の値に置き換えてください。

    • EVENT HUBS NAMESPACE NAME
    • EVENT HUB NAME
    const { EventHubProducerClient } = require("@azure/event-hubs");
    const { DefaultAzureCredential } = require("@azure/identity");
    
    // Event hubs 
    const eventHubsResourceName = "EVENT HUBS NAMESPACE NAME";
    const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`; 
    const eventHubName = "EVENT HUB NAME";
    
    // Azure Identity - passwordless authentication
    const credential = new DefaultAzureCredential();
    
    async function main() {
    
      // Create a producer client to send messages to the event hub.
      const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);
    
      // Prepare a batch of three events.
      const batch = await producer.createBatch();
      batch.tryAdd({ body: "passwordless First event" });
      batch.tryAdd({ body: "passwordless Second event" });
      batch.tryAdd({ body: "passwordless Third event" });    
    
      // Send the batch to the event hub.
      await producer.sendBatch(batch);
    
      // Close the producer client.
      await producer.close();
    
      console.log("A batch of three events have been sent to the event hub");
    }
    
    main().catch((err) => {
      console.log("Error occurred: ", err);
    });
    
  3. node send.js を実行して、このファイルを実行します。 このコマンドにより、3 つのイベントのバッチがイベント ハブに送信されます パスワードレス (Azure Active Directory のロールベースのアクセス制御) 認証を使用している場合は、az login を実行してAzure Event Hubs データ所有者ロールに追加されたアカウントを使用して Azure にサインインすることができます。

  4. Azure portal で、イベント ハブがメッセージを受信したことを確認します。 ページを最新の情報に更新して、グラフを更新します。 メッセージが受信されたことが示されるまでに数秒かかることがあります。

    イベント ハブがメッセージを受信したことを確認する

    Note

    情報提供を目的とした補足コメントを含む完全なソース コードについては、GitHub の sendEvents.js ページにアクセスしてください。

受信イベント

このセクションでは、JavaScript アプリケーション内で Azure Blob Storage チェックポイント ストアを使用して、イベント ハブからイベントを受信します。 これにより、Azure Storage Blob 内で定期的に受信したメッセージのメタデータ チェックポイントが実行されます。 この手法によって、後で前回終了したところからメッセージを継続して受信しやすくなります。

チェックポイント ストアとして Azure Blob Storage を使用する場合は、次の推奨事項に従ってください。

  • コンシューマー グループごとに個別のコンテナーを使用します。 同じストレージ アカウントを使用できますが、各グループごとに 1 つのコンテナーを使用します。
  • コンテナーを他の何かに使用しないでください。また、ストレージ アカウントも他の何かに使用しないでください。
  • ストレージ アカウントは、デプロイされたアプリケーションが配置されているのと同じリージョンに存在する必要があります。 アプリケーションがオンプレミスの場合は、可能な中で最も近いリージョンを選択することを試みてください。

Azure portal の [ストレージ アカウント] ページの [Blob service] セクションで、次の設定が無効になっていることを確認してください。

  • 階層型名前空間
  • BLOB の論理的な削除
  • バージョン管理

Azure Storage アカウントと BLOB コンテナーを作成する

Azure Storage アカウントを作成して、そこに BLOB コンテナーを作成するには、次の作業を行います。

  1. Azure Storage アカウントを作成する
  2. ストレージ アカウントに BLOB コンテナーを作成する
  3. BLOB コンテナーに対する認証

ローカルで開発する場合は、BLOB データにアクセスするユーザー アカウントに正しいアクセス許可があることを確認します。 BLOB データの読み取りと書き込みを行うには、ストレージ BLOB データ共同作成者が必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールに割り当てられている必要があります。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページでご覧いただけます。

このシナリオでは、最小限の特権の原則に従って、ストレージ アカウントに限定したアクセス許可をユーザー アカウントに割り当てます。 この方法を使って、ユーザーに必要最小限のアクセス許可のみを与え、より安全な運用環境を作成します。

次の例では、ストレージ BLOB データ共同作成者ロールを自分のユーザー アカウントに割り当てます。これにより、そのストレージ アカウント内の BLOB データに対する読み取りと書き込みの両方のアクセス権が付与されます。

重要

ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1 分から 2 分ですが、まれに 8 分程度までかかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。

  1. Azure portal で、メインの検索バーまたは左側のナビゲーションを使ってストレージ アカウントを見つけます。

  2. ストレージ アカウントの概要ページで、左側のメニューから [アクセス制御 (IAM)] を選びます。

  3. [アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。

  4. 上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。

    ストレージ アカウント ロールの割り当て方法を示すスクリーンショット。

  5. 検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、ストレージ BLOB データ共同作成者を検索し、一致する結果を選び、[次へ] を選びます。

  6. [アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。

  7. ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。

  8. [レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。

イベントを受信するために npm パッケージをインストールする

受信側では、追加で 2 つのパッケージをインストールする必要があります。 このクイックスタートでは、既に読み取ったイベントをプログラムが読み取らないよう、Azure Blob Storage を使用してチェックポイントを永続化します。 BLOB 内で定期的に受信したメッセージのメタデータ チェックポイントが実行されます。 この手法によって、後で前回終了したところからメッセージを継続して受信しやすくなります。

これらのコマンドを実行します。

npm install @azure/storage-blob
npm install @azure/eventhubs-checkpointstore-blob
npm install @azure/identity

イベントを受信するコードを記述する

  1. Visual Studio Code など、お好みのエディターを開きます。

  2. receive.js というファイルを作成し、そこに次のコードを貼り付けます。

    このコード内の次のプレースホルダーは、実際の値に置き換えてください。

    • EVENT HUBS NAMESPACE NAME
    • EVENT HUB NAME
    • STORAGE ACCOUNT NAME
    • STORAGE CONTAINER NAME
    const { DefaultAzureCredential } = require("@azure/identity");
    const { EventHubConsumerClient, earliestEventPosition  } = require("@azure/event-hubs");
    const { ContainerClient } = require("@azure/storage-blob");    
    const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");
    
    // Event hubs 
    const eventHubsResourceName = "EVENT HUBS NAMESPACE NAME";
    const fullyQualifiedNamespace = `${eventHubsResourceName}.servicebus.windows.net`; 
    const eventHubName = "EVENT HUB NAME";
    const consumerGroup = "$Default"; // name of the default consumer group
    
    // Azure Storage 
    const storageAccountName = "STORAGE ACCOUNT NAME";
    const storageContainerName = "STORAGE CONTAINER NAME";
    const baseUrl = `https://${storageAccountName}.blob.core.windows.net`;
    
    // Azure Identity - passwordless authentication
    const credential = new DefaultAzureCredential();
    
    async function main() {
    
      // Create a blob container client and a blob checkpoint store using the client.
      const containerClient = new ContainerClient(
        `${baseUrl}/${storageContainerName}`,
        credential
      );  
      const checkpointStore = new BlobCheckpointStore(containerClient);
    
      // Create a consumer client for the event hub by specifying the checkpoint store.
      const consumerClient = new EventHubConsumerClient(consumerGroup, fullyQualifiedNamespace, eventHubName, credential, checkpointStore);
    
      // Subscribe to the events, and specify handlers for processing the events and errors.
      const subscription = consumerClient.subscribe({
          processEvents: async (events, context) => {
            if (events.length === 0) {
              console.log(`No events received within wait time. Waiting for next interval`);
              return;
            }
    
            for (const event of events) {
              console.log(`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`);
            }
            // Update the checkpoint.
            await context.updateCheckpoint(events[events.length - 1]);
          },
    
          processError: async (err, context) => {
            console.log(`Error : ${err}`);
          }
        },
        { startPosition: earliestEventPosition }
      );
    
      // After 30 seconds, stop processing.
      await new Promise((resolve) => {
        setTimeout(async () => {
          await subscription.close();
          await consumerClient.close();
          resolve();
        }, 30000);
      });
    }
    
    main().catch((err) => {
      console.log("Error occurred: ", err);
    });
    
  3. コマンド プロンプトから node receive.js を実行して、このファイルを実行します。 受信したイベントに関するメッセージがウィンドウに表示されます。

    C:\Self Study\Event Hubs\JavaScript>node receive.js
    Received event: 'First event' from partition: '0' and consumer group: '$Default'
    Received event: 'Second event' from partition: '0' and consumer group: '$Default'
    Received event: 'Third event' from partition: '0' and consumer group: '$Default'
    

    Note

    情報提供を目的とした補足コメントを含む完全なソース コードについては、GitHub の receiveEventsUsingCheckpointStore.js ページにアクセスしてください。

    受信側プログラムは、イベント ハブの既定のコンシューマー グループのすべてのパーティションからイベントを受信します。

リソースをクリーンアップする

Event Hubs 名前空間を持つリソース グループを削除するか、リソース グループを保持する場合は名前空間のみを削除します。

GitHub で次のサンプルを確認します。