Condividi tramite


Guida introduttiva: Inviare o ricevere eventi da hub eventi con Python

In questa guida introduttiva si apprenderà come inviare e ricevere eventi da un hub eventi usando il pacchetto Python azure-eventhub .

Prerequisiti

Se non si ha familiarità con Hub eventi di Azure, vedere Panoramica di Hub eventi prima di procedere con questa guida di avvio rapido.

Per completare questa guida introduttiva, assicurarsi di avere i prerequisiti seguenti:

  • Sottoscrizione di Microsoft Azure: iscriversi per ottenere una versione di valutazione gratuita , se non è disponibile.
  • Python 3.8 o versione successiva: assicurarsi che pip sia installato e aggiornato.
  • Visual Studio Code (scelta consigliata): oppure usare qualsiasi altro IDE preferito.
  • Namespace e hub eventi di Event Hubs: seguire questa guida per crearli nel portale di Azure.

Installare i pacchetti per l'invio di eventi

Per installare i pacchetti Python per Hub eventi, aprire un prompt dei comandi con Python nel percorso. Modificare la directory nella cartella in cui si desidera mantenere gli esempi.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Autenticare l'app in Azure

Questa guida introduttiva illustra due modi per connettersi a Hub eventi di Azure:

  • Senza password. Utilizzare l'entità di sicurezza in Microsoft Entra ID e il controllo degli accessi in base al ruolo per connettersi a uno spazio dei nomi di Hub eventi. Non è necessario preoccuparsi di avere stringhe di connessione hardcoded nel codice, in un file di configurazione o in un'archiviazione sicura come Azure Key Vault.
  • Stringa di connessione. Usare una stringa di connessione per connettersi a un namespace di Event Hubs. Se non si ha familiarità con Azure, è possibile trovare l'opzione stringa di connessione più semplice da seguire.

È consigliabile usare l'opzione senza password in applicazioni e ambienti di produzione reali. Per altre informazioni, vedere Autenticazione e autorizzazione del bus di servizio econnessioni senza password per i servizi di Azure.

Assegnare ruoli all'utente di Microsoft Entra

Quando si sviluppa in locale, assicurarsi che l'account utente che si connette a Hub eventi di Azure disponga delle autorizzazioni corrette. È necessario il ruolo Proprietario dati di Hub eventi di Azure per inviare e ricevere messaggi. Per assegnare a se stessi questo ruolo, è necessario il ruolo Amministratore accesso utenti o un altro ruolo che include l'azione Microsoft.Authorization/roleAssignments/write . È possibile assegnare ruoli controllo degli accessi in base al ruolo di Azure a un utente usando il portale di Azure, l'interfaccia della riga di comando di Azure o Azure PowerShell. Per altre informazioni, vedere la pagina Informazioni sull'ambito per il controllo degli accessi in base al ruolo di Azure.

L'esempio seguente assegna il Azure Event Hubs Data Owner ruolo all'account utente, che fornisce l'accesso completo alle risorse di Hub eventi di Azure. In uno scenario reale, seguire il Principio dei privilegi minimi per concedere agli utenti solo le autorizzazioni minime necessarie per un ambiente di produzione più sicuro.

Ruoli predefiniti di Azure per Hub eventi di Azure

Per Hub eventi di Azure, la gestione degli spazi dei nomi e di tutte le risorse correlate tramite il portale di Azure e l'API di gestione delle risorse di Azure è già protetta usando il modello di controllo degli accessi in base al ruolo di Azure. Azure offre i seguenti ruoli predefiniti per autorizzare l'accesso a uno spazio dei nomi di Event Hubs:

Per creare un ruolo personalizzato, vedere Diritti necessari per le operazioni di Hub eventi.

Importante

Nella maggior parte dei casi, la propagazione dell'assegnazione di ruolo in Azure richiede un minuto o due. In rari casi, potrebbero essere necessari fino a otto minuti. Se si ricevono errori di autenticazione quando si esegue il codice per la prima volta, attendere alcuni istanti e riprovare.

  1. Nella portale di Azure individuare lo spazio dei nomi di Hub eventi usando la barra di ricerca principale o lo spostamento a sinistra.

  2. Nella pagina di panoramica selezionare Controllo di accesso (IAM) nel menu a sinistra.

  3. Nella pagina Controllo di accesso (IAM), selezionare la scheda Assegnazioni di ruolo.

  4. Selezionare + Aggiungi dal menu in alto. Selezionare quindi Aggiungi assegnazione di ruolo.

    Screenshot che mostra come assegnare un ruolo.

  5. Usare la casella di ricerca per filtrare i risultati in base al ruolo desiderato. Per questo esempio, cercare Azure Event Hubs Data Owner e selezionare il risultato corrispondente. Scegliere quindi Avanti.

  6. In Assegna accesso a selezionare Utente, gruppo o entità servizio. Scegliere quindi + Seleziona membri.

  7. Nella finestra di dialogo, cerca il nome utente di Microsoft Entra (di solito il tuo indirizzo email user@domain). Scegliere Seleziona nella parte inferiore della finestra di dialogo.

  8. Selezionare Rivedi e assegna per passare alla pagina finale. Selezionare di nuovo Rivedi e assegna per completare il processo.

