EventHubProducerClient Classe

La classe EventHubProducerClient definisce un'interfaccia di alto livello per l'invio di eventi al servizio Hub eventi di Azure.

Ereditarietà
azure.eventhub._client_base.ClientBase
EventHubProducerClient

Costruttore

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: Literal[False] = False, **kwargs: Any)

Parametri

fully_qualified_namespace
str
Necessario

Nome host completo per lo spazio dei nomi Hub eventi. È probabile che sia simile a .servicebus.windows.net

eventhub_name
str
Necessario

Percorso dell'hub eventi specifico a cui connettere il client.

credential
TokenCredential oppure AzureSasCredential oppure AzureNamedKeyCredential
Necessario

Oggetto credenziale usato per l'autenticazione che implementa un'interfaccia specifica per ottenere i token. EventHubSharedKeyCredentialAccetta o oggetti credenziali generati dalla libreria e dagli oggetti azure-identity che implementano il metodo *get_token(self, ambiti).

buffered_mode
bool

Se True, il client producer raccoglie gli eventi in un buffer, in modo efficiente batch, quindi pubblica. Il valore predefinito è False.

buffer_concurrency
<xref:ThreadPoolExecutor> oppure int oppure None

ThreadPoolExecutor da usare per la pubblicazione di eventi o il numero di ruoli di lavoro per ThreadPoolExecutor. Il valore predefinito è None e un ThreadPoolExecutor con il numero predefinito di ruoli di lavoro verrà creato per ogni https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

Callback da chiamare dopo la pubblicazione di un batch. Il callback accetta due parametri:

  • eventi: elenco di eventi pubblicati correttamente

  • partition_id: ID partizione a cui sono stati pubblicati gli eventi nell'elenco.

La funzione di callback deve essere definita come: on_success(eventi, partition_id). È necessario quando buffered_mode è True mentre facoltativo se buffered_mode è False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

Il callback da chiamare una volta che non è stato pubblicato un batch. La funzione di callback deve essere definita come: on_error(eventi, partition_id, errore), dove:

  • eventi: elenco di eventi che non sono stati pubblicati,

  • partition_id: ID partizione a cui sono stati tentati di pubblicare gli eventi nell'elenco e

  • errore: eccezione correlata all'errore di invio.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:

  • Se un callback di on_error viene passato durante l'istanza del client del produttore,

    le informazioni sugli errori verranno quindi passate al callback on_error , che verrà quindi chiamato.

  • Se un callback di on_error non viene passato durante l'istanza del client,

    quindi l'errore verrà generato per impostazione predefinita.

Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:

  • Se gli eventi non riescono a eseguire la sequenza entro il timeout specificato, verrà generato direttamente un errore.

  • Se gli eventi non riescono a inviare dopo aver eseguito correttamente l'esecuzione della sequenza, verrà chiamato il callback on_error .

max_buffer_length
int

Solo modalità memorizzata nel buffer. Numero totale di eventi per partizione che possono essere memorizzati nel buffer prima che venga attivato un download. Il valore predefinito è 1500 in modalità buffer.

max_wait_time
Optional[float]

Solo modalità memorizzata nel buffer. Tempo di attesa per la compilazione di un batch con eventi nel buffer prima della pubblicazione. Il valore predefinito è 1 in modalità buffer.

logging_enable
bool

Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.

auth_timeout
float

Tempo in secondi per attendere che un token venga autorizzato dal servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.

user_agent
str

Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.

retry_total
int

Numero totale di tentativi di ridistribuire un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3.

retry_backoff_factor
float

Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza ritardo). In modalità fissa, i criteri di ripetizione dei tentativi saranno sempre in sospensione per {fattore backoff}. In modalità 'esponenziale', i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se l'backoff_factor è 0,1, il tentativo verrà riposato per [0,0, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.

retry_backoff_max
float

Tempo massimo di off off. Il valore predefinito è 120 secondi (2 minuti).

retry_mode
str

Comportamento di ritardo tra tentativi di ripetizione. I valori supportati sono "fissi" o "esponenziali", dove il valore predefinito è "esponenziale".

idle_timeout
float

Timeout, in secondi, dopo il quale questo client chiuderà la connessione sottostante se non è presente alcuna attività. Per impostazione predefinita, il valore non è Nessuno, ovvero che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.

