Delen via


EventHubProducerClient Klas

De Klasse EventHubProducerClient definieert een interface op hoog niveau voor het verzenden van gebeurtenissen naar de Azure Event Hubs-service.

Overname
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

Constructor

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

Parameters

fully_qualified_namespace
str
Vereist

De volledig gekwalificeerde hostnaam voor de Event Hubs-naamruimte. Dit is waarschijnlijk vergelijkbaar met .servicebus.windows.net

eventhub_name
str
Vereist

Het pad van de specifieke Event Hub waarmee de client verbinding moet maken.

credential
AsyncTokenCredential of AzureSasCredential of AzureNamedKeyCredential
Vereist

Het referentieobject dat wordt gebruikt voor verificatie, waarmee een bepaalde interface wordt geïmplementeerd voor het ophalen van tokens. Het accepteert , of referentieobjecten EventHubSharedKeyCredentialdie zijn gegenereerd door de azure-identity-bibliotheek en objecten die de methode *get_token(zelf, bereiken) implementeren.

buffered_mode
bool

Indien waar, verzamelt de producentclient gebeurtenissen in een buffer, efficiënt batchgewijs en publiceert deze vervolgens. De standaardwaarde is Onwaar.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

De callback die moet worden aangeroepen zodra een batch is gepubliceerd. De callback heeft twee parameters:

  • gebeurtenissen: de lijst met gebeurtenissen die zijn gepubliceerd

  • partition_id: de partitie-id waarnaar de gebeurtenissen in de lijst zijn gepubliceerd.

De callback-functie moet als volgt worden gedefinieerd: on_success(gebeurtenissen, partition_id). Vereist wanneer buffered_mode Waar is, maar optioneel als buffered_mode Onwaar is.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

De callback die moet worden aangeroepen zodra een batch niet kan worden gepubliceerd. Vereist wanneer in buffered_mode Waar is, maar optioneel als buffered_mode Onwaar is. De callback-functie moet als volgt worden gedefinieerd: on_error(gebeurtenissen, partition_id, fout) waarbij:

  • gebeurtenissen: de lijst met gebeurtenissen die niet zijn gepubliceerd,

  • partition_id: de partitie-id waarnaar de gebeurtenissen in de lijst zijn gepubliceerd en

  • fout: de uitzondering met betrekking tot de fout bij het verzenden.

Als buffered_mode Onwaar is, is on_error callback optioneel en worden fouten als volgt verwerkt:

  • Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

    foutinformatie wordt doorgegeven aan de on_error callback, die vervolgens wordt aangeroepen.

  • Als een on_error callback niet wordt doorgegeven tijdens het maken van de client,

    wordt de fout standaard gegenereerd.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt:

  • Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er direct een fout gegenereerd.

  • Als gebeurtenissen niet kunnen worden verzonden nadat de enquête is voltooid, wordt de on_error callback aangeroepen.

max_buffer_length
int

Alleen buffermodus. Het totale aantal gebeurtenissen per partitie dat kan worden gebufferd voordat een flush wordt geactiveerd. De standaardwaarde is 1500 in de buffermodus.

max_wait_time
Optional[float]

Alleen buffermodus. De hoeveelheid tijd die moet worden gewacht totdat een batch wordt gemaakt met gebeurtenissen in de buffer voordat deze wordt gepubliceerd. De standaardwaarde is 1 in de buffermodus.

logging_enable
bool

Of netwerktraceringslogboeken naar de logboekregistratie moeten worden uitgevoerd. De standaardwaarde is Onwaar.

auth_timeout
float

De tijd in seconden om te wachten tot een token is geautoriseerd door de service. De standaardwaarde is 60 seconden. Als dit is ingesteld op 0, wordt er geen time-out afgedwongen door de client.

user_agent
str

Indien opgegeven, wordt dit toegevoegd vóór de tekenreeks van de gebruikersagent.

retry_total
int

Het totale aantal pogingen om een mislukte bewerking opnieuw uit te voeren wanneer er een fout optreedt. De standaardwaarde is 3.

retry_backoff_factor
float

