EventHubConsumerClient Klasse
Die EventHubConsumerClient-Klasse definiert eine allgemeine Schnittstelle zum Empfangen von Ereignissen vom Azure Event Hubs-Dienst.
Das Standard Ziel von EventHubConsumerClient besteht darin, Ereignisse von allen Partitionen eines EventHub mit Lastenausgleich und Prüfpunkting zu empfangen.
Wenn mehrere EventHubConsumerClient-Instanzen für denselben Event Hub, dieselbe Consumergruppe und denselben Prüfpunktstandort ausgeführt werden, werden die Partitionen gleichmäßig unter ihnen verteilt.
Um den Lastenausgleich und persistente Prüfpunkte zu aktivieren, müssen beim Erstellen des EventHubConsumerClients checkpoint_store festgelegt werden. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet.
Ein EventHubConsumerClient kann auch von einer bestimmten Partition empfangen werden, wenn Sie die Methode receive() oder receive_batch() aufrufen und die partition_id angeben. Der Lastenausgleich funktioniert nicht im Einzelpartitionsmodus. Benutzer können jedoch weiterhin Prüfpunkte speichern, wenn die checkpoint_store festgelegt ist.
- Vererbung
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
Konstruktor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parameter
- fully_qualified_namespace
- str
Der vollqualifizierte Hostname für den Event Hubs-Namespace. Das Namespaceformat ist . servicebus.windows.net.
- eventhub_name
- str
Der Pfad des spezifischen Event Hubs, mit dem der Client verbunden werden soll.
- credential
- TokenCredential oder AzureSasCredential oder AzureNamedKeyCredential
Das für die Authentifizierung verwendete Anmeldeinformationsobjekt, das eine bestimmte Schnittstelle zum Abrufen von Token implementiert. Es akzeptiert EventHubSharedKeyCredential, oder Anmeldeinformationsobjekte, die von der azure-identity-Bibliothek generiert werden, und Objekte, die die *get_token(self, scopes) -Methode implementieren.
- logging_enable
- bool
Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.
- auth_timeout
- float
Die Zeit in Sekunden, um auf die Autorisierung eines Tokens durch den Dienst zu warten. Der Standardwert beträgt 60 Sekunden. Wenn dieser Wert auf 0 festgelegt ist, wird kein Timeout vom Client erzwungen.
- user_agent
- str
Falls angegeben, wird dies vor der Zeichenfolge des Benutzer-Agents hinzugefügt.
- retry_total
- int
Die Gesamtanzahl der Versuche, einen fehlgeschlagenen Vorgang zu wiederholen, wenn ein Fehler auftritt. Der Standardwert ist 3. Der Kontext der retry_total beim Empfangen ist besonders: Die Empfangsmethode wird in jeder Iteration durch eine While-Schleife implementiert, die die interne Empfangsmethode aufruft. Im Empfangsfall gibt retry_total die Anzahl der Wiederholungsversuche nach einem Fehler an, der von der internen Empfangsmethode in der while-Schleife ausgelöst wurde. Wenn Wiederholungsversuche erschöpft sind, wird der on_error Rückruf (sofern angegeben) mit den Fehlerinformationen aufgerufen. Der fehlerhafte interne Partitionsconsumer wird geschlossen (on_partition_close wird aufgerufen, wenn angegeben), und ein neuer interner Partitionsconsumer wird erstellt (on_partition_initialize wird aufgerufen, wenn angegeben), um den Empfang fortzusetzen.
- retry_backoff_factor
- float
Ein Backofffaktor, der zwischen den Versuchen nach dem zweiten Versuch angewendet werden soll (die meisten Fehler werden sofort durch einen zweiten Versuch ohne Verzögerung behoben). Im festen Modus wird die Wiederholungsrichtlinie für {Backoff-Faktor} immer in den Ruhezustand versetzt. Im Modus "exponentiell" wird die Wiederholungsrichtlinie für: {backoff factor} * (2 ** ({number of total retries} - 1)) sekunden in den Ruhezustand versetzt. Wenn der backoff_factor 0,1 ist, wird der Wiederholungsversuch für [0.0s, 0.2s, 0.4s, ...] zwischen Wiederholungen in den Ruhezustand versetzt. Der Standardwert ist 0,8.
- retry_backoff_max
- float
Die maximale Backoffzeit. Der Standardwert ist 120 Sekunden (2 Minuten).
- retry_mode
- str
Das Verzögerungsverhalten zwischen Wiederholungsversuchen. Unterstützte Werte sind "fixed" oder "exponential", wobei der Standardwert "exponentiell" ist.
- idle_timeout
- float
Timeout in Sekunden, nach dem dieser Client die zugrunde liegende Verbindung schließt, wenn keine weitere Aktivität erfolgt. Standardmäßig ist der Wert None, was bedeutet, dass der Client aufgrund von Inaktivität nicht heruntergefahren wird, es sei denn, der Dienst wird initiiert.
- transport_type
- TransportType
Der Typ des Transportprotokolls, das für die Kommunikation mit dem Event Hubs-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp , in diesem Fall wird Port 5671 verwendet. Wenn der Port 5671 in der Netzwerkumgebung nicht verfügbar/blockiert ist, kann stattdessen TransportType.AmqpOverWebsocket verwendet werden, wobei Port 443 für die Kommunikation verwendet wird.
HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (str-Wert) und "proxy_port" (int-Wert). Darüber hinaus können auch die folgenden Schlüssel vorhanden sein: "Benutzername", "Kennwort".
- checkpoint_store
- CheckpointStore oder None
Ein Manager, der die Partitionslastenausgleichs- und Prüfpunktdaten beim Empfangen von Ereignissen speichert. Der Prüfpunktspeicher wird in beiden Fällen verwendet, in denen alle Partitionen oder eine einzelne Partition empfangen werden. Im letzteren Fall gilt der Lastenausgleich nicht. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet, und der EventHubConsumerClient-instance empfängt Ereignisse ohne Lastenausgleich.
- load_balancing_interval
- float
Wenn der Lastenausgleich einsetzt. Dies ist das Intervall in Sekunden zwischen zwei Lastenausgleichsauswertungen. Der Standardwert ist „30 Sekunden“.
- partition_ownership_expiration_interval
- float
Ein Partitionsbesitz läuft nach dieser Anzahl von Sekunden ab. Bei jeder Lastenausgleichsauswertung wird die Ablaufzeit des Besitzes automatisch verlängert. Der Standardwert ist 6 * load_balancing_interval, d. h. 180 Sekunden, wenn die Standard-load_balancing_interval von 30 Sekunden verwendet wird.
- load_balancing_strategy
- str oder LoadBalancingStrategy
Wenn der Lastenausgleich einsetzt, wird diese Strategie verwendet, um den Partitionsbesitz zu beanspruchen und auszugleichen. Verwenden Sie "gierig" oder LoadBalancingStrategy.GREEDY für die gierige Strategie, die für jede Lastenausgleichsauswertung so viele nicht beanspruchte Partitionen aufnimmt, die zum Lastenausgleich erforderlich sind. Verwenden Sie "balanced" oder LoadBalancingStrategy.BALANCED für die Ausgewogene Strategie, die für jede Lastenausgleichsauswertung nur eine Partition beansprucht, die nicht von einem anderen EventHubConsumerClient beansprucht wird. Wenn alle Partitionen eines EventHubs von einem anderen EventHubConsumerClient beansprucht werden und dieser Client zu wenige Partitionen beansprucht hat, stiehlt dieser Client eine Partition von anderen Clients für jede Lastenausgleichsauswertung, unabhängig von der Lastenausgleichsstrategie. Die Gierstrategie wird standardmäßig verwendet.
Die benutzerdefinierte Endpunktadresse, die zum Herstellen einer Verbindung mit dem Event Hubs-Dienst verwendet werden soll, sodass Netzwerkanforderungen über alle Anwendungsgateways oder anderen Pfade weitergeleitet werden können, die für die Hostumgebung erforderlich sind. Der Standardwert ist None. Das Format wäre wie "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Wenn port nicht im custom_endpoint_address angegeben ist, wird standardmäßig Port 443 verwendet.
Pfad zur benutzerdefinierten CA_BUNDLE Datei des SSL-Zertifikats, das zum Authentifizieren der Identität des Verbindungsendpunkts verwendet wird. Der Standardwert ist None. In diesem Fall wird certifi.where() verwendet.
- uamqp_transport
- bool
Gibt an, ob die uamqp-Bibliothek als zugrunde liegender Transport verwendet werden soll. Der Standardwert ist False, und die Pure Python AMQP-Bibliothek wird als zugrunde liegender Transport verwendet.
- socket_timeout
- float
Die Zeit in Sekunden, die der zugrunde liegende Socket für die Verbindung beim Senden und Empfangen von Daten warten soll, bevor ein Timeout auftritt. Der Standardwert ist 0,2 für TransportType.Amqp und 1 für TransportType.AmqpOverWebsocket. Wenn EventHubsConnectionError-Fehler aufgrund eines Schreibzeitlimits auftreten, muss möglicherweise ein größer als der Standardwert übergeben werden. Dies ist für erweiterte Verwendungsszenarien vorgesehen, und normalerweise sollte der Standardwert ausreichend sein.
Beispiele
Erstellen Sie eine neue instance von EventHubConsumerClient.
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
Methoden
close |
Beenden Sie das Abrufen von Ereignissen aus dem Event Hub, und schließen Sie die zugrunde liegende AMQP-Verbindung und -Links. |
from_connection_string |
Erstellen Sie einen EventHubConsumerClient aus einer Verbindungszeichenfolge. |
get_eventhub_properties |
Ruft Die Eigenschaften des Event Hubs ab. Zu den Schlüsseln im zurückgegebenen Wörterbuch gehören:
|
get_partition_ids |
Abrufen von Partitions-IDs des Event Hubs. |
get_partition_properties |
Rufen Sie die Eigenschaften der angegebenen Partition ab. Zu den Schlüsseln im Eigenschaftenwörterbuch gehören:
|
receive |
Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten. |
receive_batch |
Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten. |
close
Beenden Sie das Abrufen von Ereignissen aus dem Event Hub, und schließen Sie die zugrunde liegende AMQP-Verbindung und -Links.
close() -> None
Rückgabetyp
Beispiele
Schließen Sie den Client.
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
Erstellen Sie einen EventHubConsumerClient aus einer Verbindungszeichenfolge.
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
Parameter
- eventhub_name
- str
Der Pfad des spezifischen Event Hubs, mit dem der Client verbunden werden soll.
- logging_enable
- bool
Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.
- auth_timeout
- float
Die Zeit in Sekunden, um auf die Autorisierung eines Tokens durch den Dienst zu warten. Der Standardwert beträgt 60 Sekunden. Wenn dieser Wert auf 0 festgelegt ist, wird kein Timeout vom Client erzwungen.
- user_agent
- str
Falls angegeben, wird dies vor der Zeichenfolge des Benutzer-Agents hinzugefügt.
- retry_total
- int
Die Gesamtanzahl der Versuche, einen fehlgeschlagenen Vorgang zu wiederholen, wenn ein Fehler auftritt. Der Standardwert ist 3. Der Kontext der retry_total beim Empfangen ist besonders: Die Empfangsmethode wird in jeder Iteration durch eine While-Schleife implementiert, die die interne Empfangsmethode aufruft. Im Empfangsfall gibt retry_total die Anzahl der Wiederholungsversuche nach einem Fehler an, der von der internen Empfangsmethode in der while-Schleife ausgelöst wurde. Wenn Wiederholungsversuche erschöpft sind, wird der on_error Rückruf (sofern angegeben) mit den Fehlerinformationen aufgerufen. Der fehlerhafte interne Partitionsconsumer wird geschlossen (on_partition_close wird aufgerufen, wenn angegeben), und ein neuer interner Partitionsconsumer wird erstellt (on_partition_initialize wird aufgerufen, wenn angegeben), um den Empfang fortzusetzen.
- retry_backoff_factor
- float
Ein Backofffaktor, der zwischen den Versuchen nach dem zweiten Versuch angewendet werden soll (die meisten Fehler werden sofort durch einen zweiten Versuch ohne Verzögerung behoben). Im festen Modus wird die Wiederholungsrichtlinie für {Backoff-Faktor} immer in den Ruhezustand versetzt. Im Modus "exponentiell" wird die Wiederholungsrichtlinie für: {backoff factor} * (2 ** ({number of total retries} - 1)) sekunden in den Ruhezustand versetzt. Wenn der backoff_factor 0,1 ist, wird der Wiederholungsversuch für [0.0s, 0.2s, 0.4s, ...] zwischen Wiederholungen ruhen. Der Standardwert ist 0,8.
- retry_backoff_max
- float
Die maximale Backoffzeit. Der Standardwert ist 120 Sekunden (2 Minuten).
- retry_mode
- str
Das Verzögerungsverhalten zwischen Wiederholungsversuchen. Unterstützte Werte sind "fixed" oder "exponential", wobei der Standardwert "exponentiell" ist.
- idle_timeout
- float
Timeout in Sekunden, nach dem dieser Client die zugrunde liegende Verbindung schließt, wenn keine furthur-Aktivität vorhanden ist. Standardmäßig ist der Wert None, was bedeutet, dass der Client nicht aufgrund von Inaktivität heruntergefahren wird, es sei denn, der Dienst initiiert.
- transport_type
- TransportType
Der Typ des Transportprotokolls, das für die Kommunikation mit dem Event Hubs-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp , in diesem Fall wird Port 5671 verwendet. Wenn port 5671 in der Netzwerkumgebung nicht verfügbar/blockiert ist, könnte stattdessen TransportType.AmqpOverWebsocket verwendet werden, wobei Port 443 für die Kommunikation verwendet wird.
- http_proxy
- dict
HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (Str-Wert) und "proxy_port" (int-Wert). Zusätzlich können die folgenden Schlüssel vorhanden sein: "Benutzername", "Kennwort".
- checkpoint_store
- CheckpointStore oder None
Ein Manager, der die Partitionslastenausgleichs- und Prüfpunktdaten beim Empfangen von Ereignissen speichert. Der Prüfpunktspeicher wird in beiden Fällen des Empfangens von allen Partitionen oder einer einzelnen Partition verwendet. Im letzteren Fall gilt der Lastenausgleich nicht. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet, und der EventHubConsumerClient-instance empfängt Ereignisse ohne Lastenausgleich.
- load_balancing_interval
- float
Wenn der Lastenausgleich einsetzt. Dies ist das Intervall in Sekunden zwischen zwei Lastenausgleichsauswertungen. Der Standardwert ist 10 Sekunden.
- partition_ownership_expiration_interval
- float
Ein Partitionsbesitz läuft nach dieser Anzahl von Sekunden ab. Jede Lastenausgleichsauswertung verlängert automatisch die Ablaufzeit des Besitzes. Der Standardwert ist 6 * load_balancing_interval, d. h. 60 Sekunden bei Verwendung des Standard-load_balancing_interval von 30 Sekunden.
- load_balancing_strategy
- str oder LoadBalancingStrategy
Wenn der Lastenausgleich einsetzt, wird diese Strategie verwendet, um den Partitionsbesitz zu beanspruchen und auszugleichen. Verwenden Sie "greedy" oder LoadBalancingStrategy.GREEDY für die gierige Strategie, die für jede Lastenausgleichsauswertung so viele nicht beanspruchte Partitionen erfasst, die zum Lastenausgleich erforderlich sind. Verwenden Sie "balanced" oder LoadBalancingStrategy.BALANCED für die balanced-Strategie, die für jede Lastenausgleichsauswertung nur eine Partition beansprucht, die nicht von einem anderen EventHubConsumerClient beansprucht wird. Wenn alle Partitionen eines EventHubs von einem anderen EventHubConsumerClient beansprucht werden und dieser Client zu wenige Partitionen beansprucht hat, stiehlt dieser Client eine Partition von anderen Clients für jede Lastenausgleichsauswertung, unabhängig von der Lastenausgleichsstrategie. Die Greedy-Strategie wird standardmäßig verwendet.
Die benutzerdefinierte Endpunktadresse, die zum Herstellen einer Verbindung mit dem Event Hubs-Dienst verwendet werden soll, sodass Netzwerkanforderungen über alle Anwendungsgateways oder andere Pfade weitergeleitet werden können, die für die Hostumgebung erforderlich sind. Der Standardwert ist None. Das Format würde wie "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" aussehen. Wenn port im custom_endpoint_address nicht angegeben ist, wird standardmäßig Port 443 verwendet.
Pfad zur benutzerdefinierten CA_BUNDLE Datei des SSL-Zertifikats, das zum Authentifizieren der Identität des Verbindungsendpunkts verwendet wird. Der Standardwert ist None. In diesem Fall wird certifi.where() verwendet.
- uamqp_transport
- bool
Gibt an, ob die uamqp-Bibliothek als zugrunde liegender Transport verwendet werden soll. Der Standardwert ist False, und die Pure Python AMQP-Bibliothek wird als zugrunde liegender Transport verwendet.
Rückgabetyp
Beispiele
Erstellen Sie eine neue instance von EventHubConsumerClient aus Verbindungszeichenfolge.
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Ruft Die Eigenschaften des Event Hubs ab.
Zu den Schlüsseln im zurückgegebenen Wörterbuch gehören:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
Gibt zurück
Ein Wörterbuch, das Informationen zum Event Hub enthält.
Rückgabetyp
Ausnahmen
get_partition_ids
Abrufen von Partitions-IDs des Event Hubs.
get_partition_ids() -> List[str]
Gibt zurück
Eine Liste von Partitions-IDs.
Rückgabetyp
Ausnahmen
get_partition_properties
Rufen Sie die Eigenschaften der angegebenen Partition ab.
Zu den Schlüsseln im Eigenschaftenwörterbuch gehören:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
Parameter
Gibt zurück
Ein Wörterbuch, das Partitionseigenschaften enthält.
Rückgabetyp
Ausnahmen
receive
Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten.
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
Parameter
- on_event
- callable[PartitionContext, EventData oder None]
Die Rückruffunktion für die Behandlung eines empfangenen Ereignisses. Der Rückruf benötigt zwei Parameter: partition_context , das den Partitionskontext und das Ereignis enthält, das das empfangene Ereignis ist. Die Rückruffunktion sollte wie folgt definiert werden: on_event(partition_context, Ereignis). Ausführliche Informationen zum PartitionContextPartitionskontext finden Sie unter .
- max_wait_time
- float
Das maximale Intervall in Sekunden, das der Ereignisprozessor wartet, bevor der Rückruf aufgerufen wird. Wenn innerhalb dieses Intervalls keine Ereignisse empfangen werden, wird der on_event Rückruf mit None aufgerufen. Wenn dieser Wert auf Keine oder 0 (Standardeinstellung) festgelegt ist, wird der Rückruf erst aufgerufen, wenn ein Ereignis empfangen wird.
- partition_id
- str
Falls angegeben, empfängt der Client nur von dieser Partition. Andernfalls empfängt der Client von allen Partitionen.
- owner_level
- int
Die Priorität für einen exklusiven Verbraucher. Ein exklusiver Consumer wird erstellt, wenn owner_level festgelegt ist. Ein Verbraucher mit einem höheren owner_level hat eine höhere exklusive Priorität. Die Besitzerebene wird auch als "Epochenwert" des Consumers bezeichnet.
- prefetch
- int
Die Anzahl der Ereignisse, die vom Dienst für die Verarbeitung vorab ausgelöst werden sollen. Der Standardwert ist 300.
- track_last_enqueued_event_properties
- bool
Gibt an, ob der Consumer Informationen zum zuletzt in die Warteschlange gestellten Ereignis auf der zugeordneten Partition anfordern und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden. Wenn Informationen zum zuletzt in Warteschlange gestellten Partitionsereignis nachverfolgt werden, enthält jedes Ereignis, das vom Event Hubs-Dienst empfangen wird, Metadaten über die Partition. Dies führt zu einem geringen Zusätzlichen Netzwerkbandbreitenverbrauch, der im Allgemeinen ein günstiger Kompromiss ist, wenn er bei regelmäßigen Anforderungen für Partitionseigenschaften mit dem Event Hub-Client berücksichtigt wird. Sie ist standardmäßig auf False festgelegt.
Beginnen Sie mit dem Empfang von dieser Ereignisposition, wenn keine Prüfpunktdaten für eine Partition vorhanden sind. Prüfpunktdaten werden verwendet, sofern verfügbar. Dies kann ein Dict mit partitions-ID als Schlüssel und Position als Wert für einzelne Partitionen oder ein einzelner Wert für alle Partitionen sein. Der Werttyp kann str, int oder datetime.datetime sein. Außerdem werden die Werte "-1" für den Empfang vom Anfang des Datenstroms und "@latest" für den Empfang nur neuer Ereignisse unterstützt. Der Standardwert ist "@latest".
Bestimmen Sie, ob der angegebene starting_position inklusive(>=) ist oder nicht (>). True für inklusive und False für exklusiv. Dies kann ein Dict mit der Partitions-ID als Schlüssel und bool als Wert sein, der angibt, ob der starting_position für eine bestimmte Partition inklusive ist oder nicht. Dies kann auch ein einzelner Bool-Wert für alle starting_position sein. Der Standardwert ist False.
- on_error
- callable[[PartitionContext, Exception]]
Die Rückruffunktion, die aufgerufen wird, wenn ein Fehler beim Empfang ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind, oder während des Lastenausgleichs. Der Rückruf benötigt zwei Parameter: partition_context der Partitionsinformationen enthält, und fehler als Ausnahme. partition_context kann Keine sein, wenn der Fehler während des Lastenausgleichs ausgelöst wird. Der Rückruf sollte wie folgt definiert werden: on_error(partition_context, Error). Der on_error Rückrufs wird auch aufgerufen, wenn während des on_event Rückrufs eine nicht behandelte Ausnahme ausgelöst wird.
- on_partition_initialize
- callable[[PartitionContext]]
Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition aufgerufen wird, hat die Initialisierung abgeschlossen. Es wird auch aufgerufen, wenn ein neuer interner Partitionsverbraucher erstellt wird, um den Empfangensprozess für einen fehlerhaften und geschlossenen internen Partitionsverbraucher zu übernehmen. Der Rückruf benötigt einen einzigen Parameter: partition_context der die Partitionsinformationen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Die Rückruffunktion, die nach einem Consumer für eine bestimmte Partition aufgerufen wird, ist geschlossen. Es wird auch aufgerufen, wenn während des Empfangens ein Fehler ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind. Der Rückruf benötigt zwei Parameter: partition_context der Partitionsinformationen und grund für das Schließen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_close(partition_context, Reason). Bitte beachten Sie die CloseReason verschiedenen Schließgründe.
Rückgabetyp
Beispiele
Empfangen von Ereignissen vom EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten.
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
Parameter
- on_event_batch
- callable[PartitionContext, list[EventData]]
Die Rückruffunktion für die Behandlung eines Batches empfangener Ereignisse. Der Rückruf benötigt zwei Parameter: partition_context , der den Partitionskontext enthält, und event_batch, bei dem es sich um die empfangenen Ereignisse handelt. Die Rückruffunktion sollte wie folgt definiert werden: on_event_batch(partition_context, event_batch). event_batch kann eine leere Liste sein, wenn max_wait_time nicht None oder 0 ist und nach max_wait_time kein Ereignis empfangen wird. Ausführliche Informationen zum PartitionContextPartitionskontext finden Sie unter .
- max_batch_size
- int
Die maximale Anzahl von Ereignissen in einem Batch, die an den Rückruf übergeben on_event_batch. Wenn die tatsächliche empfangene Anzahl von Ereignissen größer als max_batch_size ist, werden die empfangenen Ereignisse in Batches unterteilt und rufen den Rückruf für jeden Batch mit bis zu max_batch_size Ereignissen auf.
- max_wait_time
- float
Das maximale Intervall in Sekunden, das der Ereignisprozessor wartet, bevor der Rückruf aufgerufen wird. Wenn innerhalb dieses Intervalls keine Ereignisse empfangen werden, wird der on_event_batch Rückruf mit einer leeren Liste aufgerufen.
- partition_id
- str
Falls angegeben, empfängt der Client nur von dieser Partition. Andernfalls empfängt der Client von allen Partitionen.
- owner_level
- int
Die Priorität für einen exklusiven Verbraucher. Ein exklusiver Consumer wird erstellt, wenn owner_level festgelegt ist. Ein Verbraucher mit einem höheren owner_level hat eine höhere exklusive Priorität. Die Besitzerebene wird auch als "Epochenwert" des Consumers bezeichnet.
- prefetch
- int
Die Anzahl der Ereignisse, die vom Dienst für die Verarbeitung vorab ausgelöst werden sollen. Der Standardwert ist 300.
- track_last_enqueued_event_properties
- bool
Gibt an, ob der Consumer Informationen zum zuletzt in die Warteschlange gestellten Ereignis auf der zugeordneten Partition anfordern und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden. Wenn Informationen zum zuletzt in Warteschlange gestellten Partitionsereignis nachverfolgt werden, enthält jedes Ereignis, das vom Event Hubs-Dienst empfangen wird, Metadaten über die Partition. Dies führt zu einem geringen Zusätzlichen Netzwerkbandbreitenverbrauch, der im Allgemeinen ein günstiger Kompromiss ist, wenn er bei regelmäßigen Anforderungen für Partitionseigenschaften mit dem Event Hub-Client berücksichtigt wird. Sie ist standardmäßig auf False festgelegt.
Beginnen Sie mit dem Empfang von dieser Ereignisposition, wenn keine Prüfpunktdaten für eine Partition vorhanden sind. Prüfpunktdaten werden verwendet, sofern verfügbar. Dies kann ein Dict mit partitions-ID als Schlüssel und Position als Wert für einzelne Partitionen oder ein einzelner Wert für alle Partitionen sein. Der Werttyp kann str, int oder datetime.datetime sein. Außerdem werden die Werte "-1" für den Empfang vom Anfang des Datenstroms und "@latest" für den Empfang nur neuer Ereignisse unterstützt. Der Standardwert ist "@latest".
Bestimmen Sie, ob der angegebene starting_position inklusive(>=) ist oder nicht (>). True für inklusive und False für exklusiv. Dies kann ein Dict mit der Partitions-ID als Schlüssel und bool als Wert sein, der angibt, ob der starting_position für eine bestimmte Partition inklusive ist oder nicht. Dies kann auch ein einzelner Bool-Wert für alle starting_position sein. Der Standardwert ist False.
- on_error
- callable[[PartitionContext, Exception]]
Die Rückruffunktion, die aufgerufen wird, wenn ein Fehler beim Empfang ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind, oder während des Lastenausgleichs. Der Rückruf benötigt zwei Parameter: partition_context der Partitionsinformationen enthält, und fehler als Ausnahme. partition_context kann Keine sein, wenn der Fehler während des Lastenausgleichs ausgelöst wird. Der Rückruf sollte wie folgt definiert werden: on_error(partition_context, Error). Der on_error Rückrufs wird auch aufgerufen, wenn während des on_event Rückrufs eine nicht behandelte Ausnahme ausgelöst wird.
- on_partition_initialize
- callable[[PartitionContext]]
Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition aufgerufen wird, hat die Initialisierung abgeschlossen. Es wird auch aufgerufen, wenn ein neuer interner Partitionsverbraucher erstellt wird, um den Empfangensprozess für einen fehlerhaften und geschlossenen internen Partitionsverbraucher zu übernehmen. Der Rückruf benötigt einen einzigen Parameter: partition_context der die Partitionsinformationen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Die Rückruffunktion, die nach einem Consumer für eine bestimmte Partition aufgerufen wird, ist geschlossen. Es wird auch aufgerufen, wenn während des Empfangens ein Fehler ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind. Der Rückruf benötigt zwei Parameter: partition_context der Partitionsinformationen und grund für das Schließen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_close(partition_context, Reason). Bitte beachten Sie die CloseReason verschiedenen Schließgründe.
Rückgabetyp
Beispiele
Empfangen von Ereignissen in Batches vom EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Azure SDK for Python