transport_type
TransportType

Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp in cui viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, TransportType.AmqpOverWebsocket può essere usata invece che usa la porta 443 per la comunicazione.

http_proxy
Dict

Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, le chiavi seguenti possono essere presenti: 'username', 'password'.

custom_endpoint_address
Optional[str]

Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà "sb:// custom_endpoint_hostname>:<custom_endpoint_port<>". Se la porta non è specificata nella custom_endpoint_address, verrà usata per impostazione predefinita la porta 443.

connection_verify
Optional[str]

Percorso del file personalizzato CA_BUNDLE del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è None nel caso in cui verrà usato certifi.where().

uamqp_transport
bool

Se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.

socket_timeout
float

Tempo in secondi in cui il socket sottostante sulla connessione deve attendere quando si inviano e ricevono dati prima del timeout. Il valore predefinito è 0.2 per TransportType.Amqp e 1 per TransportType.AmqpOverWebsocket. Se si verificano errori EventHubsConnectionError a causa del timeout di scrittura, potrebbe essere necessario passare un valore maggiore di quello predefinito. Si tratta di scenari di utilizzo avanzati e il valore predefinito deve essere sufficiente.

Esempio

Creare una nuova istanza di EventHubProducerClient.


   import os
   from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   producer = EventHubProducerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,  # EventHub name should be specified if it doesn't show up in connection string.
       credential=credential
   )

Metodi

close

Chiudere la connessione e i collegamenti AMQP sottostanti al client Producer.

create_batch

Creare un oggetto EventDataBatch con dimensioni massime di tutti i contenuti vincolati da max_size_in_bytes.

Il max_size_in_bytes deve essere diverso dalla dimensione massima dei messaggi consentiti definita dal servizio.

flush

Solo modalità memorizzata nel buffer. Scaricare gli eventi nel buffer da inviare immediatamente se il client funziona in modalità buffer.

from_connection_string

Creare un oggetto EventHubProducerClient da un stringa di connessione.

get_buffered_event_count

Numero di eventi memorizzati nel buffer e in attesa di pubblicazione per una determinata partizione. Restituisce Nessuno in modalità non buffer. NOTA: il buffer eventi viene elaborato in un thread in background, pertanto il numero di eventi nel buffer segnalato da questa API deve essere considerato solo un'approssimazione ed è consigliabile usare solo nel debug. Per un ID di partizione senza buffer eventi, 0 verrà restituito indipendentemente dal fatto che tale ID di partizione esista effettivamente all'interno dell'hub eventi.

get_eventhub_properties

Ottenere le proprietà dell'hub eventi.

Le chiavi nel dizionario restituito includono:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Ottenere gli ID di partizione dell'hub eventi.

get_partition_properties

Ottenere le proprietà della partizione specificata.

Le chiavi nel dizionario delle proprietà includono:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Invia un batch di dati dell'evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo tenterà di accodare gli eventi nel buffer entro il tempo specificato, se specificato e restituito. Il producer eseguirà l'invio automatico in background in modalità memorizzata nel buffer.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:

  • Se un callback on_error viene passato durante la creazione di istanze del client producer,

    le informazioni sull'errore verranno quindi passate al callback on_error , che verrà quindi chiamato.

  • Se un callback on_error non viene passato durante la creazione di istanze del client,

    l'errore verrà generato per impostazione predefinita.

Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:

  • Se gli eventi non vengono accodati entro il timeout specificato, verrà generato direttamente un errore.

  • Se gli eventi non vengono inviati dopo l'accodamento, verrà chiamato il callback on_error .

In modalità memorizzata nel buffer, l'invio di un batch rimarrà intatto e inviato come singola unità. Il batch non verrà riorganiato. Ciò può comportare l'inefficienza dell'invio di eventi.

Se si invia un elenco finito di EventData o AmqpAnnotatedMessage e si sa che è entro il limite di dimensioni del frame dell'hub eventi, è possibile inviarli con una chiamata send_batch . In caso contrario, usare create_batch per creare EventDataBatch e aggiungere EventData o AmqpAnnotatedMessage nel batch uno per uno fino al limite di dimensioni e quindi chiamare questo metodo per inviare il batch.

send_event

