EventHubProducerClient Classe
La classe EventHubProducerClient definisce un'interfaccia di alto livello per l'invio di eventi al servizio Hub eventi di Azure.
- Ereditarietà
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubProducerClient
Costruttore
EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)
Parametri
- fully_qualified_namespace
- str
Nome host completo per lo spazio dei nomi di Hub eventi. È probabile che sia simile a .servicebus.windows.net
- credential
- AsyncTokenCredential oppure AzureSasCredential oppure AzureNamedKeyCredential
Oggetto credenziale usato per l'autenticazione che implementa una particolare interfaccia per ottenere i token. Accetta EventHubSharedKeyCredentialoggetti credenziali o generati dalla libreria azure-identity e dagli oggetti che implementano il metodo *get_token(self, scopes).
- buffered_mode
- bool
Se True, il client producer raccoglierà gli eventi in un buffer, in modo efficiente batch, quindi pubblicherà. Il valore predefinito è False.
Callback da chiamare dopo la pubblicazione di un batch. Il callback accetta due parametri:
events: elenco di eventi che sono stati pubblicati correttamente
partition_id: ID di partizione in cui sono stati pubblicati gli eventi nell'elenco.
La funzione di callback deve essere definita come: on_success(eventi, partition_id). Obbligatorio quando buffered_mode è True se facoltativo se buffered_mode è False.
Callback da chiamare dopo la pubblicazione di un batch. Obbligatorio quando in buffered_mode è True se facoltativo se buffered_mode è False. La funzione di callback deve essere definita come : on_error(eventi, partition_id, errore), dove:
events: elenco di eventi che non è stato possibile pubblicare,
partition_id: ID partizione in cui sono stati tentati di pubblicare gli eventi nell'elenco e
error: eccezione correlata all'errore di invio.
Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:
Se un callback on_error viene passato durante la creazione di istanze del client producer,
le informazioni sull'errore verranno quindi passate al callback on_error , che verrà quindi chiamato.
Se un callback on_error non viene passato durante la creazione di istanze del client,
l'errore verrà generato per impostazione predefinita.
Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:
Se gli eventi non vengono accodati entro il timeout specificato, verrà generato direttamente un errore.
Se gli eventi non vengono inviati dopo l'accodamento, verrà chiamato il callback on_error .
- max_buffer_length
- int
Solo modalità memorizzata nel buffer. Numero totale di eventi per partizione che possono essere memorizzati nel buffer prima dell'attivazione di uno scaricamento. Il valore predefinito è 1500 in modalità memorizzata nel buffer.
Solo modalità memorizzata nel buffer. Quantità di tempo di attesa per la compilazione di un batch con eventi nel buffer prima della pubblicazione. Il valore predefinito è 1 in modalità memorizzata nel buffer.
- logging_enable
- bool
Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.
- auth_timeout
- float
Tempo in secondi di attesa dell'autorizzazione di un token da parte del servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.
- user_agent
- str
Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.
- retry_total
- int
Numero totale di tentativi di rollforward di un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3.
- retry_backoff_factor
- float
Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza alcun ritardo). In modalità fissa, i criteri di ripetizione dei tentativi verranno sempre in sospensione per {fattore backoff}. In modalità "esponenziale", i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se il backoff_factor è 0,1, il nuovo tentativo verrà interrotto per [0,0s, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.
- retry_backoff_max
- float
Tempo massimo di back off. Il valore predefinito è 120 secondi (2 minuti).
- retry_mode
- str
Comportamento di ritardo tra i tentativi. I valori supportati sono "fixed" o "esponenziale", dove il valore predefinito è "esponenziale".
- idle_timeout
- float
Timeout, in secondi, dopo il quale il client chiuderà la connessione sottostante se non è presente alcuna attività. Per impostazione predefinita, il valore è None, il che significa che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.
- transport_type
- TransportType
Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp nel qual caso viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, è possibile usare TransportType.AmqpOverWebsocket che usa la porta 443 per la comunicazione.
- http_proxy
- dict
Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, possono essere presenti le chiavi seguenti: 'username', 'password'.
Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà simile a "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se la porta non è specificata nel custom_endpoint_address, per impostazione predefinita verrà usata la porta 443.
Percorso del file di CA_BUNDLE personalizzato del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è Nessuno nel qual caso verrà usato certifi.where().
- uamqp_transport
- bool
Indica se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.
- socket_timeout
- float
Tempo in secondi in cui il socket sottostante sulla connessione deve attendere quando si inviano e ricevono dati prima del timeout. Il valore predefinito è 0.2 per TransportType.Amqp e 1 per TransportType.AmqpOverWebsocket. Se si verificano errori EventHubsConnectionError a causa del timeout della scrittura, potrebbe essere necessario passare un valore maggiore del valore predefinito. Si tratta di scenari di utilizzo avanzati e in genere il valore predefinito deve essere sufficiente.
Esempio
Creare una nuova istanza di EventHubProducerClient.
import os
from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Metodi
close |
Chiudere il client Producer sottostante la connessione AMQP e i collegamenti. |
create_batch |
Creare un oggetto EventDataBatch con la dimensione massima di tutto il contenuto vincolato da max_size_in_bytes. Il max_size_in_bytes non deve essere maggiore della dimensione massima consentita del messaggio definita dal servizio. |
flush |
Solo modalità memorizzata nel buffer. Scarica gli eventi nel buffer da inviare immediatamente se il client funziona in modalità memorizzata nel buffer. |
from_connection_string |
Creare un oggetto EventHubProducerClient da un stringa di connessione. |
get_buffered_event_count |
Numero di eventi memorizzati nel buffer e in attesa di pubblicazione per una determinata partizione. Restituisce Nessuno in modalità non memorizzata nel buffer. NOTA: il buffer eventi viene elaborato in una coroutine in background, pertanto il numero di eventi nel buffer segnalato da questa API deve essere considerato solo un'approssimazione ed è consigliato solo per l'uso nel debug. Per un ID di partizione senza eventi memorizzati nel buffer, verrà restituito 0 indipendentemente dal fatto che tale ID di partizione esista effettivamente all'interno dell'hub eventi. |
get_eventhub_properties |
Ottenere le proprietà dell'hub eventi. Le chiavi nel dizionario restituito includono:
|
get_partition_ids |
Ottiene gli ID di partizione dell'hub eventi. |
get_partition_properties |
Ottiene le proprietà della partizione specificata. Le chiavi nel dizionario delle proprietà includono:
|
send_batch |
Invia un batch di dati dell'evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo accoderà gli eventi nel buffer locale e restituirà. Il producer eseguirà l'invio automatico in background. Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:
Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:
In modalità memorizzata nel buffer, l'invio di un batch rimarrà intatto e inviato come singola unità. Il batch non verrà riorganiato. Ciò può comportare l'inefficienza dell'invio di eventi. Se si invia un elenco finito di EventData o AmqpAnnotatedMessage e si sa che è entro il limite di dimensioni del frame dell'hub eventi, è possibile inviarli con una chiamata send_batch . In caso contrario, usare create_batch per creare EventDataBatch e aggiungere EventData o AmqpAnnotatedMessage nel batch uno per uno fino al limite di dimensioni e quindi chiamare questo metodo per inviare il batch. |
send_event |
Invia i dati di un evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo accoderà l'evento nel buffer locale e restituirà. Il producer eseguirà l'invio in batch automatico e l'invio in background. Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue: * Se viene passato un callback on_error durante la creazione di istanze del client producer,
Se buffered_mode è True, on_error callback è obbligatorio e gli errori verranno gestiti nel modo seguente: * Se gli eventi non riescono a accodare entro il timeout specificato, verrà generato un errore direttamente.
|
close
Chiudere il client Producer sottostante la connessione AMQP e i collegamenti.
async close(*, flush: bool = True, **kwargs: Any) -> None
Parametri
- flush
- bool
Solo modalità memorizzata nel buffer. Se impostato su True, gli eventi nel buffer verranno inviati immediatamente. Il valore predefinito è true.
Solo modalità memorizzata nel buffer. Timeout per chiudere il producer. Il valore predefinito è Nessuno, ovvero nessun timeout.
Tipo restituito
Eccezioni
Se si verifica un errore durante lo scaricamento del buffer se lo scaricamento è impostato su True o la chiusura delle connessioni AMQP sottostanti in modalità memorizzata nel buffer.
Esempio
Chiudere il gestore.
import os
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
try:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
finally:
# Close down the producer handler.
await producer.close()
create_batch
Creare un oggetto EventDataBatch con la dimensione massima di tutto il contenuto vincolato da max_size_in_bytes.
Il max_size_in_bytes non deve essere maggiore della dimensione massima consentita del messaggio definita dal servizio.
async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch
Tipo restituito
Eccezioni
Se si verifica un errore durante lo scaricamento del buffer se lo scaricamento è impostato su True o la chiusura delle connessioni AMQP sottostanti in modalità memorizzata nel buffer.
Esempio
Creare un oggetto EventDataBatch con dimensioni limitate
from azure.eventhub import EventData
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
flush
Solo modalità memorizzata nel buffer. Scarica gli eventi nel buffer da inviare immediatamente se il client funziona in modalità memorizzata nel buffer.
async flush(**kwargs: Any) -> None
Parametri
Timeout per scaricare gli eventi memorizzati nel buffer, il valore predefinito è Nessuno, ovvero nessun timeout.
Tipo restituito
Eccezioni
Se il producer non riesce a scaricare il buffer entro il timeout specificato in modalità memorizzata nel buffer.
from_connection_string
Creare un oggetto EventHubProducerClient da un stringa di connessione.
from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient
Parametri
- eventhub_name
- str
Percorso dell'hub eventi specifico a cui connettere il client.
- buffered_mode
- bool
Se True, il client producer raccoglierà gli eventi in un buffer, in modo efficiente batch, quindi pubblicherà. Il valore predefinito è False.
Callback da chiamare dopo la pubblicazione di un batch. Il callback accetta due parametri:
events: elenco di eventi che sono stati pubblicati correttamente
partition_id: ID di partizione in cui sono stati pubblicati gli eventi nell'elenco.
La funzione di callback deve essere definita come: on_success(eventi, partition_id). È obbligatorio quando buffered_mode è True se facoltativo se buffered_mode è False.
Callback da chiamare dopo la pubblicazione di un batch. La funzione di callback deve essere definita come : on_error(eventi, partition_id, errore), dove:
events: elenco di eventi che non è stato possibile pubblicare,
partition_id: ID partizione in cui sono stati tentati di pubblicare gli eventi nell'elenco e
error: eccezione correlata all'errore di invio.
Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:
Se un callback on_error viene passato durante la creazione di istanze del client producer,
le informazioni sull'errore verranno quindi passate al callback on_error , che verrà quindi chiamato.
Se un callback on_error non viene passato durante la creazione di istanze del client,
l'errore verrà generato per impostazione predefinita.
Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:
Se gli eventi non vengono accodati entro il timeout specificato, verrà generato direttamente un errore.
Se gli eventi non vengono inviati dopo l'accodamento, verrà chiamato il callback on_error .
- max_buffer_length
- int
Solo modalità memorizzata nel buffer. Numero totale di eventi per partizione che possono essere memorizzati nel buffer prima dell'attivazione di uno scaricamento. Il valore predefinito è 1500 in modalità memorizzata nel buffer.
Solo modalità memorizzata nel buffer. Quantità di tempo di attesa per la compilazione di un batch con eventi nel buffer prima della pubblicazione. Il valore predefinito è 1 in modalità memorizzata nel buffer.
- logging_enable
- bool
Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.
- http_proxy
- dict
Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, possono essere presenti le chiavi seguenti: 'username', 'password'.
- auth_timeout
- float
Tempo in secondi di attesa dell'autorizzazione di un token da parte del servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.
- user_agent
- str
Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.
- retry_total
- int
Numero totale di tentativi di rollforward di un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3.
- retry_backoff_factor
- float
Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza alcun ritardo). In modalità fissa, i criteri di ripetizione dei tentativi verranno sempre in sospensione per {fattore backoff}. In modalità "esponenziale", i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se il backoff_factor è 0,1, il nuovo tentativo verrà interrotto per [0,0s, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.
- retry_backoff_max
- float
Tempo massimo di back off. Il valore predefinito è 120 secondi (2 minuti).
- retry_mode
- str
Comportamento di ritardo tra i tentativi. I valori supportati sono "fixed" o "esponenziale", dove il valore predefinito è "esponenziale".
- idle_timeout
- float
Timeout, in secondi, dopo il quale il client chiuderà la connessione sottostante se non è presente alcuna attività. Per impostazione predefinita, il valore è None, il che significa che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.
- transport_type
- TransportType
Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp nel qual caso viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, è possibile usare TransportType.AmqpOverWebsocket che usa la porta 443 per la comunicazione.
Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà simile a "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se la porta non è specificata nel custom_endpoint_address, per impostazione predefinita verrà usata la porta 443.
Percorso del file di CA_BUNDLE personalizzato del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è Nessuno nel qual caso verrà usato certifi.where().
- uamqp_transport
- bool
Indica se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.
Tipo restituito
Eccezioni
Se si verifica un errore durante lo scaricamento del buffer se lo scaricamento è impostato su True o la chiusura delle connessioni AMQP sottostanti in modalità memorizzata nel buffer.
Esempio
Creare una nuova istanza di EventHubProducerClient da stringa di connessione.
import os
from azure.eventhub.aio import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_buffered_event_count
Numero di eventi memorizzati nel buffer e in attesa di pubblicazione per una determinata partizione. Restituisce Nessuno in modalità non memorizzata nel buffer. NOTA: il buffer eventi viene elaborato in una coroutine in background, pertanto il numero di eventi nel buffer segnalato da questa API deve essere considerato solo un'approssimazione ed è consigliato solo per l'uso nel debug. Per un ID di partizione senza eventi memorizzati nel buffer, verrà restituito 0 indipendentemente dal fatto che tale ID di partizione esista effettivamente all'interno dell'hub eventi.
get_buffered_event_count(partition_id: str) -> int | None
Parametri
Tipo restituito
Eccezioni
Se si verifica un errore durante lo scaricamento del buffer se lo scaricamento è impostato su True o la chiusura delle connessioni AMQP sottostanti in modalità memorizzata nel buffer.
get_eventhub_properties
Ottenere le proprietà dell'hub eventi.
Le chiavi nel dizionario restituito includono:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Restituisce
Dizionario contenente informazioni sull'hub eventi.
Tipo restituito
Eccezioni
get_partition_ids
Ottiene gli ID di partizione dell'hub eventi.
async get_partition_ids() -> List[str]
Restituisce
Elenco di ID partizione.
Tipo restituito
Eccezioni
get_partition_properties
Ottiene le proprietà della partizione specificata.
Le chiavi nel dizionario delle proprietà includono:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametri
Restituisce
Dict delle proprietà della partizione.
Tipo restituito
Eccezioni
send_batch
Invia un batch di dati dell'evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo accoderà gli eventi nel buffer locale e restituirà. Il producer eseguirà l'invio automatico in background.
Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:
Se un callback on_error viene passato durante la creazione di istanze del client producer,
le informazioni sull'errore verranno quindi passate al callback on_error , che verrà quindi chiamato.
Se un callback on_error non viene passato durante la creazione di istanze del client,
l'errore verrà generato per impostazione predefinita.
Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:
Se gli eventi non vengono accodati entro il timeout specificato, verrà generato direttamente un errore.
Se gli eventi non vengono inviati dopo l'accodamento, verrà chiamato il callback on_error .
In modalità memorizzata nel buffer, l'invio di un batch rimarrà intatto e inviato come singola unità. Il batch non verrà riorganiato. Ciò può comportare l'inefficienza dell'invio di eventi.
Se si invia un elenco finito di EventData o AmqpAnnotatedMessage e si sa che è entro il limite di dimensioni del frame dell'hub eventi, è possibile inviarli con una chiamata send_batch . In caso contrario, usare create_batch per creare EventDataBatch e aggiungere EventData o AmqpAnnotatedMessage nel batch uno per uno fino al limite di dimensioni e quindi chiamare questo metodo per inviare il batch.
async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None
Parametri
- event_data_batch
- Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Oggetto EventDataBatch da inviare o un elenco di EventData da inviare in un batch. Tutti gli EventData o AmqpAnnotatedMessage nell'elenco o EventDataBatch verranno inseriti nella stessa partizione.
- timeout
- float
Tempo di attesa massimo per inviare i dati dell'evento in modalità non memorizzata nel buffer o il tempo di attesa massimo per accodare i dati dell'evento nel buffer in modalità memorizzata nel buffer. In modalità non memorizzata nel buffer, verrà utilizzato il tempo di attesa predefinito specificato al momento della creazione del producer. In modalità memorizzata nel buffer il tempo di attesa predefinito è Nessuno.
- partition_id
- str
ID di partizione specifico a cui inviare. Il valore predefinito è Nessuno, nel qual caso il servizio verrà assegnato a tutte le partizioni usando il round robin. Se si specifica partition_id e event_data_batch è un EventDataBatch perché EventDataBatch ha partition_id, verrà generato un typeError.
- partition_key
- str
Con il partition_key specificato, i dati dell'evento verranno inviati a una determinata partizione dell'hub eventi deciso dal servizio. Se si specifica partition_key e event_data_batch è un oggetto EventDataBatch, verrà generato un typeerror perché EventDataBatch ha partition_key. Se vengono forniti sia partition_id che partition_key, il partition_id avrà la precedenza. AVVISO: l'impostazione di partition_key di valore non stringa sugli eventi da inviare è sconsigliata perché il partition_key verrà ignorato dal servizio Hub eventi e gli eventi verranno assegnati a tutte le partizioni tramite round robin. Sono inoltre disponibili SDK per l'utilizzo di eventi che prevedono che partition_key solo come tipo stringa, potrebbero non riuscire ad analizzare il valore non stringa.
Tipo restituito
Eccezioni
Se il valore specificato dal parametro di timeout è trascorso prima che l'evento possa essere inviato in modalità non memorizzata nel buffer o gli eventi possono essere accodati nel buffer in modalità memorizzata nel buffer.
Esempio
Invia in modo asincrono i dati degli eventi
async with producer:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
send_event
Invia i dati di un evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo accoderà l'evento nel buffer locale e restituirà. Il producer eseguirà l'invio in batch automatico e l'invio in background.
Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue: * Se viene passato un callback on_error durante la creazione di istanze del client producer,
then error information will be passed to the *on_error* callback, which will then be called.
* If an *on_error* callback is not passed in during client instantiation,
then the error will be raised by default.
Se buffered_mode è True, on_error callback è obbligatorio e gli errori verranno gestiti nel modo seguente: * Se gli eventi non riescono a accodare entro il timeout specificato, verrà generato un errore direttamente.
* If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None
Parametri
- timeout
- float
Tempo di attesa massimo per inviare i dati dell'evento in modalità non memorizzata nel buffer o il tempo di attesa massimo per accodare i dati dell'evento nel buffer in modalità memorizzata nel buffer. In modalità non memorizzata nel buffer, verrà utilizzato il tempo di attesa predefinito specificato al momento della creazione del producer. In modalità memorizzata nel buffer il tempo di attesa predefinito è Nessuno.
- partition_id
- str
ID di partizione specifico a cui inviare. Il valore predefinito è Nessuno, nel qual caso il servizio verrà assegnato a tutte le partizioni usando il round robin. Se si specifica partition_id e event_data_batch è un EventDataBatch perché EventDataBatch ha partition_id, verrà generato un typeError.
- partition_key
- str
Con il partition_key specificato, i dati dell'evento verranno inviati a una determinata partizione dell'hub eventi deciso dal servizio. Se si specifica partition_key e event_data_batch è un oggetto EventDataBatch, verrà generato un typeerror perché EventDataBatch ha partition_key. Se vengono forniti sia partition_id che partition_key, il partition_id avrà la precedenza. AVVISO: l'impostazione di partition_key di valore non stringa sugli eventi da inviare è sconsigliata perché il partition_key verrà ignorato dal servizio Hub eventi e gli eventi verranno assegnati a tutte le partizioni tramite round robin. Sono inoltre disponibili SDK per l'utilizzo di eventi che prevedono che partition_key solo come tipo stringa, potrebbero non riuscire ad analizzare il valore non stringa.
Tipo restituito
Eccezioni
Se il valore specificato dal parametro di timeout è trascorso prima che l'evento possa essere inviato in modalità non memorizzata nel buffer o gli eventi non possono essere accodati nel buffer in modalità memorizzata nel buffer.
Attributi
total_buffered_event_count
Numero totale di eventi attualmente memorizzati nel buffer e in attesa di pubblicazione in tutte le partizioni. Restituisce Nessuno in modalità non memorizzata nel buffer. NOTA: il buffer eventi viene elaborato in una coroutine in background, pertanto il numero di eventi nel buffer segnalato da questa API deve essere considerato solo un'approssimazione ed è consigliato solo per l'uso nel debug.
Tipo restituito
Azure SDK for Python
Commenti e suggerimenti
https://aka.ms/ContentUserFeedback.
Presto disponibile: Nel corso del 2024 verranno gradualmente disattivati i problemi di GitHub come meccanismo di feedback per il contenuto e ciò verrà sostituito con un nuovo sistema di feedback. Per altre informazioni, vedereInvia e visualizza il feedback per