Een uitstelfactor die moet worden toegepast tussen pogingen na de tweede poging (de meeste fouten worden onmiddellijk opgelost door een tweede poging zonder vertraging). In de vaste modus wordt beleid voor opnieuw proberen altijd in de slaapstand gezet voor {backoff factor}. In de exponentiële modus wordt beleid voor opnieuw proberen in de slaapstand gezet voor: {backoff factor} * (2 ** ({aantal totaal aantal nieuwe pogingen} - 1)) seconden. Als de backoff_factor 0,1 is, wordt de nieuwe poging tussen nieuwe pogingen in de slaapstand gezet voor [0.0s, 0.2s, 0.4s, ...]. De standaardwaarde is 0,8.

retry_backoff_max
float

De maximale verloftijd. De standaardwaarde is 120 seconden (2 minuten).

retry_mode
str

Het vertragingsgedrag tussen nieuwe pogingen. Ondersteunde waarden zijn 'vast' of 'exponentieel', waarbij standaard 'exponentieel' is.

idle_timeout
float

Time-out, in seconden, waarna deze client de onderliggende verbinding sluit als er geen activiteit is. Standaard is de waarde Geen, wat betekent dat de client niet wordt afgesloten vanwege inactiviteit, tenzij deze is geïnitieerd door de service.

transport_type
TransportType

Het type transportprotocol dat wordt gebruikt voor communicatie met de Event Hubs-service. De standaardwaarde is TransportType.Amqp , in welk geval poort 5671 wordt gebruikt. Als poort 5671 niet beschikbaar/geblokkeerd is in de netwerkomgeving, kan TransportType.AmqpOverWebsocket worden gebruikt in plaats van poort 443 voor communicatie.

http_proxy
dict

HTTP-proxyinstellingen. Dit moet een woordenlijst zijn met de volgende toetsen: 'proxy_hostname' (str-waarde) en 'proxy_port' (int-waarde). Daarnaast kunnen ook de volgende sleutels aanwezig zijn: 'gebruikersnaam', 'wachtwoord'.

custom_endpoint_address
Optional[str]

Het aangepaste eindpuntadres dat moet worden gebruikt voor het tot stand brengen van een verbinding met de Event Hubs-service, zodat netwerkaanvragen kunnen worden gerouteerd via toepassingsgateways of andere paden die nodig zijn voor de hostomgeving. De standaardwaarde is Geen. De indeling ziet er ongeveer als volgt uit: 'sb://< custom_endpoint_hostname>:<custom_endpoint_port>'. Als poort niet is opgegeven in de custom_endpoint_address, wordt standaard poort 443 gebruikt.

connection_verify
Optional[str]

Pad naar het aangepaste CA_BUNDLE-bestand van het SSL-certificaat dat wordt gebruikt om de identiteit van het verbindingseindpunt te verifiëren. De standaardwaarde is Geen, in welk geval certifi.where() wordt gebruikt.

uamqp_transport
bool

Of u de uamqp-bibliotheek wilt gebruiken als het onderliggende transport. De standaardwaarde is False en de Pure Python AMQP-bibliotheek wordt gebruikt als het onderliggende transport.

socket_timeout
float

De tijd in seconden dat de onderliggende socket op de verbinding moet wachten bij het verzenden en ontvangen van gegevens voordat er een time-out optreedt. De standaardwaarde is 0,2 voor TransportType.Amqp en 1 voor TransportType.AmqpOverWebsocket. Als er EventHubsConnectionError-fouten optreden vanwege een time-out van de schrijfbewerking, moet mogelijk een groter dan de standaardwaarde worden doorgegeven. Dit is voor geavanceerde gebruiksscenario's en normaal gesproken moet de standaardwaarde voldoende zijn.

Voorbeelden

Maak een nieuw exemplaar van de EventHubProducerClient.


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Methoden

close

Sluit de onderliggende AMQP-verbinding en koppelingen van de Producer-client.

create_batch

Maak een EventDataBatch-object met de maximale grootte van alle inhoud die wordt beperkt door max_size_in_bytes.

De max_size_in_bytes mag niet groter zijn dan de maximaal toegestane berichtgrootte die door de service is gedefinieerd.

flush

Alleen buffermodus. Gebeurtenissen in de buffer leegmaken die onmiddellijk moeten worden verzonden als de client in de buffermodus werkt.

