Уреди

Делите путем


Send events to or receive events from event hubs by using Python

This quickstart shows how to send events to and receive events from an event hub using the azure-eventhub Python package.

Prerequisites

If you're new to Azure Event Hubs, see Event Hubs overview before you do this quickstart.

To complete this quickstart, you need the following prerequisites:

  • Microsoft Azure subscription. To use Azure services, including Azure Event Hubs, you need a subscription. If you don't have an existing Azure account, sign up for a free trial.
  • Python 3.8 or later, with pip installed and updated.
  • Visual Studio Code (recommended) or any other integrated development environment (IDE).
  • Create an Event Hubs namespace and an event hub. The first step is to use the Azure portal to create an Event Hubs namespace, and obtain the management credentials that your application needs to communicate with the event hub. To create a namespace and an event hub, follow the procedure in this article.

Install the packages to send events

To install the Python packages for Event Hubs, open a command prompt that has Python in its path. Change the directory to the folder where you want to keep your samples.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Authenticate the app to Azure

This quickstart shows you two ways of connecting to Azure Event Hubs: passwordless and connection string. The first option shows you how to use your security principal in Microsoft Entra ID and role-based access control (RBAC) to connect to an Event Hubs namespace. You don't need to worry about having hard-coded connection strings in your code or in a configuration file or in a secure storage like Azure Key Vault. The second option shows you how to use a connection string to connect to an Event Hubs namespace. If you're new to Azure, you may find the connection string option easier to follow. We recommend using the passwordless option in real-world applications and production environments. For more information, see Authentication and authorization. You can also read more about passwordless authentication on the overview page.

Assign roles to your Microsoft Entra user

When developing locally, make sure that the user account that connects to Azure Event Hubs has the correct permissions. You'll need the Azure Event Hubs Data Owner role in order to send and receive messages. To assign yourself this role, you'll need the User Access Administrator role, or another role that includes the Microsoft.Authorization/roleAssignments/write action. You can assign Azure RBAC roles to a user using the Azure portal, Azure CLI, or Azure PowerShell. Learn more about the available scopes for role assignments on the scope overview page.

The following example assigns the Azure Event Hubs Data Owner role to your user account, which provides full access to Azure Event Hubs resources. In a real scenario, follow the Principle of Least Privilege to give users only the minimum permissions needed for a more secure production environment.

Azure built-in roles for Azure Event Hubs

For Azure Event Hubs, the management of namespaces and all related resources through the Azure portal and the Azure resource management API is already protected using the Azure RBAC model. Azure provides the below Azure built-in roles for authorizing access to an Event Hubs namespace:

If you want to create a custom role, see Rights required for Event Hubs operations.

Important

In most cases, it will take a minute or two for the role assignment to propagate in Azure. In rare cases, it may take up to eight minutes. If you receive authentication errors when you first run your code, wait a few moments and try again.

  1. In the Azure portal, locate your Event Hubs namespace using the main search bar or left navigation.

  2. On the overview page, select Access control (IAM) from the left-hand menu.

  3. On the Access control (IAM) page, select the Role assignments tab.

  4. Select + Add from the top menu and then Add role assignment from the resulting drop-down menu.

    A screenshot showing how to assign a role.

  5. Use the search box to filter the results to the desired role. For this example, search for Azure Event Hubs Data Owner and select the matching result. Then choose Next.

  6. Under Assign access to, select User, group, or service principal, and then choose + Select members.

  7. In the dialog, search for your Microsoft Entra username (usually your user@domain email address) and then choose Select at the bottom of the dialog.

  8. Select Review + assign to go to the final page, and then Review + assign again to complete the process.

Send events

In this section, create a Python script to send events to the event hub that you created earlier.

  1. Open your favorite Python editor, such as Visual Studio Code.

  2. Create a script called send.py. This script sends a batch of events to the event hub that you created earlier.

  3. Paste the following code into send.py:

    In the code, use real values to replace the following placeholders:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Note

    For examples of other options for sending events to Event Hub asynchronously using a connection string, see the GitHub send_async.py page. The patterns shown there are also applicable to sending events passwordless.

Receive events

This quickstart uses Azure Blob storage as a checkpoint store. The checkpoint store is used to persist checkpoints (that is, the last read positions).

