Condividi tramite


EventHubConsumerClient Classe

La classe EventHubConsumerClient definisce un'interfaccia di alto livello per la ricezione di eventi dal servizio Hub eventi di Azure.

L'obiettivo principale di EventHubConsumerClient è ricevere eventi da tutte le partizioni di un EventHub con bilanciamento del carico e checkpoint.

Quando più istanze di EventHubConsumerClient vengono eseguite nello stesso hub eventi, nel gruppo di consumer e nella posizione di checkpoint, le partizioni verranno distribuite in modo uniforme tra di esse.

Per abilitare il bilanciamento del carico e i checkpoint persistenti, è necessario impostare checkpoint_store durante la creazione di EventHubConsumerClient. Se non viene fornito un archivio checkpoint, il checkpoint verrà mantenuto internamente in memoria.

Un oggetto EventHubConsumerClient può anche ricevere da una partizione specifica quando si chiama il metodo receive() o receive_batch() e specificare la partition_id. Il bilanciamento del carico non funzionerà in modalità partizione singola. Tuttavia, gli utenti possono comunque salvare i checkpoint se il checkpoint_store è impostato.

Ereditarietà
azure.eventhub._client_base.ClientBase
EventHubConsumerClient

Costruttore

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parametri

fully_qualified_namespace
str
Necessario

Nome host completo per lo spazio dei nomi Hub eventi. Il formato dello spazio dei nomi è: .servicebus.windows.net.

eventhub_name
str
Necessario

Percorso dell'hub eventi specifico a cui connettere il client.

consumer_group
str
Necessario

Ricevere eventi dall'hub eventi per questo gruppo di consumer.

credential
TokenCredential oppure AzureSasCredential oppure AzureNamedKeyCredential
Necessario

Oggetto credenziale usato per l'autenticazione che implementa un'interfaccia specifica per ottenere i token. EventHubSharedKeyCredentialAccetta o oggetti credenziali generati dalla libreria e dagli oggetti azure-identity che implementano il metodo *get_token(self, ambiti).

logging_enable
bool

Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.

auth_timeout
float

Tempo in secondi per attendere che un token venga autorizzato dal servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.

user_agent
str

Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.

retry_total
int

Numero totale di tentativi di ridistribuire un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3. Il contesto dell'retry_total nella ricezione è speciale: il metodo di ricezione viene implementato da un metodo di ricezione interna chiamante del ciclo durante l'iterazione. Nel caso di ricezione , retry_total specifica i numeri di ripetizione dei tentativi dopo l'errore generato dal metodo di ricezione interno nel ciclo while-loop. Se i tentativi di ripetizione vengono esauriti, il callback on_error verrà chiamato (se specificato) con le informazioni sull'errore. Il consumer di partizione interna non riuscito verrà chiuso (on_partition_close verrà chiamato se specificato) e verrà creato un nuovo consumer di partizione interna (on_partition_initialize verrà chiamato se specificato) per riprendere la ricezione.

retry_backoff_factor
float

Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza ritardo). In modalità fissa, i criteri di ripetizione dei tentativi saranno sempre in sospensione per {fattore backoff}. In modalità 'esponenziale', i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se l'backoff_factor è 0,1, il tentativo verrà riposato per [0,0, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.

retry_backoff_max
float

Tempo massimo di off off. Il valore predefinito è 120 secondi (2 minuti).

retry_mode
str

Comportamento di ritardo tra tentativi di ripetizione. I valori supportati sono "fissi" o "esponenziali", dove il valore predefinito è "esponenziale".

idle_timeout
float

Timeout, in secondi, dopo il quale questo client chiuderà la connessione sottostante se non è presente un'ulteriore attività. Per impostazione predefinita, il valore non è Nessuno, ovvero che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.

transport_type
TransportType

Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp in cui viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, TransportType.AmqpOverWebsocket può essere usata invece che usa la porta 443 per la comunicazione.

http_proxy
dict[str, str oppure int]

Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, le chiavi seguenti possono essere presenti: 'username', 'password'.