from_connection_string

Maak een EventHubProducerClient op basis van een verbindingsreeks.

get_buffered_event_count

Het aantal gebeurtenissen dat wordt gebufferd en dat wacht om te worden gepubliceerd voor een bepaalde partitie. Retourneert Geen in de niet-gebufferde modus. OPMERKING: De gebeurtenisbuffer wordt verwerkt in een achtergrondcoroutine. Het aantal gebeurtenissen in de buffer dat door deze API wordt gerapporteerd, moet daarom alleen worden beschouwd als een benadering en wordt alleen aanbevolen voor gebruik bij foutopsporing. Voor een partitie-id waarvoor geen gebeurtenissen zijn gebufferd, wordt 0 geretourneerd, ongeacht of die partitie-id daadwerkelijk bestaat in de Event Hub.

get_eventhub_properties

Eigenschappen van de Event Hub ophalen.

Sleutels in de geretourneerde woordenlijst zijn onder andere:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Partitie-id's van de Event Hub ophalen.

get_partition_properties

Eigenschappen van de opgegeven partitie ophalen.

Sleutels in de eigenschappenwoordenlijst zijn onder andere:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Hiermee wordt een batch gebeurtenisgegevens verzonden. Standaard wordt de methode geblokkeerd totdat de bevestiging is ontvangen of een time-out voor de bewerking optreedt. Als de EventHubProducerClient is geconfigureerd om te worden uitgevoerd in de gebufferde modus, worden de gebeurtenissen door de methode in de lokale buffer geplaatst en geretourneerd. De producent verzendt automatisch op de achtergrond.

Als buffered_mode False is, is on_error callback optioneel en worden fouten als volgt verwerkt:

  • Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

    vervolgens worden foutinformatie doorgegeven aan de on_error callback, die vervolgens wordt aangeroepen.

  • Als een on_error callback niet wordt doorgegeven tijdens het instantiëeren van de client,

    dan wordt de fout standaard gegenereerd.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt:

  • Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er direct een fout gegenereerd.

  • Als gebeurtenissen niet kunnen worden verzonden nadat de enquête is voltooid, wordt de on_error callback aangeroepen.

In de buffermodus blijft het verzenden van een batch intact en wordt deze verzonden als één eenheid. De batch wordt niet opnieuw gerangschikt. Dit kan leiden tot inefficiëntie van het verzenden van gebeurtenissen.

Als u een eindige lijst met EventData of AmqpAnnotatedMessage verzendt en u weet dat deze binnen de framegroottelimiet van de Event Hub valt, kunt u deze verzenden met een send_batch-aanroep . Gebruik anders create_batch om EventDataBatch te maken en voeg EventData of AmqpAnnotatedMessage één voor één toe aan de batch totdat de groottelimiet is bereikt. Roep vervolgens deze methode aan om de batch te verzenden.

send_event

Hiermee worden gebeurtenisgegevens verzonden. Standaard wordt de methode geblokkeerd totdat de bevestiging is ontvangen of een time-out voor de bewerking optreedt. Als de EventHubProducerClient is geconfigureerd om te worden uitgevoerd in de gebufferde modus, wordt de gebeurtenis door de methode in de lokale buffer geplaatst en geretourneerd. De producent voert automatische batchverwerking en verzending op de achtergrond uit.

Als buffered_mode False is, is on_error callback optioneel en worden fouten als volgt verwerkt: * Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt: * Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er rechtstreeks een fout gegenereerd.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Sluit de onderliggende AMQP-verbinding en koppelingen van de Producer-client.

async close(*, flush: bool = True, **kwargs: Any) -> None

Parameters

flush
bool

Alleen buffermodus. Als deze optie is ingesteld op Waar, worden gebeurtenissen in de buffer onmiddellijk verzonden. De standaardwaarde is Waar.

timeout
float of None

Alleen buffermodus. Time-out om de producent te sluiten. De standaardwaarde is Geen, wat betekent dat er geen time-out is.

Retourtype

Uitzonderingen

Als er een fout is opgetreden bij het leegmaken van de buffer als leegmaken is ingesteld op True of het sluiten van de onderliggende AMQP-verbindingen in de buffermodus.

Voorbeelden