Follow these recommendations when using Azure Blob Storage as a checkpoint store:

  • Use a separate container for each consumer group. You can use the same storage account, but use one container per each group.
  • Don't use the container for anything else, and don't use the storage account for anything else.
  • Storage account should be in the same region as the deployed application is located in. If the application is on-premises, try to choose the closest region possible.

On the Storage account page in the Azure portal, in the Blob service section, ensure that the following settings are disabled.

  • Hierarchical namespace
  • Blob soft delete
  • Versioning

Create an Azure storage account and a blob container

Create an Azure storage account and a blob container in it by doing the following steps:

  1. Create an Azure Storage account
  2. Create a blob container.
  3. Authenticate to the blob container.

Be sure to record the connection string and container name for later use in the receive code.

When developing locally, make sure that the user account that is accessing blob data has the correct permissions. You'll need Storage Blob Data Contributor to read and write blob data. To assign yourself this role, you'll need to be assigned the User Access Administrator role, or another role that includes the Microsoft.Authorization/roleAssignments/write action. You can assign Azure RBAC roles to a user using the Azure portal, Azure CLI, or Azure PowerShell. You can learn more about the available scopes for role assignments on the scope overview page.

In this scenario, you'll assign permissions to your user account, scoped to the storage account, to follow the Principle of Least Privilege. This practice gives users only the minimum permissions needed and creates more secure production environments.

The following example will assign the Storage Blob Data Contributor role to your user account, which provides both read and write access to blob data in your storage account.

Important

In most cases it will take a minute or two for the role assignment to propagate in Azure, but in rare cases it may take up to eight minutes. If you receive authentication errors when you first run your code, wait a few moments and try again.

  1. In the Azure portal, locate your storage account using the main search bar or left navigation.

  2. On the storage account overview page, select Access control (IAM) from the left-hand menu.

  3. On the Access control (IAM) page, select the Role assignments tab.

  4. Select + Add from the top menu and then Add role assignment from the resulting drop-down menu.

    A screenshot showing how to assign a storage account role.

  5. Use the search box to filter the results to the desired role. For this example, search for Storage Blob Data Contributor and select the matching result and then choose Next.

  6. Under Assign access to, select User, group, or service principal, and then choose + Select members.

  7. In the dialog, search for your Microsoft Entra username (usually your user@domain email address) and then choose Select at the bottom of the dialog.

  8. Select Review + assign to go to the final page, and then Review + assign again to complete the process.

Install the packages to receive events

For the receiving side, you need to install one or more packages. In this quickstart, you use Azure Blob storage to persist checkpoints so that the program doesn't read the events that it has already read. It performs metadata checkpoints on received messages at regular intervals in a blob. This approach makes it easy to continue receiving messages later from where you left off.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Create a Python script to receive events

In this section, you create a Python script to receive events from your event hub:

  1. Open your favorite Python editor, such as Visual Studio Code.

  2. Create a script called recv.py.

  3. Paste the following code into recv.py:

    In the code, use real values to replace the following placeholders:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Note

    For examples of other options for receiving events from Event Hub asynchronously using a connection string, see the GitHub recv_with_checkpoint_store_async.py page. The patterns shown there are also applicable to receiving events passwordless.

Run the receiver app

To run the script, open a command prompt that has Python in its path, and then run this command:

python recv.py

Run the sender app

To run the script, open a command prompt that has Python in its path, and then run this command:

python send.py

The receiver window should display the messages that were sent to the event hub.

Troubleshooting

If you don't see events in the receiver window or the code reports an error, try the following troubleshooting tips:

  • If you don't see results from recy.py, run send.py several times.

  • If you see errors about "coroutine" when using the passwordless code (with credentials), make sure you're using importing from azure.identity.aio.

  • If you see "Unclosed client session" with passwordless code (with credentials), make sure you close the credential when finished. For more information, see Async credentials.

  • If you see authorization errors with recv.py when accessing storage, make sure you followed the steps in Create an Azure storage account and a blob container and assigned the Storage Blob Data Contributor role to the service principal.

  • If you receive events with different partition IDs, this result is expected. Partitions are a data organization mechanism that relates to the downstream parallelism required in consuming applications. The number of partitions in an event hub directly relates to the number of concurrent readers you expect to have. For more information, see Learn more about partitions.

Next steps

In this quickstart, you've sent and received events asynchronously. To learn how to send and receive events synchronously, go to the GitHub sync_samples page.

For all the samples (both synchronous and asynchronous) on GitHub, go to Azure Event Hubs client library for Python samples.