你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
本快速入门介绍如何使用 azure-eventhub Python 包向事件中心发送事件以及从事件中心接收事件。
先决条件
如果不熟悉 Azure 事件中心,请在阅读本快速入门之前参阅事件中心概述。
若要完成本快速入门,请确保满足以下先决条件:
- Microsoft Azure 订阅:注册 免费试用版 (如果没有)。
- Python 3.8 或更高版本:确保安装并更新 pip。
- Visual Studio Code (建议):或使用所选的任何其他 IDE。
- 事件中心命名空间和事件中心:按照 本指南 在 Azure 门户中创建它们。
安装包以发送事件
若要为事件中心安装 Python 包,请打开其路径中包含 Python 的命令提示符。 将目录更改为要在其中保存示例的文件夹。
向 Azure 验证应用
本快速入门介绍如何通过两种方式连接到 Azure 事件中心:
- 无密码。 使用 Microsoft Entra ID 中的安全主体和基于角色的访问控制 (RBAC) 连接到事件中心命名空间。 无需担心在代码、配置文件或安全存储(如 Azure Key Vault)中具有硬编码连接字符串。
- 连接字符串。 使用连接字符串连接到事件中心命名空间。 如果不熟悉 Azure,你可能会感觉“连接字符串”选项更易于使用。
建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅 Azure 服务的服务总线身份验证和授权 和 无密码连接。
向 Microsoft Entra 用户分配角色
在本地开发时,请确保连接到 Azure 事件中心的用户帐户具有正确的权限。 需要 Azure 事件中心数据所有者 角色来发送和接收消息。 若要将此角色分配给自己,您需要 "User Access Administrator" 角色或其他包含 Microsoft.Authorization/roleAssignments/write
操作的角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 有关详细信息,请参阅 “了解 Azure RBAC”页的范围 。
以下示例将 Azure Event Hubs Data Owner
角色分配给用户帐户,该角色提供对 Azure 事件中心资源的完全访问权限。 在实际方案中,遵循最小特权原则,仅向用户提供更安全的生产环境所需的最小权限。
Azure 事件中心的内置 Azure 角色
对于 Azure 事件中心,通过 Azure 门户和 Azure 资源管理 API 对命名空间和所有相关资源进行的管理已使用 Azure RBAC 模型进行了保护。 Azure 提供以下内置角色,用于授权访问事件中心命名空间:
- Azure 事件中心数据所有者:允许数据访问事件中心命名空间及其实体(队列、主题、订阅和筛选器)。
- Azure 事件中心数据发送者:使用此角色授予对事件中心命名空间及其实体的发送者访问权限。
- Azure 事件中心数据接收者:使用此角色授予对事件中心命名空间及其实体的接收者访问权限。
如果要创建自定义角色,请参阅执行事件中心操作所需的权限。
重要说明
在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,可能需要长达 8 分钟的时间。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。
在 Azure 门户中,使用主搜索栏或左侧导航找到你的事件中心命名空间。
在概述页面上,从左侧菜单中选择“访问控制(IAM)”。
在“访问控制(IAM)”页上,选择“角色分配”选项卡。
从顶部菜单中选择 “+ 添加 ”。 然后选择“ 添加角色分配”。
使用搜索框筛选结果来找到所需角色。 对于此示例,请搜索
Azure Event Hubs Data Owner
并选择匹配的结果。 然后选择“下一步”。在“ 分配访问权限”下,选择“ 用户”、“组”或服务主体。 然后选择 “+ 选择成员”。
在对话框中,搜索Microsoft Entra 用户名(通常是 user@domain 电子邮件地址)。 选择对话框底部的 “选择 ”。
选择 “审阅 + 分配 ”以转到最后一页。 再次选择 “查看 + 分配 ”以完成该过程。
发送事件
在本部分,创建一个 Python 脚本,用于将事件发送到前面创建的事件中心。
打开常用的 Python 编辑器,如 Visual Studio Code。
创建名为 send.py 的脚本。 此脚本将一批事件发送到前面创建的事件中心。
将以下代码粘贴到 send.py 中:
在代码中,使用实际值替换以下占位符:
-
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
- 在命名空间的 “概述 ”页上看到完全限定的名称。 它应采用以下格式:<NAMESPACENAME>>.servicebus.windows.net
-
EVENT_HUB_NAME
- 事件中心的名称。
import asyncio from azure.eventhub import EventData from azure.eventhub.aio import EventHubProducerClient from azure.identity.aio import DefaultAzureCredential EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def run(): # Create a producer client to send messages to the event hub. # Specify a credential that has correct role assigned to access # event hubs namespace and the event hub name. producer = EventHubProducerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, credential=credential, ) print("Producer client created successfully.") async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData("First event ")) event_data_batch.add(EventData("Second event")) event_data_batch.add(EventData("Third event")) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) # Close credential when no longer needed. await credential.close() asyncio.run(run())
注意
有关使用连接字符串异步将事件发送到事件中心的其他选项示例,请参阅 GitHub send_async.py 页。 显示的模式也适用于以无密码方式发送事件。
-
接收事件
本快速入门使用 Azure Blob 存储作为检查点存储。 检查点存储用于保存检查点(即,上次读取位置)。
使用 Azure Blob 存储作为检查点存储时,请遵循以下建议:
- 对每个使用者组使用单独的容器。 可以使用同一存储帐户,但每个组使用一个容器。
- 不要将存储帐户用于任何其他用途。
- 不要将容器用于任何其他用途。
- 在部署的应用程序所在的同一区域中创建存储帐户。 如果应用程序位于本地,请尝试选择最近的区域。
在 Azure 门户的“存储帐户”页上的“Blob 服务”部分,确保禁用以下设置。
- 分层命名空间
- Blob 软删除
- 版本控制
创建 Azure 存储帐户和 Blob 容器
执行以下步骤,创建 Azure 存储帐户并在其中创建 Blob 容器:
- 创建 Azure 存储帐户
- 创建 Blob 容器。
- 对 Blob 容器进行身份验证。
请务必记下连接字符串和容器名称,供稍后在接收代码中使用。
在本地开发时,请确保访问 Blob 数据的用户帐户具有正确的权限。 需要具备存储 Blob 数据参与者角色才能读写 Blob 数据。 若要为自己分配此角色,需要被分配 “用户访问管理员” 角色,或另一个包含 Microsoft.Authorization/roleAssignments/write 操作的角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 有关详细信息,请参阅了解 Azure RBAC 的范围。
在此方案中,将权限分配给用户帐户(作用域为存储帐户)以遵循 最低特权原则。 这种做法只为用户提供所需的最低权限,并创建更安全的生产环境。
以下示例将 存储 Blob 数据参与者 角色分配给用户帐户,该角色提供对存储帐户中 Blob 数据的读取和写入访问权限。
重要说明
在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,可能需要长达 8 分钟的时间。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。
在 Azure 门户中,使用主搜索栏或左侧导航找到你的存储帐户。
在存储帐户页上,从左侧菜单中选择 “访问控制”(IAM )。
在“访问控制(IAM)”页上,选择“角色分配”选项卡。
从顶部菜单中选择 “+ 添加 ”。 然后选择“ 添加角色分配”。
使用搜索框筛选结果来找到所需角色。 对于此示例,搜索“存储 Blob 数据参与者”。 选择匹配的结果,然后选择 “下一步”。
在“将访问权限分配到”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。
在对话框中,搜索 Microsoft Entra 用户名(通常是 user@domain 电子邮件地址),然后选中对话框底部的“选择”。
选择 “审阅 + 分配 ”以转到最后一页。 再次选择 “查看 + 分配 ”以完成该过程。
安装包以接收事件
对于接收端,需要安装一个或多个包。 在本快速入门中,你将使用 Azure Blob 存储来保留检查点,以便程序不会读取已读取的事件。 它在 Blob 中按固定的时间间隔对收到的消息执行元数据检查点。 使用此方式可以很容易地在以后的某个时间从退出的位置继续接收消息。
创建用于接收事件的 Python 脚本
在本部分,你将创建一个 Python 脚本用于从事件中心接收事件:
打开常用的 Python 编辑器,如 Visual Studio Code。
创建名为 recv.py 的脚本。
将以下代码粘贴到 recv.py 中:
在代码中,使用实际值替换以下占位符:
-
BLOB_STORAGE_ACCOUNT_URL
- 此值应采用以下格式:https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
-
BLOB_CONTAINER_NAME
- Azure 存储帐户中 Blob 容器的名称。 -
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
- 在命名空间的 “概述 ”页上看到完全限定的名称。 它应采用以下格式:<NAMESPACENAME>>.servicebus.windows.net
-
EVENT_HUB_NAME
- 事件中心的名称。
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import ( BlobCheckpointStore, ) from azure.identity.aio import DefaultAzureCredential BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL" BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME" EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def on_event(partition_context, event): # Print the event data. print( 'Received the event: "{}" from the partition with ID: "{}"'.format( event.body_as_str(encoding="UTF-8"), partition_context.partition_id ) ) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event) async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore( blob_account_url=BLOB_STORAGE_ACCOUNT_URL, container_name=BLOB_CONTAINER_NAME, credential=credential, ) # Create a consumer client for the event hub. client = EventHubConsumerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, consumer_group="$Default", checkpoint_store=checkpoint_store, credential=credential, ) async with client: # Call the receive method. Read from the beginning of the partition # (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") # Close credential when no longer needed. await credential.close() if __name__ == "__main__": # Run the main method. asyncio.run(main())
注意
有关使用连接字符串从事件中心异步接收事件的其他选项示例,请参阅 GitHub recv_with_checkpoint_store_async.py 页。 显示的模式也适用于以无密码方式接收事件。
-
运行接收器应用
启动命令提示符。
运行以下命令,并使用已添加到 Azure 事件中心命名空间上的 Azure 事件中心数据所有者角色的帐户和 Azure 存储帐户上的存储 Blob 数据参与者角色登录。
az login
切换到具有 receive.py 文件的文件夹,然后运行以下命令:
python recv.py
运行发送器应用
启动命令提示符。
运行以下命令,并使用已添加到 Azure 事件中心命名空间上的 Azure 事件中心数据所有者角色的帐户和 Azure 存储帐户上的存储 Blob 数据参与者角色登录。
az login
切换到具有 send.py 的文件夹,然后运行以下命令:
python send.py
接收器窗口应会显示已发送到事件中心的消息。
疑难解答
如果在接收端窗口中未看到事件或者代码报告错误,请尝试按照以下故障排除提示操作:
如果未看到 recy.py 的结果,请多次运行 send.py。
如果在使用无密码代码(结合凭据)时看到有关“协同例程”的错误,请确保使用从
azure.identity.aio
导入功能。如果无密码代码(使用凭据)出现“未关闭的客户端会话”错误,请确保在完成后关闭凭据。 有关详细信息,请参阅异步凭据。
如果在访问存储时看到 recv.py 授权错误,请确保按照创建 Azure 存储帐户和 Blob 容器中的步骤操作,并将“存储 Blob 数据参与者”角色分配给服务主体。
如果收到具有不同分区 ID 的事件,则预期会出现这种结果。 分区是一种数据组织机制,与使用方应用程序中所需的下游并行度相关。 事件中心的分区数与预期会有的并发读取者数直接相关。 有关详细信息,请参阅详细了解分区。
后续步骤
在本快速入门教程中,您以异步方式发送和接收事件。 若要了解如何以同步方式发送和接收事件,请参阅 GitHub sync_samples 页。
在 适用于 Python 的 Azure 事件中心客户端库中探索更多示例和高级方案。