Поделиться через


клиентская библиотека Сетка событий Azure для Python версии 4.16.0

"Сетка событий Azure" — это полностью управляемая интеллектуальная служба маршрутизации событий, которая обеспечивает равномерное потребление событий с помощью модели "публикация — подписка".

Исходный код | Пакет (PyPI) | Пакет (Conda) | Справочная документация по | APIДокументация по продукту | Образцы | Changelog

Начало работы

Предварительные требования

  • Для использования этого пакета требуется Python 3.7 или более поздней версии.
  • Для использования этого пакета вам потребуется подписка Azure и ресурс раздела Сетки событий. Выполните инструкции из этого пошагового руководства, чтобы зарегистрировать поставщик ресурсов Службы "Сетка событий" и создать разделы Сетки событий с помощью портал Azure. Аналогичное руководство по использованию Azure CLI.

Установка пакета

Установите клиентную библиотеку Сетка событий Azure для Python с помощью pip:

pip install azure-eventgrid
  • Требуется существующий раздел или домен Сетки событий. Вы можете создать ресурс с помощью портала Azure или Azure CLI.

Если вы используете Azure CLI, замените <resource-group-name> и <resource-name> собственными уникальными именами.

Создайте раздел Сетки событий.

az eventgrid topic --create --location <location> --resource-group <resource-group-name> --name <resource-name>

создать Домен в Сетке событий;

az eventgrid domain --create --location <location> --resource-group <resource-group-name> --name <resource-name>

Аутентификация клиента

Чтобы взаимодействовать со службой "Сетка событий", необходимо создать экземпляр клиента. Для создания экземпляра клиентского объекта необходимы конечная точка и учетные данные .

Использование Azure Active Directory (AAD)

Сетка событий Azure обеспечивает интеграцию с Azure Active Directory (Azure AD) для проверки подлинности запросов на основе удостоверений. С помощью Azure AD можно использовать управление доступом на основе ролей (RBAC), чтобы предоставить пользователям, группам или приложениям доступ к Сетка событий Azure ресурсам.

Для отправки событий в раздел или домен с TokenCredentialудостоверением, прошедшему проверку подлинности, должна быть назначена роль "Отправитель данных EventGrid".

azure-identity С помощью пакета можно легко авторизовать запросы как в среде разработки, так и в рабочей среде. Дополнительные сведения об Azure Active Directory см. в файле сведенийazure-identity.

Например, можно использовать для DefaultAzureCredential создания клиента, который будет проходить проверку подлинности с помощью Azure Active Directory:

from azure.identity import DefaultAzureCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent

default_az_credential = DefaultAzureCredential()
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
client = EventGridPublisherClient(endpoint, default_az_credential)

Поиск конечной точки

Конечную точку раздела можно найти в ресурсе раздела Сетки событий на портал Azure. Это будет выглядеть следующим образом: "https://<event-grid-topic-name>.<topic-location>.eventgrid.azure.net/api/events"

Создание клиента с помощью AzureKeyCredential

Чтобы использовать ключ Доступа в credential качестве параметра, передайте ключ в виде строки в экземпляр AzureKeyCredential.

Примечание: Ключ доступа можно найти на портале Azure в меню "Ключи доступа" ресурса раздела Сетки событий. Их также можно получить с помощью azure CLI или библиотеки azure-mgmt-eventgrid . Руководство по получению ключей доступа можно найти здесь.

import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential

topic_key = os.environ["EVENTGRID_TOPIC_KEY"]
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]

credential_key = AzureKeyCredential(topic_key)
client = EventGridPublisherClient(endpoint, credential_key)

Примечание: Клиент также может пройти проверку подлинности с помощью подписи SAS с помощью AzureSasCredential. Пример, демонстрирующий это, доступен здесь (async_version).

Примечание: Метод generate_sas можно использовать для создания подписанного URL-адреса. Пример, демонстрирующий это, можно увидеть здесь.