Invia i dati di un evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo tenterà di accodare gli eventi nel buffer entro il tempo specificato, se specificato e restituito. Il producer eseguirà l'invio automatico in background in modalità memorizzata nel buffer.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue: * Se viene passato un callback on_error durante la creazione di istanze del client producer,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Se buffered_mode è True, on_error callback è obbligatorio e gli errori verranno gestiti nel modo seguente: * Se gli eventi non riescono a accodare entro il timeout specificato, verrà generato un errore direttamente.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Chiudere la connessione e i collegamenti AMQP sottostanti al client Producer.

close(*, flush: bool = True, **kwargs: Any) -> None

Parametri

flush
bool

Solo modalità memorizzata nel buffer. Se impostato su True, gli eventi nel buffer verranno inviati immediatamente. Il valore predefinito è true.

timeout
float oppure None

Solo modalità memorizzata nel buffer. Timeout per chiudere il produttore. Il valore predefinito è Nessuna, ovvero nessun timeout.

Tipo restituito

Eccezioni

Se si verifica un errore quando si scarica il buffer se lo scaricamento è impostato su True o chiude le connessioni AMQP sottostanti in modalità buffer.

Esempio

Chiudere il client.


   import os
   from azure.eventhub import EventHubProducerClient, EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = producer.create_batch()

       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # EventDataBatch object reaches max_size.
               # New EventDataBatch object can be created here to send more data
               break

       producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       producer.close()

create_batch

Creare un oggetto EventDataBatch con dimensioni massime di tutti i contenuti vincolati da max_size_in_bytes.

Il max_size_in_bytes deve essere diverso dalla dimensione massima dei messaggi consentiti definita dal servizio.

create_batch(**kwargs: Any) -> EventDataBatch

Tipo restituito

Eccezioni

Se si verifica un errore quando si scarica il buffer se lo scaricamento è impostato su True o chiude le connessioni AMQP sottostanti in modalità buffer.

Esempio

Creare un oggetto EventDataBatch con dimensioni limitate


       event_data_batch = producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Solo modalità memorizzata nel buffer. Scaricare gli eventi nel buffer da inviare immediatamente se il client funziona in modalità buffer.

flush(**kwargs: Any) -> None

Parametri

timeout
Optional[float]

Timeout per scaricare gli eventi memorizzati nel buffer, il valore predefinito è Nessuna, ovvero nessun timeout.

Tipo restituito

Eccezioni

Se il produttore non riesce a scaricare il buffer all'interno del timeout specificato in modalità buffer.

from_connection_string

Creare un oggetto EventHubProducerClient da un stringa di connessione.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: Literal[False] = False, **kwargs: Any) -> EventHubProducerClient

Parametri

conn_str
str
Necessario

Stringa di connessione di un hub eventi.

eventhub_name
str

Percorso dell'hub eventi specifico a cui connettere il client.

buffered_mode
bool

Se True, il client producer raccoglie gli eventi in un buffer, in modo efficiente batch, quindi pubblica. Il valore predefinito è False.

buffer_concurrency
<xref:ThreadPoolExecutor> oppure int oppure None

ThreadPoolExecutor da usare per la pubblicazione di eventi o il numero di ruoli di lavoro per ThreadPoolExecutor. Il valore predefinito è None e un ThreadPoolExecutor con il numero predefinito di ruoli di lavoro verrà creato per ogni https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

Callback da chiamare dopo la pubblicazione di un batch. Il callback accetta due parametri:

  • eventi: elenco di eventi pubblicati correttamente

  • partition_id: ID partizione a cui sono stati pubblicati gli eventi nell'elenco.

La funzione di callback deve essere definita come: on_success(eventi, partition_id). Obbligatorio quando buffered_mode è True mentre facoltativo se buffered_mode è False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

Il callback da chiamare una volta che non è stato pubblicato un batch. Obbligatorio quando in buffered_mode è True mentre facoltativo se buffered_mode è False. La funzione di callback deve essere definita come: on_error(eventi, partition_id, errore), dove:

  • eventi: elenco di eventi che non sono stati pubblicati,

  • partition_id: ID partizione a cui sono stati tentati di pubblicare gli eventi nell'elenco e

  • errore: eccezione correlata all'errore di invio.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:

  • Se un callback di on_error viene passato durante l'istanza del client del produttore,

    le informazioni sugli errori verranno quindi passate al callback on_error , che verrà quindi chiamato.

  • Se un callback di on_error non viene passato durante l'istanza del client,

    quindi l'errore verrà generato per impostazione predefinita.

Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:

  • Se gli eventi non riescono a eseguire la sequenza entro il timeout specificato, verrà generato direttamente un errore.

  • Se gli eventi non riescono a inviare dopo aver eseguito correttamente l'esecuzione della sequenza, verrà chiamato il callback on_error .

