Отправка и получение событий с помощью Python в Центрах событий

В этом кратком руководстве показано, как отправлять события в концентратор событий и получать события из него с помощью пакета Python azure-eventhub.

Необходимые компоненты

Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями, прежде чем приступить к работе с этим руководством.

Для работы с данным руководством необходимо следующее:

  • Подписка Microsoft Azure. Чтобы использовать службы Azure, в том числе Центры событий Azure, потребуется действующая подписка. Если у вас нет существующей учетной записи Azure, зарегистрируйтесь для получения бесплатной пробной версии.
  • Python 3.8 или более поздней версии с установленным и обновленным pip.
  • Visual Studio Code (рекомендовано) или любая другая интегрированная среда разработки (IDE).
  • Создайте пространство имен Центров событий и концентратор событий. Первым шагом является использование портал Azure для создания пространства имен Центров событий и получения учетных данных управления, необходимых приложению для взаимодействия с концентратором событий. Чтобы создать пространство имен и концентратор событий, выполните инструкции из этой статьи.

Установка пакетов для отправки событий

Чтобы установить пакеты Python для Центров событий, откройте командную строку с python в пути. Измените каталог на папку, в которой вы хотите сохранить примеры.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Проверка подлинности приложения в Azure

В этом кратком руководстве показано два способа подключения к Центры событий Azure: без пароля и строка подключения. Первый вариант показывает, как использовать субъект безопасности в идентификаторе Microsoft Entra ID и управлении доступом на основе ролей (RBAC) для подключения к пространству имен Центров событий. Вам не нужно беспокоиться о наличии жестко закодированных строка подключения в коде или в файле конфигурации или в безопасном хранилище, например Azure Key Vault. Второй вариант показывает, как использовать строка подключения для подключения к пространству имен Центров событий. Если вы не знакомы с Azure, вы можете найти вариант строка подключения проще следовать. Мы рекомендуем использовать параметр без пароля в реальных приложениях и рабочих средах. Дополнительные сведения см. в разделе "Проверка подлинности и авторизация". Дополнительные сведения о проверке подлинности без пароля см. на странице обзора.

Назначение ролей пользователю Microsoft Entra

При локальной разработке убедитесь, что учетная запись пользователя, которая подключается к Центры событий Azure имеет правильные разрешения. Для отправки и получения сообщений вам потребуется роль владельца данных Центры событий Azure. Чтобы назначить себе эту роль, вам потребуется роль Администратор istrator пользователя или другую роль, которая включает Microsoft.Authorization/roleAssignments/write действие. Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Узнайте больше о доступных область для назначений ролей на странице обзора область.

В следующем примере роль назначается Azure Event Hubs Data Owner учетной записи пользователя, которая предоставляет полный доступ к ресурсам Центры событий Azure. В реальном сценарии следуйте принципу наименьших привилегий , чтобы предоставить пользователям только минимальные разрешения, необходимые для более безопасной рабочей среды.

Встроенные роли Azure для Центров событий Azure

Для Центры событий Azure управление пространствами имен и всеми связанными ресурсами с помощью портал Azure и API управления ресурсами Azure уже защищено с помощью модели Azure RBAC. Azure предоставляет следующие встроенные роли Azure для авторизации доступа к пространству имен Центров событий:

Если вы хотите создать пользовательскую роль, см. раздел "Права", необходимые для операций Центров событий.

Важно!

В большинстве случаев для распространения назначения ролей в Azure потребуется несколько минут. В редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.

  1. В портал Azure найдите пространство имен Центров событий с помощью главной панели поиска или навигации слева.

  2. На странице обзора выберите элемент управления доступом (IAM) в меню слева.

  3. На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.

  4. Выберите + Добавить в верхнем меню, а затем выберите Добавить назначение роли в появившемся раскрывающемся меню.

    A screenshot showing how to assign a role.

  5. Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите Azure Event Hubs Data Owner и выберите соответствующий результат. Теперь щелкните Далее.

  6. В разделе Назначение доступа для выберите Пользователь, группа или субъект-служба и + Выбрать членов.

  7. В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш user@domain адрес электронной почты), а затем выберите в нижней части диалогового окна.

  8. Нажмите кнопку Проверить и назначить, чтобы перейти на последнюю страницу, а затем еще раз Проверить и назначить, чтобы завершить процесс.

Отправка событий

В этом разделе описано, как создать скрипт Python для отправки событий в созданный ранее концентратор событий.

  1. Откройте предпочитаемый редактор Python, например Visual Studio Code.

  2. Создайте сценарий с именем send.py. Этот сценарий отправляет пакет событий в концентратор событий, созданный ранее.

  3. Скопируйте приведенный ниже код в файл send.py.

    В коде используйте реальные значения для замены следующих заполнителей:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Примечание.

    Примеры других вариантов отправки событий в Концентратор событий асинхронно с помощью строка подключения см. на странице send_async.py GitHub. Приведенные шаблоны также применимы к отправке событий без пароля.

Получение событий

В этом кратком руководстве в качестве хранилища контрольных точек используется хранилище BLOB-объектов Azure. Хранилище контрольных точек используется для сохранения контрольных точек (т. е. последних позиций чтения).

Следуйте этим рекомендациям при использовании Хранилище BLOB-объектов Azure в качестве хранилища проверка point:

  • Используйте отдельный контейнер для каждой группы потребителей. Вы можете использовать одну и ту же учетную запись хранения, но использовать один контейнер для каждой группы.
  • Не используйте контейнер для других компонентов и не используйте учетную запись хранения для других действий.
  • служба хранилища учетная запись должна находиться в том же регионе, в который находится развернутое приложение. Если приложение находится в локальной среде, попробуйте выбрать ближайший регион.

