Condividi tramite


EventHubConsumerClient Classe

La classe EventHubConsumerClient definisce un'interfaccia di alto livello per la ricezione di eventi dal servizio Hub eventi di Azure.

L'obiettivo principale di EventHubConsumerClient è ricevere eventi da tutte le partizioni di un EventHub con bilanciamento del carico e checkpoint.

Quando più istanze di EventHubConsumerClient vengono eseguite nello stesso hub eventi, nel gruppo di consumer e nella posizione di checkpoint, le partizioni verranno distribuite in modo uniforme tra di esse.

Per abilitare il bilanciamento del carico e i checkpoint persistenti, è necessario impostare checkpoint_store durante la creazione di EventHubConsumerClient. Se non viene fornito un archivio checkpoint, il checkpoint verrà mantenuto internamente in memoria.

Un oggetto EventHubConsumerClient può anche ricevere da una partizione specifica quando si chiama il metodo receive() o receive_batch() e specificare la partition_id. Il bilanciamento del carico non funzionerà in modalità partizione singola. Tuttavia, gli utenti possono comunque salvare i checkpoint se il checkpoint_store è impostato.

Ereditarietà
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Costruttore

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parametri

fully_qualified_namespace
str
Necessario

Nome host completo per lo spazio dei nomi Hub eventi. Il formato dello spazio dei nomi è: .servicebus.windows.net.

eventhub_name
str
Necessario

Percorso dell'hub eventi specifico a cui connettere il client.

consumer_group
str
Necessario

Ricevere eventi dall'hub eventi per questo gruppo di consumer.

credential
AsyncTokenCredential oppure AzureSasCredential oppure AzureNamedKeyCredential
Necessario

Oggetto credenziale usato per l'autenticazione che implementa un'interfaccia specifica per ottenere i token. EventHubSharedKeyCredentialAccetta o oggetti credenziali generati dalla libreria e dagli oggetti azure-identity che implementano il metodo *get_token(self, ambiti).

logging_enable
bool

Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.

auth_timeout
float

Tempo in secondi per attendere che un token venga autorizzato dal servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.

user_agent
str

Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.

retry_total
int

Numero totale di tentativi di ridistribuire un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3. Il contesto dell'retry_total nella ricezione è speciale: il metodo di ricezione viene implementato da un metodo di ricezione interna chiamante del ciclo durante l'iterazione. Nel caso di ricezione , retry_total specifica i numeri di ripetizione dei tentativi dopo l'errore generato dal metodo di ricezione interno nel ciclo while-loop. Se i tentativi di ripetizione vengono esauriti, il callback on_error verrà chiamato (se specificato) con le informazioni sull'errore. Il consumer di partizione interna non riuscito verrà chiuso (on_partition_close verrà chiamato se specificato) e verrà creato un nuovo consumer di partizione interna (on_partition_initialize verrà chiamato se specificato) per riprendere la ricezione.

retry_backoff_factor
float

Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza ritardo). In modalità fissa, i criteri di ripetizione dei tentativi saranno sempre in sospensione per {fattore backoff}. In modalità 'esponenziale', i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se l'backoff_factor è 0,1, il tentativo verrà riposato per [0,0, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.

retry_backoff_max
float

Tempo massimo di off off. Il valore predefinito è 120 secondi (2 minuti).

retry_mode
str

Comportamento di ritardo tra tentativi di ripetizione. I valori supportati sono "fissi" o "esponenziali", dove il valore predefinito è "esponenziale".

idle_timeout
float

Timeout, in secondi, dopo il quale questo client chiuderà la connessione sottostante se non è presente un'ulteriore attività. Per impostazione predefinita, il valore non è Nessuno, ovvero che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.

transport_type
TransportType

Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp in cui viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, TransportType.AmqpOverWebsocket può essere usata invece che usa la porta 443 per la comunicazione.

http_proxy

Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int).

checkpoint_store
Optional[CheckpointStore]

Gestione che archivia i dati del bilanciamento del carico di partizione e checkpoint durante la ricezione di eventi. L'archivio checkpoint verrà usato in entrambi i casi di ricezione da tutte le partizioni o da una singola partizione. In quest'ultimo caso il bilanciamento del carico non si applica. Se non viene fornito un archivio checkpoint, il checkpoint verrà mantenuto internamente in memoria e l'istanza di EventHubConsumerClient riceverà eventi senza bilanciamento del carico.

load_balancing_interval
float

Quando si esegue il bilanciamento del carico. Questo è l'intervallo, in secondi, tra due valutazioni di bilanciamento del carico. Il valore predefinito è 30 secondi.

