Delen via


EventHubProducerClient Klas

De Klasse EventHubProducerClient definieert een interface op hoog niveau voor het verzenden van gebeurtenissen naar de Azure Event Hubs-service.

Overname
azure.eventhub._client_base.ClientBase
EventHubProducerClient

Constructor

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: Literal[False] = False, **kwargs: Any)

Parameters

fully_qualified_namespace
str
Vereist

De volledig gekwalificeerde hostnaam voor de Event Hubs-naamruimte. Dit is waarschijnlijk vergelijkbaar met .servicebus.windows.net

eventhub_name
str
Vereist

Het pad van de specifieke Event Hub waarmee de client verbinding moet maken.

credential
TokenCredential of AzureSasCredential of AzureNamedKeyCredential
Vereist

Het referentieobject dat wordt gebruikt voor verificatie, waarmee een bepaalde interface wordt geïmplementeerd voor het ophalen van tokens. Het accepteert , of referentieobjecten EventHubSharedKeyCredentialdie zijn gegenereerd door de azure-identity-bibliotheek en objecten die de methode *get_token(zelf, bereiken) implementeren.

buffered_mode
bool

Als Waar is, verzamelt de producerclient gebeurtenissen in een buffer, efficiënt batchgewijs en publiceert deze vervolgens. De standaardwaarde is False.

buffer_concurrency
<xref:ThreadPoolExecutor> of int of None

De ThreadPoolExecutor die moet worden gebruikt voor het publiceren van gebeurtenissen of het aantal werkrollen voor de ThreadPoolExecutor. De standaardwaarde is Geen en er wordt een ThreadPoolExecutor met het standaardaantal werkrollen gemaakt per https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

De callback die moet worden aangeroepen zodra een batch is gepubliceerd. De callback heeft twee parameters:

  • gebeurtenissen: de lijst met gebeurtenissen die zijn gepubliceerd

  • partition_id: de partitie-id waarnaar de gebeurtenissen in de lijst zijn gepubliceerd.

De callback-functie moet als volgt worden gedefinieerd: on_success(gebeurtenissen, partition_id). Dit is vereist wanneer buffered_mode Waar is, maar optioneel als buffered_mode Onwaar is.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

De callback die moet worden aangeroepen zodra een batch niet kan worden gepubliceerd. De callback-functie moet als volgt worden gedefinieerd: on_error(gebeurtenissen, partition_id, fout), waarbij:

  • gebeurtenissen: de lijst met gebeurtenissen die niet zijn gepubliceerd,

  • partition_id: de partitie-id waarnaar de gebeurtenissen in de lijst zijn gepubliceerd en

  • fout: de uitzondering met betrekking tot de verzendfout.

Als buffered_mode False is, is on_error callback optioneel en worden fouten als volgt verwerkt:

  • Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

    vervolgens worden foutinformatie doorgegeven aan de on_error callback, die vervolgens wordt aangeroepen.

  • Als een on_error callback niet wordt doorgegeven tijdens het instantiëeren van de client,

    dan wordt de fout standaard gegenereerd.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt:

  • Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er direct een fout gegenereerd.

  • Als gebeurtenissen niet kunnen worden verzonden nadat de enquête is voltooid, wordt de on_error callback aangeroepen.

max_buffer_length
int

Alleen buffermodus. Het totale aantal gebeurtenissen per partitie dat kan worden gebufferd voordat een flush wordt geactiveerd. De standaardwaarde is 1500 in de buffermodus.

max_wait_time
Optional[float]

Alleen buffermodus. De hoeveelheid tijd die moet worden gewacht totdat een batch is gemaakt met gebeurtenissen in de buffer voordat deze wordt gepubliceerd. De standaardwaarde is 1 in de buffermodus.

logging_enable
bool

Of netwerktraceringslogboeken moeten worden uitgevoerd naar de logboekregistratie. De standaardwaarde is False.

auth_timeout
float

