Enviar ou receber eventos dos Hubs de Eventos usando o Python

Este início rápido mostra como enviar e receber eventos de um hub de eventos usando o pacote Python azure-eventhub.

Pré-requisitos

Se você estiver conhecendo agora os Hubs de Eventos do Azure, confira Visão geral dos Hubs de Eventos antes de prosseguir com este início rápido.

Para concluir este início rápido, você precisará dos seguintes pré-requisitos:

  • Assinatura do Microsoft Azure. Para usar os serviços do Azure, incluindo os Hubs de Eventos do Azure, você precisa ter uma assinatura. Caso não tenha uma conta do Azure existente, inscreva-se para uma avaliação gratuita.
  • Python 3.8 ou posterior com PIP instalado e atualizado.
  • Visual Studio Code (recomendado) ou qualquer outro IDE (ambiente de desenvolvimento integrado).
  • Criar um namespace de Hubs de Eventos e um hub de eventos. A primeira etapa é usar o portal do Azure para criar um namespace dos Hubs de Eventos e obter as credenciais de gerenciamento que seu aplicativo precisa para se comunicar com o hub de eventos. Para criar um namespace e um hub de eventos, siga o procedimento nesse artigo.

Instalar os pacotes para enviar eventos

Para instalar os pacotes do Python para Hubs de Eventos, abra um prompt de comando que tenha Python no caminho. Altere o diretório para a pasta em que você deseja manter os exemplos.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Autenticar o aplicativo no Azure

Este guia de início rápido mostra duas maneiras de se conectar aos Hubs de Eventos do Azure: sem senha e cadeia de conexão. A primeira opção mostra como usar sua entidade de segurança no Microsoft Entra ID e no RBAC (controle de acesso baseado em função) para se conectar a um namespace dos Hubs de Eventos. Você não precisa se preocupar em ter cadeias de conexão embutidas no código, em um arquivo de configuração ou em um armazenamento seguro, como o Azure Key Vault. A segunda opção mostra como usar uma cadeia de conexão para se conectar a um namespace dos Hubs de Eventos. Se você é novo no Azure, talvez ache a opção de cadeia de conexão mais fácil de ser seguida. É recomendável usar a opção sem senha em aplicativos do mundo real e ambientes de produção. Para obter mais informações, confira Autenticação e autorização. Você também pode ler mais sobre a autenticação sem senha na página de visão geral.

Atribuir funções ao usuário do Microsoft Entra

No caso de desenvolvimento local, verifique se a conta de usuário que se conecta aos Hubs de Eventos do Azure tem as permissões corretas. Você precisará da função Proprietário de Dados dos Hubs de Eventos do Azure para enviar e receber mensagens. Para atribuir essa função a si mesmo, você precisará da função Administrador de acesso do usuário ou de outra função que inclua a ação Microsoft.Authorization/roleAssignments/write. É possível atribuir funções RBAC do Azure a um usuário usando o portal do Azure, a CLI do Azure ou o Azure PowerShell. Saiba mais sobre os escopos disponíveis para atribuições de função na página de visão geral do escopo.

O exemplo a seguir atribui a função Azure Event Hubs Data Owner à sua conta de usuário, que fornece acesso completo aos recursos dos Hubs de Eventos do Azure. Em um cenário real, siga o Princípio do Privilégio Mínimo para dar aos usuários apenas as permissões mínimas necessárias para um ambiente de produção mais seguro.

Funções interna do Azure par Hubs de Eventos do Azure

Para os Hubs de Eventos do Azure, o gerenciamento de namespaces e de todos os recursos relacionados por meio do portal do Azure e da API de gerenciamento de recursos do Azure já está protegido pelo modelo Azure RBAC. O Azure fornece as funções internas do Azure abaixo para autorizar o acesso a um namespace dos Hubs de Eventos:

Se você quiser criar uma função personalizada, confira Direitos exigidos para operações dos Hubs de Eventos.

Importante

