EventHubProducerClient Klasse

Die EventHubProducerClient-Klasse definiert eine allgemeine Schnittstelle zum Senden von Ereignissen an den Azure Event Hubs-Dienst.

Vererbung
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

Konstruktor

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

Parameter

fully_qualified_namespace
str
Erforderlich

Der vollqualifizierte Hostname für den Event Hubs-Namespace. Dies ist wahrscheinlich ähnlich wie .servicebus.windows.net

eventhub_name
str
Erforderlich

Der Pfad des spezifischen Event Hubs, mit dem der Client verbunden werden soll.

credential
AsyncTokenCredential oder AzureSasCredential oder AzureNamedKeyCredential
Erforderlich

Das für die Authentifizierung verwendete Anmeldeinformationsobjekt, das eine bestimmte Schnittstelle zum Abrufen von Token implementiert. Es akzeptiert EventHubSharedKeyCredential, oder Anmeldeinformationsobjekte, die von der azure-identity-Bibliothek generiert werden, und Objekte, die die *get_token(self, scopes) -Methode implementieren.

buffered_mode
bool

True gibt an, dass der Producerclient Ereignisse in einem Puffer sammelt, effizient batchet und dann veröffentlicht. Der Standardwert lautet False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Der Rückruf, der aufgerufen werden soll, nachdem ein Batch erfolgreich veröffentlicht wurde. Der Rückruf verwendet zwei Parameter:

  • events: Die Liste der Ereignisse, die erfolgreich veröffentlicht wurden

  • partition_id: Die Partitions-ID, in der die Ereignisse in der Liste veröffentlicht wurden.

Die Rückruffunktion sollte wie folgt definiert werden: on_success(events, partition_id). Erforderlich, wenn buffered_mode auf True festgelegt ist, wenn buffered_mode auf False festgelegt ist.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Der Rückruf, der aufgerufen werden soll, nachdem ein Batch nicht veröffentlicht werden konnte. Erforderlich, wenn in buffered_mode true ist, während optional, wenn buffered_mode false ist. Die Rückruffunktion sollte wie folgt definiert werden: on_error(Ereignisse, partition_id, Fehler), wobei:

  • events: Die Liste der Ereignisse, die nicht veröffentlicht werden konnten,

  • partition_id: Die Partitions-ID, in der die Ereignisse in der Liste veröffentlicht werden sollen, und

  • error: Die Ausnahme im Zusammenhang mit dem Sendefehler.

Wenn buffered_mode false ist, ist on_error Rückruf optional, und Fehler werden wie folgt behandelt:

  • Wenn während der Instanziierung des Producerclients ein on_error Rückruf übergeben wird,

    Dann werden Fehlerinformationen an den on_error Rückruf übergeben, der dann aufgerufen wird.

  • Wenn während der Clientinstanziierung kein on_error Rückruf übergeben wird,

    dann wird der Fehler standardmäßig ausgelöst.

Wenn buffered_mode true ist, ist on_error Rückruf erforderlich, und Fehler werden wie folgt behandelt:

  • Wenn Ereignisse nicht innerhalb des angegebenen Timeouts in die Warteschlange eingereissen werden, wird direkt ein Fehler ausgelöst.

  • Wenn Ereignisse nicht gesendet werden können, nachdem sie erfolgreich in die Warteschlange gestellt wurden, wird der on_error Rückruf aufgerufen.

max_buffer_length
int

Nur gepufferter Modus. Die Gesamtanzahl der Ereignisse pro Partition, die gepuffert werden können, bevor eine Leerung ausgelöst wird. Der Standardwert ist 1500 im gepufferten Modus.

max_wait_time
Optional[float]

Nur gepufferter Modus. Die Zeitspanne, in der vor der Veröffentlichung auf das Erstellen eines Batches mit Ereignissen im Puffer gewartet wird. Der Standardwert ist 1 im gepufferten Modus.

logging_enable
bool

Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.

auth_timeout
float

Die Zeit in Sekunden, um auf die Autorisierung eines Tokens durch den Dienst zu warten. Der Standardwert beträgt 60 Sekunden. Wenn dieser Wert auf 0 festgelegt ist, wird kein Timeout vom Client erzwungen.