Основные понятия

Раздел

Раздел — это канал в службе EventGrid для отправки событий. Схема событий, которую принимает тема, определяется во время создания темы. Если события типа схемы отправляются в раздел, требующий другого типа схемы, возникают ошибки.

Домен

Домен событий — это средство управления для большого количества разделов Сетки событий, связанных с тем же приложением. Они позволяют публиковать события во множестве разделов. Домены также предоставляют управление авторизацией и проверкой подлинности для каждого раздела. Дополнительные сведения см. в статье Общие сведения о домене событий.

При создании домена событий конечная точка публикации для этого домена становится доступной. Этот процесс аналогичен созданию раздела Сетки событий. Единственное отличие заключается в том, что при публикации в домене необходимо указать раздел в домене, в который должно быть доставлено событие.

Схемы событий

Событие — это наименьший объем информации, которая полностью описывает то, что произошло в системе. При создании пользовательского раздела или домена необходимо указать схему, которая будет использоваться при публикации событий.

Сетка событий поддерживает несколько схем для кодирования событий.

Схема службы "Сетка событий"

Хотя вы можете настроить раздел для использования пользовательской схемы, чаще всего используется уже определенная схема Сетки событий. См. спецификации и требования здесь.

Схема CloudEvents v1.0

Другой вариант — использовать схему CloudEvents версии 1.0. CloudEvents — это проект Cloud Native Computing Foundation, который создает спецификацию для общего описания данных событий. Сводку по службам CloudEvents можно найти здесь.

EventGridPublisherClient

EventGridPublisherClient предоставляет операции для отправки данных о событиях в имя узла раздела, указанное во время инициализации клиента.

Независимо от схемы, для использования в разделе или домене, EventGridPublisherClient будет использоваться для публикации событий в нем. Используйте события публикации send метода.

Разрешена отправка следующих форматов событий:

  • Список или один экземпляр строго типизированных событий EventGridEvents.

  • Представление диктовки сериализованного объекта EventGridEvent.

  • Список или один экземпляр строго типизированных CloudEvents.

  • Представление сериализованного объекта CloudEvent.

  • Представление любой пользовательской схемы.

Ознакомьтесь с примерами , чтобы получить подробные примеры.

Примечание: Перед публикацией важно знать, поддерживает ли ваш раздел CloudEvents или EventGridEvents. При отправке в раздел, который не поддерживает схему отправляемого события, send() вызовет исключение.

Системные разделы

Системный раздел в Сетке событий представляет одно или несколько событий, опубликованных службами Azure, такими как служба хранилища Azure или Центры событий Azure. Например, системный раздел может представлять все события BLOB-объектов или только события создания и удаления BLOB-объектов, опубликованные для определенной учетной записи хранения.

Имена различных типов событий системы, опубликованных в Сетка событий Azure, доступны в azure.eventgrid.SystemEventNames. Полный список распознаваемых системных разделов см. в разделе Системные разделы.

Дополнительные сведения о ключевых понятиях сетки событий см. в разделе Основные понятия в Сетка событий Azure.

Сетка событий в Kubernetes со службой Azure Arc

Сетка событий в Kubernetes со службой Azure Arc — это предложение, которое позволяет использовать службу "Сетка событий" в собственном кластере Kubernetes. Эта возможность обеспечивается с помощью Kubernetes с включенной службой Azure Arc. Посредством Kubernetes с включенной службой Azure Arc поддерживаемый кластер Kubernetes подключается к Azure. После подключения в нем можно установить Сетку событий. Дополнительные сведения см. здесь.

Поддержка облачных событий CNCF

Начиная с версии 4.7.0 этот пакет также поддерживает публикацию облачного события CNCF из https://pypi.org/project/cloudevents/. Вы сможете передать объект CloudEvent из этой библиотеки в send API.


from cloudevents.http import CloudEvent

event = CloudEvent(...)

client.send(event)