Na maioria dos casos, levará um ou dois minutos para a atribuição de função se propagar no Azure. Em casos raros, poderá levar até oito minutos. Se você receber erros de autenticação ao executar o código pela primeira vez, aguarde alguns instantes e tente novamente.

  1. No portal do Azure, localize o namespace dos Hubs de Eventos usando a barra de pesquisa principal ou a navegação à esquerda.

  2. Na página de visão geral, selecione Controle de acesso (IAM) no menu à esquerda.

  3. Na página Controle de acesso (IAM), selecione a guia Atribuições de função.

  4. Selecione + Adicionar no menu superior e, em seguida, Adicionar atribuição de função no menu suspenso resultante.

    A screenshot showing how to assign a role.

  5. Use a caixa de pesquisa para filtrar os resultados para a função desejada. Neste exemplo, pesquise Azure Event Hubs Data Owner e selecione o resultado correspondente. Em seguida, escolha Avançar.

  6. Em Atribuir acesso a, selecione Usuário, grupo ou entidade de serviço e, em seguida, selecione + Selecionar membros.

  7. No diálogo, pesquise seu nome de usuário do Microsoft Entra (geralmente seu endereço de email user@domain) e escolha Selecionar na parte inferior do diálogo.

  8. Selecione Revisar + atribuir para ir para a página final e, em seguida, Revisar + atribuir novamente para concluir o processo.

Enviar eventos

Nesta seção, crie um script Python para enviar eventos para o hub de eventos que você criou anteriormente.

  1. Abra seu editor favorito do Python, como o Visual Studio Code.

  2. Crie um script chamado send.py. Esse script envia um lote de eventos para o hub de eventos que você criou anteriormente.

  3. Cole o seguinte código em sender.py:

    No código, use valores reais para substituir os seguintes espaços reservados:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Observação

    Para ver exemplos de outras opções para enviar eventos ao Hub de Eventos de maneira assíncrona usando uma cadeia de conexão, confira a página send_async.py do GitHub. Os padrões mostrados também são aplicáveis ao envio de eventos sem senha.

Receber eventos

Este início rápido usa o Armazenamento de Blobs do Azure como um repositório de pontos de verificação. O repositório de pontos de verificação é usado para persistir pontos de verificação (ou seja, as últimas posições de leitura).

Siga estas recomendações ao usar Armazenamento de Blobs do Azure como um repositório de ponto de verificação:

  • Use um contêiner separado para cada grupo de consumidores. Você pode usar a mesma conta de armazenamento, mas usar um contêiner por cada grupo.
  • Não use o contêiner para mais nada e não use a conta de armazenamento para mais nada.
  • A conta de armazenamento deve estar na mesma região em que o aplicativo implantado está localizado. Se o aplicativo for local, tente escolher a região mais próxima possível.

Na página Conta de armazenamento do portal do Azure, na seção serviço Blob, verifique se as seguintes configurações estão desabilitadas.

  • Namespace hierárquico
  • Exclusão temporária de blobs
  • Controle de versão

Criar uma conta de armazenamento e um contêiner de blobs do Azure

Crie uma conta de armazenamento do Azure e um contêiner de blobs nela executando as seguintes etapas:

  1. Crie uma conta de Armazenamento do Azure
  2. Criar um contêiner de blob.
  3. Autenticar no contêiner de blob.

Registre a cadeia de conexão e o nome do contêiner para usá-los posteriormente no código de recebimento.

Ao desenvolver localmente, verifique se a conta de usuário que está acessando os dados de blob tem as permissões corretas. Você precisará do Colaborador de Dados do Blob de Armazenamento para ler e gravar os dados de blob. Para atribuir essa função a si mesmo, você precisará receber a atribuição da função Administrador de Acesso do Usuário ou de outra função que inclua a ação Microsoft.Authorization/roleAssignments/write. É possível atribuir funções RBAC do Azure a um usuário usando o portal do Azure, a CLI do Azure ou o Azure PowerShell. Você pode saber mais sobre os escopos disponíveis para atribuições de função na página de visão geral do escopo.

Nesse cenário, você atribuirá permissões à sua conta de usuário, no escopo da conta de armazenamento, para seguir o Princípio do Privilégio Mínimo. Essa prática fornece aos usuários apenas as permissões mínimas necessárias e cria ambientes de produção mais seguros.

O exemplo a seguir atribuirá a função de Colaborador de Dados do Blob de Armazenamento à sua conta de usuário, que fornece acesso de leitura e gravação aos dados de blob na sua conta de armazenamento.

Importante

