你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Python 的Azure 事件网格客户端库 - 版本 4.17.0b1

Azure 事件网格是一种完全托管的智能事件路由服务,允许通过发布-订阅模型来一致地使用事件。

源代码 | 包 (PyPI) | 包 (Conda) | API 参考文档 | 产品文档 | 样品 | 更改日志

免责声明

这是 Azure EventGrid 的 EventGridClientBeta 版本,该版本与 GA EventGridPublisherClient一起提供。 EventGridClient 支持 publish_cloud_eventsreceive_cloud_eventsacknowledge_cloud_eventsrelease_cloud_eventsreject_cloud_eventsrenew_cloud_event_locks 操作。 有关详细信息,请参阅 示例

入门

先决条件

安装包

使用 pip 安装适用于 Python 的 Azure 事件网格 客户端库:

pip install azure-eventgrid

如果使用 Azure CLI,请将 和 <resource-name> 替换为<resource-group-name>自己的唯一名称。

创建事件网格主题

az eventgrid topic --create --location <location> --resource-group <resource-group-name> --name <resource-name>

创建事件网格域

az eventgrid domain --create --location <location> --resource-group <resource-group-name> --name <resource-name>

验证客户端

若要与事件网格服务交互,需要创建客户端的实例。 需要 终结点凭据 才能实例化客户端对象。

使用 Azure Active Directory (AAD)

Azure 事件网格提供与 Azure Active Directory (Azure AD) 的集成,以便对请求进行基于标识的身份验证。 借助 Azure AD,可以使用基于角色的访问控制 (RBAC) 向用户、组或应用程序授予对Azure 事件网格资源的访问权限。

若要使用 TokenCredential将事件发送到主题或域,应为经过身份验证的标识分配“EventGrid 数据发送者”角色。

借助该 azure-identity 包,可以在开发和生产环境中无缝授权请求。 若要详细了解 Azure Active Directory,请参阅 azure-identity 自述文件

例如,可以使用 DefaultAzureCredential 构造使用 Azure Active Directory 进行身份验证的客户端:

from azure.identity import DefaultAzureCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent

default_az_credential = DefaultAzureCredential()
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
client = EventGridPublisherClient(endpoint, default_az_credential)

查找终结点

可以在Azure 门户的事件网格主题资源中找到主题终结点。 这将如下所示: "https://<event-grid-topic-name>.<topic-location>.eventgrid.azure.net/api/events"

使用 AzureKeyCredential 创建客户端

若要使用 Access 密钥作为 credential 参数,请将密钥作为字符串传递到 AzureKeyCredential 实例中。

注意: 可以在 Azure 门户中的事件网格主题资源的“访问密钥”菜单中找到访问密钥。 还可以通过 azure CLI 或 azure-mgmt-eventgrid 库获取它们。 可 在此处找到获取访问密钥的指南。

import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential

topic_key = os.environ["EVENTGRID_TOPIC_KEY"]
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]

credential_key = AzureKeyCredential(topic_key)
client = EventGridPublisherClient(endpoint, credential_key)

注意: 还可以使用 AzureSasCredential通过 SAS 签名对客户端进行身份验证。 此处提供了 (async_version) 的示例

注意: 方法 generate_sas 可用于生成共享访问签名。 此处显示了一个演示这一点的示例。

关键概念

主题

主题是 EventGrid 服务中用于发送事件的通道。 主题接受的事件架构在主题创建时决定。 如果将架构类型的事件发送到需要其他架构类型的主题,则会引发错误。

Domain

事件 是用于大量与同一应用程序相关的事件网格主题的管理工具。 它们可以将事件发布到数千个主题。 域还提供对每个主题的授权和身份验证控制。 有关详细信息,请访问 事件域概述

创建事件域时,会提供此域的发布终结点。 此过程类似于创建事件网格主题。 唯一的区别在于,在发布到域时,必须在想要将事件传递到的域中指定主题。

事件架构

事件是完全描述系统中发生的事件的最小信息量。 创建自定义主题或域时,必须指定发布事件时将使用的架构。

事件网格支持多个架构来编码事件。

事件网格架构

虽然可以将主题配置为使用 自定义架构,但使用已定义的事件网格架构更为常见。 请参阅 此处的规范和要求。

