EventHubProducerClient Classe

La classe EventHubProducerClient definisce un'interfaccia di alto livello per l'invio di eventi al servizio Hub eventi di Azure.

Ereditarietà
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

Costruttore

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

Parametri

fully_qualified_namespace
str
Necessario

Nome host completo per lo spazio dei nomi di Hub eventi. È probabile che sia simile a .servicebus.windows.net

eventhub_name
str
Necessario

Percorso dell'hub eventi specifico a cui connettere il client.

credential
AsyncTokenCredential oppure AzureSasCredential oppure AzureNamedKeyCredential
Necessario

Oggetto credenziale usato per l'autenticazione che implementa una particolare interfaccia per ottenere i token. Accetta EventHubSharedKeyCredentialoggetti credenziali o generati dalla libreria azure-identity e dagli oggetti che implementano il metodo *get_token(self, scopes).

buffered_mode
bool

Se True, il client producer raccoglierà gli eventi in un buffer, in modo efficiente batch, quindi pubblicherà. Il valore predefinito è False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Callback da chiamare dopo la pubblicazione di un batch. Il callback accetta due parametri:

  • events: elenco di eventi che sono stati pubblicati correttamente

  • partition_id: ID di partizione in cui sono stati pubblicati gli eventi nell'elenco.

La funzione di callback deve essere definita come: on_success(eventi, partition_id). Obbligatorio quando buffered_mode è True se facoltativo se buffered_mode è False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Callback da chiamare dopo la pubblicazione di un batch. Obbligatorio quando in buffered_mode è True se facoltativo se buffered_mode è False. La funzione di callback deve essere definita come : on_error(eventi, partition_id, errore), dove:

  • events: elenco di eventi che non è stato possibile pubblicare,

  • partition_id: ID partizione in cui sono stati tentati di pubblicare gli eventi nell'elenco e

  • error: eccezione correlata all'errore di invio.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:

  • Se un callback on_error viene passato durante la creazione di istanze del client producer,

    le informazioni sull'errore verranno quindi passate al callback on_error , che verrà quindi chiamato.

  • Se un callback on_error non viene passato durante la creazione di istanze del client,

    l'errore verrà generato per impostazione predefinita.

Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:

  • Se gli eventi non vengono accodati entro il timeout specificato, verrà generato direttamente un errore.

  • Se gli eventi non vengono inviati dopo l'accodamento, verrà chiamato il callback on_error .

max_buffer_length
int

Solo modalità memorizzata nel buffer. Numero totale di eventi per partizione che possono essere memorizzati nel buffer prima dell'attivazione di uno scaricamento. Il valore predefinito è 1500 in modalità memorizzata nel buffer.

max_wait_time
Optional[float]

Solo modalità memorizzata nel buffer. Quantità di tempo di attesa per la compilazione di un batch con eventi nel buffer prima della pubblicazione. Il valore predefinito è 1 in modalità memorizzata nel buffer.

logging_enable
bool

Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.

auth_timeout
float

Tempo in secondi di attesa dell'autorizzazione di un token da parte del servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.

user_agent
str

Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.

retry_total
int

Numero totale di tentativi di rollforward di un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3.