user_agent
str

Falls angegeben, wird dies vor der Zeichenfolge des Benutzer-Agents hinzugefügt.

retry_total
int

Die Gesamtanzahl der Versuche, einen fehlgeschlagenen Vorgang zu wiederholen, wenn ein Fehler auftritt. Der Standardwert ist 3.

retry_backoff_factor
float

Ein Backofffaktor, der zwischen den Versuchen nach dem zweiten Versuch angewendet werden soll (die meisten Fehler werden sofort durch einen zweiten Versuch ohne Verzögerung behoben). Im festen Modus wird die Wiederholungsrichtlinie für {Backoff-Faktor} immer in den Ruhezustand versetzt. Im Modus "exponentiell" wird die Wiederholungsrichtlinie für: {backoff factor} * (2 ** ({number of total retries} - 1)) sekunden in den Ruhezustand versetzt. Wenn der backoff_factor 0,1 ist, wird der Wiederholungsversuch für [0.0s, 0.2s, 0.4s, ...] zwischen Wiederholungen in den Ruhezustand versetzt. Der Standardwert ist 0,8.

retry_backoff_max
float

Die maximale Backoffzeit. Der Standardwert ist 120 Sekunden (2 Minuten).

retry_mode
str

Das Verzögerungsverhalten zwischen Wiederholungsversuchen. Unterstützte Werte sind "fixed" oder "exponential", wobei der Standardwert "exponentiell" ist.

idle_timeout
float

Timeout in Sekunden, nach dem dieser Client die zugrunde liegende Verbindung schließt, wenn keine Aktivität vorhanden ist. Standardmäßig ist der Wert None, was bedeutet, dass der Client aufgrund von Inaktivität nicht heruntergefahren wird, es sei denn, der Dienst wird initiiert.

transport_type
TransportType

Der Typ des Transportprotokolls, das für die Kommunikation mit dem Event Hubs-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp , in diesem Fall wird Port 5671 verwendet. Wenn der Port 5671 in der Netzwerkumgebung nicht verfügbar/blockiert ist, kann stattdessen TransportType.AmqpOverWebsocket verwendet werden, wobei Port 443 für die Kommunikation verwendet wird.

http_proxy
dict

HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (str-Wert) und "proxy_port" (int-Wert). Darüber hinaus können auch die folgenden Schlüssel vorhanden sein: "Benutzername", "Kennwort".

custom_endpoint_address
Optional[str]

Die benutzerdefinierte Endpunktadresse, die zum Herstellen einer Verbindung mit dem Event Hubs-Dienst verwendet werden soll, sodass Netzwerkanforderungen über alle Anwendungsgateways oder anderen Pfade weitergeleitet werden können, die für die Hostumgebung erforderlich sind. Der Standardwert ist None. Das Format wäre wie "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Wenn port nicht im custom_endpoint_address angegeben ist, wird standardmäßig Port 443 verwendet.

connection_verify
Optional[str]

Pfad zur benutzerdefinierten CA_BUNDLE Datei des SSL-Zertifikats, das zum Authentifizieren der Identität des Verbindungsendpunkts verwendet wird. Der Standardwert ist None. In diesem Fall wird certifi.where() verwendet.

uamqp_transport
bool

Gibt an, ob die uamqp-Bibliothek als zugrunde liegender Transport verwendet werden soll. Der Standardwert ist False, und die Pure Python AMQP-Bibliothek wird als zugrunde liegender Transport verwendet.

socket_timeout
float

Die Zeit in Sekunden, die der zugrunde liegende Socket für die Verbindung beim Senden und Empfangen von Daten warten soll, bevor ein Timeout auftritt. Der Standardwert ist 0,2 für TransportType.Amqp und 1 für TransportType.AmqpOverWebsocket. Wenn EventHubsConnectionError-Fehler aufgrund eines Schreibzeitlimits auftreten, muss möglicherweise ein größer als der Standardwert übergeben werden. Dies ist für erweiterte Verwendungsszenarien vorgesehen, und normalerweise sollte der Standardwert ausreichend sein.

