Inviare o ricevere eventi da hub eventi con Python

Questa guida di avvio rapido illustra come inviare e ricevere eventi da un hub eventi con il pacchetto Python azure-eventhub.

Prerequisiti

Se non si ha familiarità con Hub eventi di Azure, vedere Panoramica di Hub eventi prima di procedere con questa guida di avvio rapido.

Per completare questa guida introduttiva è necessario soddisfare i prerequisiti seguenti:

  • Sottoscrizione di Microsoft Azure. Per usare i servizi di Azure, tra cui Hub eventi di Azure, è necessaria una sottoscrizione. Se non si ha un account Azure esistente, iscriversi per ottenere una versione di valutazione gratuita.
  • Python 3.7 o versione successiva, con pip installato e aggiornato.
  • Visual Studio Code (scelta consigliata) o qualsiasi altro IDE (Integrated Development Environment).
  • Creare uno spazio dei nomi di Hub eventi e un hub eventi. Il primo passaggio consiste nell'usare il portale di Azure per creare uno spazio dei nomi di Hub eventi e ottenere le credenziali di gestione necessarie all'applicazione per comunicare con l'hub eventi. Per creare uno spazio dei nomi e un hub eventi, seguire la procedura descritta in questo articolo.

Installare i pacchetti per l'invio di eventi

Per installare i pacchetti Python per Hub eventi, aprire un prompt dei comandi con Python nel percorso. Passare alla cartella in cui si vogliono mantenere gli esempi.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Autenticare l'app in Azure

Questa guida introduttiva illustra due modi per connettersi a Hub eventi di Azure: senza password e stringa di connessione. La prima opzione illustra come usare l'entità di sicurezza in Azure Active Directory e il controllo degli accessi in base al ruolo per connettersi a uno spazio dei nomi di Hub eventi. Non è necessario preoccuparsi di avere stringhe di connessione hardcoded nel codice o in un file di configurazione o in una risorsa di archiviazione sicura come Azure Key Vault. La seconda opzione mostra come usare una stringa di connessione per connettersi a uno spazio dei nomi di Hub eventi. Se non si ha familiarità con Azure, è possibile che l'opzione della stringa di connessione sia più semplice da seguire. È consigliabile usare l'opzione senza password nelle applicazioni reali e negli ambienti di produzione. Per altre informazioni, vedere Autenticazione e autorizzazione. Per altre informazioni sull'autenticazione senza password, vedere la pagina di panoramica.

Assegnare ruoli all'utente di Azure AD

Quando si sviluppa in locale, assicurarsi che l'account utente che si connette a Hub eventi di Azure disponga delle autorizzazioni corrette. Per inviare e ricevere messaggi, è necessario il ruolo di proprietario dei dati Hub eventi di Azure. Per assegnare a se stessi questo ruolo, è necessario il ruolo Amministratore accesso utenti o un altro ruolo che include l'azione Microsoft.Authorization/roleAssignments/write . È possibile assegnare ruoli controllo degli accessi in base al ruolo di Azure a un utente usando il portale di Azure, l'interfaccia della riga di comando di Azure o Azure PowerShell. Altre informazioni sugli ambiti disponibili per le assegnazioni di ruolo nella pagina di panoramica dell'ambito .

L'esempio seguente assegna il Azure Event Hubs Data Owner ruolo all'account utente, che fornisce l'accesso completo alle risorse Hub eventi di Azure. In uno scenario reale, seguire il principio dei privilegi minimi per concedere agli utenti solo le autorizzazioni minime necessarie per un ambiente di produzione più sicuro.

Ruoli predefiniti di Azure per Hub eventi di Azure

Per Hub eventi di Azure, la gestione degli spazi dei nomi e di tutte le risorse correlate tramite il portale di Azure e l'API di gestione delle risorse di Azure è già protetta usando il modello di Controllo degli accessi in base al ruolo di Azure. Azure offre i ruoli predefiniti di Azure seguenti per autorizzare l'accesso a uno spazio dei nomi di Hub eventi:

Per creare un ruolo personalizzato, vedere Diritti necessari per le operazioni di Hub eventi.

Importante