checkpoint_store
CheckpointStore oppure None

Gestione che archivia i dati del bilanciamento del carico di partizione e checkpoint durante la ricezione di eventi. L'archivio checkpoint verrà usato in entrambi i casi di ricezione da tutte le partizioni o da una singola partizione. In quest'ultimo caso il bilanciamento del carico non si applica. Se non viene fornito un archivio checkpoint, il checkpoint verrà mantenuto internamente in memoria e l'istanza di EventHubConsumerClient riceverà eventi senza bilanciamento del carico.

load_balancing_interval
float

Quando si esegue il bilanciamento del carico. Questo è l'intervallo, in secondi, tra due valutazioni di bilanciamento del carico. Il valore predefinito è 30 secondi.

partition_ownership_expiration_interval
float

Una proprietà della partizione scade dopo questo numero di secondi. Ogni valutazione di bilanciamento del carico estende automaticamente la scadenza della proprietà. Il valore predefinito è 6 * load_balancing_interval, ad esempio 180 secondi quando si usa il load_balancing_interval predefinito di 30 secondi.

load_balancing_strategy
str oppure LoadBalancingStrategy

Quando il bilanciamento del carico viene eseguito, userà questa strategia per richiedere e bilanciare la proprietà della partizione. Usare "greedy" o LoadBalancingStrategy.GREEDY per la strategia greedy, che, per ogni valutazione di bilanciamento del carico, afferrarà quante partizioni non dichiarate necessarie per bilanciare il carico. Usare "bilanciato" o LoadBalancingStrategy.BALANCE per la strategia bilanciata, che, per ogni valutazione di bilanciamento del carico, richiede solo una partizione non richiesta da altri EventHubConsumerClient. Se tutte le partizioni di un Oggetto EventHub sono richieste da altri EventHubConsumerClient e questo client ha richiesto troppe partizioni, questo client ruberà una partizione da altri client per ogni valutazione di bilanciamento del carico indipendentemente dalla strategia di bilanciamento del carico. La strategia Greedy viene usata per impostazione predefinita.

custom_endpoint_address
str oppure None

Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà "sb:// custom_endpoint_hostname>:<custom_endpoint_port<>". Se la porta non è specificata nella custom_endpoint_address, verrà usata per impostazione predefinita la porta 443.

connection_verify
str oppure None

Percorso del file personalizzato CA_BUNDLE del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è None nel caso in cui verrà usato certifi.where().

uamqp_transport
bool

Se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.

socket_timeout
float

Tempo in secondi in cui il socket sottostante sulla connessione deve attendere quando si inviano e ricevono dati prima del timeout. Il valore predefinito è 0.2 per TransportType.Amqp e 1 per TransportType.AmqpOverWebsocket. Se si verificano errori EventHubsConnectionError a causa del timeout di scrittura, potrebbe essere necessario passare un valore maggiore di quello predefinito. Si tratta di scenari di utilizzo avanzati e il valore predefinito deve essere sufficiente.

Esempio

Creare una nuova istanza di EventHubConsumerClient.


   import os
   from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,
       consumer_group='$Default',
       credential=credential)

Metodi

close

Arrestare il recupero di eventi dall'hub eventi e chiudere la connessione e i collegamenti AMQP sottostanti.

from_connection_string

Creare un oggetto EventHubConsumerClient da un stringa di connessione.

get_eventhub_properties

Ottenere le proprietà dell'hub eventi.

Le chiavi nel dizionario restituito includono:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Ottiene gli ID di partizione dell'hub eventi.

get_partition_properties

Ottiene le proprietà della partizione specificata.

Le chiavi nel dizionario delle proprietà includono:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Ricevere eventi dalle partizioni, con bilanciamento del carico e checkpoint facoltativi.

receive_batch

Ricevere eventi dalle partizioni, con bilanciamento del carico e checkpoint facoltativi.

close