Beispiele

Erstellen Sie eine neue instance von EventHubProducerClient.


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Methoden

close

Schließen Sie die zugrundeliegende AMQP-Verbindung und die Verknüpfungen des Producerclients.

create_batch

Erstellen Sie ein EventDataBatch-Objekt mit der maximalen Größe aller Inhalte, die durch max_size_in_bytes eingeschränkt werden.

Die max_size_in_bytes darf nicht größer als die vom Dienst definierte maximal zulässige Nachrichtengröße sein.

flush

Nur gepufferter Modus. Leerung von Ereignissen im Puffer, die sofort gesendet werden sollen, wenn der Client im gepufferten Modus arbeitet.

from_connection_string

Erstellen Sie einen EventHubProducerClient aus einer Verbindungszeichenfolge.

get_buffered_event_count

Die Anzahl der Ereignisse, die gepuffert werden und auf die Veröffentlichung für eine bestimmte Partition warten. Gibt Im nicht gepufferten Modus Keine zurück. HINWEIS: Der Ereignispuffer wird in einer Hintergrund-Coroutine verarbeitet. Daher sollte die Anzahl der Ereignisse im Puffer, die von dieser API gemeldet werden, nur als Näherung betrachtet werden und nur für die Verwendung beim Debuggen empfohlen werden. Für eine Partitions-ID, für die keine Ereignisse gepuffert sind, wird 0 zurückgegeben, unabhängig davon, ob diese Partitions-ID tatsächlich innerhalb des Event Hubs vorhanden ist.

get_eventhub_properties

Ruft Eigenschaften des Event Hubs ab.

Zu den Schlüsseln im zurückgegebenen Wörterbuch gehören:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Ruft Partitions-IDs des Event Hubs ab.

get_partition_properties

Ruft Eigenschaften der angegebenen Partition ab.

Zu den Schlüsseln im Eigenschaftenverzeichnis gehören:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Sendet einen Batch mit Ereignisdaten. Standardmäßig wird die -Methode blockiert, bis die Bestätigung empfangen wird oder das Vorgangstimeout auftritt. Wenn eventHubProducerClient für die Ausführung im gepufferten Modus konfiguriert ist, quediert die Methode die Ereignisse in die Warteschlange in den lokalen Puffer und gibt zurück. Der Producer sendet automatisch im Hintergrund.

Wenn buffered_mode false ist, ist on_error Rückruf optional, und Fehler werden wie folgt behandelt:

  • Wenn während der Instanziierung des Producerclients ein on_error Rückruf übergeben wird,

    Dann werden Fehlerinformationen an den on_error Rückruf übergeben, der dann aufgerufen wird.

  • Wenn während der Clientinstanziierung kein on_error Rückruf übergeben wird,

    dann wird der Fehler standardmäßig ausgelöst.

Wenn buffered_mode true ist, ist on_error Rückruf erforderlich, und Fehler werden wie folgt behandelt:

  • Wenn Ereignisse nicht innerhalb des angegebenen Timeouts in die Warteschlange eingereissen werden, wird direkt ein Fehler ausgelöst.

  • Wenn Ereignisse nicht gesendet werden können, nachdem sie erfolgreich in die Warteschlange gestellt wurden, wird der on_error Rückruf aufgerufen.

Im gepufferten Modus bleibt das Senden eines Batches intakt und wird als einzelne Einheit gesendet. Der Batch wird nicht neu angeordnet. Dies kann zu Ineffizienz beim Senden von Ereignissen führen.

Wenn Sie eine endliche Liste von EventData oder AmqpAnnotatedMessage senden und wissen, dass diese innerhalb der Event Hub-Framegröße liegt, können Sie sie mit einem send_batch-Aufruf senden. Verwenden Sie create_batch andernfalls , um EventDataBatch zu erstellen und entweder EventData oder AmqpAnnotatedMessage dem Batch nacheinander bis zur Größenbeschränkung hinzuzufügen, und rufen Sie dann diese Methode auf, um den Batch zu senden.

send_event