De tijd in seconden om te wachten tot een token is geautoriseerd door de service. De standaardwaarde is 60 seconden. Als dit is ingesteld op 0, wordt er geen time-out afgedwongen door de client.

user_agent
str

Indien opgegeven, wordt deze toegevoegd vóór de tekenreeks van de gebruikersagent.

retry_total
int

Het totale aantal pogingen om een mislukte bewerking opnieuw uit te voeren wanneer er een fout optreedt. De standaardwaarde is 3.

retry_backoff_factor
float

Een uitstelfactor die moet worden toegepast tussen pogingen na de tweede poging (de meeste fouten worden onmiddellijk opgelost door een tweede poging zonder vertraging). In de vaste modus wordt beleid voor opnieuw proberen altijd in de slaapstand gezet voor {backoff factor}. In de exponentiële modus wordt het beleid voor opnieuw proberen in de slaapstand gezet voor: {uitstelfactor} * (2 ** ({aantal totale nieuwe pogingen} - 1)) seconden. Als de backoff_factor 0,1 is, wordt de nieuwe poging in de slaapstand gezet voor [0.0s, 0.2s, 0.4s, ...] tussen nieuwe pogingen. De standaardwaarde is 0,8.

retry_backoff_max
float

De maximale verloftijd. De standaardwaarde is 120 seconden (2 minuten).

retry_mode
str

Het vertragingsgedrag tussen nieuwe pogingen. Ondersteunde waarden zijn 'vast' of 'exponentieel', waarbij de standaardwaarde 'exponentieel' is.

idle_timeout
float

Time-out, in seconden, waarna deze client de onderliggende verbinding sluit als er geen activiteit is. Standaard is de waarde Geen, wat betekent dat de client niet wordt afgesloten vanwege inactiviteit, tenzij gestart door de service.

transport_type
TransportType

Het type transportprotocol dat wordt gebruikt voor de communicatie met de Event Hubs-service. De standaardwaarde is TransportType.Amqp . In dat geval wordt poort 5671 gebruikt. Als poort 5671 niet beschikbaar/geblokkeerd is in de netwerkomgeving, kan TransportType.AmqpOverWebsocket worden gebruikt in plaats daarvan, waarbij poort 443 wordt gebruikt voor communicatie.

http_proxy
Dict

HTTP-proxyinstellingen. Dit moet een woordenlijst zijn met de volgende sleutels: 'proxy_hostname' (str-waarde) en 'proxy_port' (int-waarde). Daarnaast kunnen de volgende sleutels ook aanwezig zijn: 'gebruikersnaam', 'wachtwoord'.

custom_endpoint_address
Optional[str]

Het aangepaste eindpuntadres dat moet worden gebruikt voor het tot stand brengen van een verbinding met de Event Hubs-service, zodat netwerkaanvragen kunnen worden gerouteerd via toepassingsgateways of andere paden die nodig zijn voor de hostomgeving. De standaardwaarde is Geen. De indeling ziet er als volgt uit: 'sb://< custom_endpoint_hostname>:<custom_endpoint_port>'. Als poort niet is opgegeven in de custom_endpoint_address, wordt standaard poort 443 gebruikt.

connection_verify
Optional[str]

Pad naar het aangepaste CA_BUNDLE-bestand van het SSL-certificaat dat wordt gebruikt om de identiteit van het verbindingseindpunt te verifiëren. De standaardwaarde is Geen, in welk geval certifi.where() wordt gebruikt.

uamqp_transport
bool

Of u de uamqp-bibliotheek als het onderliggende transport wilt gebruiken. De standaardwaarde is False en de Pure Python AMQP-bibliotheek wordt gebruikt als het onderliggende transport.

socket_timeout
float

De tijd in seconden dat de onderliggende socket op de verbinding moet wachten bij het verzenden en ontvangen van gegevens voordat er een time-out optreedt. De standaardwaarde is 0,2 voor TransportType.Amqp en 1 voor TransportType.AmqpOverWebsocket. Als er EventHubsConnectionError-fouten optreden vanwege een time-out van schrijfbewerkingen, moet mogelijk een grotere waarde dan de standaardwaarde worden doorgegeven. Dit is voor geavanceerde gebruiksscenario's en normaal gesproken moet de standaardwaarde voldoende zijn.