Sluit de handler.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

Maak een EventDataBatch-object met de maximale grootte van alle inhoud die wordt beperkt door max_size_in_bytes.

De max_size_in_bytes mag niet groter zijn dan de maximaal toegestane berichtgrootte die door de service is gedefinieerd.

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

Retourtype

Uitzonderingen

Als er een fout is opgetreden bij het leegmaken van de buffer als leegmaken is ingesteld op True of het sluiten van de onderliggende AMQP-verbindingen in de buffermodus.

Voorbeelden

EventDataBatch-object binnen beperkte grootte maken


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Alleen buffermodus. Gebeurtenissen in de buffer leegmaken die onmiddellijk moeten worden verzonden als de client in de buffermodus werkt.

async flush(**kwargs: Any) -> None

Parameters

timeout
float of None

Time-out voor het leegmaken van de gebufferde gebeurtenissen. De standaardwaarde is Geen, wat betekent dat er geen time-out is.

Retourtype

Uitzonderingen

Als de producent de buffer niet kan leegmaken binnen de opgegeven time-out in de gebufferde modus.

from_connection_string

Maak een EventHubProducerClient op basis van een verbindingsreeks.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

Parameters

conn_str
str
Vereist

De verbindingsreeks van een Event Hub.

eventhub_name
str

Het pad van de specifieke Event Hub waarmee de client verbinding moet maken.

buffered_mode
bool

Als Waar is, verzamelt de producerclient gebeurtenissen in een buffer, efficiënt batchgewijs en publiceert deze vervolgens. De standaardwaarde is False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

De callback die moet worden aangeroepen zodra een batch is gepubliceerd. De callback heeft twee parameters:

  • gebeurtenissen: de lijst met gebeurtenissen die zijn gepubliceerd

  • partition_id: de partitie-id waarnaar de gebeurtenissen in de lijst zijn gepubliceerd.

De callback-functie moet als volgt worden gedefinieerd: on_success(gebeurtenissen, partition_id). Dit is vereist wanneer buffered_mode Waar is, maar optioneel als buffered_mode Onwaar is.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

De callback die moet worden aangeroepen zodra een batch niet kan worden gepubliceerd. De callback-functie moet als volgt worden gedefinieerd: on_error(gebeurtenissen, partition_id, fout), waarbij:

  • gebeurtenissen: de lijst met gebeurtenissen die niet zijn gepubliceerd,

  • partition_id: de partitie-id waarnaar de gebeurtenissen in de lijst zijn gepubliceerd en

  • fout: de uitzondering met betrekking tot de verzendfout.

Als buffered_mode False is, is on_error callback optioneel en worden fouten als volgt verwerkt:

  • Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

    vervolgens worden foutinformatie doorgegeven aan de on_error callback, die vervolgens wordt aangeroepen.

  • Als een on_error callback niet wordt doorgegeven tijdens het instantiëeren van de client,

    dan wordt de fout standaard gegenereerd.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt:

  • Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er direct een fout gegenereerd.

  • Als gebeurtenissen niet kunnen worden verzonden nadat de enquête is voltooid, wordt de on_error callback aangeroepen.

max_buffer_length
int

Alleen buffermodus. Het totale aantal gebeurtenissen per partitie dat kan worden gebufferd voordat een flush wordt geactiveerd. De standaardwaarde is 1500 in de buffermodus.

max_wait_time
Optional[float]

Alleen buffermodus. De hoeveelheid tijd die moet worden gewacht totdat een batch is gemaakt met gebeurtenissen in de buffer voordat deze wordt gepubliceerd. De standaardwaarde is 1 in de buffermodus.

logging_enable
bool

Of netwerktraceringslogboeken moeten worden uitgevoerd naar de logboekregistratie. De standaardwaarde is False.

http_proxy
dict

HTTP-proxyinstellingen. Dit moet een woordenlijst zijn met de volgende sleutels: 'proxy_hostname' (str-waarde) en 'proxy_port' (int-waarde). Daarnaast kunnen de volgende sleutels ook aanwezig zijn: 'gebruikersnaam', 'wachtwoord'.

auth_timeout
float

