EventHubConsumerClient Classe
La classe EventHubConsumerClient definisce un'interfaccia di alto livello per la ricezione di eventi dal servizio Hub eventi di Azure.
L'obiettivo principale di EventHubConsumerClient è ricevere eventi da tutte le partizioni di un EventHub con bilanciamento del carico e checkpoint.
Quando più istanze di EventHubConsumerClient vengono eseguite nello stesso hub eventi, nel gruppo di consumer e nella posizione di checkpoint, le partizioni verranno distribuite in modo uniforme tra di esse.
Per abilitare il bilanciamento del carico e i checkpoint persistenti, è necessario impostare checkpoint_store durante la creazione di EventHubConsumerClient. Se non viene fornito un archivio checkpoint, il checkpoint verrà mantenuto internamente in memoria.
Un oggetto EventHubConsumerClient può anche ricevere da una partizione specifica quando si chiama il metodo receive() o receive_batch() e specificare la partition_id. Il bilanciamento del carico non funzionerà in modalità partizione singola. Tuttavia, gli utenti possono comunque salvare i checkpoint se il checkpoint_store è impostato.
- Ereditarietà
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
Costruttore
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parametri
- fully_qualified_namespace
- str
Nome host completo per lo spazio dei nomi Hub eventi. Il formato dello spazio dei nomi è: .servicebus.windows.net.
- credential
- AsyncTokenCredential oppure AzureSasCredential oppure AzureNamedKeyCredential
Oggetto credenziale usato per l'autenticazione che implementa un'interfaccia specifica per ottenere i token. EventHubSharedKeyCredentialAccetta o oggetti credenziali generati dalla libreria e dagli oggetti azure-identity che implementano il metodo *get_token(self, ambiti).
- logging_enable
- bool
Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.
- auth_timeout
- float
Tempo in secondi per attendere che un token venga autorizzato dal servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.
- user_agent
- str
Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.
- retry_total
- int
Numero totale di tentativi di ridistribuire un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3. Il contesto dell'retry_total nella ricezione è speciale: il metodo di ricezione viene implementato da un metodo di ricezione interna chiamante del ciclo durante l'iterazione. Nel caso di ricezione , retry_total specifica i numeri di ripetizione dei tentativi dopo l'errore generato dal metodo di ricezione interno nel ciclo while-loop. Se i tentativi di ripetizione vengono esauriti, il callback on_error verrà chiamato (se specificato) con le informazioni sull'errore. Il consumer di partizione interna non riuscito verrà chiuso (on_partition_close verrà chiamato se specificato) e verrà creato un nuovo consumer di partizione interna (on_partition_initialize verrà chiamato se specificato) per riprendere la ricezione.
- retry_backoff_factor
- float
Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza ritardo). In modalità fissa, i criteri di ripetizione dei tentativi saranno sempre in sospensione per {fattore backoff}. In modalità 'esponenziale', i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se l'backoff_factor è 0,1, il tentativo verrà riposato per [0,0, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.
- retry_backoff_max
- float
Tempo massimo di off off. Il valore predefinito è 120 secondi (2 minuti).
- retry_mode
- str
Comportamento di ritardo tra tentativi di ripetizione. I valori supportati sono "fissi" o "esponenziali", dove il valore predefinito è "esponenziale".
- idle_timeout
- float
Timeout, in secondi, dopo il quale questo client chiuderà la connessione sottostante se non è presente un'ulteriore attività. Per impostazione predefinita, il valore non è Nessuno, ovvero che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.
- transport_type
- TransportType
Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp in cui viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, TransportType.AmqpOverWebsocket può essere usata invece che usa la porta 443 per la comunicazione.
- http_proxy
Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int).
- checkpoint_store
- Optional[CheckpointStore]
Gestione che archivia i dati del bilanciamento del carico di partizione e checkpoint durante la ricezione di eventi. L'archivio checkpoint verrà usato in entrambi i casi di ricezione da tutte le partizioni o da una singola partizione. In quest'ultimo caso il bilanciamento del carico non si applica. Se non viene fornito un archivio checkpoint, il checkpoint verrà mantenuto internamente in memoria e l'istanza di EventHubConsumerClient riceverà eventi senza bilanciamento del carico.
- load_balancing_interval
- float
Quando si esegue il bilanciamento del carico. Questo è l'intervallo, in secondi, tra due valutazioni di bilanciamento del carico. Il valore predefinito è 30 secondi.
- partition_ownership_expiration_interval
- float
Una proprietà della partizione scade dopo questo numero di secondi. Ogni valutazione di bilanciamento del carico estende automaticamente la scadenza della proprietà. Il valore predefinito è 6 * load_balancing_interval, ad esempio 180 secondi quando si usa il load_balancing_interval predefinito di 30 secondi.
- load_balancing_strategy
- str oppure LoadBalancingStrategy
Quando il bilanciamento del carico viene eseguito, userà questa strategia per richiedere e bilanciare la proprietà della partizione. Usare "greedy" o LoadBalancingStrategy.GREEDY per la strategia greedy, che, per ogni valutazione di bilanciamento del carico, afferrarà quante partizioni non dichiarate necessarie per bilanciare il carico. Usare "bilanciato" o LoadBalancingStrategy.BALANCE per la strategia bilanciata, che, per ogni valutazione di bilanciamento del carico, richiede solo una partizione non richiesta da altri EventHubConsumerClient. Se tutte le partizioni di un Oggetto EventHub sono richieste da altri EventHubConsumerClient e questo client ha richiesto troppe partizioni, questo client ruberà una partizione da altri client per ogni valutazione di bilanciamento del carico indipendentemente dalla strategia di bilanciamento del carico. La strategia Greedy viene usata per impostazione predefinita.
Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà "sb:// custom_endpoint_hostname>:<custom_endpoint_port<>". Se la porta non è specificata nella custom_endpoint_address, verrà usata per impostazione predefinita la porta 443.
Percorso del file personalizzato CA_BUNDLE del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è None nel caso in cui verrà usato certifi.where().
- uamqp_transport
- bool
Se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.
- socket_timeout
- float
Tempo in secondi in cui il socket sottostante sulla connessione deve attendere quando si inviano e ricevono dati prima del timeout. Il valore predefinito è 0.2 per TransportType.Amqp e 1 per TransportType.AmqpOverWebsocket. Se si verificano errori EventHubsConnectionError a causa del timeout di scrittura, potrebbe essere necessario passare un valore maggiore di quello predefinito. Si tratta di scenari di utilizzo avanzati e il valore predefinito deve essere sufficiente.
Esempio
Creare una nuova istanza di EventHubConsumerClient.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Metodi
close |
Arrestare il recupero di eventi dall'hub eventi e chiudere la connessione e i collegamenti AMQP sottostanti. |
from_connection_string |
Creare un oggetto EventHubConsumerClient da un stringa di connessione. |
get_eventhub_properties |
Ottenere le proprietà dell'hub eventi. Le chiavi nel dizionario restituito includono:
|
get_partition_ids |
Ottenere gli ID di partizione dell'hub eventi. |
get_partition_properties |
Ottenere le proprietà della partizione specificata. Le chiavi nel dizionario delle proprietà includono:
|
receive |
Ricevere eventi dalle partizioni, con bilanciamento del carico e checkpoint facoltativi. |
receive_batch |
Ricevere eventi da partizioni in batch, con bilanciamento del carico e checkpoint facoltativi. |
close
Arrestare il recupero di eventi dall'hub eventi e chiudere la connessione e i collegamenti AMQP sottostanti.
async close() -> None
Tipo restituito
Esempio
Chiudere il client.
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
Creare un oggetto EventHubConsumerClient da un stringa di connessione.
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
Parametri
- eventhub_name
- str
Percorso dell'hub eventi specifico a cui connettere il client.
- logging_enable
- bool
Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.
- http_proxy
- dict
Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, le chiavi seguenti possono essere presenti: 'username', 'password'.
- auth_timeout
- float
Tempo in secondi per attendere che un token venga autorizzato dal servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.
- user_agent
- str
Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.
- retry_total
- int
Numero totale di tentativi di ridistribuire un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3. Il contesto dell'retry_total nella ricezione è speciale: il metodo di ricezione viene implementato da un metodo di ricezione interna chiamante del ciclo durante l'iterazione. Nel caso di ricezione , retry_total specifica i numeri di ripetizione dei tentativi dopo l'errore generato dal metodo di ricezione interno nel ciclo while-loop. Se i tentativi di ripetizione vengono esauriti, il callback on_error verrà chiamato (se specificato) con le informazioni sull'errore. Il consumer di partizione interna non riuscito verrà chiuso (on_partition_close verrà chiamato se specificato) e verrà creato un nuovo consumer di partizione interna (on_partition_initialize verrà chiamato se specificato) per riprendere la ricezione.
- retry_backoff_factor
- float
Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza ritardo). In modalità fissa, i criteri di ripetizione dei tentativi saranno sempre in sospensione per {fattore backoff}. In modalità 'esponenziale', i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se l'backoff_factor è 0,1, il tentativo verrà riposato per [0,0, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.
- retry_backoff_max
- float
Tempo massimo di off off. Il valore predefinito è 120 secondi (2 minuti).
- retry_mode
- str
Comportamento di ritardo tra tentativi di ripetizione. I valori supportati sono "fissi" o "esponenziali", dove il valore predefinito è "esponenziale".
- idle_timeout
- float
Timeout, in secondi, dopo il quale questo client chiuderà la connessione sottostante se non è presente un'ulteriore attività. Per impostazione predefinita, il valore non è Nessuno, ovvero che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.
- transport_type
- TransportType
Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp in cui viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, TransportType.AmqpOverWebsocket può essere usata invece che usa la porta 443 per la comunicazione.
- checkpoint_store
- Optional[CheckpointStore]
Gestione che archivia i dati del bilanciamento del carico di partizione e checkpoint durante la ricezione di eventi. L'archivio checkpoint verrà usato in entrambi i casi di ricezione da tutte le partizioni o da una singola partizione. In quest'ultimo caso il bilanciamento del carico non si applica. Se non viene fornito un archivio checkpoint, il checkpoint verrà mantenuto internamente in memoria e l'istanza di EventHubConsumerClient riceverà eventi senza bilanciamento del carico.
- load_balancing_interval
- float
Quando si esegue il bilanciamento del carico. Questo è l'intervallo, in secondi, tra due valutazioni di bilanciamento del carico. Il valore predefinito è 30 secondi.
- partition_ownership_expiration_interval
- float
Una proprietà della partizione scade dopo questo numero di secondi. Ogni valutazione di bilanciamento del carico estende automaticamente la scadenza della proprietà. Il valore predefinito è 6 * load_balancing_interval, ad esempio 180 secondi quando si usa il load_balancing_interval predefinito di 30 secondi.
- load_balancing_strategy
- str oppure LoadBalancingStrategy
Quando il bilanciamento del carico viene eseguito, userà questa strategia per richiedere e bilanciare la proprietà della partizione. Usare "greedy" o LoadBalancingStrategy.GREEDY per la strategia greedy, che, per ogni valutazione di bilanciamento del carico, afferrarà quante partizioni non dichiarate necessarie per bilanciare il carico. Usare "bilanciato" o LoadBalancingStrategy.BALANCE per la strategia bilanciata, che, per ogni valutazione di bilanciamento del carico, richiede solo una partizione non richiesta da altri EventHubConsumerClient. Se tutte le partizioni di un Oggetto EventHub sono richieste da altri EventHubConsumerClient e questo client ha richiesto troppe partizioni, questo client ruberà una partizione da altri client per ogni valutazione di bilanciamento del carico indipendentemente dalla strategia di bilanciamento del carico. La strategia Greedy viene usata per impostazione predefinita.
Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà "sb:// custom_endpoint_hostname>:<custom_endpoint_port<>". Se la porta non è specificata nella custom_endpoint_address, verrà usata per impostazione predefinita la porta 443.
Percorso del file personalizzato CA_BUNDLE del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è None nel caso in cui verrà usato certifi.where().
- uamqp_transport
- bool
Se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.
Tipo restituito
Esempio
Creare una nuova istanza di EventHubConsumerClient da stringa di connessione.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Ottenere le proprietà dell'hub eventi.
Le chiavi nel dizionario restituito includono:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Restituisce
Dizionario contenente informazioni sull'hub eventi.
Tipo restituito
Eccezioni
get_partition_ids
Ottenere gli ID di partizione dell'hub eventi.
async get_partition_ids() -> List[str]
Restituisce
Elenco di ID di partizione.
Tipo restituito
Eccezioni
get_partition_properties
Ottenere le proprietà della partizione specificata.
Le chiavi nel dizionario delle proprietà includono:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametri
Restituisce
Dizionario contenente proprietà di partizione.
Tipo restituito
Eccezioni
receive
Ricevere eventi dalle partizioni, con bilanciamento del carico e checkpoint facoltativi.
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parametri
- on_event
- Callable[PartitionContext, Optional[EventData]]
Funzione di callback per la gestione di un evento ricevuto. Il callback accetta due parametri: partition_context che contiene il contesto di partizione e l'evento che è l'evento ricevuto. La funzione di callback deve essere definita come: on_event(partition_context, evento). Per informazioni dettagliate sul contesto della partizione, vedere PartitionContext.
- max_wait_time
- float
Intervallo massimo in secondi in cui il processore di eventi attenderà prima di chiamare il callback. Se non vengono ricevuti eventi entro questo intervallo, il callback on_event verrà chiamato con Nessuno. Se questo valore è impostato su Nessuno o 0 (impostazione predefinita), il callback non verrà chiamato finché non viene ricevuto un evento.
- partition_id
- str
Se specificato, il client riceverà solo da questa partizione. In caso contrario, il client riceverà da tutte le partizioni.
- owner_level
- int
Priorità per un consumatore esclusivo. Se owner_level è impostato, verrà creato un consumer esclusivo. Un consumer con un owner_level superiore ha priorità più elevata. Il livello di proprietario è noto anche come "valore epoca" del consumer.
- prefetch
- int
Numero di eventi da prefetch dal servizio per l'elaborazione. Il valore predefinito è 300.
- track_last_enqueued_event_properties
- bool
Indica se il consumer deve richiedere informazioni sull'ultimo evento accodato nella partizione associata e tenere traccia di tali informazioni come eventi. Quando vengono monitorate le informazioni sulle partizioni dell'ultimo evento, ogni evento ricevuto dal servizio Hub eventi conterrà metadati sulla partizione. Ciò comporta una piccola quantità di utilizzo aggiuntivo della larghezza di banda di rete che è in genere un compromesso favorevole quando considerato contro le richieste periodicamente per le proprietà di partizione usando il client hub eventi. È impostato su False per impostazione predefinita.
Iniziare a ricevere da questa posizione evento se non sono presenti dati di checkpoint per una partizione. I dati del checkpoint verranno usati se disponibili. Questo può essere un dict con ID di partizione come chiave e posizione come valore per singole partizioni o un singolo valore per tutte le partizioni. Il tipo di valore può essere str, int o datetime.datetime. Sono supportati anche i valori "-1" per la ricezione dall'inizio del flusso e "@latest" per ricevere solo nuovi eventi.
Determinare se il starting_position specificato è inclusivo(=) o meno (>>). True per inclusivo e False per l'esclusiva. Può trattarsi di un id di partizione come chiave e bool come valore che indica se il starting_position per una partizione specifica è inclusivo o meno. Può anche essere un singolo valore bool per tutti i starting_position. Il valore predefinito è False.
- on_error
- Callable[[PartitionContext, Exception]]
Funzione di callback che verrà chiamata quando viene generato un errore durante la ricezione dopo l'esaurimento dei tentativi di ripetizione o durante il processo di bilanciamento del carico. Il callback accetta due parametri: partition_context che contiene le informazioni di partizione e l'errore che rappresenta l'eccezione. partition_context potrebbe essere Nessuna se l'errore viene generato durante il processo di bilanciamento del carico. Il callback deve essere definito come: on_error(partition_context, errore). Il callback on_error verrà chiamato anche se viene generata un'eccezione non gestita durante il callback on_event.
- on_partition_initialize
- Callable[[PartitionContext]]
La funzione di callback che verrà chiamata dopo che un consumer per una determinata partizione termina l'inizializzazione. Verrà chiamato anche quando viene creato un nuovo consumer di partizioni interne per eseguire il processo di ricezione per un consumer di partizione interno non riuscito e chiuso. Il callback accetta un singolo parametro: partition_context che contiene le informazioni sulla partizione. Il callback deve essere definito come: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
La funzione di callback che verrà chiamata dopo la chiusura di un consumer per una determinata partizione. Verrà chiamato anche quando viene generato un errore durante la ricezione dopo l'esaurimento dei tentativi. Il callback accetta due parametri: partition_context che contiene informazioni di partizione e motivo della chiusura. Il callback deve essere definito come: on_partition_close(partition_context, motivo). Fare riferimento a CloseReason per i vari motivi di chiusura.
Tipo restituito
Esempio
Ricevere eventi da EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
Ricevere eventi da partizioni in batch, con bilanciamento del carico e checkpoint facoltativi.
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parametri
- on_event_batch
- Callable[PartitionContext, List[EventData]]
Funzione di callback per la gestione di un batch di eventi ricevuti. Il callback accetta due parametri: partition_context che contiene il contesto di partizione e event_batch, ovvero gli eventi ricevuti. La funzione di callback deve essere definita come: on_event_batch(partition_context, event_batch). event_batch potrebbe essere un elenco vuoto se max_wait_time non è nessuna né 0 e non viene ricevuto alcun evento dopo max_wait_time. Per informazioni dettagliate sul contesto della partizione, vedere PartitionContext.
- max_batch_size
- int
Numero massimo di eventi in un batch passato al callback on_event_batch. Se il numero effettivo di eventi ricevuti è maggiore di max_batch_size, gli eventi ricevuti vengono suddivisi in batch e chiamano il callback per ogni batch con un massimo di max_batch_size eventi .
- max_wait_time
- float
Intervallo massimo in secondi in cui il processore di eventi attenderà prima di chiamare il callback. Se non vengono ricevuti eventi entro questo intervallo, il callback on_event_batch verrà chiamato con un elenco vuoto. Se questo valore è impostato su Nessuno o 0 (impostazione predefinita), il callback non verrà chiamato finché non vengono ricevuti gli eventi.
- partition_id
- str
Se specificato, il client riceverà solo da questa partizione. In caso contrario, il client riceverà da tutte le partizioni.
- owner_level
- int
Priorità per un consumatore esclusivo. Se owner_level è impostato, verrà creato un consumer esclusivo. Un consumer con un owner_level superiore ha priorità più elevata. Il livello di proprietario è noto anche come "valore epoca" del consumer.
- prefetch
- int
Numero di eventi da prefetch dal servizio per l'elaborazione. Il valore predefinito è 300.
- track_last_enqueued_event_properties
- bool
Indica se il consumer deve richiedere informazioni sull'ultimo evento accodato nella partizione associata e tenere traccia di tali informazioni come eventi. Quando vengono monitorate le informazioni sulle partizioni dell'ultimo evento, ogni evento ricevuto dal servizio Hub eventi conterrà metadati sulla partizione. Ciò comporta una piccola quantità di utilizzo aggiuntivo della larghezza di banda di rete che è in genere un compromesso favorevole quando considerato contro le richieste periodicamente per le proprietà di partizione usando il client hub eventi. È impostato su False per impostazione predefinita.
Iniziare a ricevere da questa posizione evento se non sono presenti dati di checkpoint per una partizione. I dati del checkpoint verranno usati se disponibili. Questo può essere un dict con ID di partizione come chiave e posizione come valore per singole partizioni o un singolo valore per tutte le partizioni. Il tipo di valore può essere str, int o datetime.datetime. Sono supportati anche i valori "-1" per la ricezione dall'inizio del flusso e "@latest" per ricevere solo nuovi eventi.
Determinare se il starting_position specificato è inclusivo(=) o meno (>>). True per inclusivo e False per l'esclusiva. Può trattarsi di un id di partizione come chiave e bool come valore che indica se il starting_position per una partizione specifica è inclusivo o meno. Può anche essere un singolo valore bool per tutti i starting_position. Il valore predefinito è False.
- on_error
- Callable[[PartitionContext, Exception]]
Funzione di callback che verrà chiamata quando viene generato un errore durante la ricezione dopo l'esaurimento dei tentativi di ripetizione o durante il processo di bilanciamento del carico. Il callback accetta due parametri: partition_context che contiene le informazioni di partizione e l'errore che rappresenta l'eccezione. partition_context potrebbe essere Nessuna se l'errore viene generato durante il processo di bilanciamento del carico. Il callback deve essere definito come: on_error(partition_context, errore). Il callback on_error verrà chiamato anche se viene generata un'eccezione non gestita durante il callback on_event.
- on_partition_initialize
- Callable[[PartitionContext]]
La funzione di callback che verrà chiamata dopo che un consumer per una determinata partizione termina l'inizializzazione. Verrà chiamato anche quando viene creato un nuovo consumer di partizioni interne per eseguire il processo di ricezione per un consumer di partizione interno non riuscito e chiuso. Il callback accetta un singolo parametro: partition_context che contiene le informazioni sulla partizione. Il callback deve essere definito come: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
La funzione di callback che verrà chiamata dopo la chiusura di un consumer per una determinata partizione. Verrà chiamato anche quando viene generato un errore durante la ricezione dopo l'esaurimento dei tentativi. Il callback accetta due parametri: partition_context che contiene informazioni di partizione e motivo della chiusura. Il callback deve essere definito come: on_partition_close(partition_context, motivo). Fare riferimento a CloseReason per i vari motivi di chiusura.
Tipo restituito
Esempio
Ricevere eventi in batch da EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python