Enviar eventos para ou receber eventos de hubs de eventos com Python

Este guia de início rápido mostra como enviar e receber eventos de um hub de eventos usando o pacote Python azure-eventhub .

Pré-requisitos

Se você é novo nos Hubs de Eventos do Azure, consulte Visão geral dos Hubs de Eventos antes de fazer este início rápido.

Para concluir este início rápido, você precisa dos seguintes pré-requisitos:

Instale os pacotes para enviar eventos

Para instalar os pacotes Python para Hubs de Eventos, abra um prompt de comando que tenha Python em seu caminho. Altere o diretório para a pasta onde você deseja manter suas amostras.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Autenticar o aplicativo no Azure

Este guia de início rápido mostra duas maneiras de se conectar aos Hubs de Eventos do Azure: sem senha e cadeia de conexão. A primeira opção mostra como usar sua entidade de segurança no Microsoft Entra ID e RBAC (controle de acesso baseado em função) para se conectar a um namespace de Hubs de Eventos. Você não precisa se preocupar em ter cadeias de conexão codificadas em seu código ou em um arquivo de configuração ou em um armazenamento seguro como o Azure Key Vault. A segunda opção mostra como usar uma cadeia de conexão para se conectar a um namespace de Hubs de Eventos. Se você for novo no Azure, poderá achar a opção de cadeia de conexão mais fácil de seguir. Recomendamos o uso da opção sem senha em aplicativos e ambientes de produção do mundo real. Para obter mais informações, consulte Autenticação e autorização. Você também pode ler mais sobre autenticação sem senha na página de visão geral.

Atribuir funções ao usuário do Microsoft Entra

Ao desenvolver localmente, verifique se a conta de usuário que se conecta aos Hubs de Eventos do Azure tem as permissões corretas. Você precisará da função Proprietário de Dados dos Hubs de Eventos do Azure para enviar e receber mensagens. Para atribuir essa função a si mesmo, você precisará da função de Administrador de Acesso de Usuário ou de outra função que inclua a Microsoft.Authorization/roleAssignments/write ação. Você pode atribuir funções do RBAC do Azure a um usuário usando o portal do Azure, a CLI do Azure ou o Azure PowerShell. Saiba mais sobre os escopos disponíveis para atribuições de função na página de visão geral do escopo.

O exemplo a seguir atribui a Azure Event Hubs Data Owner função à sua conta de usuário, que fornece acesso total aos recursos dos Hubs de Eventos do Azure. Em um cenário real, siga o Princípio do Menor Privilégio para dar aos usuários apenas as permissões mínimas necessárias para um ambiente de produção mais seguro.

Funções internas do Azure para Hubs de Eventos do Azure

Para Hubs de Eventos do Azure, o gerenciamento de namespaces e todos os recursos relacionados por meio do portal do Azure e da API de gerenciamento de recursos do Azure já está protegido usando o modelo RBAC do Azure. O Azure fornece as seguintes funções internas do Azure para autorizar o acesso a um namespace de Hubs de Eventos:

Se quiser criar uma função personalizada, consulte Direitos necessários para operações de Hubs de Eventos.

Importante

Na maioria dos casos, levará um ou dois minutos para que a atribuição de função se propague no Azure. Em casos raros, pode demorar até oito minutos. Se você receber erros de autenticação quando executar o código pela primeira vez, aguarde alguns momentos e tente novamente.

  1. No portal do Azure, localize seu namespace de Hubs de Eventos usando a barra de pesquisa principal ou a navegação à esquerda.

  2. Na página de visão geral, selecione Controle de acesso (IAM) no menu à esquerda.

  3. Na página Controle de acesso (IAM), selecione a guia Atribuições de função.

  4. Selecione + Adicionar no menu superior e, em seguida, Adicionar atribuição de função no menu suspenso resultante.

    A screenshot showing how to assign a role.

  5. Use a caixa de pesquisa para filtrar os resultados para a função desejada. Para este exemplo, procure Azure Event Hubs Data Owner e selecione o resultado correspondente. Em seguida, escolha Avançar.

  6. Em Atribuir acesso a, selecione Utilizador, grupo ou entidade de serviço e, em seguida, selecione + Selecionar membros.

  7. Na caixa de diálogo, procure seu nome de usuário do Microsoft Entra (geralmente seu endereço de e-mail user@domain ) e escolha Selecionar na parte inferior da caixa de diálogo.

  8. Selecione Rever + atribuir para ir para a página final e, em seguida, Rever + atribuir novamente para concluir o processo.

Enviar eventos

Nesta seção, crie um script Python para enviar eventos para o hub de eventos que você criou anteriormente.

  1. Abra seu editor Python favorito, como o Visual Studio Code.

  2. Crie um script chamado send.py. Esse script envia um lote de eventos para o hub de eventos que você criou anteriormente.

  3. Cole o seguinte código em send.py:

    No código, use valores reais para substituir os seguintes espaços reservados:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Nota

    Para obter exemplos de outras opções para enviar eventos para o Hub de Eventos de forma assíncrona usando uma cadeia de conexão, consulte a página send_async.py do GitHub. Os padrões mostrados também são aplicáveis ao envio de eventos sem senha.

Receber eventos

Este guia de início rápido usa o armazenamento de Blob do Azure como um armazenamento de ponto de verificação. O armazenamento de pontos de verificação é usado para persistir pontos de verificação (ou seja, as últimas posições lidas).

Siga estas recomendações ao usar o Armazenamento de Blobs do Azure como um armazenamento de ponto de verificação:

  • Use um contêiner separado para cada grupo de consumidores. Você pode usar a mesma conta de armazenamento, mas usar um contêiner por cada grupo.
  • Não use o contêiner para mais nada e não use a conta de armazenamento para mais nada.
  • A conta de armazenamento deve estar na mesma região em que o aplicativo implantado está localizado. Se o aplicativo for local, tente escolher a região mais próxima possível.

Na página Conta de armazenamento no portal do Azure, na seção Serviço de Blob, verifique se as configurações a seguir estão desabilitadas.

  • Espaço de nomes hierárquico
  • Eliminação de forma recuperável de blobs
  • Controlo de Versão

Criar uma conta de armazenamento do Azure e um contêiner de blob

Crie uma conta de armazenamento do Azure e um contêiner de blob nela executando as seguintes etapas:

  1. Criar uma conta de Armazenamento do Azure
  2. Crie um contêiner de blob.
  3. Autentique-se no contêiner de blob.

Certifique-se de registrar a cadeia de conexão e o nome do contêiner para uso posterior no código de recebimento.

Ao desenvolver localmente, certifique-se de que a conta de usuário que está acessando dados de blob tem as permissões corretas. Você precisará do Storage Blob Data Contributor para ler e gravar dados de blob. Para atribuir essa função a si mesmo, você precisará receber a função de Administrador de Acesso de Usuário ou outra função que inclua a ação Microsoft.Authorization/roleAssignments/write. Você pode atribuir funções do RBAC do Azure a um usuário usando o portal do Azure, a CLI do Azure ou o Azure PowerShell. Você pode saber mais sobre os escopos disponíveis para atribuições de função na página de visão geral do escopo.

Nesse cenário, você atribuirá permissões à sua conta de usuário, com escopo para a conta de armazenamento, para seguir o Princípio do Menor Privilégio. Essa prática oferece aos usuários apenas as permissões mínimas necessárias e cria ambientes de produção mais seguros.

O exemplo a seguir atribuirá a função de Colaborador de Dados de Blob de Armazenamento à sua conta de usuário, que fornece acesso de leitura e gravação aos dados de blob em sua conta de armazenamento.

Importante