partition_ownership_expiration_interval
float

Una proprietà della partizione scade dopo questo numero di secondi. Ogni valutazione di bilanciamento del carico estende automaticamente la scadenza della proprietà. Il valore predefinito è 6 * load_balancing_interval, ad esempio 180 secondi quando si usa il load_balancing_interval predefinito di 30 secondi.

load_balancing_strategy
str oppure LoadBalancingStrategy

Quando il bilanciamento del carico viene eseguito, userà questa strategia per richiedere e bilanciare la proprietà della partizione. Usare "greedy" o LoadBalancingStrategy.GREEDY per la strategia greedy, che, per ogni valutazione di bilanciamento del carico, afferrarà quante partizioni non dichiarate necessarie per bilanciare il carico. Usare "bilanciato" o LoadBalancingStrategy.BALANCE per la strategia bilanciata, che, per ogni valutazione di bilanciamento del carico, richiede solo una partizione non richiesta da altri EventHubConsumerClient. Se tutte le partizioni di un Oggetto EventHub sono richieste da altri EventHubConsumerClient e questo client ha richiesto troppe partizioni, questo client ruberà una partizione da altri client per ogni valutazione di bilanciamento del carico indipendentemente dalla strategia di bilanciamento del carico. La strategia Greedy viene usata per impostazione predefinita.

custom_endpoint_address
Optional[str]

Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà "sb:// custom_endpoint_hostname>:<custom_endpoint_port<>". Se la porta non è specificata nella custom_endpoint_address, verrà usata per impostazione predefinita la porta 443.

connection_verify
Optional[str]

Percorso del file personalizzato CA_BUNDLE del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è None nel caso in cui verrà usato certifi.where().

uamqp_transport
bool

Se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.

socket_timeout
float

Tempo in secondi in cui il socket sottostante sulla connessione deve attendere quando si inviano e ricevono dati prima del timeout. Il valore predefinito è 0.2 per TransportType.Amqp e 1 per TransportType.AmqpOverWebsocket. Se si verificano errori EventHubsConnectionError a causa del timeout di scrittura, potrebbe essere necessario passare un valore maggiore di quello predefinito. Si tratta di scenari di utilizzo avanzati e il valore predefinito deve essere sufficiente.

Esempio

Creare una nuova istanza di EventHubConsumerClient.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Metodi

close

Arrestare il recupero di eventi dall'hub eventi e chiudere la connessione e i collegamenti AMQP sottostanti.

from_connection_string

Creare un oggetto EventHubConsumerClient da un stringa di connessione.

get_eventhub_properties

Ottenere le proprietà dell'hub eventi.

Le chiavi nel dizionario restituito includono:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Ottenere gli ID di partizione dell'hub eventi.

get_partition_properties

Ottenere le proprietà della partizione specificata.

Le chiavi nel dizionario delle proprietà includono:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Ricevere eventi dalle partizioni, con bilanciamento del carico e checkpoint facoltativi.

receive_batch

Ricevere eventi da partizioni in batch, con bilanciamento del carico e checkpoint facoltativi.

close

Arrestare il recupero di eventi dall'hub eventi e chiudere la connessione e i collegamenti AMQP sottostanti.

async close() -> None

Tipo restituito

Esempio

Chiudere il client.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

Creare un oggetto EventHubConsumerClient da un stringa di connessione.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

Parametri

conn_str
str
Necessario

Stringa di connessione di un hub eventi.

consumer_group
str
Necessario

Ricevere eventi dall'hub eventi per questo gruppo di consumer.

eventhub_name
str

Percorso dell'hub eventi specifico a cui connettere il client.

logging_enable
bool

Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.

http_proxy
dict

Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, le chiavi seguenti possono essere presenti: 'username', 'password'.

auth_timeout
float

Tempo in secondi per attendere che un token venga autorizzato dal servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.

user_agent
str

Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.

retry_total
int

Numero totale di tentativi di ridistribuire un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3. Il contesto dell'retry_total nella ricezione è speciale: il metodo di ricezione viene implementato da un metodo di ricezione interna chiamante del ciclo durante l'iterazione. Nel caso di ricezione , retry_total specifica i numeri di ripetizione dei tentativi dopo l'errore generato dal metodo di ricezione interno nel ciclo while-loop. Se i tentativi di ripetizione vengono esauriti, il callback on_error verrà chiamato (se specificato) con le informazioni sull'errore. Il consumer di partizione interna non riuscito verrà chiuso (on_partition_close verrà chiamato se specificato) e verrà creato un nuovo consumer di partizione interna (on_partition_initialize verrà chiamato se specificato) per riprendere la ricezione.