На странице учетной записи служба хранилища в портал Azure в разделе службы BLOB-объектов убедитесь, что следующие параметры отключены.

  • Иерархическое пространство имен
  • Обратимое удаление BLOB-объекта
  • Управление версиями

Создание учетной записи хранения Azure и контейнера больших двоичных объектов

Создайте учетную запись хранения Azure и контейнер больших двоичных объектов в ней, выполнив следующие действия.

  1. Создайте учетную запись хранения Azure
  2. Создайте контейнер BLOB объектов.
  3. Проверка подлинности в контейнере BLOB-объектов.

Обязательно запишите строку подключения и имя контейнера для последующего использования в коде получения.

Если вы выполняете разработку локально, убедитесь, что учетная запись пользователя, через которую осуществляется доступ к данным BLOB-объектов, имеет правильные разрешения. Вам потребуется служба хранилища участник данных BLOB-объектов для чтения и записи данных BLOB-объектов. Чтобы назначить себе эту роль, вам потребуется назначить роль Администратор istrator для доступа пользователей или другую роль, включающую действие Microsoft.Authorization/roleAssignments/write. Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Дополнительные сведения о доступных областях назначения ролей можно узнать на странице обзора области.

В этом сценарии вы назначите разрешения учетной записи пользователя, которая ограничена учетной записью хранения, чтобы обеспечить соблюдение принципа минимальных привилегий. В рамках этой практики пользователям предоставляются только минимальные необходимые разрешения, что позволяет создавать более защищенные рабочие среды.

В следующем примере будет назначена роль участника данных BLOB-объектов служба хранилища учетной записи пользователя, которая предоставляет доступ для чтения и записи к данным BLOB-объектов в вашей учетной записи хранения.

Важно!

В большинстве случаев для распространения назначения ролей в Azure потребуется минута или две, но в редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.

  1. На портале Azure найдите свою учетную запись хранения, воспользовавшись основной панелью поиска или областью навигации слева.

  2. На странице обзора учетной записи хранения выберите Контроль доступа (IAM) в меню слева.

  3. На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.

  4. Выберите + Добавить в верхнем меню, а затем выберите Добавить назначение роли в появившемся раскрывающемся меню.

    A screenshot showing how to assign a storage account role.

  5. Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите участника данных BLOB-объектов хранилища и выберите соответствующий результат, а затем нажмите кнопку Далее.

  6. В разделе Назначение доступа для выберите Пользователь, группа или субъект-служба и + Выбрать членов.

  7. В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш user@domain адрес электронной почты), а затем выберите в нижней части диалогового окна.

  8. Нажмите кнопку Проверить и назначить, чтобы перейти на последнюю страницу, а затем еще раз Проверить и назначить, чтобы завершить процесс.

Установка пакетов для получения событий

Для принимающей стороны необходимо установить один или несколько пакетов. В этом кратком руководстве вы используете хранилище BLOB-объектов Azure для сохранения контрольных точек, чтобы программа не читала уже прочитанные события. Оно регулярно выполняет фиксацию метаданных для полученных сообщений в большом двоичном объекте. Благодаря такому подходу позже можно легко продолжить получать сообщения с того места, где вы остановились.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Создание сценария Python для получения событий

В этом разделе вы создадите скрипт Python для получения событий из концентратора событий.

  1. Откройте предпочитаемый редактор Python, например Visual Studio Code.

  2. Создайте сценарий с именем recv.py.

  3. Вставьте в recv.py следующий код.

    В коде используйте реальные значения для замены следующих заполнителей:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Примечание.

    Примеры других вариантов получения событий из Концентратора событий асинхронно с помощью строка подключения см. на странице recv_with_проверка point_store_async.py GitHub. Отображаемые шаблоны также применимы к получению событий без пароля.

Запуск приложения получателя

Чтобы выполнить сценарий, откройте окно командной строки с Python в пути, а затем выполните следующую команду.

python recv.py

Запуск приложения отправителя

Чтобы выполнить сценарий, откройте окно командной строки с Python в пути, а затем выполните следующую команду.

python send.py

В окне получателя должны отобразиться сообщения, отправленные в концентратор событий.

Устранение неполадок

Если события не отображаются в окне приемника или код сообщает об ошибке, попробуйте выполнить следующие советы по устранению неполадок:

  • Если результаты не отображаются из recy.py, выполните send.py несколько раз.

  • Если при использовании кода без пароля (с учетными данными) возникают ошибки о "coroutine" (с учетными данными), убедитесь, что вы используете импорт из azure.identity.aio.

  • Если отображается "Незакрытый сеанс клиента" с кодом без пароля (с учетными данными), убедитесь, что вы закройте учетные данные после завершения. Дополнительные сведения см. в разделе "Асинхронные учетные данные".

  • Если при доступе к хранилищу возникают ошибки авторизации с recv.py, выполните действия, описанные в статье "Создание учетной записи хранения Azure" и контейнера BLOB-объектов, а также назначение роли участника данных BLOB-объектов служба хранилища субъекту-службе.

  • Если вы получаете события с различными идентификаторами секций, ожидается этот результат. Секции — это способ организации данных, который соотносится со степенью параллелизма подчиненных элементов, требуемой для работы потребляющих приложений. Поэтому количество секций в концентраторе событий непосредственно связано с предполагаемым числом параллельных модулей чтения. Дополнительные сведения см. в разделе "Дополнительные сведения о секциях".

Следующие шаги

В этом кратком руководстве события отправлены и получены асинхронно. Чтобы узнать, как отправлять и получать события синхронно, перейдите на страницу GitHub sync_samples.

Для всех примеров (синхронных и асинхронных) в GitHub перейдите на страницу Azure Event Hubs client library for Python Samples (Клиентская библиотека Центров событий Azure для примеров Python).