Voorbeelden

Maak een nieuw exemplaar van de EventHubProducerClient.


   import os
   from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   producer = EventHubProducerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,  # EventHub name should be specified if it doesn't show up in connection string.
       credential=credential
   )

Methoden

close

Sluit de onderliggende AMQP-verbinding en koppelingen van de Producer-client.

create_batch

Maak een EventDataBatch-object met de maximale grootte van alle inhoud die wordt beperkt door max_size_in_bytes.

De max_size_in_bytes mag niet groter zijn dan de maximaal toegestane berichtgrootte die door de service is gedefinieerd.

flush

Alleen buffermodus. Gebeurtenissen in de buffer leegmaken die onmiddellijk moeten worden verzonden als de client in de buffermodus werkt.

from_connection_string

Maak een EventHubProducerClient op basis van een verbindingsreeks.

get_buffered_event_count

Het aantal gebeurtenissen dat wordt gebufferd en dat wacht om te worden gepubliceerd voor een bepaalde partitie. Retourneert Geen in de niet-gebufferde modus. OPMERKING: de gebeurtenisbuffer wordt verwerkt in een achtergrondthread. Het aantal gebeurtenissen in de buffer dat door deze API wordt gerapporteerd, moet daarom alleen worden beschouwd als een benadering en wordt alleen aanbevolen voor gebruik bij foutopsporing. Voor een partitie-id waarvoor geen gebeurtenissen zijn gebufferd, wordt 0 geretourneerd, ongeacht of die partitie-id daadwerkelijk bestaat in de Event Hub.

get_eventhub_properties

Eigenschappen van de Event Hub ophalen.

Sleutels in de geretourneerde woordenlijst zijn onder andere:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Partitie-id's van de Event Hub ophalen.

get_partition_properties

Eigenschappen van de opgegeven partitie ophalen.

Sleutels in de eigenschappenwoordenlijst zijn onder andere:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Hiermee wordt een batch gebeurtenisgegevens verzonden. Standaard wordt de methode geblokkeerd totdat de bevestiging is ontvangen of een time-out voor de bewerking optreedt. Als de EventHubProducerClient is geconfigureerd om te worden uitgevoerd in de gebufferde modus, probeert de methode de gebeurtenissen binnen de opgegeven tijd in de buffer te zetten en te retourneren. De producent voert automatische verzending op de achtergrond uit in de buffermodus.

Als buffered_mode False is, is on_error callback optioneel en worden fouten als volgt verwerkt:

  • Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

    vervolgens worden foutinformatie doorgegeven aan de on_error callback, die vervolgens wordt aangeroepen.

  • Als een on_error callback niet wordt doorgegeven tijdens het instantiëeren van de client,

    dan wordt de fout standaard gegenereerd.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt:

  • Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er direct een fout gegenereerd.

  • Als gebeurtenissen niet kunnen worden verzonden nadat de enquête is voltooid, wordt de on_error callback aangeroepen.

In de buffermodus blijft het verzenden van een batch intact en wordt deze verzonden als één eenheid. De batch wordt niet opnieuw gerangschikt. Dit kan leiden tot inefficiëntie van het verzenden van gebeurtenissen.

Als u een eindige lijst met EventData of AmqpAnnotatedMessage verzendt en u weet dat deze binnen de framegroottelimiet van de Event Hub valt, kunt u deze verzenden met een send_batch-aanroep . Gebruik anders create_batch om EventDataBatch te maken en voeg EventData of AmqpAnnotatedMessage één voor één toe aan de batch totdat de groottelimiet is bereikt. Roep vervolgens deze methode aan om de batch te verzenden.

send_event