Inviare gli eventi

In questa sezione creare uno script Python per inviare eventi all'hub eventi creato in precedenza.

  1. Aprire l'editor preferito di Python, ad esempioVisual Studio Code.

  2. Creare uno script denominato send.py. Questo script invia un batch di eventi all'hub eventi creato in precedenza.

  3. Incollare il codice seguente in send.py:

    Nel codice usare valori reali per sostituire i segnaposto seguenti:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - Il nome completo viene visualizzato nella pagina Panoramica dello spazio dei nomi. Deve essere nel formato : <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Nome dell'hub eventi.
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        print("Producer client created successfully.") 
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Nota

    Per esempi di altre opzioni per l'invio di eventi a un hub eventi in modo asincrono usando una stringa di connessione, vedere la pagina gitHub send_async.py. I modelli illustrati sono applicabili anche all'invio di eventi senza password.

Ricevere eventi

In questo argomento di avvio rapido si usa archiviazione BLOB di Azure come archivio di checkpoint. L'archivio di checkpoint viene usato per rendere persistenti i checkpoint (ovvero le ultime posizioni di lettura).

Segui queste raccomandazioni quando usi Azure Blob Storage come archivio dei punti di controllo:

  • Usare un contenitore separato per ogni gruppo di consumer. È possibile usare lo stesso account di archiviazione, ma occorre usare un contenitore per ogni gruppo.
  • Non usare l'account di archiviazione per altro.
  • Non usare il contenitore per altro.
  • Creare l'account di archiviazione nella stessa area dell'applicazione distribuita. Se l'applicazione è locale, provare a scegliere l'area più vicina possibile.

Nella pagina Account di archiviazione del portale di Azure verificare che le impostazioni seguenti siano disabilitate nella sezione Servizio BLOB.

  • Spazio dei nomi gerarchico
  • Eliminazione temporanea dei BLOB
  • Controllo delle versioni

Creare un account di archiviazione di Azure e un contenitore BLOB

Per creare un account di archiviazione di Azure e un contenitore BLOB al suo interno, eseguire queste operazioni:

  1. Creare un account di archiviazione di Azure
  2. Creare un contenitore BLOB.
  3. Eseguire l'autenticazione nel contenitore BLOB.

Assicurarsi di prendere nota della stringa di connessione e del nome del contenitore per un uso successivo nel codice di ricezione.

Quando si sviluppa in locale, assicurarsi che l'account utente che accede ai dati BLOB disponga delle autorizzazioni corrette. Per leggere e scrivere dati BLOB, è necessario disporre del ruolo Collaboratore ai dati dei BLOB di archiviazione. Per assegnare a se stessi questo ruolo, è necessario essere assegnati al ruolo Amministratore accesso utenti o a un altro ruolo che include l'azione Microsoft.Authorization/roleAssignments/write . È possibile assegnare ruoli controllo degli accessi in base al ruolo di Azure a un utente usando il portale di Azure, l'interfaccia della riga di comando di Azure o Azure PowerShell. Per ulteriori informazioni, vedere Comprendere l'ambito di Azure RBAC.

In questo scenario, assegni le autorizzazioni al tuo account utente, con ambito limitato all'account di archiviazione, per seguire il principio del minimo privilegio. Questa procedura offre agli utenti solo le autorizzazioni minime necessarie e crea ambienti di produzione più sicuri.

Nell'esempio seguente verrà assegnato il ruolo Collaboratore ai dati dei BLOB di archiviazione all'account utente, in modo da fornire l'accesso in lettura e scrittura ai dati BLOB nell'account di archiviazione.

Importante