Примеры

В следующих разделах представлено несколько фрагментов кода, охватывающих некоторые из наиболее распространенных задач Сетки событий, в том числе:

Отправка события Сетки событий

В этом примере публикуется событие Сетки событий.

import os
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent

key = os.environ["EG_ACCESS_KEY"]
endpoint = os.environ["EG_TOPIC_HOSTNAME"]

event = EventGridEvent(
    data={"team": "azure-sdk"},
    subject="Door1",
    event_type="Azure.Sdk.Demo",
    data_version="2.0"
)

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

Отправка облачного события

В этом примере публикуется облачное событие.

import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]

event = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team": "azure-sdk"}
)

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

Отправка нескольких событий

Можно отправить события в виде пакета при отправке нескольких событий в раздел или домен. В этом примере отправляется список cloudEvents с помощью метода send.

ПРЕДУПРЕЖДЕНИЕ: При отправке списка нескольких событий одновременно перебор и отправка каждого события не приведет к оптимальной производительности. Для достижения оптимальной производительности настоятельно рекомендуется отправить список событий.

import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]

event0 = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team": "azure-sdk"}
)
event1 = CloudEvent(
    type="Azure.Sdk.Sample",
    source="https://egsample.dev/sampleevent",
    data={"team2": "azure-eventgrid"}
)

events = [event0, event1]

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(events)

Отправка событий в виде словарей

Представление диктовки соответствующих сериализованных моделей также можно использовать для публикации CloudEvent(s) или EventGridEvent(s) отдельно от строго типизированных объектов.

Используйте представление, подобное дикту, для отправки в раздел с пользовательской схемой, как показано ниже.

import os
import uuid
import datetime as dt
from msrest.serialization import UTC
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient

key = os.environ["CUSTOM_SCHEMA_ACCESS_KEY"]
endpoint = os.environ["CUSTOM_SCHEMA_TOPIC_HOSTNAME"]

event = custom_schema_event = {
    "customSubject": "sample",
    "customEventType": "sample.event",
    "customDataVersion": "2.0",
    "customId": uuid.uuid4(),
    "customEventTime": dt.datetime.now(UTC()).isoformat(),
    "customData": "sample data"
    }

credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)

client.send(event)

Использование из очереди хранилища

В этом примере используется сообщение, полученное из очереди хранилища, и выполняется десериализация его в объект CloudEvent.

from azure.core.messaging import CloudEvent
from azure.storage.queue import QueueServiceClient, BinaryBase64DecodePolicy
import os
import json

# all types of CloudEvents below produce same DeserializedEvent
connection_str = os.environ['STORAGE_QUEUE_CONN_STR']
queue_name = os.environ['STORAGE_QUEUE_NAME']

with QueueServiceClient.from_connection_string(connection_str) as qsc:
    payload =  qsc.get_queue_client(
        queue=queue_name,
        message_decode_policy=BinaryBase64DecodePolicy()
        ).peek_messages()

    ## deserialize payload into a list of typed Events
    events = [CloudEvent.from_dict(json.loads(msg.content)) for msg in payload]

Использование из служебной шины

В этом примере используется сообщение о полезных данных, полученное из Служебной шины, и десериализует его в объект EventGridEvent.

from azure.eventgrid import EventGridEvent
from azure.servicebus import ServiceBusClient
import os
import json

# all types of EventGridEvents below produce same DeserializedEvent
connection_str = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connection_str) as sb_client:
    payload =  sb_client.get_queue_receiver(queue_name).receive_messages()

    ## deserialize payload into a list of typed Events
    events = [EventGridEvent.from_dict(json.loads(next(msg.body).decode('utf-8'))) for msg in payload]

Распределенная трассировка с помощью EventGrid

OpenTelemetry для Python можно использовать обычным образом с EventGrid, так как он совместим с интеграцией трассировки Azure-Core.