max_buffer_length
int

Solo modalità memorizzata nel buffer. Numero totale di eventi per partizione che possono essere memorizzati nel buffer prima che venga attivato un download. Il valore predefinito è 1500 in modalità buffer.

max_wait_time
Optional[float]

Solo modalità memorizzata nel buffer. Tempo di attesa per la compilazione di un batch con eventi nel buffer prima della pubblicazione. Il valore predefinito è 1 in modalità buffer.

logging_enable
bool

Indica se restituire i log di traccia di rete al logger. L'impostazione predefinita è False.

http_proxy
Dict

Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, le chiavi seguenti possono essere presenti: 'username', 'password'.

auth_timeout
float

Tempo in secondi per attendere che un token venga autorizzato dal servizio. Il valore predefinito è 60 secondi. Se impostato su 0, non verrà applicato alcun timeout dal client.

user_agent
str

Se specificato, verrà aggiunto davanti alla stringa dell'agente utente.

retry_total
int

Numero totale di tentativi di ridistribuire un'operazione non riuscita quando si verifica un errore. Il valore predefinito è 3.

retry_backoff_factor
float

Fattore di backoff da applicare tra i tentativi dopo il secondo tentativo (la maggior parte degli errori viene risolta immediatamente da un secondo tentativo senza ritardo). In modalità fissa, i criteri di ripetizione dei tentativi saranno sempre in sospensione per {fattore backoff}. In modalità 'esponenziale', i criteri di ripetizione dei tentativi verranno interrotti per: {backoff factor} * (2 ** ({numero di tentativi totali} - 1)) secondi. Se l'backoff_factor è 0,1, il tentativo verrà riposato per [0,0, 0,2s, 0,4s, ...] tra i tentativi. Il valore predefinito è 0,8.

retry_backoff_max
float

Tempo massimo di off off. Il valore predefinito è 120 secondi (2 minuti).

retry_mode
str

Comportamento di ritardo tra tentativi di ripetizione. I valori supportati sono "fissi" o "esponenziali", dove il valore predefinito è "esponenziale".

idle_timeout
float

Timeout, in secondi, dopo il quale questo client chiuderà la connessione sottostante se non è presente alcuna attività. Per impostazione predefinita, il valore non è Nessuno, ovvero che il client non verrà arrestato a causa dell'inattività a meno che non venga avviato dal servizio.

transport_type
TransportType

Tipo di protocollo di trasporto che verrà usato per comunicare con il servizio Hub eventi. Il valore predefinito è TransportType.Amqp in cui viene usata la porta 5671. Se la porta 5671 non è disponibile/bloccata nell'ambiente di rete, TransportType.AmqpOverWebsocket può essere usata invece che usa la porta 443 per la comunicazione.

http_proxy

Impostazioni proxy HTTP. Deve essere un dizionario con le chiavi seguenti: 'proxy_hostname' (valore str) e 'proxy_port' (valore int). Inoltre, le chiavi seguenti possono essere presenti: 'username', 'password'.

custom_endpoint_address
Optional[str]

Indirizzo dell'endpoint personalizzato da usare per stabilire una connessione al servizio Hub eventi, consentendo l'instradamento delle richieste di rete tramite qualsiasi gateway applicazione o altri percorsi necessari per l'ambiente host. L'impostazione predefinita è None. Il formato sarà "sb:// custom_endpoint_hostname>:<custom_endpoint_port<>". Se la porta non è specificata nella custom_endpoint_address, verrà usata per impostazione predefinita la porta 443.

connection_verify
Optional[str]

Percorso del file personalizzato CA_BUNDLE del certificato SSL usato per autenticare l'identità dell'endpoint di connessione. Il valore predefinito è None nel caso in cui verrà usato certifi.where().

uamqp_transport
bool

Se usare la libreria uamqp come trasporto sottostante. Il valore predefinito è False e la libreria AMQP Pure Python verrà usata come trasporto sottostante.

Tipo restituito

Eccezioni