Hiermee worden gebeurtenisgegevens verzonden. Standaard wordt de methode geblokkeerd totdat de bevestiging is ontvangen of een time-out voor de bewerking optreedt. Als de EventHubProducerClient is geconfigureerd om te worden uitgevoerd in de gebufferde modus, probeert de methode de gebeurtenissen binnen de opgegeven tijd in de buffer te zetten en te retourneren. De producent voert automatische verzending op de achtergrond uit in de buffermodus.

Als buffered_mode False is, is on_error callback optioneel en worden fouten als volgt verwerkt: * Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt: * Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er rechtstreeks een fout gegenereerd.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Sluit de onderliggende AMQP-verbinding en koppelingen van de Producer-client.

close(*, flush: bool = True, **kwargs: Any) -> None

Parameters

flush
bool

Alleen buffermodus. Als deze optie is ingesteld op True, worden gebeurtenissen in de buffer onmiddellijk verzonden. De standaardwaarde is True.

timeout
float of None

Alleen buffermodus. Time-out om de producent te sluiten. De standaardwaarde is Geen, wat betekent dat er geen time-out is.

Retourtype

Uitzonderingen

Als er een fout is opgetreden bij het leegmaken van de buffer als leegmaken is ingesteld op Waar of het sluiten van de onderliggende AMQP-verbindingen in de gebufferde modus.

Voorbeelden

Sluit de client af.


   import os
   from azure.eventhub import EventHubProducerClient, EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = producer.create_batch()

       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # EventDataBatch object reaches max_size.
               # New EventDataBatch object can be created here to send more data
               break

       producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       producer.close()

create_batch

Maak een EventDataBatch-object met de maximale grootte van alle inhoud die wordt beperkt door max_size_in_bytes.

De max_size_in_bytes mag niet groter zijn dan de maximaal toegestane berichtgrootte die door de service is gedefinieerd.

create_batch(**kwargs: Any) -> EventDataBatch

Retourtype

Uitzonderingen

Als er een fout is opgetreden bij het leegmaken van de buffer als leegmaken is ingesteld op Waar of het sluiten van de onderliggende AMQP-verbindingen in de gebufferde modus.

Voorbeelden

EventDataBatch-object maken binnen een beperkte grootte


       event_data_batch = producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Alleen buffermodus. Gebeurtenissen in de buffer leegmaken die onmiddellijk moeten worden verzonden als de client in de buffermodus werkt.

flush(**kwargs: Any) -> None

Parameters

timeout
Optional[float]

Time-out voor het leegmaken van de gebufferde gebeurtenissen. De standaardwaarde is Geen, wat betekent dat er geen time-out is.

Retourtype

Uitzonderingen

Als de producent de buffer niet kan leegmaken binnen de opgegeven time-out in de gebufferde modus.

from_connection_string

Maak een EventHubProducerClient op basis van een verbindingsreeks.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: Literal[False] = False, **kwargs: Any) -> EventHubProducerClient

Parameters

conn_str
str
Vereist

De verbindingsreeks van een Event Hub.

eventhub_name
str

Het pad van de specifieke Event Hub waarmee de client verbinding moet maken.

buffered_mode
bool

Als Waar is, verzamelt de producerclient gebeurtenissen in een buffer, efficiënt batchgewijs en publiceert deze vervolgens. De standaardwaarde is False.

buffer_concurrency
<xref:ThreadPoolExecutor> of int of None

De ThreadPoolExecutor die moet worden gebruikt voor het publiceren van gebeurtenissen of het aantal werkrollen voor de ThreadPoolExecutor. De standaardwaarde is Geen en er wordt een ThreadPoolExecutor met het standaardaantal werkrollen gemaakt per https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

De callback die moet worden aangeroepen zodra een batch is gepubliceerd. De callback heeft twee parameters:

  • gebeurtenissen: de lijst met gebeurtenissen die zijn gepubliceerd

  • partition_id: de partitie-id waarnaar de gebeurtenissen in de lijst zijn gepubliceerd.