Arrestare il recupero di eventi dall'hub eventi e chiudere la connessione e i collegamenti AMQP sottostanti.

close() -> None

Tipo restituito

Esempio

Chiudere il client.


   import os
   import threading

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group="$Default",
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, multi-thread will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The 'receive' method is a blocking call, it can be executed in a thread for
   # non-blocking behavior, and combined with the 'close' method.

   worker = threading.Thread(
       target=consumer.receive,
       kwargs={
           "on_event": on_event,
           "starting_position": "-1",  # "-1" is from the beginning of the partition.
       }
   )
   worker.start()
   time.sleep(10)  # Keep receiving for 10s then close.
   # Close down the consumer handler explicitly.
   consumer.close()

from_connection_string

Creare un oggetto EventHubConsumerClient da un stringa di connessione.

from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient

Parametri

conn_str
str
Necessario

Stringa di connessione di un hub eventi.

consumer_group
str
Necessario

Ricevere eventi dall'hub eventi per questo gruppo di consumer.

eventhub_name
str

Percorso dell'hub eventi specifico a cui connettere il client.

logging_enable
bool

Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.

auth_timeout
float

Tempo in secondi per attendere che un token venga autorizzato dal servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.

user_agent
str

Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.

retry_total
int

Numero totale di tentativi di ridistribuire un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3. Il contesto dell'retry_total nella ricezione è speciale: il metodo di ricezione viene implementato da un metodo di ricezione interna chiamante del ciclo durante l'iterazione. Nel caso di ricezione , retry_total specifica i numeri di ripetizione dei tentativi dopo l'errore generato dal metodo di ricezione interno nel ciclo while-loop. Se i tentativi di ripetizione vengono esauriti, il callback on_error verrà chiamato (se specificato) con le informazioni sull'errore. Il consumer di partizione interna non riuscito verrà chiuso (on_partition_close verrà chiamato se specificato) e verrà creato un nuovo consumer di partizione interna (on_partition_initialize verrà chiamato se specificato) per riprendere la ricezione.

retry_backoff_factor
float

Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza ritardo). In modalità fissa, i criteri di ripetizione dei tentativi saranno sempre in sospensione per {fattore backoff}. In modalità 'esponenziale', i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se il backoff_factor è 0,1, il nuovo tentativo verrà interrotto per [0,0s, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.

retry_backoff_max
float

Tempo massimo di back off. Il valore predefinito è 120 secondi (2 minuti).

retry_mode
str

Comportamento di ritardo tra i tentativi. I valori supportati sono "fixed" o "esponenziale", dove il valore predefinito è "esponenziale".

idle_timeout
float

Timeout, in secondi, dopo il quale il client chiuderà la connessione sottostante se non è presente alcuna attività di distanza. Per impostazione predefinita, il valore è None, il che significa che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.

transport_type
TransportType

Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp nel qual caso viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, è possibile usare TransportType.AmqpOverWebsocket che usa la porta 443 per la comunicazione.

http_proxy
dict

Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, possono essere presenti le chiavi seguenti: 'username', 'password'.

checkpoint_store
CheckpointStore oppure None

Gestore che archivia i dati del bilanciamento del carico della partizione e del checkpoint durante la ricezione di eventi. L'archivio checkpoint verrà usato in entrambi i casi di ricezione da tutte le partizioni o da una singola partizione. Nel secondo caso il bilanciamento del carico non si applica. Se non viene fornito un archivio di checkpoint, il checkpoint verrà mantenuto internamente in memoria e l'istanza eventHubConsumerClient riceverà eventi senza bilanciamento del carico.

load_balancing_interval
float

Quando si esegue il bilanciamento del carico. Questo è l'intervallo, espresso in secondi, tra due valutazioni di bilanciamento del carico. Il valore predefinito è 10 secondi.

partition_ownership_expiration_interval
float

Una proprietà della partizione scadrà dopo questo numero di secondi. Ogni valutazione del bilanciamento del carico estenderà automaticamente la scadenza della proprietà. Il valore predefinito è 6 * load_balancing_interval, ovvero 60 secondi quando si usa il load_balancing_interval predefinito di 30 secondi.

load_balancing_strategy
str oppure LoadBalancingStrategy

Quando si attiva il bilanciamento del carico, questa strategia verrà usata per richiedere e bilanciare la proprietà della partizione. Usare "greedy" o LoadBalancingStrategy.GREEDY per la strategia greedy , che, per ogni valutazione di bilanciamento del carico, afferra il numero di partizioni non richieste per bilanciare il carico. Usare " bilanciato" o LoadBalancingStrategy.BALANCED per la strategia bilanciata, che, per ogni valutazione di bilanciamento del carico, richiede solo una partizione non richiesta da altri EventHubConsumerClient. Se tutte le partizioni di un hub eventi sono richieste da altri EventHubConsumerClient e questo client ha richiesto troppe partizioni, questo client ruberà una partizione da altri client per ogni valutazione del bilanciamento del carico indipendentemente dalla strategia di bilanciamento del carico. La strategia Greedy viene usata per impostazione predefinita.

custom_endpoint_address
str oppure None

Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà simile a "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se la porta non è specificata nel custom_endpoint_address, per impostazione predefinita verrà usata la porta 443.

connection_verify
str oppure None

Percorso del file di CA_BUNDLE personalizzato del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è Nessuno nel qual caso verrà usato certifi.where().

uamqp_transport
bool

Indica se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.

Tipo restituito

Esempio

Creare una nuova istanza di EventHubConsumerClient da stringa di connessione.


   import os
   from azure.eventhub import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Ottenere le proprietà dell'hub eventi.

Le chiavi nel dizionario restituito includono:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

Restituisce

Dizionario contenente informazioni sull'hub eventi.

Tipo restituito

Eccezioni

get_partition_ids

Ottiene gli ID di partizione dell'hub eventi.

get_partition_ids() -> List[str]

Restituisce

Elenco di ID partizione.

Tipo restituito

Eccezioni

get_partition_properties

Ottiene le proprietà della partizione specificata.

Le chiavi nel dizionario delle proprietà includono:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametri

partition_id
str
Necessario

ID partizione di destinazione.

Restituisce

Dizionario contenente le proprietà della partizione.

Tipo restituito

Eccezioni

receive

Ricevere eventi dalle partizioni, con bilanciamento del carico e checkpoint facoltativi.

receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None

Parametri

on_event
callable[PartitionContext, EventData oppure None]
Necessario

Funzione di callback per la gestione di un evento ricevuto. Il callback accetta due parametri: partition_context che contiene il contesto di partizione e l'evento che è l'evento ricevuto. La funzione di callback deve essere definita come: on_event(partition_context, evento). Per informazioni dettagliate sul contesto della partizione, vedere PartitionContext.

max_wait_time
float

Intervallo massimo in secondi di attesa del processore di eventi prima di chiamare il callback. Se non vengono ricevuti eventi entro questo intervallo, il callback on_event verrà chiamato con Nessuno. Se questo valore è impostato su Nessuno o 0 (impostazione predefinita), il callback non verrà chiamato finché non viene ricevuto un evento.

partition_id
str

Se specificato, il client riceverà solo da questa partizione. In caso contrario, il client riceverà da tutte le partizioni.

owner_level
int

Priorità per un consumer esclusivo. Se owner_level è impostato, verrà creato un consumer esclusivo. Un consumer con un owner_level superiore ha una priorità esclusiva più elevata. Il livello di proprietario è noto anche come "valore del periodo" del consumer.

prefetch
int

Numero di eventi da preletturare dal servizio per l'elaborazione. Il valore predefinito è 300.

track_last_enqueued_event_properties
bool

Indica se il consumer deve richiedere informazioni sull'ultimo evento accodato nella partizione associata e tenere traccia delle informazioni ricevute come eventi. Quando vengono rilevate informazioni sulle partizioni dell'ultimo evento accodato, ogni evento ricevuto dal servizio Hub eventi conterrà i metadati sulla partizione. Ciò comporta una piccola quantità di utilizzo aggiuntivo della larghezza di banda di rete che è in genere un compromesso favorevole quando viene considerato contro l'esecuzione periodica di richieste per le proprietà di partizione usando il client hub eventi. Per impostazione predefinita, è impostata su False .

starting_position
str, int, datetime oppure dict[str,any]

Iniziare a ricevere da questa posizione evento se non sono presenti dati di checkpoint per una partizione. I dati del checkpoint verranno usati se disponibili. Può trattarsi di un dict con ID partizione come chiave e posizione come valore per singole partizioni o un singolo valore per tutte le partizioni. Il tipo di valore può essere str, int o datetime.datetime. Sono supportati anche i valori "-1" per la ricezione dall'inizio del flusso e "@latest" per la ricezione solo di nuovi eventi. Il valore predefinito è "@latest".

starting_position_inclusive
bool oppure dict[str,bool]

Determinare se il starting_position specificato è inclusivo(>=) o meno (>). True per inclusivo e False per l'esclusivo. Può trattarsi di un dict con ID partizione come chiave e bool come valore che indica se il starting_position per una partizione specifica è inclusivo o meno. Può anche essere un singolo valore bool per tutti i starting_position. Il valore predefinito è False.

on_error
callable[[PartitionContext, Exception]]

Funzione di callback che verrà chiamata quando viene generato un errore durante la ricezione dopo l'esaurimento dei tentativi o durante il processo di bilanciamento del carico. Il callback accetta due parametri: partition_context che contiene le informazioni sulla partizione e l'errore che rappresenta l'eccezione. partition_context potrebbe essere Nessuno se l'errore viene generato durante il processo di bilanciamento del carico. Il callback deve essere definito come : on_error(partition_context, errore). Il callback on_error verrà chiamato anche se viene generata un'eccezione non gestita durante il callback on_event .

on_partition_initialize
callable[[PartitionContext]]

La funzione di callback che verrà chiamata dopo che un consumer per una determinata partizione termina l'inizializzazione. Viene chiamato anche quando viene creato un nuovo consumer di partizione interno per assumere il controllo del processo di ricezione per un consumer di partizione interna chiuso e non riuscito. Il callback accetta un singolo parametro: partition_context che contiene le informazioni sulla partizione. Il callback deve essere definito come: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

Funzione di callback che verrà chiamata dopo la chiusura di un consumer per una determinata partizione. Viene chiamato anche quando viene generato un errore durante la ricezione dopo che i tentativi di ripetizione vengono esauriti. Il callback accetta due parametri: partition_context che contiene informazioni sulla partizione e motivo della chiusura. Il callback deve essere definito come: on_partition_close(partition_context, motivo). Fare riferimento a CloseReason per i vari motivi di chiusura.

Tipo restituito

Esempio

Ricevere eventi da EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive(on_event=on_event)

receive_batch

Ricevere eventi dalle partizioni, con bilanciamento del carico e checkpoint facoltativi.

receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None

Parametri

on_event_batch
callable[PartitionContext, list[EventData]]
Necessario

Funzione di callback per la gestione di un batch di eventi ricevuti. Il callback accetta due parametri: partition_context che contiene il contesto di partizione e event_batch, ovvero gli eventi ricevuti. La funzione di callback deve essere definita come: on_event_batch(partition_context, event_batch). event_batch potrebbe essere un elenco vuoto se max_wait_time non è nessuna né 0 e non viene ricevuto alcun evento dopo max_wait_time. Per informazioni dettagliate sul contesto della partizione, vedere PartitionContext.

max_batch_size
int

Numero massimo di eventi in un batch passato al callback on_event_batch. Se il numero effettivo di eventi ricevuti è maggiore di max_batch_size, gli eventi ricevuti vengono suddivisi in batch e chiamano il callback per ogni batch con un massimo di max_batch_size eventi .

max_wait_time
float

Intervallo massimo in secondi in cui il processore di eventi attenderà prima di chiamare il callback. Se non vengono ricevuti eventi entro questo intervallo, il callback on_event_batch verrà chiamato con un elenco vuoto.

partition_id
str

Se specificato, il client riceverà solo da questa partizione. In caso contrario, il client riceverà da tutte le partizioni.

owner_level
int

Priorità per un consumatore esclusivo. Se owner_level è impostato, verrà creato un consumer esclusivo. Un consumer con un owner_level superiore ha priorità più elevata. Il livello di proprietario è noto anche come "valore epoca" del consumer.

prefetch
int

Numero di eventi da prefetch dal servizio per l'elaborazione. Il valore predefinito è 300.

track_last_enqueued_event_properties
bool

Indica se il consumer deve richiedere informazioni sull'ultimo evento accodato nella partizione associata e tenere traccia di tali informazioni come eventi. Quando vengono monitorate le informazioni sulle partizioni dell'ultimo evento, ogni evento ricevuto dal servizio Hub eventi conterrà metadati sulla partizione. Ciò comporta una piccola quantità di utilizzo aggiuntivo della larghezza di banda di rete che è in genere un compromesso favorevole quando considerato contro le richieste periodicamente per le proprietà di partizione usando il client hub eventi. È impostato su False per impostazione predefinita.

starting_position
str, int, datetime oppure dict[str,any]

Iniziare a ricevere da questa posizione evento se non sono presenti dati di checkpoint per una partizione. I dati del checkpoint verranno usati se disponibili. Questo può essere un dict con ID di partizione come chiave e posizione come valore per singole partizioni o un singolo valore per tutte le partizioni. Il tipo di valore può essere str, int o datetime.datetime. Sono supportati anche i valori "-1" per la ricezione dall'inizio del flusso e "@latest" per ricevere solo nuovi eventi. Il valore predefinito è "@latest".

starting_position_inclusive
bool oppure dict[str,bool]

Determinare se il starting_position specificato è inclusivo(=) o meno (>>). True per inclusivo e False per l'esclusiva. Può trattarsi di un id di partizione come chiave e bool come valore che indica se il starting_position per una partizione specifica è inclusivo o meno. Può anche essere un singolo valore bool per tutti i starting_position. Il valore predefinito è False.

on_error
callable[[PartitionContext, Exception]]

Funzione di callback che verrà chiamata quando viene generato un errore durante la ricezione dopo l'esaurimento dei tentativi di ripetizione o durante il processo di bilanciamento del carico. Il callback accetta due parametri: partition_context che contiene le informazioni di partizione e l'errore che rappresenta l'eccezione. partition_context potrebbe essere Nessuna se l'errore viene generato durante il processo di bilanciamento del carico. Il callback deve essere definito come: on_error(partition_context, errore). Il callback on_error verrà chiamato anche se viene generata un'eccezione non gestita durante il callback on_event.

on_partition_initialize
callable[[PartitionContext]]

La funzione di callback che verrà chiamata dopo che un consumer per una determinata partizione termina l'inizializzazione. Verrà chiamato anche quando viene creato un nuovo consumer di partizioni interne per eseguire il processo di ricezione per un consumer di partizione interno non riuscito e chiuso. Il callback accetta un singolo parametro: partition_context che contiene le informazioni sulla partizione. Il callback deve essere definito come: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

La funzione di callback che verrà chiamata dopo la chiusura di un consumer per una determinata partizione. Verrà chiamato anche quando viene generato un errore durante la ricezione dopo l'esaurimento dei tentativi. Il callback accetta due parametri: partition_context che contiene informazioni di partizione e motivo della chiusura. Il callback deve essere definito come: on_partition_close(partition_context, motivo). Fare riferimento a CloseReason per i vari motivi di chiusura.

Tipo restituito

Esempio

Ricevere eventi in batch da EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received events from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive_batch(on_event_batch=on_event_batch)