Se si verifica un errore quando si scarica il buffer se lo scaricamento è impostato su True o chiude le connessioni AMQP sottostanti in modalità buffer.

Esempio

Creare una nuova istanza di EventHubProducerClient da stringa di connessione.


   import os
   from azure.eventhub import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Numero di eventi memorizzati nel buffer e in attesa di pubblicazione per una determinata partizione. Restituisce Nessuno in modalità non buffer. NOTA: il buffer eventi viene elaborato in un thread in background, pertanto il numero di eventi nel buffer segnalato da questa API deve essere considerato solo un'approssimazione ed è consigliabile usare solo nel debug. Per un ID di partizione senza buffer eventi, 0 verrà restituito indipendentemente dal fatto che tale ID di partizione esista effettivamente all'interno dell'hub eventi.

get_buffered_event_count(partition_id: str) -> int | None

Parametri

partition_id
str
Necessario

ID partizione di destinazione.

Tipo restituito

int,

Eccezioni

Se si verifica un errore quando si scarica il buffer se lo scaricamento è impostato su True o chiude le connessioni AMQP sottostanti in modalità buffer.

get_eventhub_properties

Ottenere le proprietà dell'hub eventi.

Le chiavi nel dizionario restituito includono:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

Restituisce

Dizionario contenente proprietà eventhub.

Tipo restituito

Eccezioni

get_partition_ids

Ottenere gli ID di partizione dell'hub eventi.

get_partition_ids() -> List[str]

Restituisce

Elenco di ID di partizione.

Tipo restituito

Eccezioni

get_partition_properties

Ottenere le proprietà della partizione specificata.

Le chiavi nel dizionario delle proprietà includono:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametri

partition_id
str
Necessario

ID partizione di destinazione.

Restituisce

Dizionario delle proprietà della partizione.

Tipo restituito

Eccezioni

send_batch

Invia un batch di dati dell'evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo tenterà di accodare gli eventi nel buffer entro il tempo specificato, se specificato e restituito. Il producer eseguirà l'invio automatico in background in modalità memorizzata nel buffer.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue:

  • Se un callback on_error viene passato durante la creazione di istanze del client producer,

    le informazioni sull'errore verranno quindi passate al callback on_error , che verrà quindi chiamato.

  • Se un callback on_error non viene passato durante la creazione di istanze del client,

    l'errore verrà generato per impostazione predefinita.

Se buffered_mode è True, è necessario on_error callback e gli errori verranno gestiti come segue:

  • Se gli eventi non vengono accodati entro il timeout specificato, verrà generato direttamente un errore.

  • Se gli eventi non vengono inviati dopo l'accodamento, verrà chiamato il callback on_error .

In modalità memorizzata nel buffer, l'invio di un batch rimarrà intatto e inviato come singola unità. Il batch non verrà riorganiato. Ciò può comportare l'inefficienza dell'invio di eventi.

Se si invia un elenco finito di EventData o AmqpAnnotatedMessage e si sa che è entro il limite di dimensioni del frame dell'hub eventi, è possibile inviarli con una chiamata send_batch . In caso contrario, usare create_batch per creare EventDataBatch e aggiungere EventData o AmqpAnnotatedMessage nel batch uno per uno fino al limite di dimensioni e quindi chiamare questo metodo per inviare il batch.

send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parametri

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Necessario

Oggetto EventDataBatch da inviare o un elenco di EventData da inviare in un batch. Tutti EventData o AmqpAnnotatedMessage nell'elenco o EventDataBatch verranno inseriti nella stessa partizione.

timeout
float

Tempo di attesa massimo per inviare i dati dell'evento in modalità non memorizzata nel buffer o il tempo di attesa massimo per assegnare i dati dell'evento al buffer in modalità buffer. In modalità non buffer, il tempo di attesa predefinito specificato al momento della creazione del produttore verrà usato. In modalità memorizzata nel buffer il tempo di attesa predefinito è Nessuno.

partition_id
str

ID di partizione specifico a cui inviare. Il valore predefinito è Nessuno, nel qual caso il servizio verrà assegnato a tutte le partizioni usando il round robin. Se si specifica partition_id e event_data_batch è un EventDataBatch perché EventDataBatch ha partition_id, verrà generato un typeError.

partition_key
str