Sendet Ereignisdaten. Standardmäßig wird die -Methode blockiert, bis die Bestätigung empfangen wird oder das Vorgangstimeout auftritt. Wenn der EventHubProducerClient für die Ausführung im gepufferten Modus konfiguriert ist, wird das Ereignis von der Methode in die Warteschlange in den lokalen Puffer gestellt und zurückgegeben. Der Producer führt die automatische Batchverarbeitung und das Senden im Hintergrund durch.

Wenn buffered_mode false ist, ist on_error Rückruf optional, und Fehler werden wie folgt behandelt: * Wenn während der Instanziierung des Producerclients ein on_error Rückruf übergeben wird,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Wenn buffered_mode true ist, ist on_error Rückruf erforderlich, und Fehler werden wie folgt behandelt: * Wenn Ereignisse nicht innerhalb des angegebenen Timeouts in die Warteschlange eingereissen werden, wird direkt ein Fehler ausgelöst.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Schließen Sie die zugrundeliegende AMQP-Verbindung und die Verknüpfungen des Producerclients.

async close(*, flush: bool = True, **kwargs: Any) -> None

Parameter

flush
bool

Nur gepufferter Modus. Wenn dieser Wert auf True festgelegt ist, werden Ereignisse im Puffer sofort gesendet. Der Standardwert lautet "True".

timeout
float oder None

Nur gepufferter Modus. Timeout zum Schließen des Producers. Der Standardwert ist Keine, was bedeutet, dass kein Timeout auftritt.

Rückgabetyp

Ausnahmen

Wenn beim Leeren des Puffers ein Fehler aufgetreten ist, wenn das Leeren auf True festgelegt ist oder die zugrunde liegenden AMQP-Verbindungen im gepufferten Modus geschlossen werden.

Beispiele

Schließen Sie den Handler.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

Erstellen Sie ein EventDataBatch-Objekt mit der maximalen Größe aller Inhalte, die durch max_size_in_bytes eingeschränkt werden.

Die max_size_in_bytes darf nicht größer als die vom Dienst definierte maximal zulässige Nachrichtengröße sein.

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

Rückgabetyp

Ausnahmen

Wenn beim Leeren des Puffers ein Fehler aufgetreten ist, wenn das Leeren auf True festgelegt ist oder die zugrunde liegenden AMQP-Verbindungen im gepufferten Modus geschlossen werden.

Beispiele

Erstellen eines EventDataBatch-Objekts in begrenzter Größe


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Nur gepufferter Modus. Leerung von Ereignissen im Puffer, die sofort gesendet werden sollen, wenn der Client im gepufferten Modus arbeitet.

async flush(**kwargs: Any) -> None

Parameter

timeout
float oder None

Timeout zum Leeren der gepufferten Ereignisse. Der Standardwert ist Keine, was bedeutet, dass kein Timeout auftritt.

Rückgabetyp

Ausnahmen

Wenn der Producer den Puffer nicht innerhalb des angegebenen Timeouts im gepufferten Modus leeren kann.

from_connection_string

Erstellen Sie einen EventHubProducerClient aus einer Verbindungszeichenfolge.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

Parameter

conn_str
str
Erforderlich

Die Verbindungszeichenfolge eines Event Hubs.

eventhub_name
str

Der Pfad des spezifischen Event Hubs, mit dem der Client verbunden werden soll.

buffered_mode
bool

True gibt an, dass der Producerclient Ereignisse in einem Puffer sammelt, effizient batchet und dann veröffentlicht. Der Standardwert lautet False.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Der Rückruf, der aufgerufen werden soll, nachdem ein Batch erfolgreich veröffentlicht wurde. Der Rückruf verwendet zwei Parameter:

  • events: Die Liste der Ereignisse, die erfolgreich veröffentlicht wurden

  • partition_id: Die Partitions-ID, in der die Ereignisse in der Liste veröffentlicht wurden.