De tijd in seconden om te wachten tot een token is geautoriseerd door de service. De standaardwaarde is 60 seconden. Als dit is ingesteld op 0, wordt er geen time-out afgedwongen door de client.

user_agent
str

Indien opgegeven, wordt deze toegevoegd vóór de tekenreeks van de gebruikersagent.

retry_total
int

Het totale aantal pogingen om een mislukte bewerking opnieuw uit te voeren wanneer er een fout optreedt. De standaardwaarde is 3.

retry_backoff_factor
float

Een uitstelfactor die moet worden toegepast tussen pogingen na de tweede poging (de meeste fouten worden onmiddellijk opgelost door een tweede poging zonder vertraging). In de vaste modus wordt beleid voor opnieuw proberen altijd in de slaapstand gezet voor {backoff factor}. In de exponentiële modus wordt het beleid voor opnieuw proberen in de slaapstand gezet voor: {uitstelfactor} * (2 ** ({aantal totale nieuwe pogingen} - 1)) seconden. Als de backoff_factor 0,1 is, wordt de nieuwe poging in de slaapstand gezet voor [0.0s, 0.2s, 0.4s, ...] tussen nieuwe pogingen. De standaardwaarde is 0,8.

retry_backoff_max
float

De maximale verloftijd. De standaardwaarde is 120 seconden (2 minuten).

retry_mode
str

Het vertragingsgedrag tussen nieuwe pogingen. Ondersteunde waarden zijn 'vast' of 'exponentieel', waarbij de standaardwaarde 'exponentieel' is.

idle_timeout
float

Time-out, in seconden, waarna deze client de onderliggende verbinding sluit als er geen activiteit is. Standaard is de waarde Geen, wat betekent dat de client niet wordt afgesloten vanwege inactiviteit, tenzij gestart door de service.

transport_type
TransportType

Het type transportprotocol dat wordt gebruikt voor de communicatie met de Event Hubs-service. De standaardwaarde is TransportType.Amqp . In dat geval wordt poort 5671 gebruikt. Als poort 5671 niet beschikbaar/geblokkeerd is in de netwerkomgeving, kan TransportType.AmqpOverWebsocket worden gebruikt in plaats daarvan, waarbij poort 443 wordt gebruikt voor communicatie.

custom_endpoint_address
Optional[str]

Het aangepaste eindpuntadres dat moet worden gebruikt voor het tot stand brengen van een verbinding met de Event Hubs-service, zodat netwerkaanvragen kunnen worden gerouteerd via toepassingsgateways of andere paden die nodig zijn voor de hostomgeving. De standaardwaarde is Geen. De indeling ziet er als volgt uit: 'sb://< custom_endpoint_hostname>:<custom_endpoint_port>'. Als poort niet is opgegeven in de custom_endpoint_address, wordt standaard poort 443 gebruikt.

connection_verify
Optional[str]

Pad naar het aangepaste CA_BUNDLE-bestand van het SSL-certificaat dat wordt gebruikt om de identiteit van het verbindingseindpunt te verifiëren. De standaardwaarde is Geen, in welk geval certifi.where() wordt gebruikt.

uamqp_transport
bool

Of u de uamqp-bibliotheek als het onderliggende transport wilt gebruiken. De standaardwaarde is False en de Pure Python AMQP-bibliotheek wordt gebruikt als het onderliggende transport.

Retourtype

Uitzonderingen

Als er een fout is opgetreden bij het leegmaken van de buffer als leegmaken is ingesteld op True of het sluiten van de onderliggende AMQP-verbindingen in de buffermodus.

Voorbeelden

Maak een nieuw exemplaar van de EventHubProducerClient vanuit verbindingsreeks.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Het aantal gebeurtenissen dat wordt gebufferd en dat wacht om te worden gepubliceerd voor een bepaalde partitie. Retourneert Geen in de niet-gebufferde modus. OPMERKING: De gebeurtenisbuffer wordt verwerkt in een achtergrondcoroutine. Het aantal gebeurtenissen in de buffer dat door deze API wordt gerapporteerd, moet daarom alleen worden beschouwd als een benadering en wordt alleen aanbevolen voor gebruik bij foutopsporing. Voor een partitie-id waarvoor geen gebeurtenissen zijn gebufferd, wordt 0 geretourneerd, ongeacht of die partitie-id daadwerkelijk bestaat in de Event Hub.

