Python を使用してイベント ハブとの間でイベントを送受信する
このクイックスタートでは、azure-eventhub Python パッケージを使用して、イベント ハブとの間でイベントを送受信する方法について説明します。
前提条件
Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。
このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。
- Microsoft Azure サブスクリプション。 Azure Event Hubs を含む Azure サービスを使用するには、サブスクリプションが必要です。 Azure アカウントを持っていない場合、無料試用版でサインアップします。
- Python 3.8 以降 (PIP がインストールおよび更新されている)。
- Visual Studio Code (推奨) または他の任意の統合開発環境 (IDE)。
- Event Hubs 名前空間とイベント ハブを作成する。 最初のステップでは、Azure portal を使用して Event Hubs の名前空間を作成し、アプリケーションがイベント ハブと通信するために必要な管理資格情報を取得します。 名前空間とイベント ハブを作成するには、こちらの記事の手順に従います。
イベントを送信するためにパッケージをインストールする
Event Hubs 向けの Python パッケージをインストールするには、Python をパス設定した状態でコマンド プロンプトを開きます。 サンプルを保持するフォルダーにディレクトリを変更します。
pip install azure-eventhub
pip install azure-identity
pip install aiohttp
Azure に対してアプリを認証する
このクイック スタートでは、Azure Event Hubs に接続する 2 つの方法である、パスワードレスと接続文字列について説明します。 1 番目のオプションとして、Microsoft Entra ID のセキュリティ プリンシパルとロールベースのアクセス制御 (RBAC) を使用して Event Hubs 名前空間に接続する方法について説明します。 コードや構成ファイル、または Azure Key Vault などのセキュリティで保護されたストレージに、ハードコーディングされた接続文字列を含める心配はありません。 2 番目のオプションとして、接続文字列を使用して Event Hubs 名前空間に接続する方法について説明します。 Azure を初めて使用する場合は、接続文字列オプションの方が理解しやすいかもしれません。 実際のアプリケーションと運用環境では、パスワードレス オプションを使用することをお勧めします。 詳細については、「認証と承認」を参照してください。 パスワードレス認証の詳細については、概要ページを参照してください。
Microsoft Entra ユーザーにロールを割り当てる
ローカルでの開発時には、Azure Event Hubs に接続するユーザー アカウントに正しいアクセス許可があることを確認してください。 メッセージを送受信するには、Azure Event Hubs データ所有者ロールが必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write
アクションを含む別のロールが必要です。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページを参照してください。
次の例では、ユーザー アカウントに Azure Event Hubs Data Owner
ロールを割り当てます。これにより、Azure Event Hubs リソースにフル アクセスできます。 実際のシナリオでは、より安全な運用環境を実現するため、最小限の特権の原則に従って、必要な最小限のアクセス許可のみをユーザーに付与します。
Azure Event Hubs の Azure の組み込みロール
Azure Event Hubs の場合、Azure portal および Azure リソース管理 API による名前空間とそれに関連するすべてのリソースの管理は、Azure RBAC モデルを使用して既に保護されています。 Azure では、Event Hubs 名前空間へのアクセス権を付与するために、次の Azure 組み込みロールが提供されています。
- Azure Event Hubs データ所有者: Event Hubs 名前空間とそのエンティティ (キュー、トピック、サブスクリプション、フィルター) へのデータ アクセスが可能です。
- Azure Event Hubs データ送信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を送信者に付与します。
- Azure Event Hubs データ受信者: このロールを使用して、Event Hubs 名前空間とそのエンティティへのアクセス権を受信者に付与します。
カスタム ロールを作成する場合は、Event Hubs 操作に必要な権限に関するページを参照してください。
重要
ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1、2 分です。 まれに、最大 8 分かかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。
Azure portal で、メインの検索バーまたは左側のナビゲーションを使用して Event Hubs 名前空間を見つけます。
概要ページで、左側のメニューから [アクセス制御 (IAM)] を選択します。
[アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。
上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。
検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、
Azure Event Hubs Data Owner
を検索して一致する結果を選択します。 [次へ] を選びます。[アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。
ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。
[レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。
送信イベント
このセクションでは、前に作成したイベント ハブにイベントを送信する Python スクリプトを作成します。
Visual Studio Codeなど、お使いの Python エディターを開きます。
send.py という名前のスクリプトを作成します。 このスクリプトは、前に作成したイベント ハブにイベントのバッチを送信します。
次のコードを send.py に貼り付けます。
このコード内の次のプレースホルダーは、実際の値に置き換えてください。
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
EVENT_HUB_NAME
import asyncio from azure.eventhub import EventData from azure.eventhub.aio import EventHubProducerClient from azure.identity.aio import DefaultAzureCredential EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def run(): # Create a producer client to send messages to the event hub. # Specify a credential that has correct role assigned to access # event hubs namespace and the event hub name. producer = EventHubProducerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, credential=credential, ) async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData("First event ")) event_data_batch.add(EventData("Second event")) event_data_batch.add(EventData("Third event")) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) # Close credential when no longer needed. await credential.close() asyncio.run(run())
Note
接続文字列を使用してイベントをイベント ハブに非同期的に送信するためのその他のオプションの例については、GitHub の send_async.py のページを参照してください。 示されているパターンは、パスワードレスでイベントを送信する場合にも適用できます。
受信イベント
このクイックスタートでは、Azure Blob Storage をチェックポイント ストアとして使用します。 チェックポイント ストアは、チェックポイント (最後の読み取り位置) を保持するために使用されます。
チェックポイント ストアとして Azure Blob Storage を使用する場合は、次の推奨事項に従ってください。
- コンシューマー グループごとに個別のコンテナーを使用します。 同じストレージ アカウントを使用できますが、各グループごとに 1 つのコンテナーを使用します。
- コンテナーを他の何かに使用しないでください。また、ストレージ アカウントも他の何かに使用しないでください。
- ストレージ アカウントは、デプロイされたアプリケーションが配置されているのと同じリージョンに存在する必要があります。 アプリケーションがオンプレミスの場合は、可能な中で最も近いリージョンを選択することを試みてください。
Azure portal の [ストレージ アカウント] ページの [Blob service] セクションで、次の設定が無効になっていることを確認してください。
- 階層型名前空間
- BLOB の論理的な削除
- バージョン管理
Azure Storage アカウントと BLOB コンテナーを作成する
次の手順に従って、Azure Storage アカウントと BLOB コンテナーを作成します。
- Azure ストレージ アカウントの作成
- BLOB コンテナーを作成します。
- BLOB コンテナーを認証します。
接続文字列とコンテナー名を記録しておいてください。後から受信コードで使用します。
ローカルで開発する場合は、BLOB データにアクセスするユーザー アカウントに正しいアクセス許可があることを確認します。 BLOB データの読み取りと書き込みを行うには、ストレージ BLOB データ共同作成者が必要です。 このロールを自分に割り当てるには、ユーザー アクセス管理者ロール、または Microsoft.Authorization/roleAssignments/write アクションを含む別のロールに割り当てられている必要があります。 Azure portal、Azure CLI、または Azure PowerShell を使用して、ユーザーに Azure RBAC ロールを割り当てることができます。 ロールの割り当てに使用できるスコープの詳細は、スコープの概要ページでご覧いただけます。
このシナリオでは、最小限の特権の原則に従って、ストレージ アカウントに限定したアクセス許可をユーザー アカウントに割り当てます。 この方法を使って、ユーザーに必要最小限のアクセス許可のみを与え、より安全な運用環境を作成します。
次の例では、ストレージ BLOB データ共同作成者ロールを自分のユーザー アカウントに割り当てます。これにより、そのストレージ アカウント内の BLOB データに対する読み取りと書き込みの両方のアクセス権が付与されます。
重要
ほとんどの場合、ロールの割り当てが Azure に反映されるまでの時間は 1 分から 2 分ですが、まれに 8 分程度までかかる場合があります。 初めてコードを実行したときに認証エラーを受け取る場合は、しばらく待ってから再試行してください。
Azure portal で、メインの検索バーまたは左側のナビゲーションを使ってストレージ アカウントを見つけます。
ストレージ アカウントの概要ページで、左側のメニューから [アクセス制御 (IAM)] を選びます。
[アクセス制御 (IAM)] ページで、[ロールの割り当て] タブを選びます。
上部のメニューから [+ 追加] を選択し、次に結果のドロップダウン メニューから [ロールの割り当ての追加] を選択します。
検索ボックスを使って、結果を目的のロールに絞り込みます。 この例では、ストレージ BLOB データ共同作成者を検索し、一致する結果を選び、[次へ] を選びます。
[アクセスの割り当て先] で、[ユーザー、グループ、またはサービス プリンシパル] を選び、[+ メンバーの選択] を選びます。
ダイアログで、自分の Microsoft Entra ユーザー名 (通常は user@domain メール アドレス) を検索し、ダイアログの下部にある [選択] を選びます。
[レビューと割り当て] を選んで最終ページに移動し、もう一度 [レビューと割り当て] を行ってプロセスを完了します。
イベントを受信するためにパッケージをインストールする
受信側では、追加で 1 つ以上のパッケージをインストールする必要があります。 このクイックスタートでは、既に読み取ったイベントをプログラムが読み取らないよう、Azure Blob Storage を使用してチェックポイントを永続化します。 BLOB 内で定期的に受信したメッセージのメタデータ チェックポイントが実行されます。 この手法によって、後で前回終了したところからメッセージを継続して受信しやすくなります。
pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity
イベントを受信する Python スクリプトの作成
このセクションでは、イベント ハブからイベントを受信する Python スクリプトを作成します。
Visual Studio Codeなど、お使いの Python エディターを開きます。
recv.py という名前のスクリプトを作成します。
次のコードを recv.py に貼り付けます。
このコード内の次のプレースホルダーは、実際の値に置き換えてください。
BLOB_STORAGE_ACCOUNT_URL
BLOB_CONTAINER_NAME
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
EVENT_HUB_NAME
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import ( BlobCheckpointStore, ) from azure.identity.aio import DefaultAzureCredential BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL" BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME" EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def on_event(partition_context, event): # Print the event data. print( 'Received the event: "{}" from the partition with ID: "{}"'.format( event.body_as_str(encoding="UTF-8"), partition_context.partition_id ) ) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event) async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore( blob_account_url=BLOB_STORAGE_ACCOUNT_URL, container_name=BLOB_CONTAINER_NAME, credential=credential, ) # Create a consumer client for the event hub. client = EventHubConsumerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, consumer_group="$Default", checkpoint_store=checkpoint_store, credential=credential, ) async with client: # Call the receive method. Read from the beginning of the partition # (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") # Close credential when no longer needed. await credential.close() if __name__ == "__main__": # Run the main method. asyncio.run(main())
Note
接続文字列を使用してイベント ハブから非同期的にイベントを受信するためのその他のオプションの例については、GitHub の recv_with_checkpoint_store_async.py のページを参照してください。 示されているパターンは、パスワードレスでイベントを受信する場合にも適用できます。
受信側アプリを実行する
スクリプトを実行するには、Python をパス設定した状態でコマンド プロンプトを開き、次のコマンドを実行します。
python recv.py
送信側アプリを実行する
スクリプトを実行するには、Python をパス設定した状態でコマンド プロンプトを開き、次のコマンドを実行します。
python send.py
イベント ハブに送信されたメッセージが受信側ウィンドウに表示されます。
トラブルシューティング
受信側ウィンドウにイベントが表示されない場合、またはコードでエラーが報告される場合は、次のトラブルシューティングのヒントを試してください。
recy.py からの結果が表示されない場合は、send.py 複数回実行します。
パスワードなしのコード (資格情報を使用) を使用しているときに "コルーチン" に関するエラーが表示される場合は、
azure.identity.aio
からのインポートを使用していることを確認してください。パスワードなしのコード (資格情報を含む) で "閉じていないクライアント セッション" が表示される場合は、完了したら資格情報を閉じてください。 詳細については、非同期資格情報に関するページをご覧ください。
ストレージにアクセスするときに recv.py の認可エラーが表示される場合は、「Azure ストレージ アカウントと BLOB コンテナーを作成する」の手順に従い、ストレージ BLOB データ共同作成者 ロールをサービス プリンシパルに割り当ててください。
パーティション ID が異なるイベントを受け取った場合、この結果が予想されます。 パーティションはデータ編成メカニズムであり、コンシューマー アプリケーションで必要とされるダウンストリーム並列処理に関連します。 イベント ハブでのパーティションの数は、予想される同時接続のリーダー数に直接関連します。 詳細については、パーティションの詳細に関するページを参照してください。
次のステップ
このクイックスタートでは、イベントを非同期的に送受信しました。 イベントを同期的に送受信する方法については、GitHub の sync_samples ページにあるサンプルを参照してください。
GitHub にあるすべての (同期と非同期の両方の) サンプルについては、Python 用 Azure Event Hubs クライアント ライブラリ サンプル ページにアクセスしてください。