Die Rückruffunktion sollte wie folgt definiert werden: on_success(events, partition_id). Dies ist erforderlich, wenn buffered_mode auf True festgelegt ist, wenn buffered_mode auf False festgelegt ist.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Der Rückruf, der aufgerufen werden soll, nachdem ein Batch nicht veröffentlicht werden konnte. Die Rückruffunktion sollte wie folgt definiert werden: on_error(Ereignisse, partition_id, Fehler), wobei:

  • events: Die Liste der Ereignisse, die nicht veröffentlicht werden konnten,

  • partition_id: Die Partitions-ID, in der die Ereignisse in der Liste veröffentlicht werden sollen, und

  • error: Die Ausnahme im Zusammenhang mit dem Sendefehler.

Wenn buffered_mode false ist, ist on_error Rückruf optional, und Fehler werden wie folgt behandelt:

  • Wenn während der Instanziierung des Producerclients ein on_error Rückruf übergeben wird,

    Dann werden Fehlerinformationen an den on_error Rückruf übergeben, der dann aufgerufen wird.

  • Wenn während der Clientinstanziierung kein on_error Rückruf übergeben wird,

    dann wird der Fehler standardmäßig ausgelöst.

Wenn buffered_mode true ist, ist on_error Rückruf erforderlich, und Fehler werden wie folgt behandelt:

  • Wenn Ereignisse nicht innerhalb des angegebenen Timeouts in die Warteschlange eingereissen werden, wird direkt ein Fehler ausgelöst.

  • Wenn Ereignisse nicht gesendet werden können, nachdem sie erfolgreich in die Warteschlange gestellt wurden, wird der on_error Rückruf aufgerufen.

max_buffer_length
int

Nur gepufferter Modus. Die Gesamtanzahl der Ereignisse pro Partition, die gepuffert werden können, bevor eine Leerung ausgelöst wird. Der Standardwert ist 1500 im gepufferten Modus.

max_wait_time
Optional[float]

Nur gepufferter Modus. Die Zeitspanne, in der vor der Veröffentlichung auf das Erstellen eines Batches mit Ereignissen im Puffer gewartet wird. Der Standardwert ist 1 im gepufferten Modus.

logging_enable
bool

Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.

http_proxy
dict

HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (str-Wert) und "proxy_port" (int-Wert). Darüber hinaus können auch die folgenden Schlüssel vorhanden sein: "Benutzername", "Kennwort".

auth_timeout
float

Die Zeit in Sekunden, um auf die Autorisierung eines Tokens durch den Dienst zu warten. Der Standardwert beträgt 60 Sekunden. Wenn dieser Wert auf 0 festgelegt ist, wird kein Timeout vom Client erzwungen.

user_agent
str

Falls angegeben, wird dies vor der Zeichenfolge des Benutzer-Agents hinzugefügt.

retry_total
int

Die Gesamtanzahl der Versuche, einen fehlgeschlagenen Vorgang zu wiederholen, wenn ein Fehler auftritt. Der Standardwert ist 3.

retry_backoff_factor
float

Ein Backofffaktor, der zwischen den Versuchen nach dem zweiten Versuch angewendet werden soll (die meisten Fehler werden sofort durch einen zweiten Versuch ohne Verzögerung behoben). Im festen Modus wird die Wiederholungsrichtlinie für {Backoff-Faktor} immer in den Ruhezustand versetzt. Im Modus "exponentiell" wird die Wiederholungsrichtlinie für: {backoff factor} * (2 ** ({number of total retries} - 1)) sekunden in den Ruhezustand versetzt. Wenn der backoff_factor 0,1 ist, wird der Wiederholungsversuch für [0.0s, 0.2s, 0.4s, ...] zwischen Wiederholungen in den Ruhezustand versetzt. Der Standardwert ist 0,8.

retry_backoff_max
float

Die maximale Backoffzeit. Der Standardwert ist 120 Sekunden (2 Minuten).

retry_mode
str

Das Verzögerungsverhalten zwischen Wiederholungsversuchen. Unterstützte Werte sind "fixed" oder "exponential", wobei der Standardwert "exponentiell" ist.

idle_timeout
float

Timeout in Sekunden, nach dem dieser Client die zugrunde liegende Verbindung schließt, wenn keine Aktivität vorhanden ist. Standardmäßig ist der Wert None, was bedeutet, dass der Client aufgrund von Inaktivität nicht heruntergefahren wird, es sei denn, der Dienst wird initiiert.