Nella maggior parte dei casi, la propagazione dell'assegnazione di ruolo in Azure richiede un minuto o due. In rari casi, potrebbero essere necessari fino a otto minuti. Se si ricevono errori di autenticazione quando si esegue il codice per la prima volta, attendere alcuni istanti e riprovare.

  1. Nel portale di Azure, individuare l'account di archiviazione usando la barra di ricerca principale o lo spostamento a sinistra.

  2. Nella pagina account di archiviazione selezionare Controllo di accesso (IAM) dal menu a sinistra.

  3. Nella pagina Controllo di accesso (IAM), selezionare la scheda Assegnazioni di ruolo.

  4. Selezionare + Aggiungi dal menu in alto. Selezionare quindi Aggiungi assegnazione di ruolo.

    Screenshot che mostra come assegnare un ruolo dell'account di archiviazione.

  5. Usare la casella di ricerca per filtrare i risultati in base al ruolo desiderato. Per questo esempio, cercare Storage Blob Data Contributor. Selezionare il risultato corrispondente e quindi scegliere Avanti.

  6. In Assegna accesso a selezionare Utente, gruppo o entità servizio e quindi scegliere + Seleziona membri.

  7. Nella finestra di dialogo cercare il nome utente di Microsoft Entra (in genere l'indirizzo di posta elettronica user@domain) e quindi scegliere Selezionare nella parte inferiore della finestra di dialogo.

  8. Selezionare Rivedi e assegna per passare alla pagina finale. Selezionare di nuovo Rivedi e assegna per completare il processo.

Installare i pacchetti per ricevere gli eventi

Per il lato ricevente, è necessario installare uno o più pacchetti. In questa guida introduttiva si usa l'archiviazione BLOB di Azure per rendere persistenti i checkpoint in modo che il programma non legga gli eventi già letti. Vengono eseguiti a intervalli regolari i checkpoint dei metadati sui messaggi ricevuti in un BLOB. Con questo approccio risulta facile continuare a ricevere messaggi dal punto in cui si era interrotto in un momento successivo.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Creazione di uno script Python per ricevere gli eventi

In questa sezione viene creato uno script Python per ricevere eventi dall'hub eventi:

  1. Aprire l'editor preferito di Python, ad esempioVisual Studio Code.

  2. Creare uno script denominato recv.py.

  3. Incollare il codice seguente in recv.py:

    Nel codice usare valori reali per sostituire i segnaposto seguenti:

    • BLOB_STORAGE_ACCOUNT_URL - Questo valore deve essere nel formato: https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME - Nome del contenitore BLOB nell'account di archiviazione di Azure.
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - Il nome completo viene visualizzato nella pagina Panoramica dello spazio dei nomi. Deve essere nel formato : <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Nome dell'hub eventi.
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Nota

    Per esempi di altre opzioni per la ricezione di eventi da un hub eventi in modo asincrono usando una stringa di connessione, vedere la pagina recv_with_checkpoint_store_async.py GitHub. I modelli mostrati sono applicabili anche alla ricezione di eventi senza password.

Eseguire l'app ricevente

  1. Avviare un prompt dei comandi.

  2. Esegui il seguente comando e accedi usando l'account che è stato aggiunto al ruolo Proprietario dati di Event Hubs di Azure nel namespace di Event Hubs e al ruolo Collaboratore dati dei BLOB di archiviazione nell'account di archiviazione di Azure.

    az login
    
  3. Passare alla cartella con il file receive.py ed eseguire il comando seguente:

    python recv.py
    

Eseguire l'app mittente

  1. Avviare un prompt dei comandi.

  2. Esegui il seguente comando e accedi usando l'account che è stato aggiunto al ruolo Proprietario dati di Event Hubs di Azure nel namespace di Event Hubs e al ruolo Collaboratore dati dei BLOB di archiviazione nell'account di archiviazione di Azure.

    az login
    
  3. Passare alla cartella con il send.py e quindi eseguire questo comando:

    python send.py
    

La finestra ricevente dovrebbe visualizzare i messaggi inviati all'hub eventi.

Risoluzione dei problemi

Se non vengono visualizzati eventi nella finestra del ricevitore o il codice segnala un errore, provare i suggerimenti per la risoluzione dei problemi seguenti:

  • Se i risultati di recy.py non vengono visualizzati, eseguire send.py più volte.

  • Se vengono visualizzati errori relativi alla "coroutine" quando si usa il codice senza password (con credenziali), assicurarsi di usare l'importazione da azure.identity.aio.

  • Se viene visualizzato "Sessione client non chiusa" con codice senza password (con credenziali), assicurarsi di chiudere le credenziali al termine. Per altre informazioni, vedere Credenziali asincrone.

  • Se vengono visualizzati errori di autorizzazione con recv.py durante l'accesso all'archiviazione, assicurarsi di seguire i passaggi descritti in Creare un account di archiviazione di Azure e un contenitore BLOB e assegnare il ruolo Collaboratore ai dati del BLOB di archiviazione all'entità servizio.

  • Se si ricevono eventi con ID di partizione diversi, questo risultato è previsto. Le partizioni sono un meccanismo di organizzazione dei dati correlato al parallelismo downstream necessario per utilizzare le applicazioni. Il numero di partizioni in un hub eventi è direttamente correlato al numero di lettori simultanei previsti. Per altre informazioni, vedere Altre informazioni sulle partizioni.

Passaggi successivi

In questo avvio rapido, hai inviato e ricevuto eventi in modo asincrono. Per informazioni su come inviare e ricevere eventi in modo sincrono, passare alla pagina sync_samples di GitHub.

Esplorare altri esempi e scenari avanzati nella libreria client di Hub eventi di Azure per esempi python.