Отправка и получение событий с помощью Python в Центрах событий
В этом кратком руководстве показано, как отправлять события в концентратор событий и получать события из него с помощью пакета Python azure-eventhub.
Необходимые компоненты
Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями, прежде чем приступить к работе с этим руководством.
Для работы с данным руководством необходимо следующее:
- Подписка Microsoft Azure. Чтобы использовать службы Azure, в том числе Центры событий Azure, потребуется действующая подписка. Если у вас нет существующей учетной записи Azure, зарегистрируйтесь для получения бесплатной пробной версии.
- Python 3.8 или более поздней версии с установленным и обновленным pip.
- Visual Studio Code (рекомендовано) или любая другая интегрированная среда разработки (IDE).
- Создайте пространство имен Центров событий и концентратор событий. Первым шагом является использование портал Azure для создания пространства имен Центров событий и получения учетных данных управления, необходимых приложению для взаимодействия с концентратором событий. Чтобы создать пространство имен и концентратор событий, выполните инструкции из этой статьи.
Установка пакетов для отправки событий
Чтобы установить пакеты Python для Центров событий, откройте командную строку с python в пути. Измените каталог на папку, в которой вы хотите сохранить примеры.
pip install azure-eventhub
pip install azure-identity
pip install aiohttp
Проверка подлинности приложения в Azure
В этом кратком руководстве показано два способа подключения к Центры событий Azure: без пароля и строка подключения. Первый вариант показывает, как использовать субъект безопасности в идентификаторе Microsoft Entra ID и управлении доступом на основе ролей (RBAC) для подключения к пространству имен Центров событий. Вам не нужно беспокоиться о наличии жестко закодированных строка подключения в коде или в файле конфигурации или в безопасном хранилище, например Azure Key Vault. Второй вариант показывает, как использовать строка подключения для подключения к пространству имен Центров событий. Если вы не знакомы с Azure, вы можете найти вариант строка подключения проще следовать. Мы рекомендуем использовать параметр без пароля в реальных приложениях и рабочих средах. Дополнительные сведения см. в разделе "Проверка подлинности и авторизация". Дополнительные сведения о проверке подлинности без пароля см. на странице обзора.
Назначение ролей пользователю Microsoft Entra
При локальной разработке убедитесь, что учетная запись пользователя, которая подключается к Центры событий Azure имеет правильные разрешения. Для отправки и получения сообщений вам потребуется роль владельца данных Центры событий Azure. Чтобы назначить себе эту роль, вам потребуется роль администратора доступа пользователей или другая роль, которая включает Microsoft.Authorization/roleAssignments/write
действие. Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Дополнительные сведения о доступных областях назначения ролей на странице обзора области.
В следующем примере роль назначается Azure Event Hubs Data Owner
учетной записи пользователя, которая предоставляет полный доступ к ресурсам Центры событий Azure. В реальном сценарии следуйте принципу наименьших привилегий , чтобы предоставить пользователям только минимальные разрешения, необходимые для более безопасной рабочей среды.
Встроенные роли Azure для Центров событий Azure
Для Центры событий Azure управление пространствами имен и всеми связанными ресурсами с помощью портал Azure и API управления ресурсами Azure уже защищено с помощью модели Azure RBAC. Azure предоставляет следующие встроенные роли Azure для авторизации доступа к пространству имен Центров событий:
- Центры событий Azure владелец данных: включает доступ к пространству имен Центров событий и его сущностям (очереди, разделы, подписки и фильтры)
- Центры событий Azure отправителя данных: используйте эту роль, чтобы предоставить отправителю доступ к пространству имен Центров событий и его сущностям.
- Центры событий Azure приемник данных. Используйте эту роль, чтобы предоставить получателю доступ к пространству имен Центров событий и его сущностям.
Если вы хотите создать пользовательскую роль, см. раздел "Права", необходимые для операций Центров событий.
Внимание
В большинстве случаев для распространения назначения ролей в Azure потребуется несколько минут. В редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.
В портал Azure найдите пространство имен Центров событий с помощью главной панели поиска или навигации слева.
На странице обзора выберите элемент управления доступом (IAM) в меню слева.
На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.
Выберите + Добавить в верхнем меню, а затем выберите Добавить назначение роли в появившемся раскрывающемся меню.
Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите
Azure Event Hubs Data Owner
и выберите соответствующий результат. Теперь щелкните Далее.В разделе Назначение доступа для выберите Пользователь, группа или субъект-служба и + Выбрать членов.
В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш user@domain адрес электронной почты), а затем выберите в нижней части диалогового окна.
Нажмите кнопку Проверить и назначить, чтобы перейти на последнюю страницу, а затем еще раз Проверить и назначить, чтобы завершить процесс.
Отправка событий
В этом разделе описано, как создать скрипт Python для отправки событий в созданный ранее концентратор событий.
Откройте предпочитаемый редактор Python, например Visual Studio Code.
Создайте сценарий с именем send.py. Этот сценарий отправляет пакет событий в концентратор событий, созданный ранее.
Скопируйте приведенный ниже код в файл send.py.
В коде используйте реальные значения для замены следующих заполнителей:
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
EVENT_HUB_NAME
import asyncio from azure.eventhub import EventData from azure.eventhub.aio import EventHubProducerClient from azure.identity.aio import DefaultAzureCredential EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def run(): # Create a producer client to send messages to the event hub. # Specify a credential that has correct role assigned to access # event hubs namespace and the event hub name. producer = EventHubProducerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, credential=credential, ) async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData("First event ")) event_data_batch.add(EventData("Second event")) event_data_batch.add(EventData("Third event")) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) # Close credential when no longer needed. await credential.close() asyncio.run(run())
Примечание.
Примеры других вариантов отправки событий в Концентратор событий асинхронно с помощью строка подключения см. на странице send_async.py GitHub. Приведенные шаблоны также применимы к отправке событий без пароля.
Получение событий
В этом кратком руководстве в качестве хранилища контрольных точек используется хранилище BLOB-объектов Azure. Хранилище контрольных точек используется для сохранения контрольных точек (т. е. последних позиций чтения).
Следуйте этим рекомендациям при использовании Хранилище BLOB-объектов Azure в качестве хранилища контрольных точек:
- Используйте отдельный контейнер для каждой группы потребителей. Вы можете использовать одну и ту же учетную запись хранения, но использовать один контейнер для каждой группы.
- Не используйте контейнер для других компонентов и не используйте учетную запись хранения для других действий.
- Учетная запись хранения должна находиться в том же регионе, в который находится развернутое приложение. Если приложение находится в локальной среде, попробуйте выбрать ближайший регион.
На странице учетной записи хранения в портал Azure в разделе службы BLOB-объектов убедитесь, что следующие параметры отключены.
- Иерархическое пространство имен
- Обратимое удаление BLOB-объекта
- Управление версиями
Создание учетной записи хранения Azure и контейнера больших двоичных объектов
Создайте учетную запись хранения Azure и контейнер больших двоичных объектов в ней, выполнив следующие действия.
- Создайте учетную запись хранения Azure
- Создайте контейнер BLOB объектов.
- Проверка подлинности в контейнере BLOB-объектов.
Обязательно запишите строку подключения и имя контейнера для последующего использования в коде получения.
Если вы выполняете разработку локально, убедитесь, что учетная запись пользователя, через которую осуществляется доступ к данным BLOB-объектов, имеет правильные разрешения. Вам потребуется участник данных BLOB-объектов хранилища для чтения и записи данных BLOB-объектов. Чтобы назначить себе эту роль, вам потребуется назначить роль администратора доступа пользователей или другую роль, включающую действие Microsoft.Authorization/roleAssignments/write . Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Дополнительные сведения о доступных областях назначения ролей можно узнать на странице обзора области.
В этом сценарии вы назначите разрешения учетной записи пользователя, которая ограничена учетной записью хранения, чтобы обеспечить соблюдение принципа минимальных привилегий. В рамках этой практики пользователям предоставляются только минимальные необходимые разрешения, что позволяет создавать более защищенные рабочие среды.
В следующем примере роль участника данных BLOB-объектов хранилища назначается учетной записи пользователя, которая предоставляет доступ как для чтения, так и записи к данным BLOB-объектов в вашей учетной записи хранения.
Внимание
В большинстве случаев для распространения назначения ролей в Azure потребуется минута или две, но в редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.
На портале Azure найдите свою учетную запись хранения, воспользовавшись основной панелью поиска или областью навигации слева.
На странице обзора учетной записи хранения выберите Контроль доступа (IAM) в меню слева.
На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.
Выберите + Добавить в верхнем меню, а затем выберите Добавить назначение роли в появившемся раскрывающемся меню.
Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите участника данных BLOB-объектов хранилища и выберите соответствующий результат, а затем нажмите кнопку Далее.
В разделе Назначение доступа для выберите Пользователь, группа или субъект-служба и + Выбрать членов.
В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш user@domain адрес электронной почты), а затем выберите в нижней части диалогового окна.
Нажмите кнопку Проверить и назначить, чтобы перейти на последнюю страницу, а затем еще раз Проверить и назначить, чтобы завершить процесс.
Установка пакетов для получения событий
Для принимающей стороны необходимо установить один или несколько пакетов. В этом кратком руководстве вы используете хранилище BLOB-объектов Azure для сохранения контрольных точек, чтобы программа не читала уже прочитанные события. Оно регулярно выполняет фиксацию метаданных для полученных сообщений в большом двоичном объекте. Благодаря такому подходу позже можно легко продолжить получать сообщения с того места, где вы остановились.
pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity
Создание сценария Python для получения событий
В этом разделе вы создадите скрипт Python для получения событий из концентратора событий.
Откройте предпочитаемый редактор Python, например Visual Studio Code.
Создайте сценарий с именем recv.py.
Вставьте в recv.py следующий код.
В коде используйте реальные значения для замены следующих заполнителей:
BLOB_STORAGE_ACCOUNT_URL
BLOB_CONTAINER_NAME
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
EVENT_HUB_NAME
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import ( BlobCheckpointStore, ) from azure.identity.aio import DefaultAzureCredential BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL" BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME" EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def on_event(partition_context, event): # Print the event data. print( 'Received the event: "{}" from the partition with ID: "{}"'.format( event.body_as_str(encoding="UTF-8"), partition_context.partition_id ) ) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event) async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore( blob_account_url=BLOB_STORAGE_ACCOUNT_URL, container_name=BLOB_CONTAINER_NAME, credential=credential, ) # Create a consumer client for the event hub. client = EventHubConsumerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, consumer_group="$Default", checkpoint_store=checkpoint_store, credential=credential, ) async with client: # Call the receive method. Read from the beginning of the partition # (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") # Close credential when no longer needed. await credential.close() if __name__ == "__main__": # Run the main method. asyncio.run(main())
Примечание.
Примеры других вариантов получения событий из Концентратора событий асинхронно с помощью строка подключения см. на странице recv_with_checkpoint_store_async.py GitHub. Отображаемые шаблоны также применимы к получению событий без пароля.
Запуск приложения получателя
Чтобы выполнить сценарий, откройте окно командной строки с Python в пути, а затем выполните следующую команду.
python recv.py
Запуск приложения отправителя
Чтобы выполнить сценарий, откройте окно командной строки с Python в пути, а затем выполните следующую команду.
python send.py
В окне получателя должны отобразиться сообщения, отправленные в концентратор событий.
Устранение неполадок
Если события не отображаются в окне приемника или код сообщает об ошибке, попробуйте выполнить следующие советы по устранению неполадок:
Если результаты не отображаются из recy.py, выполните send.py несколько раз.
Если при использовании кода без пароля (с учетными данными) возникают ошибки о "coroutine" (с учетными данными), убедитесь, что вы используете импорт из
azure.identity.aio
.Если отображается "Незакрытый сеанс клиента" с кодом без пароля (с учетными данными), убедитесь, что вы закройте учетные данные после завершения. Дополнительные сведения см. в разделе "Асинхронные учетные данные".
Если при доступе к хранилищу возникают ошибки авторизации с recv.py , выполните действия, описанные в разделе "Создание учетной записи хранения Azure" и контейнера BLOB-объектов , а также назначение роли участника данных BLOB-объектов хранилища субъекту-службе.
Если вы получаете события с различными идентификаторами секций, ожидается этот результат. Секции — это способ организации данных, который соотносится со степенью параллелизма подчиненных элементов, требуемой для работы потребляющих приложений. Поэтому количество секций в концентраторе событий непосредственно связано с предполагаемым числом параллельных модулей чтения. Дополнительные сведения см. в разделе "Дополнительные сведения о секциях".
Следующие шаги
В этом кратком руководстве события отправлены и получены асинхронно. Чтобы узнать, как отправлять и получать события синхронно, перейдите на страницу GitHub sync_samples.
Для всех примеров (синхронных и асинхронных) в GitHub перейдите на страницу Azure Event Hubs client library for Python Samples (Клиентская библиотека Центров событий Azure для примеров Python).