transport_type
TransportType

Der Typ des Transportprotokolls, das für die Kommunikation mit dem Event Hubs-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp , in diesem Fall wird Port 5671 verwendet. Wenn der Port 5671 in der Netzwerkumgebung nicht verfügbar/blockiert ist, kann stattdessen TransportType.AmqpOverWebsocket verwendet werden, wobei Port 443 für die Kommunikation verwendet wird.

custom_endpoint_address
Optional[str]

Die benutzerdefinierte Endpunktadresse, die zum Herstellen einer Verbindung mit dem Event Hubs-Dienst verwendet werden soll, sodass Netzwerkanforderungen über alle Anwendungsgateways oder anderen Pfade weitergeleitet werden können, die für die Hostumgebung erforderlich sind. Der Standardwert ist None. Das Format wäre wie "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Wenn port nicht im custom_endpoint_address angegeben ist, wird standardmäßig Port 443 verwendet.

connection_verify
Optional[str]

Pfad zur benutzerdefinierten CA_BUNDLE Datei des SSL-Zertifikats, das zum Authentifizieren der Identität des Verbindungsendpunkts verwendet wird. Der Standardwert ist None. In diesem Fall wird certifi.where() verwendet.

uamqp_transport
bool

Gibt an, ob die uamqp-Bibliothek als zugrunde liegender Transport verwendet werden soll. Der Standardwert ist False, und die Pure Python AMQP-Bibliothek wird als zugrunde liegender Transport verwendet.

Rückgabetyp

Ausnahmen

Wenn beim Leeren des Puffers ein Fehler aufgetreten ist, wenn das Leeren auf True festgelegt ist oder die zugrunde liegenden AMQP-Verbindungen im gepufferten Modus geschlossen werden.

Beispiele

Erstellen Sie eine neue instance von EventHubProducerClient aus Verbindungszeichenfolge.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Die Anzahl der Ereignisse, die gepuffert werden und auf die Veröffentlichung für eine bestimmte Partition warten. Gibt Im nicht gepufferten Modus Keine zurück. HINWEIS: Der Ereignispuffer wird in einer Hintergrund-Coroutine verarbeitet. Daher sollte die Anzahl der Ereignisse im Puffer, die von dieser API gemeldet werden, nur als Näherung betrachtet werden und nur für die Verwendung beim Debuggen empfohlen werden. Für eine Partitions-ID, für die keine Ereignisse gepuffert sind, wird 0 zurückgegeben, unabhängig davon, ob diese Partitions-ID tatsächlich innerhalb des Event Hubs vorhanden ist.

get_buffered_event_count(partition_id: str) -> int | None

Parameter

partition_id
str
Erforderlich

Die Zielpartitions-ID.

Rückgabetyp

int,

Ausnahmen

Wenn beim Leeren des Puffers ein Fehler aufgetreten ist, wenn das Leeren auf True festgelegt ist oder die zugrunde liegenden AMQP-Verbindungen im gepufferten Modus geschlossen werden.

get_eventhub_properties

Ruft Eigenschaften des Event Hubs ab.

Zu den Schlüsseln im zurückgegebenen Wörterbuch gehören:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Gibt zurück

Ein Wörterbuch, das Informationen zum Event Hub enthält.

Rückgabetyp

Ausnahmen

get_partition_ids

Ruft Partitions-IDs des Event Hubs ab.

async get_partition_ids() -> List[str]

Gibt zurück

Eine Liste der Partitions-IDs.

Rückgabetyp

Ausnahmen

get_partition_properties

Ruft Eigenschaften der angegebenen Partition ab.

Zu den Schlüsseln im Eigenschaftenverzeichnis gehören:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parameter

partition_id
str
Erforderlich

Die Zielpartitions-ID.

Gibt zurück

Ein Diktat der Partitionseigenschaften.

Rückgabetyp

Ausnahmen

send_batch