Na maioria dos casos, levará um ou dois minutos para a atribuição de função se propagar no Azure, mas em casos raros pode levar até oito minutos. Se você receber erros de autenticação ao executar o código pela primeira vez, aguarde alguns instantes e tente novamente.

  1. No portal do Azure, localize sua conta de armazenamento usando a barra de pesquisa principal ou a navegação à esquerda.

  2. Na página de visão geral da conta de armazenamento, selecione Controle de acesso (IAM) no menu à esquerda.

  3. Na página Controle de acesso (IAM), selecione a guia Atribuições de função.

  4. Selecione + Adicionar no menu superior e, em seguida, Adicionar atribuição de função no menu suspenso resultante.

    A screenshot showing how to assign a storage account role.

  5. Use a caixa de pesquisa para filtrar os resultados para a função desejada. Para este exemplo, pesquise o Colaborador de Dados do Blob de Armazenamento e selecione o resultado correspondente e, em seguida, escolha Avançar.

  6. Em Atribuir acesso a, selecione Usuário, grupo ou entidade de serviço e, em seguida, selecione + Selecionar membros.

  7. No diálogo, pesquise seu nome de usuário do Microsoft Entra (geralmente seu endereço de email user@domain) e escolha Selecionar na parte inferior do diálogo.

  8. Selecione Revisar + atribuir para ir para a página final e, em seguida, Revisar + atribuir novamente para concluir o processo.

Instalar os pacotes para receber eventos

Para o lado receptor, será necessário instalar mais dois pacotes. Neste início rápido, você usará o Armazenamento de Blobs do Azure para persistir pontos de verificação para que o programa não leia os eventos que ele já leu. Ele executa pontos de verificação de metadados em mensagens recebidas em intervalos regulares em um blob. Essa abordagem facilita continuar a receber mensagens mais tarde exatamente do ponto em que você parou.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Criar um script do Python para receber eventos

Nesta seção, você criará um script Python para receber eventos do hub de eventos:

  1. Abra seu editor favorito do Python, como o Visual Studio Code.

  2. Crie um script chamado recv.py.

  3. Cole o seguinte código em recv.py:

    No código, use valores reais para substituir os seguintes espaços reservados:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Observação

    Para ver exemplos de outras opções para receber eventos do Hub de Eventos de maneira assíncrona usando uma cadeia de conexão, confira a página recv_with_checkpoint_store_async.py do GitHub. Os padrões mostrados também são aplicáveis ao recebimento de eventos sem senha.

Executar o aplicativo receptor

Para executar o script, abra um prompt de comando que tenha Python no caminho e, em seguida, execute este comando:

python recv.py

Executar o aplicativo remetente

Para executar o script, abra um prompt de comando que tenha Python no caminho e, em seguida, execute este comando:

python send.py

A janela do destinatário deve exibir as mensagens que foram enviadas para o hub de eventos.

Solução de problemas

Se você não vir eventos na janela do receptor ou o código relatar um erro, tente as seguintes dicas de solução de problemas:

  • Se você não vir os resultados de recy.py, execute send.py várias vezes.

  • Se você vir erros sobre "corrotina" ao usar o código sem senha (com credenciais), verifique se está usando a importação de azure.identity.aio.

  • Se você vir "Sessão de cliente não revelada" com código sem senha (com credenciais), lembre-se de fechar a credencial quando terminar. Para saber mais, confira Credenciais assíncronas.

  • Se você vir erros de autorização com recv.py ao acessar o armazenamento, verifique se seguiu as etapas em Criar uma conta de armazenamento do Azure e um contêiner de blobs e atribuiu a função Colaborador de Dados de Blob de Armazenamento à entidade de serviço.

  • Se você receber eventos com IDs de partição diferentes, esse resultado é esperado. As partições são um mecanismo de organização de dados relacionados ao paralelismo de downstream necessário no consumo de aplicativos. O número de partições em um hub de eventos está diretamente relacionado ao número de leitores simultâneos que você espera ter. Para obter mais informações, confira Saiba mais sobre partições.

Próximas etapas

Neste início rápido, você enviou e recebeu eventos de maneira assíncrona. Para saber como enviar e receber eventos de maneira síncrona, acesse a página sync_samples do GitHub.

Para obter todos os exemplos (síncronos e assíncronos) no GitHub, acesse os exemplos da biblioteca de clientes dos Hubs de Eventos do Azure para Python.