get_buffered_event_count(partition_id: str) -> int | None

Parameters

partition_id
str
Vereist

De doelpartitie-id.

Retourtype

int,

Uitzonderingen

Als er een fout is opgetreden bij het leegmaken van de buffer als leegmaken is ingesteld op True of het sluiten van de onderliggende AMQP-verbindingen in de buffermodus.

get_eventhub_properties

Eigenschappen van de Event Hub ophalen.

Sleutels in de geretourneerde woordenlijst zijn onder andere:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Retouren

Een woordenlijst met informatie over de Event Hub.

Retourtype

Uitzonderingen

get_partition_ids

Partitie-id's van de Event Hub ophalen.

async get_partition_ids() -> List[str]

Retouren

Een lijst met partitie-id's.

Retourtype

Uitzonderingen

get_partition_properties

Eigenschappen van de opgegeven partitie ophalen.

Sleutels in de eigenschappenwoordenlijst zijn onder andere:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parameters

partition_id
str
Vereist

De doelpartitie-id.

Retouren

Een dict van partitie-eigenschappen.

Retourtype

Uitzonderingen

send_batch

Hiermee wordt een batch gebeurtenisgegevens verzonden. Standaard wordt de methode geblokkeerd totdat de bevestiging is ontvangen of een time-out voor de bewerking optreedt. Als de EventHubProducerClient is geconfigureerd om te worden uitgevoerd in de gebufferde modus, worden de gebeurtenissen door de methode in de lokale buffer geplaatst en geretourneerd. De producent verzendt automatisch op de achtergrond.

Als buffered_mode False is, is on_error callback optioneel en worden fouten als volgt verwerkt:

  • Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

    vervolgens worden foutinformatie doorgegeven aan de on_error callback, die vervolgens wordt aangeroepen.

  • Als een on_error callback niet wordt doorgegeven tijdens het instantiëeren van de client,

    dan wordt de fout standaard gegenereerd.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt:

  • Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er direct een fout gegenereerd.

  • Als gebeurtenissen niet kunnen worden verzonden nadat de enquête is voltooid, wordt de on_error callback aangeroepen.

In de buffermodus blijft het verzenden van een batch intact en wordt deze verzonden als één eenheid. De batch wordt niet opnieuw gerangschikt. Dit kan leiden tot inefficiëntie van het verzenden van gebeurtenissen.

Als u een eindige lijst met EventData of AmqpAnnotatedMessage verzendt en u weet dat deze binnen de framegroottelimiet van de Event Hub valt, kunt u deze verzenden met een send_batch-aanroep . Gebruik anders create_batch om EventDataBatch te maken en voeg EventData of AmqpAnnotatedMessage één voor één toe aan de batch totdat de groottelimiet is bereikt. Roep vervolgens deze methode aan om de batch te verzenden.

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parameters

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Vereist

Het EventDataBatch-object dat moet worden verzonden of een lijst met EventData die in een batch moet worden verzonden. Alle EventData of AmqpAnnotatedMessage in de lijst of EventDataBatch komen op dezelfde partitie terecht.

timeout
float

De maximale wachttijd voor het verzenden van de gebeurtenisgegevens in de niet-gebufferde modus of de maximale wachttijd voor het in de buffer plaatsen van de gebeurtenisgegevens in de buffermodus. In de niet-gebufferde modus wordt de standaardwachttijd gebruikt die is opgegeven toen de producent werd gemaakt. In de buffermodus is de standaardwachttijd Geen.

partition_id
str

De specifieke partitie-id die moet worden verzonden. De standaardwaarde is Geen. In dat geval wijst de service toe aan alle partities met behulp van round robin. Er wordt een TypeError gegenereerd als partition_id is opgegeven en event_data_batch een EventDataBatch is omdat EventDataBatch zelf partition_id heeft.

partition_key
str