Nella maggior parte dei casi, la propagazione dell'assegnazione di ruolo in Azure richiederà un minuto o due. In rari casi, possono essere necessari fino a otto minuti. Se si ricevono errori di autenticazione quando si esegue il codice per la prima volta, attendere alcuni istanti e riprovare.

  1. Nella portale di Azure individuare lo spazio dei nomi di Hub eventi usando la barra di ricerca principale o lo spostamento a sinistra.

  2. Nella pagina di panoramica selezionare Controllo di accesso (IAM) dal menu a sinistra.

  3. Nella pagina Controllo di accesso (IAM) selezionare la scheda Assegnazioni di ruolo .

  4. Selezionare + Aggiungi dal menu in alto e quindi Aggiungi assegnazione di ruolo dal menu a discesa risultante.

    Screenshot che mostra come assegnare un ruolo.

  5. Usare la casella di ricerca per filtrare i risultati in base al ruolo desiderato. Per questo esempio, cercare Azure Event Hubs Data Owner e selezionare il risultato corrispondente. Scegliere quindi Avanti.

  6. In Assegna accesso a selezionare Utente, gruppo o entità servizio e quindi scegliere + Seleziona membri.

  7. Nella finestra di dialogo cercare il nome utente di Azure AD (in genere l'indirizzo di posta elettronica user@domain ) e quindi scegliere Seleziona nella parte inferiore della finestra di dialogo.

  8. Selezionare Rivedi e assegna per passare alla pagina finale e quindi rivedi e assegna di nuovo per completare il processo.

Inviare eventi

In questa sezione creare uno script Python per inviare eventi all'hub eventi creato in precedenza.

  1. Aprire l'editor preferito di Python, ad esempio Visual Studio Code.

  2. Creare uno script denominato send.py. Questo script invia un batch di eventi all'hub eventi creato in precedenza.

  3. Incollare il codice seguente in send.py:

    Nel codice usare i valori reali per sostituire i segnaposto seguenti:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Nota

    Per esempi di altre opzioni per l'invio di eventi all'hub eventi in modo asincrono usando una stringa di connessione, vedere la pagina GitHub send_async.py. I modelli mostrati sono applicabili anche all'invio di eventi senza password.

Ricevere eventi

In questo argomento di avvio rapido si usa archiviazione BLOB di Azure come archivio di checkpoint. L'archivio di checkpoint viene usato per rendere persistenti i checkpoint (ovvero le ultime posizioni di lettura).

Seguire queste raccomandazioni quando si usano Archiviazione BLOB di Azure come archivio checkpoint:

  • Usare un contenitore separato per ogni gruppo di processori. È possibile usare lo stesso account di archiviazione, ma usare un contenitore per ogni gruppo.
  • Non usare il contenitore per altri elementi e non usare l'account di archiviazione per altri elementi.
  • L'account di archiviazione deve trovarsi nella stessa area in cui si trova l'applicazione distribuita. Se l'applicazione è locale, provare a scegliere l'area più vicina possibile.

Nella pagina Account di archiviazione nella portale di Azure, nella sezione Servizio BLOB verificare che le impostazioni seguenti siano disabilitate.

  • Spazio dei nomi gerarchico
  • Eliminazione temporanea DEL BLOB
  • Controllo delle versioni

Creare un account di archiviazione di Azure e un contenitore BLOB

Per creare un account di archiviazione di Azure e un contenitore BLOB al suo interno, eseguire queste operazioni:

  1. Creare un account di archiviazione di Azure
  2. Creare un contenitore BLOB.
  3. Eseguire l'autenticazione nel contenitore BLOB.

Assicurarsi di prendere nota della stringa di connessione e del nome del contenitore per un uso successivo nel codice di ricezione.

Quando si sviluppa in locale, assicurarsi che l'account utente che accede ai dati BLOB disponga delle autorizzazioni corrette. È necessario collaboratore ai dati BLOB di archiviazione per leggere e scrivere dati BLOB. Per assegnare questo ruolo, è necessario assegnare il ruolo Amministratore accesso utente o un altro ruolo che include l'azione Microsoft.Authorization/roleAssignments/write . È possibile assegnare ruoli di controllo degli accessi in base al ruolo di Azure a un utente usando l'portale di Azure, l'interfaccia della riga di comando di Azure o Azure PowerShell. Per altre informazioni sugli ambiti disponibili per le assegnazioni di ruolo, vedere la pagina panoramica dell'ambito .

In questo scenario si assegnano le autorizzazioni all'account utente, con ambito all'account di archiviazione, per seguire il principio dei privilegi minimi. Questa procedura offre agli utenti solo le autorizzazioni minime necessarie e crea ambienti di produzione più sicuri.

Nell'esempio seguente verrà assegnato il ruolo Collaboratore dati BLOB di archiviazione all'account utente, che fornisce sia l'accesso in lettura che in scrittura ai dati BLOB nell'account di archiviazione.

Importante

Nella maggior parte dei casi l'assegnazione di ruolo richiederà un minuto o due per propagare in Azure, ma in rari casi potrebbe richiedere fino a otto minuti. Se si ricevono errori di autenticazione quando si esegue prima il codice, attendere alcuni momenti e riprovare.

  1. Nella portale di Azure individuare l'account di archiviazione usando la barra di ricerca principale o lo spostamento sinistro.

  2. Nella pagina panoramica dell'account di archiviazione selezionare Controllo di accesso (IAM) dal menu a sinistra.

  3. Nella pagina Controllo di accesso (IAM) selezionare la scheda Assegnazioni di ruolo .

  4. Selezionare + Aggiungi dal menu in alto e quindi Aggiungere l'assegnazione di ruolo dal menu a discesa risultante.

    Screenshot che mostra come assegnare un ruolo dell'account di archiviazione.

  5. Utilizzare la casella di ricerca per filtrare i risultati nel ruolo desiderato. Per questo esempio, cercare Collaboratore dati BLOB di archiviazione e selezionare il risultato corrispondente e quindi scegliere Avanti.

  6. In Assegna accesso selezionare Utente, gruppo o entità servizio e quindi scegliere + Seleziona membri.

  7. Nella finestra di dialogo cercare il nome utente di Azure AD (in genere l'indirizzo di posta elettronica user@domain ) e quindi scegliere Seleziona nella parte inferiore della finestra di dialogo.

  8. Selezionare Rivedi e assegna per passare alla pagina finale e quindi Rivedi e assegna di nuovo per completare il processo.

Installare i pacchetti per ricevere gli eventi

Per il lato ricezione, è necessario installare uno o più pacchetti. In questo argomento di avvio rapido si usa archiviazione BLOB di Azure per rendere persistenti i checkpoint, in modo che il programma non legga gli eventi già letti. Vengono eseguiti a intervalli regolari i checkpoint dei metadati sui messaggi ricevuti in un BLOB. Con questo approccio risulta facile continuare a ricevere messaggi dal punto in cui si era interrotto in un momento successivo.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Creazione di uno script Python per ricevere gli eventi

In questa sezione viene creato uno script Python per ricevere eventi dall'hub eventi:

  1. Aprire l'editor preferito di Python, ad esempio Visual Studio Code.

  2. Creare uno script denominato recv.py.

  3. Incollare il codice seguente in recv.py:

    Nel codice usare valori reali per sostituire i segnaposto seguenti:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Nota

    Per esempi di altre opzioni per la ricezione di eventi da Hub eventi in modo asincrono usando una stringa di connessione, vedere la pagina GitHub recv_with_checkpoint_store_async.py. I modelli illustrati sono applicabili anche alla ricezione di eventi senza password.

Eseguire l'app ricevente

Per eseguire lo script aprire un prompt dei comandi con un percorso contenente Python, quindi eseguire questo comando:

python recv.py

Eseguire l'app mittente

Per eseguire lo script aprire un prompt dei comandi con un percorso contenente Python, quindi eseguire questo comando:

python send.py

La finestra ricevente dovrebbe visualizzare i messaggi inviati all'hub eventi.

Risoluzione dei problemi

Se non vengono visualizzati eventi nella finestra del ricevitore o il codice segnala un errore, provare i suggerimenti per la risoluzione dei problemi seguenti:

  • Se non vengono visualizzati risultati da recy.py, eseguire send.py più volte.

  • Se vengono visualizzati errori su "coroutine" quando si usa il codice senza password (con credenziali), assicurarsi di usare l'importazione da azure.identity.aio.

  • Se viene visualizzata "Sessione client non chiusa" con codice senza password (con credenziali), assicurarsi di chiudere le credenziali al termine. Per altre informazioni, vedere Credenziali asincrone.

  • Se vengono visualizzati errori di autorizzazione con recv.py durante l'accesso all'archiviazione, assicurarsi di seguire la procedura descritta in Creare un account di archiviazione di Azure e un contenitore BLOB e assegnare il ruolo Collaboratore dati BLOB di archiviazione all'entità servizio.

  • Se si ricevono eventi con ID di partizione diversi, questo risultato è previsto. Le partizioni sono un meccanismo di organizzazione dei dati correlato al parallelismo downstream necessario per utilizzare le applicazioni. Il numero di partizioni in un hub eventi è direttamente correlato al numero di lettori simultanei previsti. Per altre informazioni, vedere Altre informazioni sulle partizioni.

Passaggi successivi

In questo argomento di avvio rapido sono stati inviati e ricevuti eventi in modo asincrono. Per informazioni su come inviare e ricevere eventi in modo sincrono, passare alla pagina sync_samples di GitHub.

Per tutti gli esempi (sincroni e asincroni) di GitHub, passare alla libreria client di Hub eventi di Azure per gli esempi di Python.