De callback-functie moet als volgt worden gedefinieerd: on_success(gebeurtenissen, partition_id). Vereist wanneer buffered_mode Waar is, maar optioneel als buffered_mode Onwaar is.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

De callback die moet worden aangeroepen zodra een batch niet kan worden gepubliceerd. Vereist wanneer in buffered_mode Waar is en optioneel als buffered_mode Onwaar is. De callback-functie moet als volgt worden gedefinieerd: on_error(gebeurtenissen, partition_id, fout), waarbij:

  • gebeurtenissen: de lijst met gebeurtenissen die niet zijn gepubliceerd,

  • partition_id: de partitie-id waarnaar de gebeurtenissen in de lijst zijn gepubliceerd en

  • fout: de uitzondering met betrekking tot de verzendfout.

Als buffered_mode False is, is on_error callback optioneel en worden fouten als volgt verwerkt:

  • Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

    vervolgens worden foutinformatie doorgegeven aan de on_error callback, die vervolgens wordt aangeroepen.

  • Als een on_error callback niet wordt doorgegeven tijdens het instantiëeren van de client,

    dan wordt de fout standaard gegenereerd.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt:

  • Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er direct een fout gegenereerd.

  • Als gebeurtenissen niet kunnen worden verzonden nadat de enquête is voltooid, wordt de on_error callback aangeroepen.

max_buffer_length
int

Alleen buffermodus. Het totale aantal gebeurtenissen per partitie dat kan worden gebufferd voordat een flush wordt geactiveerd. De standaardwaarde is 1500 in de buffermodus.

max_wait_time
Optional[float]

Alleen buffermodus. De hoeveelheid tijd die moet worden gewacht totdat een batch is gemaakt met gebeurtenissen in de buffer voordat deze wordt gepubliceerd. De standaardwaarde is 1 in de buffermodus.

logging_enable
bool

Of netwerktraceringslogboeken moeten worden uitgevoerd naar de logboekregistratie. De standaardwaarde is False.

http_proxy
Dict

HTTP-proxyinstellingen. Dit moet een woordenlijst zijn met de volgende sleutels: 'proxy_hostname' (str-waarde) en 'proxy_port' (int-waarde). Daarnaast kunnen de volgende sleutels ook aanwezig zijn: 'gebruikersnaam', 'wachtwoord'.

auth_timeout
float

De tijd in seconden om te wachten tot een token is geautoriseerd door de service. De standaardwaarde is 60 seconden. Als dit is ingesteld op 0, wordt er geen time-out afgedwongen door de client.

user_agent
str

Indien opgegeven, wordt deze toegevoegd vóór de tekenreeks van de gebruikersagent.

retry_total
int

Het totale aantal pogingen om een mislukte bewerking opnieuw uit te voeren wanneer er een fout optreedt. De standaardwaarde is 3.

retry_backoff_factor
float

Een uitstelfactor die moet worden toegepast tussen pogingen na de tweede poging (de meeste fouten worden onmiddellijk opgelost door een tweede poging zonder vertraging). In de vaste modus wordt beleid voor opnieuw proberen altijd in de slaapstand gezet voor {backoff factor}. In de exponentiële modus wordt het beleid voor opnieuw proberen in de slaapstand gezet voor: {uitstelfactor} * (2 ** ({aantal totale nieuwe pogingen} - 1)) seconden. Als de backoff_factor 0,1 is, wordt de nieuwe poging in de slaapstand gezet voor [0.0s, 0.2s, 0.4s, ...] tussen nieuwe pogingen. De standaardwaarde is 0,8.

retry_backoff_max
float

De maximale verloftijd. De standaardwaarde is 120 seconden (2 minuten).

retry_mode
str

Het vertragingsgedrag tussen nieuwe pogingen. Ondersteunde waarden zijn 'vast' of 'exponentieel', waarbij de standaardwaarde 'exponentieel' is.

idle_timeout
float

Time-out, in seconden, waarna deze client de onderliggende verbinding sluit als er geen activiteit is. Standaard is de waarde Geen, wat betekent dat de client niet wordt afgesloten vanwege inactiviteit, tenzij gestart door de service.