Met de opgegeven partition_key worden gebeurtenisgegevens verzonden naar een bepaalde partitie van de Event Hub die door de service is bepaald. Er wordt een TypeError gegenereerd als partition_key is opgegeven en event_data_batch een EventDataBatch is omdat EventDataBatch zelf partition_key heeft. Als zowel partition_id als partition_key worden opgegeven, heeft de partition_id voorrang. WAARSCHUWING: het instellen van partition_key van een niet-tekenreekswaarde voor de gebeurtenissen die moeten worden verzonden, wordt afgeraden omdat de partition_key wordt genegeerd door de Event Hub-service en gebeurtenissen worden toegewezen aan alle partities met behulp van round robin. Bovendien zijn er SDK's voor het verbruik van gebeurtenissen die verwachten dat partition_key alleen van het tekenreekstype zijn, ze de niet-tekenreekswaarde mogelijk niet parseren.

Retourtype

Uitzonderingen

Als de waarde die is opgegeven door de time-outparameter, is verstreken voordat de gebeurtenis kan worden verzonden in de niet-buffermodus of als de gebeurtenissen in de gebufferde modus kunnen worden geplaatst.

Voorbeelden

Gebeurtenisgegevens worden asynchroon verzonden


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

Hiermee worden gebeurtenisgegevens verzonden. Standaard wordt de methode geblokkeerd totdat de bevestiging is ontvangen of een time-out voor de bewerking optreedt. Als de EventHubProducerClient is geconfigureerd om te worden uitgevoerd in de gebufferde modus, wordt de gebeurtenis door de methode in de lokale buffer geplaatst en geretourneerd. De producent voert automatische batchverwerking en verzending op de achtergrond uit.

Als buffered_mode False is, is on_error callback optioneel en worden fouten als volgt verwerkt: * Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt: * Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er rechtstreeks een fout gegenereerd.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parameters

event_data
Union[EventData, AmqpAnnotatedMessage]
Vereist

Het EventData-object dat moet worden verzonden.

timeout
float

De maximale wachttijd voor het verzenden van de gebeurtenisgegevens in de niet-gebufferde modus of de maximale wachttijd voor het in de buffer plaatsen van de gebeurtenisgegevens in de buffermodus. In de niet-gebufferde modus wordt de standaardwachttijd gebruikt die is opgegeven toen de producent werd gemaakt. In de buffermodus is de standaardwachttijd Geen.

partition_id
str

De specifieke partitie-id die moet worden verzonden. De standaardwaarde is Geen. In dat geval wijst de service toe aan alle partities met behulp van round robin. Er wordt een TypeError gegenereerd als partition_id is opgegeven en event_data_batch een EventDataBatch is omdat EventDataBatch zelf partition_id heeft.

partition_key
str

Met de opgegeven partition_key worden gebeurtenisgegevens verzonden naar een bepaalde partitie van de Event Hub die door de service is bepaald. Er wordt een TypeError gegenereerd als partition_key is opgegeven en event_data_batch een EventDataBatch is omdat EventDataBatch zelf partition_key heeft. Als zowel partition_id als partition_key worden opgegeven, heeft de partition_id voorrang. WAARSCHUWING: het instellen van partition_key van een niet-tekenreekswaarde voor de gebeurtenissen die moeten worden verzonden, wordt afgeraden omdat de partition_key wordt genegeerd door de Event Hub-service en gebeurtenissen worden toegewezen aan alle partities met behulp van round robin. Bovendien zijn er SDK's voor het verbruik van gebeurtenissen die verwachten dat partition_key alleen van het tekenreekstype zijn, ze de niet-tekenreekswaarde mogelijk niet parseren.

Retourtype

Uitzonderingen

Als de waarde die is opgegeven door de time-outparameter, is verstreken voordat de gebeurtenis kan worden verzonden in de modus niet-buffer, of als de gebeurtenissen niet in de buffermodus kunnen worden geplaatst.

Kenmerken

total_buffered_event_count

Het totale aantal gebeurtenissen dat momenteel is gebufferd en wacht om te worden gepubliceerd, voor alle partities. Retourneert Geen in de niet-gebufferde modus. OPMERKING: De gebeurtenisbuffer wordt verwerkt in een achtergrondcoroutine. Daarom moet het aantal gebeurtenissen in de buffer dat door deze API wordt gerapporteerd, alleen worden beschouwd als een benadering en wordt alleen aanbevolen voor gebruik bij foutopsporing.

Retourtype

int,