retry_backoff_factor
float

Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza alcun ritardo). In modalità fissa, i criteri di ripetizione dei tentativi verranno sempre in sospensione per {fattore backoff}. In modalità "esponenziale", i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se il backoff_factor è 0,1, il nuovo tentativo verrà interrotto per [0,0s, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.

retry_backoff_max
float

Tempo massimo di back off. Il valore predefinito è 120 secondi (2 minuti).

retry_mode
str

Comportamento di ritardo tra i tentativi. I valori supportati sono "fixed" o "esponenziale", dove il valore predefinito è "esponenziale".

idle_timeout
float

Timeout, in secondi, dopo il quale il client chiuderà la connessione sottostante se non è presente alcuna attività. Per impostazione predefinita, il valore è None, il che significa che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.

transport_type
TransportType

Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp nel qual caso viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, è possibile usare TransportType.AmqpOverWebsocket che usa la porta 443 per la comunicazione.

http_proxy
dict

Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, possono essere presenti le chiavi seguenti: 'username', 'password'.

custom_endpoint_address
Optional[str]

Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà simile a "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se la porta non è specificata nel custom_endpoint_address, per impostazione predefinita verrà usata la porta 443.

connection_verify
Optional[str]

Percorso del file di CA_BUNDLE personalizzato del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è Nessuno nel qual caso verrà usato certifi.where().

uamqp_transport
bool

Indica se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.

socket_timeout
float

Tempo in secondi in cui il socket sottostante sulla connessione deve attendere quando si inviano e ricevono dati prima del timeout. Il valore predefinito è 0.2 per TransportType.Amqp e 1 per TransportType.AmqpOverWebsocket. Se si verificano errori EventHubsConnectionError a causa del timeout della scrittura, potrebbe essere necessario passare un valore maggiore del valore predefinito. Si tratta di scenari di utilizzo avanzati e in genere il valore predefinito deve essere sufficiente.

Esempio

Creare una nuova istanza di EventHubProducerClient.


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Metodi

close

Chiudere il client Producer sottostante la connessione AMQP e i collegamenti.

create_batch

Creare un oggetto EventDataBatch con la dimensione massima di tutto il contenuto vincolato da max_size_in_bytes.

Il max_size_in_bytes non deve essere maggiore della dimensione massima consentita del messaggio definita dal servizio.

flush

Solo modalità memorizzata nel buffer. Scarica gli eventi nel buffer da inviare immediatamente se il client funziona in modalità memorizzata nel buffer.

from_connection_string

Creare un oggetto EventHubProducerClient da un stringa di connessione.

get_buffered_event_count

Numero di eventi memorizzati nel buffer e in attesa di pubblicazione per una determinata partizione. Restituisce Nessuno in modalità non memorizzata nel buffer. NOTA: il buffer eventi viene elaborato in una coroutine in background, pertanto il numero di eventi nel buffer segnalato da questa API deve essere considerato solo un'approssimazione ed è consigliato solo per l'uso nel debug. Per un ID di partizione senza eventi memorizzati nel buffer, verrà restituito 0 indipendentemente dal fatto che tale ID di partizione esista effettivamente all'interno dell'hub eventi.

get_eventhub_properties

Ottenere le proprietà dell'hub eventi.

Le chiavi nel dizionario restituito includono:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Ottiene gli ID di partizione dell'hub eventi.

get_partition_properties

Ottiene le proprietà della partizione specificata.

Le chiavi nel dizionario delle proprietà includono:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Invia un batch di dati dell'evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo accoderà gli eventi nel buffer locale e restituirà. Il producer eseguirà l'invio automatico in background.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:

  • Se un callback on_error viene passato durante la creazione di istanze del client producer,

    le informazioni sull'errore verranno quindi passate al callback on_error , che verrà quindi chiamato.

  • Se un callback on_error non viene passato durante la creazione di istanze del client,

    l'errore verrà generato per impostazione predefinita.

Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:

  • Se gli eventi non vengono accodati entro il timeout specificato, verrà generato direttamente un errore.

  • Se gli eventi non vengono inviati dopo l'accodamento, verrà chiamato il callback on_error .

In modalità memorizzata nel buffer, l'invio di un batch rimarrà intatto e inviato come singola unità. Il batch non verrà riorganiato. Ciò può comportare l'inefficienza dell'invio di eventi.

Se si invia un elenco finito di EventData o AmqpAnnotatedMessage e si sa che è entro il limite di dimensioni del frame dell'hub eventi, è possibile inviarli con una chiamata send_batch . In caso contrario, usare create_batch per creare EventDataBatch e aggiungere EventData o AmqpAnnotatedMessage nel batch uno per uno fino al limite di dimensioni e quindi chiamare questo metodo per inviare il batch.

send_event

Invia i dati di un evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo accoderà l'evento nel buffer locale e restituirà. Il producer eseguirà l'invio in batch automatico e l'invio in background.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue: * Se viene passato un callback on_error durante la creazione di istanze del client producer,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Se buffered_mode è True, on_error callback è obbligatorio e gli errori verranno gestiti nel modo seguente: * Se gli eventi non riescono a accodare entro il timeout specificato, verrà generato un errore direttamente.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Chiudere il client Producer sottostante la connessione AMQP e i collegamenti.

async close(*, flush: bool = True, **kwargs: Any) -> None

Parametri

flush
bool

Solo modalità memorizzata nel buffer. Se impostato su True, gli eventi nel buffer verranno inviati immediatamente. Il valore predefinito è true.

timeout
float oppure None

Solo modalità memorizzata nel buffer. Timeout per chiudere il producer. Il valore predefinito è Nessuno, ovvero nessun timeout.

Tipo restituito

Eccezioni

Se si verifica un errore durante lo scaricamento del buffer se lo scaricamento è impostato su True o la chiusura delle connessioni AMQP sottostanti in modalità memorizzata nel buffer.

Esempio

Chiudere il gestore.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

Creare un oggetto EventDataBatch con la dimensione massima di tutto il contenuto vincolato da max_size_in_bytes.

Il max_size_in_bytes non deve essere maggiore della dimensione massima consentita del messaggio definita dal servizio.

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

Tipo restituito

Eccezioni

Se si verifica un errore durante lo scaricamento del buffer se lo scaricamento è impostato su True o la chiusura delle connessioni AMQP sottostanti in modalità memorizzata nel buffer.

Esempio

Creare un oggetto EventDataBatch con dimensioni limitate


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Solo modalità memorizzata nel buffer. Scarica gli eventi nel buffer da inviare immediatamente se il client funziona in modalità memorizzata nel buffer.

async flush(**kwargs: Any) -> None

Parametri

timeout
float oppure None

Timeout per scaricare gli eventi memorizzati nel buffer, il valore predefinito è Nessuno, ovvero nessun timeout.

Tipo restituito

Eccezioni

Se il producer non riesce a scaricare il buffer entro il timeout specificato in modalità memorizzata nel buffer.

from_connection_string

Creare un oggetto EventHubProducerClient da un stringa di connessione.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

Parametri

conn_str
str
Necessario

Il stringa di connessione di un hub eventi.

eventhub_name
str

Percorso dell'hub eventi specifico a cui connettere il client.

buffered_mode
bool

Se True, il client producer raccoglierà gli eventi in un buffer, in modo efficiente batch, quindi pubblicherà. Il valore predefinito è False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Callback da chiamare dopo la pubblicazione di un batch. Il callback accetta due parametri:

  • events: elenco di eventi che sono stati pubblicati correttamente

  • partition_id: ID di partizione in cui sono stati pubblicati gli eventi nell'elenco.

La funzione di callback deve essere definita come: on_success(eventi, partition_id). È obbligatorio quando buffered_mode è True se facoltativo se buffered_mode è False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Callback da chiamare dopo la pubblicazione di un batch. La funzione di callback deve essere definita come : on_error(eventi, partition_id, errore), dove:

  • events: elenco di eventi che non è stato possibile pubblicare,

  • partition_id: ID partizione in cui sono stati tentati di pubblicare gli eventi nell'elenco e

  • error: eccezione correlata all'errore di invio.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:

  • Se un callback on_error viene passato durante la creazione di istanze del client producer,

    le informazioni sull'errore verranno quindi passate al callback on_error , che verrà quindi chiamato.

  • Se un callback on_error non viene passato durante la creazione di istanze del client,

    l'errore verrà generato per impostazione predefinita.

Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:

  • Se gli eventi non vengono accodati entro il timeout specificato, verrà generato direttamente un errore.

  • Se gli eventi non vengono inviati dopo l'accodamento, verrà chiamato il callback on_error .

max_buffer_length
int

Solo modalità memorizzata nel buffer. Numero totale di eventi per partizione che possono essere memorizzati nel buffer prima dell'attivazione di uno scaricamento. Il valore predefinito è 1500 in modalità memorizzata nel buffer.

max_wait_time
Optional[float]

Solo modalità memorizzata nel buffer. Quantità di tempo di attesa per la compilazione di un batch con eventi nel buffer prima della pubblicazione. Il valore predefinito è 1 in modalità memorizzata nel buffer.

logging_enable
bool

Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.

http_proxy
dict

Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, possono essere presenti le chiavi seguenti: 'username', 'password'.

auth_timeout
float

Tempo in secondi di attesa dell'autorizzazione di un token da parte del servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.

user_agent
str

Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.

retry_total
int

Numero totale di tentativi di rollforward di un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3.

retry_backoff_factor
float

Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza alcun ritardo). In modalità fissa, i criteri di ripetizione dei tentativi verranno sempre in sospensione per {fattore backoff}. In modalità "esponenziale", i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se il backoff_factor è 0,1, il nuovo tentativo verrà interrotto per [0,0s, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.

retry_backoff_max
float

Tempo massimo di back off. Il valore predefinito è 120 secondi (2 minuti).

retry_mode
str

Comportamento di ritardo tra i tentativi. I valori supportati sono "fixed" o "esponenziale", dove il valore predefinito è "esponenziale".

idle_timeout
float

Timeout, in secondi, dopo il quale il client chiuderà la connessione sottostante se non è presente alcuna attività. Per impostazione predefinita, il valore è None, il che significa che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.

transport_type
TransportType

Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp nel qual caso viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, è possibile usare TransportType.AmqpOverWebsocket che usa la porta 443 per la comunicazione.

custom_endpoint_address
Optional[str]

Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà simile a "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se la porta non è specificata nel custom_endpoint_address, per impostazione predefinita verrà usata la porta 443.

connection_verify
Optional[str]

Percorso del file di CA_BUNDLE personalizzato del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è Nessuno nel qual caso verrà usato certifi.where().

uamqp_transport
bool

Indica se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.

Tipo restituito

Eccezioni

Se si verifica un errore durante lo scaricamento del buffer se lo scaricamento è impostato su True o la chiusura delle connessioni AMQP sottostanti in modalità memorizzata nel buffer.

Esempio

Creare una nuova istanza di EventHubProducerClient da stringa di connessione.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Numero di eventi memorizzati nel buffer e in attesa di pubblicazione per una determinata partizione. Restituisce Nessuno in modalità non memorizzata nel buffer. NOTA: il buffer eventi viene elaborato in una coroutine in background, pertanto il numero di eventi nel buffer segnalato da questa API deve essere considerato solo un'approssimazione ed è consigliato solo per l'uso nel debug. Per un ID di partizione senza eventi memorizzati nel buffer, verrà restituito 0 indipendentemente dal fatto che tale ID di partizione esista effettivamente all'interno dell'hub eventi.

get_buffered_event_count(partition_id: str) -> int | None

Parametri

partition_id
str
Necessario

ID partizione di destinazione.

Tipo restituito

int,

Eccezioni

Se si verifica un errore durante lo scaricamento del buffer se lo scaricamento è impostato su True o la chiusura delle connessioni AMQP sottostanti in modalità memorizzata nel buffer.

get_eventhub_properties

Ottenere le proprietà dell'hub eventi.

Le chiavi nel dizionario restituito includono:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Restituisce

Dizionario contenente informazioni sull'hub eventi.

Tipo restituito

Eccezioni

get_partition_ids

Ottiene gli ID di partizione dell'hub eventi.

async get_partition_ids() -> List[str]

Restituisce

Elenco di ID partizione.

Tipo restituito

Eccezioni

get_partition_properties

Ottiene le proprietà della partizione specificata.

Le chiavi nel dizionario delle proprietà includono:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametri

partition_id
str
Necessario

ID partizione di destinazione.

Restituisce

Dict delle proprietà della partizione.

Tipo restituito

Eccezioni

send_batch

Invia un batch di dati dell'evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo accoderà gli eventi nel buffer locale e restituirà. Il producer eseguirà l'invio automatico in background.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:

  • Se un callback on_error viene passato durante la creazione di istanze del client producer,

    le informazioni sull'errore verranno quindi passate al callback on_error , che verrà quindi chiamato.

  • Se un callback on_error non viene passato durante la creazione di istanze del client,

    l'errore verrà generato per impostazione predefinita.

Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:

  • Se gli eventi non vengono accodati entro il timeout specificato, verrà generato direttamente un errore.

  • Se gli eventi non vengono inviati dopo l'accodamento, verrà chiamato il callback on_error .

In modalità memorizzata nel buffer, l'invio di un batch rimarrà intatto e inviato come singola unità. Il batch non verrà riorganiato. Ciò può comportare l'inefficienza dell'invio di eventi.

Se si invia un elenco finito di EventData o AmqpAnnotatedMessage e si sa che è entro il limite di dimensioni del frame dell'hub eventi, è possibile inviarli con una chiamata send_batch . In caso contrario, usare create_batch per creare EventDataBatch e aggiungere EventData o AmqpAnnotatedMessage nel batch uno per uno fino al limite di dimensioni e quindi chiamare questo metodo per inviare il batch.

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parametri

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Necessario

Oggetto EventDataBatch da inviare o un elenco di EventData da inviare in un batch. Tutti gli EventData o AmqpAnnotatedMessage nell'elenco o EventDataBatch verranno inseriti nella stessa partizione.

timeout
float

Tempo di attesa massimo per inviare i dati dell'evento in modalità non memorizzata nel buffer o il tempo di attesa massimo per accodare i dati dell'evento nel buffer in modalità memorizzata nel buffer. In modalità non memorizzata nel buffer, verrà utilizzato il tempo di attesa predefinito specificato al momento della creazione del producer. In modalità memorizzata nel buffer il tempo di attesa predefinito è Nessuno.

partition_id
str

ID di partizione specifico a cui inviare. Il valore predefinito è Nessuno, nel qual caso il servizio verrà assegnato a tutte le partizioni usando il round robin. Se si specifica partition_id e event_data_batch è un EventDataBatch perché EventDataBatch ha partition_id, verrà generato un typeError.

partition_key
str

Con il partition_key specificato, i dati dell'evento verranno inviati a una determinata partizione dell'hub eventi deciso dal servizio. Se si specifica partition_key e event_data_batch è un oggetto EventDataBatch, verrà generato un typeerror perché EventDataBatch ha partition_key. Se vengono forniti sia partition_id che partition_key, il partition_id avrà la precedenza. AVVISO: l'impostazione di partition_key di valore non stringa sugli eventi da inviare è sconsigliata perché il partition_key verrà ignorato dal servizio Hub eventi e gli eventi verranno assegnati a tutte le partizioni tramite round robin. Sono inoltre disponibili SDK per l'utilizzo di eventi che prevedono che partition_key solo come tipo stringa, potrebbero non riuscire ad analizzare il valore non stringa.

Tipo restituito

Eccezioni

Se il valore specificato dal parametro di timeout è trascorso prima che l'evento possa essere inviato in modalità non memorizzata nel buffer o gli eventi possono essere accodati nel buffer in modalità memorizzata nel buffer.

Esempio

Invia in modo asincrono i dati degli eventi


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

Invia i dati di un evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo accoderà l'evento nel buffer locale e restituirà. Il producer eseguirà l'invio in batch automatico e l'invio in background.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue: * Se viene passato un callback on_error durante la creazione di istanze del client producer,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Se buffered_mode è True, on_error callback è obbligatorio e gli errori verranno gestiti nel modo seguente: * Se gli eventi non riescono a accodare entro il timeout specificato, verrà generato un errore direttamente.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parametri

event_data
Union[EventData, AmqpAnnotatedMessage]
Necessario

Oggetto EventData da inviare.

timeout
float

Tempo di attesa massimo per inviare i dati dell'evento in modalità non memorizzata nel buffer o il tempo di attesa massimo per accodare i dati dell'evento nel buffer in modalità memorizzata nel buffer. In modalità non memorizzata nel buffer, verrà utilizzato il tempo di attesa predefinito specificato al momento della creazione del producer. In modalità memorizzata nel buffer il tempo di attesa predefinito è Nessuno.

partition_id
str

ID di partizione specifico a cui inviare. Il valore predefinito è Nessuno, nel qual caso il servizio verrà assegnato a tutte le partizioni usando il round robin. Se si specifica partition_id e event_data_batch è un EventDataBatch perché EventDataBatch ha partition_id, verrà generato un typeError.

partition_key
str

Con il partition_key specificato, i dati dell'evento verranno inviati a una determinata partizione dell'hub eventi deciso dal servizio. Se si specifica partition_key e event_data_batch è un oggetto EventDataBatch, verrà generato un typeerror perché EventDataBatch ha partition_key. Se vengono forniti sia partition_id che partition_key, il partition_id avrà la precedenza. AVVISO: l'impostazione di partition_key di valore non stringa sugli eventi da inviare è sconsigliata perché il partition_key verrà ignorato dal servizio Hub eventi e gli eventi verranno assegnati a tutte le partizioni tramite round robin. Sono inoltre disponibili SDK per l'utilizzo di eventi che prevedono che partition_key solo come tipo stringa, potrebbero non riuscire ad analizzare il valore non stringa.

Tipo restituito

Eccezioni

Se il valore specificato dal parametro di timeout è trascorso prima che l'evento possa essere inviato in modalità non memorizzata nel buffer o gli eventi non possono essere accodati nel buffer in modalità memorizzata nel buffer.

Attributi

total_buffered_event_count

Numero totale di eventi attualmente memorizzati nel buffer e in attesa di pubblicazione in tutte le partizioni. Restituisce Nessuno in modalità non memorizzata nel buffer. NOTA: il buffer eventi viene elaborato in una coroutine in background, pertanto il numero di eventi nel buffer segnalato da questa API deve essere considerato solo un'approssimazione ed è consigliato solo per l'uso nel debug.

Tipo restituito

int,