CloudEvents v1.0 架构

另一个选项是使用 CloudEvents v1.0 架构。 CloudEvents 是一个云原生计算基础项目,它生成用于以通用方式描述事件数据的规范。 可 在此处找到 CloudEvents 的服务摘要。

EventGridPublisherClient

EventGridPublisherClient 提供将事件数据发送到客户端初始化期间指定的主题主机名的操作。

无论主题或域配置为使用哪种架构, EventGridPublisherClient 都将用于向主题或域发布事件。 send使用 方法发布事件。

允许发送以下事件格式:

  • 强类型 EventGridEvents 的列表或单个实例。

  • 序列化的 EventGridEvent 对象的听写表示形式。

  • 强类型 CloudEvents 的列表或单个实例。

  • 序列化 CloudEvent 对象的听写表示形式。

  • 任何自定义架构的听写表示形式。

请查看 示例 以获取详细示例。

注意: 发布之前,请务必了解主题是否支持 CloudEvents 或 EventGridEvents。 如果发送到不支持所发送事件的架构的主题,则 send () 将引发异常。

系统主题

事件网格中的系统主题表示 Azure 服务(例如 Azure 存储或Azure 事件中心)发布的一个或多个事件。 例如,系统主题可以表示所有 Blob 事件,或者仅表示为特定存储帐户发布的 Blob 创建和 Blob 删除事件。

发布到 Azure 事件网格 的系统事件的各种事件类型的名称在 中azure.eventgrid.SystemEventNames可用。 有关可识别的系统主题的完整列表,请访问 系统主题

有关事件网格中的关键概念的详细信息,请参阅Azure 事件网格中的概念

具有 Azure Arc 的 Kubernetes 上的事件网格

具有 Azure Arc 的 Kubernetes 上的事件网格是一种产品/服务,可让你在自己的 Kubernetes 群集中运行事件网格。 此功能是通过使用已启用 Azure Arc 的 Kubernetes 启用的。 通过已启用 Azure Arc 的 Kubernetes,可将受支持的 Kubernetes 群集连接到 Azure。 连接后,可在其上安装事件网格。 你可在此处了解相关详细信息。

支持 CNCF 云事件

从 v4.7.0 开始,此包还支持从 https://pypi.org/project/cloudevents/发布 CNCF 云事件。 可以将 CloudEvent 对象从此库传递到 send API。


from cloudevents.http import CloudEvent

event = CloudEvent(...)

client.send(event)

示例

以下部分提供了几个代码片段,涵盖了一些最常见的事件网格任务,包括:

发送事件网格事件

此示例发布事件网格事件。

import os
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent

key = os.environ["EG_ACCESS_KEY"]
endpoint = os.environ["EG_TOPIC_HOSTNAME"]

event = EventGridEvent(
    data={"team": "azure-sdk"},
    subject="Door1",
    event_type="Azure.Sdk.Demo",
    data_version="2.0"
)

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

发送云事件

此示例发布一个 Cloud 事件。

import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]

event = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team": "azure-sdk"}
)

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

发送多个事件

向主题或域发送多个事件时,可以批量发送事件。 此示例使用 send 方法发送 CloudEvent 列表。

警告: 一次发送多个事件的列表时,循环访问并发送每个事件不会获得最佳性能。 为了获得最佳性能,强烈建议发送事件列表。

import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]

event0 = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team": "azure-sdk"}
)
event1 = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team2": "azure-eventgrid"}
)

events = [event0, event1]

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(events)

将事件作为字典发送

除了强类型化对象之外,还可以使用相应序列化模型的 dict 表示形式发布 CloudEvent () 或 EventGridEvent () 。

使用类似于 dict 的表示形式发送到具有自定义架构的主题,如下所示。

import os
import uuid
import datetime as dt
from msrest.serialization import UTC
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CUSTOM_SCHEMA_ACCESS_KEY"]
endpoint = os.environ["CUSTOM_SCHEMA_TOPIC_HOSTNAME"]

event = custom_schema_event = {
    "customSubject": "sample",
    "customEventType": "sample.event",
    "customDataVersion": "2.0",
    "customId": uuid.uuid4(),
    "customEventTime": dt.datetime.now(UTC()).isoformat(),
    "customData": "sample data"
    }

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