Sendet einen Batch mit Ereignisdaten. Standardmäßig wird die -Methode blockiert, bis die Bestätigung empfangen wird oder das Vorgangstimeout auftritt. Wenn eventHubProducerClient für die Ausführung im gepufferten Modus konfiguriert ist, quediert die Methode die Ereignisse in die Warteschlange in den lokalen Puffer und gibt zurück. Der Producer sendet automatisch im Hintergrund.

Wenn buffered_mode false ist, ist on_error Rückruf optional, und Fehler werden wie folgt behandelt:

  • Wenn während der Instanziierung des Producerclients ein on_error Rückruf übergeben wird,

    Dann werden Fehlerinformationen an den on_error Rückruf übergeben, der dann aufgerufen wird.

  • Wenn während der Clientinstanziierung kein on_error Rückruf übergeben wird,

    dann wird der Fehler standardmäßig ausgelöst.

Wenn buffered_mode true ist, ist on_error Rückruf erforderlich, und Fehler werden wie folgt behandelt:

  • Wenn Ereignisse nicht innerhalb des angegebenen Timeouts in die Warteschlange eingereissen werden, wird direkt ein Fehler ausgelöst.

  • Wenn Ereignisse nicht gesendet werden können, nachdem sie erfolgreich in die Warteschlange gestellt wurden, wird der on_error Rückruf aufgerufen.

Im gepufferten Modus bleibt das Senden eines Batches intakt und wird als einzelne Einheit gesendet. Der Batch wird nicht neu angeordnet. Dies kann zu Ineffizienz beim Senden von Ereignissen führen.

Wenn Sie eine endliche Liste von EventData oder AmqpAnnotatedMessage senden und wissen, dass diese innerhalb der Event Hub-Framegröße liegt, können Sie sie mit einem send_batch-Aufruf senden. Verwenden Sie create_batch andernfalls , um EventDataBatch zu erstellen und entweder EventData oder AmqpAnnotatedMessage dem Batch nacheinander bis zur Größenbeschränkung hinzuzufügen, und rufen Sie dann diese Methode auf, um den Batch zu senden.

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parameter

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Erforderlich

Das zu sendende EventDataBatch-Objekt oder eine Liste mit EventData , die in einem Batch gesendet werden soll. Alle EventData - oder AmqpAnnotatedMessage-Elemente in der Liste oder EventDataBatch landen auf derselben Partition.

timeout
float

Die maximale Wartezeit zum Senden der Ereignisdaten im nicht gepufferten Modus oder die maximale Wartezeit zum Quedieren der Ereignisdaten in den Puffer im gepufferten Modus. Im nicht gepufferten Modus wird die Standardwartezeit verwendet, die bei der Erstellung des Producers angegeben wurde. Im gepufferten Modus ist die Standardwartezeit Keine.

partition_id
str

Die spezifische Partitions-ID, an die gesendet werden soll. Der Standardwert ist Keine. In diesem Fall weist der Dienst alle Partitionen mithilfe von Roundrobin zu. Ein TypeError wird ausgelöst, wenn partition_id angegeben ist und event_data_batch ein EventDataBatch ist, da EventDataBatch selbst über partition_id verfügt.

partition_key
str

Mit dem angegebenen partition_key werden Ereignisdaten an eine bestimmte Partition des Event Hubs gesendet, die vom Dienst festgelegt wird. Ein TypeError wird ausgelöst, wenn partition_key angegeben ist und event_data_batch ein EventDataBatch ist, da EventDataBatch selbst über partition_key verfügt. Wenn sowohl partition_id als auch partition_key bereitgestellt werden, hat die partition_id Vorrang. WARNUNG: Das Festlegen partition_key des Nicht-Zeichenfolgenwerts für die zu sendenden Ereignisse wird abgeraten, da die partition_key vom Event Hub-Dienst ignoriert wird und Ereignisse mithilfe von Roundrobin allen Partitionen zugewiesen werden. Darüber hinaus gibt es SDKs für die Nutzung von Ereignissen, die davon ausgehen, dass partition_key nur Zeichenfolgentyp sind. Möglicherweise können sie den Nicht-Zeichenfolgenwert nicht analysieren.