transport_type
TransportType

Het type transportprotocol dat wordt gebruikt voor de communicatie met de Event Hubs-service. De standaardwaarde is TransportType.Amqp . In dat geval wordt poort 5671 gebruikt. Als poort 5671 niet beschikbaar/geblokkeerd is in de netwerkomgeving, kan TransportType.AmqpOverWebsocket worden gebruikt in plaats daarvan, waarbij poort 443 wordt gebruikt voor communicatie.

http_proxy

HTTP-proxyinstellingen. Dit moet een woordenlijst zijn met de volgende sleutels: 'proxy_hostname' (str-waarde) en 'proxy_port' (int-waarde). Daarnaast kunnen de volgende sleutels ook aanwezig zijn: 'gebruikersnaam', 'wachtwoord'.

custom_endpoint_address
Optional[str]

Het aangepaste eindpuntadres dat moet worden gebruikt voor het tot stand brengen van een verbinding met de Event Hubs-service, zodat netwerkaanvragen kunnen worden gerouteerd via toepassingsgateways of andere paden die nodig zijn voor de hostomgeving. De standaardwaarde is Geen. De indeling ziet er als volgt uit: 'sb://< custom_endpoint_hostname>:<custom_endpoint_port>'. Als poort niet is opgegeven in de custom_endpoint_address, wordt standaard poort 443 gebruikt.

connection_verify
Optional[str]

Pad naar het aangepaste CA_BUNDLE-bestand van het SSL-certificaat dat wordt gebruikt om de identiteit van het verbindingseindpunt te verifiëren. De standaardwaarde is Geen, in welk geval certifi.where() wordt gebruikt.

uamqp_transport
bool

Of u de uamqp-bibliotheek als het onderliggende transport wilt gebruiken. De standaardwaarde is False en de Pure Python AMQP-bibliotheek wordt gebruikt als het onderliggende transport.

Retourtype

Uitzonderingen

Als er een fout is opgetreden bij het leegmaken van de buffer als leegmaken is ingesteld op Waar of het sluiten van de onderliggende AMQP-verbindingen in de gebufferde modus.

Voorbeelden

Maak een nieuw exemplaar van de EventHubProducerClient vanuit verbindingsreeks.


   import os
   from azure.eventhub import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Het aantal gebeurtenissen dat wordt gebufferd en dat wacht om te worden gepubliceerd voor een bepaalde partitie. Retourneert Geen in de niet-gebufferde modus. OPMERKING: de gebeurtenisbuffer wordt verwerkt in een achtergrondthread. Het aantal gebeurtenissen in de buffer dat door deze API wordt gerapporteerd, moet daarom alleen worden beschouwd als een benadering en wordt alleen aanbevolen voor gebruik bij foutopsporing. Voor een partitie-id waarvoor geen gebeurtenissen zijn gebufferd, wordt 0 geretourneerd, ongeacht of die partitie-id daadwerkelijk bestaat in de Event Hub.

get_buffered_event_count(partition_id: str) -> int | None

Parameters

partition_id
str
Vereist

De doelpartitie-id.

Retourtype

int,

Uitzonderingen

Als er een fout is opgetreden bij het leegmaken van de buffer als leegmaken is ingesteld op Waar of het sluiten van de onderliggende AMQP-verbindingen in de gebufferde modus.

get_eventhub_properties

Eigenschappen van de Event Hub ophalen.

Sleutels in de geretourneerde woordenlijst zijn onder andere:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

Retouren

Een woordenlijst met eventhub-eigenschappen.

Retourtype

Uitzonderingen

get_partition_ids

Partitie-id's van de Event Hub ophalen.

get_partition_ids() -> List[str]

Retouren

Een lijst met partitie-id's.

Retourtype

Uitzonderingen

get_partition_properties

Eigenschappen van de opgegeven partitie ophalen.