Con il partition_key specificato, i dati dell'evento verranno inviati a una determinata partizione dell'hub eventi deciso dal servizio. Se si specifica partition_key e event_data_batch è un oggetto EventDataBatch, verrà generato un typeerror perché EventDataBatch ha partition_key. Se vengono forniti sia partition_id che partition_key, il partition_id avrà la precedenza. AVVISO: l'impostazione di partition_key di valore non stringa sugli eventi da inviare è sconsigliata perché il partition_key verrà ignorato dal servizio Hub eventi e gli eventi verranno assegnati a tutte le partizioni tramite round robin. Sono inoltre disponibili SDK per l'utilizzo di eventi che prevedono che partition_key solo come tipo stringa, potrebbero non riuscire ad analizzare il valore non stringa.

Tipo restituito

Eccezioni

Se il valore specificato dal parametro timeout viene trascorso prima che l'evento possa essere inviato in modalità non buffered o gli eventi non possono essere accodati nel buffer in modalità buffer.

Esempio

Invia i dati dell'evento


       with producer:
           event_data_batch = producer.create_batch()

           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # EventDataBatch object reaches max_size.
                   # New EventDataBatch object can be created here to send more data
                   break

           producer.send_batch(event_data_batch)

send_event

Invia i dati di un evento. Per impostazione predefinita, il metodo bloccherà fino al timeout dell'acknowledgement o del timeout dell'operazione. Se EventHubProducerClient è configurato per l'esecuzione in modalità memorizzata nel buffer, il metodo tenterà di accodare gli eventi nel buffer entro il tempo specificato, se specificato e restituito. Il producer eseguirà l'invio automatico in background in modalità memorizzata nel buffer.

Se buffered_mode è False, on_error callback è facoltativo e gli errori verranno gestiti come segue: * Se viene passato un callback on_error durante la creazione di istanze del client producer,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Se buffered_mode è True, on_error callback è obbligatorio e gli errori verranno gestiti nel modo seguente: * Se gli eventi non riescono a accodare entro il timeout specificato, verrà generato un errore direttamente.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parametri

event_data
Union[EventData, AmqpAnnotatedMessage]
Necessario

Oggetto EventData da inviare.

timeout
float

Tempo di attesa massimo per inviare i dati dell'evento in modalità non memorizzata nel buffer o il tempo di attesa massimo per accodare i dati dell'evento nel buffer in modalità memorizzata nel buffer. In modalità non memorizzata nel buffer, verrà utilizzato il tempo di attesa predefinito specificato al momento della creazione del producer. In modalità memorizzata nel buffer il tempo di attesa predefinito è Nessuno.

partition_id
str

ID di partizione specifico a cui inviare. Il valore predefinito è Nessuno, nel qual caso il servizio verrà assegnato a tutte le partizioni usando il round robin. Se si specifica partition_id e event_data_batch è un EventDataBatch perché EventDataBatch ha partition_id, verrà generato un typeError.

partition_key
str

Con il partition_key specificato, i dati dell'evento verranno inviati a una determinata partizione dell'hub eventi deciso dal servizio. Se si specifica partition_key e event_data_batch è un oggetto EventDataBatch, verrà generato un typeerror perché EventDataBatch ha partition_key. Se vengono forniti sia partition_id che partition_key, il partition_id avrà la precedenza. AVVISO: l'impostazione di partition_key di valore non stringa sugli eventi da inviare è sconsigliata perché il partition_key verrà ignorato dal servizio Hub eventi e gli eventi verranno assegnati a tutte le partizioni tramite round robin. Sono inoltre disponibili SDK per l'utilizzo di eventi che prevedono che partition_key solo come tipo stringa, potrebbero non riuscire ad analizzare il valore non stringa.

Tipo restituito

Eccezioni

Se il valore specificato dal parametro di timeout è trascorso prima che l'evento possa essere inviato in modalità non memorizzata nel buffer o gli eventi possono essere accodati nel buffer in modalità memorizzata nel buffer.

Attributi

total_buffered_event_count

Numero totale di eventi attualmente memorizzati nel buffer e in attesa di pubblicazione, in tutte le partizioni. Restituisce Nessuno in modalità non buffer. NOTA: il buffer eventi viene elaborato in un thread in background, pertanto il numero di eventi nel buffer segnalato da questa API deve essere considerato solo un'approssimazione ed è consigliabile usare solo nel debug.

Tipo restituito

int,