retry_backoff_factor
float

Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza ritardo). In modalità fissa, i criteri di ripetizione dei tentativi saranno sempre in sospensione per {fattore backoff}. In modalità 'esponenziale', i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se l'backoff_factor è 0,1, il tentativo verrà riposato per [0,0, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.

retry_backoff_max
float

Tempo massimo di off off. Il valore predefinito è 120 secondi (2 minuti).

retry_mode
str

Comportamento di ritardo tra tentativi di ripetizione. I valori supportati sono "fissi" o "esponenziali", dove il valore predefinito è "esponenziale".

idle_timeout
float

Timeout, in secondi, dopo il quale questo client chiuderà la connessione sottostante se non è presente un'ulteriore attività. Per impostazione predefinita, il valore non è Nessuno, ovvero che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.

transport_type
TransportType

Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp in cui viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, TransportType.AmqpOverWebsocket può essere usata invece che usa la porta 443 per la comunicazione.

checkpoint_store
Optional[CheckpointStore]

Gestione che archivia i dati del bilanciamento del carico di partizione e checkpoint durante la ricezione di eventi. L'archivio checkpoint verrà usato in entrambi i casi di ricezione da tutte le partizioni o da una singola partizione. In quest'ultimo caso il bilanciamento del carico non si applica. Se non viene fornito un archivio checkpoint, il checkpoint verrà mantenuto internamente in memoria e l'istanza di EventHubConsumerClient riceverà eventi senza bilanciamento del carico.

load_balancing_interval
float

Quando si esegue il bilanciamento del carico. Questo è l'intervallo, in secondi, tra due valutazioni di bilanciamento del carico. Il valore predefinito è 30 secondi.

partition_ownership_expiration_interval
float

Una proprietà della partizione scade dopo questo numero di secondi. Ogni valutazione di bilanciamento del carico estende automaticamente la scadenza della proprietà. Il valore predefinito è 6 * load_balancing_interval, ad esempio 180 secondi quando si usa il load_balancing_interval predefinito di 30 secondi.

load_balancing_strategy
str oppure LoadBalancingStrategy

Quando il bilanciamento del carico viene eseguito, userà questa strategia per richiedere e bilanciare la proprietà della partizione. Usare "greedy" o LoadBalancingStrategy.GREEDY per la strategia greedy, che, per ogni valutazione di bilanciamento del carico, afferrarà quante partizioni non dichiarate necessarie per bilanciare il carico. Usare "bilanciato" o LoadBalancingStrategy.BALANCE per la strategia bilanciata, che, per ogni valutazione di bilanciamento del carico, richiede solo una partizione non richiesta da altri EventHubConsumerClient. Se tutte le partizioni di un Oggetto EventHub sono richieste da altri EventHubConsumerClient e questo client ha richiesto troppe partizioni, questo client ruberà una partizione da altri client per ogni valutazione di bilanciamento del carico indipendentemente dalla strategia di bilanciamento del carico. La strategia Greedy viene usata per impostazione predefinita.

custom_endpoint_address
Optional[str]

Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà "sb:// custom_endpoint_hostname>:<custom_endpoint_port<>". Se la porta non è specificata nella custom_endpoint_address, verrà usata per impostazione predefinita la porta 443.

connection_verify
Optional[str]

Percorso del file personalizzato CA_BUNDLE del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è None nel caso in cui verrà usato certifi.where().

uamqp_transport
bool

Se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.

Tipo restituito

Esempio

Creare una nuova istanza di EventHubConsumerClient da stringa di connessione.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Ottenere le proprietà dell'hub eventi.

Le chiavi nel dizionario restituito includono:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Restituisce

Dizionario contenente informazioni sull'hub eventi.

Tipo restituito

Eccezioni

get_partition_ids

Ottenere gli ID di partizione dell'hub eventi.

async get_partition_ids() -> List[str]

Restituisce

Elenco di ID di partizione.

Tipo restituito

Eccezioni

get_partition_properties

Ottenere le proprietà della partizione specificata.

Le chiavi nel dizionario delle proprietà includono:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametri

partition_id
str
Necessario

ID partizione di destinazione.

Restituisce

Dizionario contenente proprietà di partizione.

Tipo restituito

Eccezioni

receive

Ricevere eventi dalle partizioni, con bilanciamento del carico e checkpoint facoltativi.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parametri

on_event
Callable[PartitionContext, Optional[EventData]]
Necessario