Na maioria dos casos, levará um ou dois minutos para que a atribuição de função se propague no Azure, mas, em casos raros, pode levar até oito minutos. Se você receber erros de autenticação quando executar o código pela primeira vez, aguarde alguns momentos e tente novamente.

  1. No portal do Azure, localize sua conta de armazenamento usando a barra de pesquisa principal ou a navegação à esquerda.

  2. Na página de visão geral da conta de armazenamento, selecione Controle de acesso (IAM) no menu à esquerda.

  3. Na página Controle de acesso (IAM), selecione a guia Atribuições de função.

  4. Selecione + Adicionar no menu superior e, em seguida, Adicionar atribuição de função no menu suspenso resultante.

    A screenshot showing how to assign a storage account role.

  5. Use a caixa de pesquisa para filtrar os resultados para a função desejada. Para este exemplo, procure por Storage Blob Data Contributor e selecione o resultado correspondente e, em seguida, escolha Next.

  6. Em Atribuir acesso a, selecione Utilizador, grupo ou entidade de serviço e, em seguida, selecione + Selecionar membros.

  7. Na caixa de diálogo, procure seu nome de usuário do Microsoft Entra (geralmente seu endereço de e-mail user@domain ) e escolha Selecionar na parte inferior da caixa de diálogo.

  8. Selecione Rever + atribuir para ir para a página final e, em seguida, Rever + atribuir novamente para concluir o processo.

Instalar os pacotes para receber eventos

Para o lado recetor, você precisa instalar um ou mais pacotes. Neste início rápido, você usa o armazenamento de Blob do Azure para persistir pontos de verificação para que o programa não leia os eventos que já leu. Ele executa pontos de verificação de metadados em mensagens recebidas em intervalos regulares em um blob. Essa abordagem facilita continuar recebendo mensagens mais tarde de onde você parou.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Criar um script Python para receber eventos

Nesta seção, você cria um script Python para receber eventos do seu hub de eventos:

  1. Abra seu editor Python favorito, como o Visual Studio Code.

  2. Crie um script chamado recv.py.

  3. Cole o seguinte código no recv.py:

    No código, use valores reais para substituir os seguintes espaços reservados:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Nota

    Para obter exemplos de outras opções para receber eventos do Hub de Eventos de forma assíncrona usando uma cadeia de conexão, consulte a página recv_with_checkpoint_store_async.py do GitHub. Os padrões mostrados também são aplicáveis ao recebimento de eventos sem senha.

Execute o aplicativo recetor

Para executar o script, abra um prompt de comando que tenha Python em seu caminho e, em seguida, execute este comando:

python recv.py

Executar o aplicativo de remetente

Para executar o script, abra um prompt de comando que tenha Python em seu caminho e, em seguida, execute este comando:

python send.py

A janela do recetor deve exibir as mensagens que foram enviadas para o hub de eventos.

Resolução de Problemas

Se você não vir eventos na janela do recetor ou se o código relatar um erro, tente as seguintes dicas de solução de problemas:

  • Se não vir os resultados do recy.py, execute send.py várias vezes.

  • Se você vir erros sobre "co-rotina" ao usar o código sem senha (com credenciais), verifique se está usando a importação do azure.identity.aio.

  • Se vir "Sessão de cliente não fechada" com código sem palavra-passe (com credenciais), certifique-se de que fecha a credencial quando terminar. Para obter mais informações, consulte Credenciais assíncronas.

  • Se você vir erros de autorização com recv.py ao acessar o armazenamento, siga as etapas em Criar uma conta de armazenamento do Azure e um contêiner de blob e atribua a função de Colaborador de Dados de Blob de Armazenamento à entidade de serviço.

  • Se você receber eventos com IDs de partição diferentes, esse resultado é esperado. As partições são um mecanismo de organização de dados relacionado com o paralelismo a jusante necessário nas aplicações de consumo. O número de partições num hub de eventos está diretamente relacionado com o número de leitores simultâneos que espera ter. Para obter mais informações, consulte Saiba mais sobre partições.

Próximos passos

Neste início rápido, você enviou e recebeu eventos de forma assíncrona. Para saber como enviar e receber eventos de forma síncrona, vá para a página sync_samples do GitHub.

Para todos os exemplos (síncronos e assíncronos) no GitHub, vá para a biblioteca de cliente dos Hubs de Eventos do Azure para exemplos de Python.