Ниже приведен пример использования OpenTelemetry для трассировки отправки CloudEvent.

Сначала задайте OpenTelemetry в качестве подключаемого модуля трассировки для EventGrid.

from azure.core.settings import settings
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan

settings.tracing_implementation = OpenTelemetrySpan

Регулярное открытое использование данных телеметрии отсюда. Дополнительные сведения см. в разделе OpenTelemetry . В этом примере для экспорта трассировок используется простое средство экспорта консоли. Здесь можно использовать любой экспортер, включая azure-monitor-opentelemetry-exporter, jaegerи zipkin т. д.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor  # this requires opentelemetry >= 1.0.0

# Simple console exporter
exporter = ConsoleSpanExporter()

trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
trace.get_tracer_provider().add_span_processor(
    SimpleSpanProcessor(exporter)
)

tracer После установки и exporter следуйте приведенному ниже примеру, чтобы начать сбор трассировок при использовании send метода из EventGridPublisherClient для отправки объекта CloudEvent.

import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.messaging import CloudEvent
from azure.core.credentials import AzureKeyCredential

hostname = os.environ['CLOUD_TOPIC_HOSTNAME']
key = AzureKeyCredential(os.environ['CLOUD_ACCESS_KEY'])
cloud_event = CloudEvent(
    source = 'demo',
    type = 'sdk.demo',
    data = {'test': 'hello'},
)
with tracer.start_as_current_span(name="MyApplication"):
    client = EventGridPublisherClient(hostname, key)
    client.send(cloud_event)

Устранение неполадок

  • Включите azure.eventgrid средство ведения журнала для сбора трассировок из библиотеки.

Общее

Клиентская библиотека Сетки событий будет вызывать исключения, определенные в Azure Core.

Ведение журнала

Эта библиотека использует стандартную библиотеку ведения журнала для ведения журнала. Основные сведения о сеансах HTTP (URL-адреса, заголовки и т. д.) регистрируются на уровне INFO.

Дополнительная настройка

Необязательные аргументы ключевое слово можно передавать на уровне клиента и для каждой операции. В справочной документации по azure-core описаны доступные конфигурации для повторных попыток, ведения журнала, транспортных протоколов и многого другого.

Дальнейшие действия

В следующем разделе представлено несколько фрагментов кода, иллюстрирующих общие шаблоны, используемые в API Python службы "Сетка событий".

Больше примеров кода

В этих примерах кода показаны распространенные операции с сценариями чемпиона с Сетка событий Azure клиентской библиотекой.

В следующих примерах рассматривается публикация и использование dict представлений EventGridEvents и CloudEvents.

Дополнительные примеры можно найти здесь.

  • Дополнительные примеры, связанные со сценарием отправки, см. здесь.
  • Дополнительные примеры, связанные с потреблением полезных данных из разных служб обмена сообщениями в качестве типизированного объекта, см. в статье Использование примеров.

Дополнительная документация

Более подробную документацию по Сетка событий Azure см. в документации по Сетке событий на docs.microsoft.com.

Участие

На этом проекте приветствуются публикации и предложения. Для участия в большинстве процессов по разработке документации необходимо принять лицензионное соглашение участника (CLA), в котором указывается, что вы предоставляете нам права на использование ваших публикаций. Дополнительные сведения см. на странице cla.microsoft.com.

При отправке запроса на включение внесенных изменений CLA-бот автоматически определит необходимость предоставления соглашения CLA и соответствующего оформления запроса на включение внесенных изменений (например, добавление метки, комментария). Просто следуйте инструкциям бота. Будет достаточно выполнить их один раз для всех репозиториев, поддерживающих соглашение CLA.

В рамках этого проекта действуют правила поведения в отношении продуктов с открытым исходным кодом Майкрософт. Дополнительные сведения см. в разделе часто задаваемых вопросов о правилах поведения или обратитесь к opencode@microsoft.com с любыми дополнительными вопросами или комментариями.