Sleutels in de eigenschappenwoordenlijst zijn onder andere:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Parameters

partition_id
str
Vereist

De doelpartitie-id.

Retouren

Een woordenlijst met partitie-eigenschappen.

Retourtype

Uitzonderingen

send_batch

Hiermee wordt een batch gebeurtenisgegevens verzonden. Standaard wordt de methode geblokkeerd totdat de bevestiging is ontvangen of een time-out voor de bewerking optreedt. Als de EventHubProducerClient is geconfigureerd om te worden uitgevoerd in de gebufferde modus, probeert de methode de gebeurtenissen binnen de opgegeven tijd in de buffer te zetten en te retourneren. De producent voert automatische verzending op de achtergrond uit in de buffermodus.

Als buffered_mode False is, is on_error callback optioneel en worden fouten als volgt verwerkt:

  • Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

    vervolgens worden foutinformatie doorgegeven aan de on_error callback, die vervolgens wordt aangeroepen.

  • Als een on_error callback niet wordt doorgegeven tijdens het instantiëeren van de client,

    dan wordt de fout standaard gegenereerd.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt:

  • Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er direct een fout gegenereerd.

  • Als gebeurtenissen niet kunnen worden verzonden nadat de enquête is voltooid, wordt de on_error callback aangeroepen.

In de buffermodus blijft het verzenden van een batch intact en wordt deze verzonden als één eenheid. De batch wordt niet opnieuw gerangschikt. Dit kan leiden tot inefficiëntie van het verzenden van gebeurtenissen.

Als u een eindige lijst met EventData of AmqpAnnotatedMessage verzendt en u weet dat deze binnen de framegroottelimiet van de Event Hub valt, kunt u deze verzenden met een send_batch-aanroep . Gebruik anders create_batch om EventDataBatch te maken en voeg EventData of AmqpAnnotatedMessage één voor één toe aan de batch totdat de groottelimiet is bereikt. Roep vervolgens deze methode aan om de batch te verzenden.

send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parameters

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Vereist

Het EventDataBatch-object dat moet worden verzonden of een lijst met EventData die in een batch moet worden verzonden. Alle EventData of AmqpAnnotatedMessage in de lijst of EventDataBatch komen op dezelfde partitie terecht.

timeout
float

De maximale wachttijd voor het verzenden van de gebeurtenisgegevens in de niet-gebufferde modus of de maximale wachttijd voor het in de buffer plaatsen van de gebeurtenisgegevens in de buffermodus. In de niet-gebufferde modus wordt de standaardwachttijd gebruikt die is opgegeven toen de producent werd gemaakt. In de buffermodus is de standaardwachttijd Geen.

partition_id
str

De specifieke partitie-id die moet worden verzonden. De standaardwaarde is Geen. In dat geval wijst de service toe aan alle partities met behulp van round robin. Er wordt een TypeError gegenereerd als partition_id is opgegeven en event_data_batch een EventDataBatch is omdat EventDataBatch zelf partition_id heeft.

partition_key
str

Met de opgegeven partition_key worden gebeurtenisgegevens verzonden naar een bepaalde partitie van de Event Hub die door de service is bepaald. Er wordt een TypeError gegenereerd als partition_key is opgegeven en event_data_batch een EventDataBatch is omdat EventDataBatch zelf partition_key heeft. Als zowel partition_id als partition_key worden opgegeven, heeft de partition_id voorrang. WAARSCHUWING: het instellen van partition_key van een niet-tekenreekswaarde voor de gebeurtenissen die moeten worden verzonden, wordt afgeraden omdat de partition_key wordt genegeerd door de Event Hub-service en gebeurtenissen worden toegewezen aan alle partities met behulp van round robin. Bovendien zijn er SDK's voor het verbruik van gebeurtenissen die verwachten dat partition_key alleen van het tekenreekstype zijn, ze de niet-tekenreekswaarde mogelijk niet parseren.

Retourtype

Uitzonderingen

Als de waarde die is opgegeven door de time-outparameter, is verstreken voordat de gebeurtenis kan worden verzonden in de modus niet-buffer, of als de gebeurtenissen niet in de buffermodus kunnen worden geplaatst.

Voorbeelden

Gebeurtenisgegevens verzenden


       with producer:
           event_data_batch = producer.create_batch()

           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # EventDataBatch object reaches max_size.
                   # New EventDataBatch object can be created here to send more data
                   break

           producer.send_batch(event_data_batch)

send_event

Hiermee worden gebeurtenisgegevens verzonden. Standaard wordt de methode geblokkeerd totdat de bevestiging is ontvangen of een time-out voor de bewerking optreedt. Als de EventHubProducerClient is geconfigureerd om te worden uitgevoerd in de gebufferde modus, probeert de methode de gebeurtenissen binnen de opgegeven tijd in de buffer te zetten en te retourneren. De producent voert automatische verzending op de achtergrond uit in de buffermodus.

Als buffered_mode False is, is on_error callback optioneel en worden fouten als volgt verwerkt: * Als een on_error callback wordt doorgegeven tijdens de instantiëring van de producerclient,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Als buffered_mode Waar is, is on_error callback vereist en worden fouten als volgt verwerkt: * Als gebeurtenissen niet binnen de opgegeven time-out kunnen worden weergegeven, wordt er rechtstreeks een fout gegenereerd.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parameters

event_data
Union[EventData, AmqpAnnotatedMessage]
Vereist

Het EventData-object dat moet worden verzonden.

timeout
float

De maximale wachttijd voor het verzenden van de gebeurtenisgegevens in de niet-gebufferde modus of de maximale wachttijd voor het in de buffer plaatsen van de gebeurtenisgegevens in de buffermodus. In de niet-gebufferde modus wordt de standaardwachttijd gebruikt die is opgegeven toen de producent werd gemaakt. In de buffermodus is de standaardwachttijd Geen.

partition_id
str

De specifieke partitie-id die moet worden verzonden. De standaardwaarde is Geen. In dat geval wijst de service toe aan alle partities met behulp van round robin. Er wordt een TypeError gegenereerd als partition_id is opgegeven en event_data_batch een EventDataBatch is omdat EventDataBatch zelf partition_id heeft.

partition_key
str

Met de opgegeven partition_key worden gebeurtenisgegevens verzonden naar een bepaalde partitie van de Event Hub die door de service is bepaald. Er wordt een TypeError gegenereerd als partition_key is opgegeven en event_data_batch een EventDataBatch is omdat EventDataBatch zelf partition_key heeft. Als zowel partition_id als partition_key worden opgegeven, heeft de partition_id voorrang. WAARSCHUWING: het instellen van partition_key van een niet-tekenreekswaarde voor de gebeurtenissen die moeten worden verzonden, wordt afgeraden omdat de partition_key wordt genegeerd door de Event Hub-service en gebeurtenissen worden toegewezen aan alle partities met behulp van round robin. Bovendien zijn er SDK's voor het verbruik van gebeurtenissen die verwachten dat partition_key alleen van het tekenreekstype zijn, ze de niet-tekenreekswaarde mogelijk niet parseren.

Retourtype

Uitzonderingen

Als de waarde die is opgegeven door de time-outparameter, is verstreken voordat de gebeurtenis kan worden verzonden in de niet-buffermodus of als de gebeurtenissen in de gebufferde modus kunnen worden geplaatst.

Kenmerken

total_buffered_event_count

Het totale aantal gebeurtenissen dat momenteel is gebufferd en wacht om te worden gepubliceerd, voor alle partities. Retourneert Geen in de niet-gebufferde modus. OPMERKING: de gebeurtenisbuffer wordt verwerkt in een achtergrondthread. Het aantal gebeurtenissen in de buffer dat door deze API wordt gerapporteerd, moet daarom alleen worden beschouwd als een benadering en wordt alleen aanbevolen voor gebruik bij foutopsporing.

Retourtype

int,