Funzione di callback per la gestione di un evento ricevuto. Il callback accetta due parametri: partition_context che contiene il contesto di partizione e l'evento che è l'evento ricevuto. La funzione di callback deve essere definita come: on_event(partition_context, evento). Per informazioni dettagliate sul contesto della partizione, vedere PartitionContext.

max_wait_time
float

Intervallo massimo in secondi in cui il processore di eventi attenderà prima di chiamare il callback. Se non vengono ricevuti eventi entro questo intervallo, il callback on_event verrà chiamato con Nessuno. Se questo valore è impostato su Nessuno o 0 (impostazione predefinita), il callback non verrà chiamato finché non viene ricevuto un evento.

partition_id
str

Se specificato, il client riceverà solo da questa partizione. In caso contrario, il client riceverà da tutte le partizioni.

owner_level
int

Priorità per un consumatore esclusivo. Se owner_level è impostato, verrà creato un consumer esclusivo. Un consumer con un owner_level superiore ha priorità più elevata. Il livello di proprietario è noto anche come "valore epoca" del consumer.

prefetch
int

Numero di eventi da prefetch dal servizio per l'elaborazione. Il valore predefinito è 300.

track_last_enqueued_event_properties
bool

Indica se il consumer deve richiedere informazioni sull'ultimo evento accodato nella partizione associata e tenere traccia di tali informazioni come eventi. Quando vengono monitorate le informazioni sulle partizioni dell'ultimo evento, ogni evento ricevuto dal servizio Hub eventi conterrà metadati sulla partizione. Ciò comporta una piccola quantità di utilizzo aggiuntivo della larghezza di banda di rete che è in genere un compromesso favorevole quando considerato contro le richieste periodicamente per le proprietà di partizione usando il client hub eventi. È impostato su False per impostazione predefinita.

starting_position
str, int, datetime oppure dict[str,any]

Iniziare a ricevere da questa posizione evento se non sono presenti dati di checkpoint per una partizione. I dati del checkpoint verranno usati se disponibili. Questo può essere un dict con ID di partizione come chiave e posizione come valore per singole partizioni o un singolo valore per tutte le partizioni. Il tipo di valore può essere str, int o datetime.datetime. Sono supportati anche i valori "-1" per la ricezione dall'inizio del flusso e "@latest" per ricevere solo nuovi eventi.

starting_position_inclusive
bool oppure dict[str,bool]

Determinare se il starting_position specificato è inclusivo(=) o meno (>>). True per inclusivo e False per l'esclusiva. Può trattarsi di un id di partizione come chiave e bool come valore che indica se il starting_position per una partizione specifica è inclusivo o meno. Può anche essere un singolo valore bool per tutti i starting_position. Il valore predefinito è False.

on_error
Callable[[PartitionContext, Exception]]

Funzione di callback che verrà chiamata quando viene generato un errore durante la ricezione dopo l'esaurimento dei tentativi di ripetizione o durante il processo di bilanciamento del carico. Il callback accetta due parametri: partition_context che contiene le informazioni di partizione e l'errore che rappresenta l'eccezione. partition_context potrebbe essere Nessuna se l'errore viene generato durante il processo di bilanciamento del carico. Il callback deve essere definito come: on_error(partition_context, errore). Il callback on_error verrà chiamato anche se viene generata un'eccezione non gestita durante il callback on_event.

on_partition_initialize
Callable[[PartitionContext]]

La funzione di callback che verrà chiamata dopo che un consumer per una determinata partizione termina l'inizializzazione. Verrà chiamato anche quando viene creato un nuovo consumer di partizioni interne per eseguire il processo di ricezione per un consumer di partizione interno non riuscito e chiuso. Il callback accetta un singolo parametro: partition_context che contiene le informazioni sulla partizione. Il callback deve essere definito come: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

La funzione di callback che verrà chiamata dopo la chiusura di un consumer per una determinata partizione. Verrà chiamato anche quando viene generato un errore durante la ricezione dopo l'esaurimento dei tentativi. Il callback accetta due parametri: partition_context che contiene informazioni di partizione e motivo della chiusura. Il callback deve essere definito come: on_partition_close(partition_context, motivo). Fare riferimento a CloseReason per i vari motivi di chiusura.

Tipo restituito

Esempio

Ricevere eventi da EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

Ricevere eventi da partizioni in batch, con bilanciamento del carico e checkpoint facoltativi.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parametri

on_event_batch
Callable[PartitionContext, List[EventData]]
Necessario