Rückgabetyp

Ausnahmen

Wenn der durch den Timeoutparameter angegebene Wert verstrichen ist, bevor das Ereignis im nicht gepufferten Modus gesendet werden kann oder die Ereignisse im gepufferten Modus in die Warteschlange eingereiht werden können.

Beispiele

Asynchrones Senden von Ereignisdaten


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

Sendet Ereignisdaten. Standardmäßig wird die -Methode blockiert, bis die Bestätigung empfangen wird oder das Vorgangstimeout auftritt. Wenn der EventHubProducerClient für die Ausführung im gepufferten Modus konfiguriert ist, wird das Ereignis von der Methode in die Warteschlange in den lokalen Puffer gestellt und zurückgegeben. Der Producer führt die automatische Batchverarbeitung und das Senden im Hintergrund durch.

Wenn buffered_mode false ist, ist on_error Rückruf optional, und Fehler werden wie folgt behandelt: * Wenn während der Instanziierung des Producerclients ein on_error Rückruf übergeben wird,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Wenn buffered_mode true ist, ist on_error Rückruf erforderlich, und Fehler werden wie folgt behandelt: * Wenn Ereignisse nicht innerhalb des angegebenen Timeouts in die Warteschlange eingereissen werden, wird direkt ein Fehler ausgelöst.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parameter

event_data
Union[EventData, AmqpAnnotatedMessage]
Erforderlich

Das zu sendende EventData-Objekt .

timeout
float

Die maximale Wartezeit zum Senden der Ereignisdaten im nicht gepufferten Modus oder die maximale Wartezeit zum Quedieren der Ereignisdaten in den Puffer im gepufferten Modus. Im nicht gepufferten Modus wird die Standardwartezeit verwendet, die bei der Erstellung des Producers angegeben wurde. Im gepufferten Modus ist die Standardwartezeit Keine.

partition_id
str

Die spezifische Partitions-ID, an die gesendet werden soll. Der Standardwert ist Keine. In diesem Fall weist der Dienst alle Partitionen mithilfe von Roundrobin zu. Ein TypeError wird ausgelöst, wenn partition_id angegeben ist und event_data_batch ein EventDataBatch ist, da EventDataBatch selbst über partition_id verfügt.

partition_key
str

Mit dem angegebenen partition_key werden Ereignisdaten an eine bestimmte Partition des Event Hubs gesendet, die vom Dienst festgelegt wird. Ein TypeError wird ausgelöst, wenn partition_key angegeben ist und event_data_batch ein EventDataBatch ist, da EventDataBatch selbst über partition_key verfügt. Wenn sowohl partition_id als auch partition_key bereitgestellt werden, hat die partition_id Vorrang. WARNUNG: Das Festlegen partition_key des Nicht-Zeichenfolgenwerts für die zu sendenden Ereignisse wird abgeraten, da die partition_key vom Event Hub-Dienst ignoriert wird und Ereignisse mithilfe von Roundrobin allen Partitionen zugewiesen werden. Darüber hinaus gibt es SDKs für die Nutzung von Ereignissen, die davon ausgehen, dass partition_key nur Zeichenfolgentyp sind. Möglicherweise können sie den Nicht-Zeichenfolgenwert nicht analysieren.

Rückgabetyp

Ausnahmen

Wenn der durch den Timeoutparameter angegebene Wert verstrichen ist, bevor das Ereignis im nicht gepufferten Modus gesendet werden kann oder die Ereignisse nicht in die gepufferte Warteschlange eingereiht werden können.

Attribute

total_buffered_event_count

Die Gesamtanzahl der Ereignisse, die derzeit gepuffert sind und auf die Veröffentlichung warten, über alle Partitionen hinweg. Gibt Im nicht gepufferten Modus Keine zurück. HINWEIS: Der Ereignispuffer wird in einer Hintergrund-Coroutine verarbeitet. Daher sollte die Anzahl der Ereignisse im Puffer, die von dieser API gemeldet werden, nur als Näherung betrachtet werden und nur für die Verwendung beim Debuggen empfohlen werden.

Rückgabetyp

int,