从存储队列使用

此示例使用从存储队列接收的消息,并将其反序列化为 CloudEvent 对象。

from azure.core.messaging import CloudEvent
from azure.storage.queue import QueueServiceClient, BinaryBase64DecodePolicy
import os
import json

# all types of CloudEvents below produce same DeserializedEvent
connection_str = os.environ['STORAGE_QUEUE_CONN_STR']
queue_name = os.environ['STORAGE_QUEUE_NAME']

with QueueServiceClient.from_connection_string(connection_str) as qsc:
    payload =  qsc.get_queue_client(
        queue=queue_name,
        message_decode_policy=BinaryBase64DecodePolicy()
        ).peek_messages()

    ## deserialize payload into a list of typed Events
    events = [CloudEvent.from_dict(json.loads(msg.content)) for msg in payload]

从 servicebus 使用

此示例使用从 ServiceBus 接收的有效负载消息,并将其反序列化为 EventGridEvent 对象。

from azure.eventgrid import EventGridEvent
from azure.servicebus import ServiceBusClient
import os
import json

# all types of EventGridEvents below produce same DeserializedEvent
connection_str = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connection_str) as sb_client:
    payload =  sb_client.get_queue_receiver(queue_name).receive_messages()

    ## deserialize payload into a list of typed Events
    events = [EventGridEvent.from_dict(json.loads(next(msg.body).decode('utf-8'))) for msg in payload]

使用 EventGrid 进行分布式跟踪

可以像往常一样将 OpenTelemetry for Python 与 EventGrid 配合使用,因为它与 azure 核心跟踪集成兼容。

下面是使用 OpenTelemetry 跟踪发送 CloudEvent 的示例。

首先,将 OpenTelemetry 设置为 EventGrid 的已启用跟踪插件。

from azure.core.settings import settings
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan

settings.tracing_implementation = OpenTelemetrySpan

从此处定期打开遥测使用情况。 有关详细信息 ,请参阅 OpenTelemetry 。 此示例使用简单的控制台导出程序导出跟踪。 此处可以使用任何导出程序,包括 azure-monitor-opentelemetry-exporterjaegerzipkin 等。

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor  # this requires opentelemetry >= 1.0.0

# Simple console exporter
exporter = ConsoleSpanExporter()

trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
trace.get_tracer_provider().add_span_processor(
    SimpleSpanProcessor(exporter)
)

tracer设置 和 exporter 后,请按照以下示例开始收集跟踪,同时使用 send 方法从 EventGridPublisherClient 发送 CloudEvent 对象。

import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.messaging import CloudEvent
from azure.core.credentials import AzureKeyCredential

hostname = os.environ['CLOUD_TOPIC_HOSTNAME']
key = AzureKeyCredential(os.environ['CLOUD_ACCESS_KEY'])
cloud_event = CloudEvent(
    source = 'demo',
    type = 'sdk.demo',
    data = {'test': 'hello'},
)
with tracer.start_as_current_span(name="MyApplication"):
    client = EventGridPublisherClient(hostname, key)
    client.send(cloud_event)

故障排除

  • 启用 azure.eventgrid 记录器以从库收集跟踪。

常规

事件网格客户端库将引发 Azure Core 中定义的异常。

日志记录

此库使用标准 日志记录 库进行日志记录。 有关 HTTP 会话 (URL、标头等的基本信息,) 在 INFO 级别记录。

可选配置

可选关键字 (keyword) 参数可以在客户端和按操作级别传入。 azure-core 参考文档 介绍了重试、日志记录、传输协议等的可用配置。

后续步骤

以下部分提供了几个代码片段,说明了事件网格 Python API 中使用的常见模式。

更多示例代码

这些代码示例演示了使用 Azure 事件网格 客户端库的常见支持者方案操作。

以下示例介绍了发布和使用 dict EventGridEvents 和 CloudEvents 的表示形式。

此处可以找到更多示例。

  • 可在此处查看与发送方案相关的更多示例。
  • 若要查看与将不同消息服务的有效负载用作类型化对象相关的更多示例,请访问 使用示例

其他文档

有关Azure 事件网格的更多文档,请参阅有关 docs.microsoft.com 的事件网格文档

贡献

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 cla.microsoft.com

提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。