Funzione di callback per la gestione di un batch di eventi ricevuti. Il callback accetta due parametri: partition_context che contiene il contesto di partizione e event_batch, ovvero gli eventi ricevuti. La funzione di callback deve essere definita come: on_event_batch(partition_context, event_batch). event_batch potrebbe essere un elenco vuoto se max_wait_time non è nessuna né 0 e non viene ricevuto alcun evento dopo max_wait_time. Per informazioni dettagliate sul contesto della partizione, vedere PartitionContext.

max_batch_size
int

Numero massimo di eventi in un batch passato al callback on_event_batch. Se il numero effettivo di eventi ricevuti è maggiore di max_batch_size, gli eventi ricevuti vengono suddivisi in batch e chiamano il callback per ogni batch con un massimo di max_batch_size eventi .

max_wait_time
float

Intervallo massimo in secondi in cui il processore di eventi attenderà prima di chiamare il callback. Se non vengono ricevuti eventi entro questo intervallo, il callback on_event_batch verrà chiamato con un elenco vuoto. Se questo valore è impostato su Nessuno o 0 (impostazione predefinita), il callback non verrà chiamato finché non vengono ricevuti gli eventi.

partition_id
str

Se specificato, il client riceverà solo da questa partizione. In caso contrario, il client riceverà da tutte le partizioni.

owner_level
int

Priorità per un consumatore esclusivo. Se owner_level è impostato, verrà creato un consumer esclusivo. Un consumer con un owner_level superiore ha priorità più elevata. Il livello di proprietario è noto anche come "valore epoca" del consumer.

prefetch
int

Numero di eventi da prefetch dal servizio per l'elaborazione. Il valore predefinito è 300.

track_last_enqueued_event_properties
bool

Indica se il consumer deve richiedere informazioni sull'ultimo evento accodato nella partizione associata e tenere traccia di tali informazioni come eventi. Quando vengono monitorate le informazioni sulle partizioni dell'ultimo evento, ogni evento ricevuto dal servizio Hub eventi conterrà metadati sulla partizione. Ciò comporta una piccola quantità di utilizzo aggiuntivo della larghezza di banda di rete che è in genere un compromesso favorevole quando considerato contro le richieste periodicamente per le proprietà di partizione usando il client hub eventi. È impostato su False per impostazione predefinita.

starting_position
str, int, datetime oppure dict[str,any]

Iniziare a ricevere da questa posizione evento se non sono presenti dati di checkpoint per una partizione. I dati del checkpoint verranno usati se disponibili. Questo può essere un dict con ID di partizione come chiave e posizione come valore per singole partizioni o un singolo valore per tutte le partizioni. Il tipo di valore può essere str, int o datetime.datetime. Sono supportati anche i valori "-1" per la ricezione dall'inizio del flusso e "@latest" per ricevere solo nuovi eventi.

starting_position_inclusive
bool oppure dict[str,bool]

Determinare se il starting_position specificato è inclusivo(=) o meno (>>). True per inclusivo e False per l'esclusiva. Può trattarsi di un id di partizione come chiave e bool come valore che indica se il starting_position per una partizione specifica è inclusivo o meno. Può anche essere un singolo valore bool per tutti i starting_position. Il valore predefinito è False.

on_error
Callable[[PartitionContext, Exception]]

Funzione di callback che verrà chiamata quando viene generato un errore durante la ricezione dopo l'esaurimento dei tentativi di ripetizione o durante il processo di bilanciamento del carico. Il callback accetta due parametri: partition_context che contiene le informazioni di partizione e l'errore che rappresenta l'eccezione. partition_context potrebbe essere Nessuna se l'errore viene generato durante il processo di bilanciamento del carico. Il callback deve essere definito come: on_error(partition_context, errore). Il callback on_error verrà chiamato anche se viene generata un'eccezione non gestita durante il callback on_event.

on_partition_initialize
Callable[[PartitionContext]]

La funzione di callback che verrà chiamata dopo che un consumer per una determinata partizione termina l'inizializzazione. Verrà chiamato anche quando viene creato un nuovo consumer di partizioni interne per eseguire il processo di ricezione per un consumer di partizione interno non riuscito e chiuso. Il callback accetta un singolo parametro: partition_context che contiene le informazioni sulla partizione. Il callback deve essere definito come: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

La funzione di callback che verrà chiamata dopo la chiusura di un consumer per una determinata partizione. Verrà chiamato anche quando viene generato un errore durante la ricezione dopo l'esaurimento dei tentativi. Il callback accetta due parametri: partition_context che contiene informazioni di partizione e motivo della chiusura. Il callback deve essere definito come: on_partition_close(partition_context, motivo). Fare riferimento a CloseReason per i vari motivi di chiusura.

Tipo restituito

Esempio

Ricevere eventi in batch da EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )