EventHubConsumerClient Klasa
Klasa EventHubConsumerClient definiuje interfejs wysokiego poziomu do odbierania zdarzeń z usługi Azure Event Hubs.
Głównym celem klasy EventHubConsumerClient jest odbieranie zdarzeń ze wszystkich partycji usługi EventHub z równoważeniem obciążenia i punktami kontrolnymi.
Gdy wiele wystąpień EventHubConsumerClient działa w tym samym centrum zdarzeń, grupa odbiorców i lokalizacja punktów kontrolnych, partycje będą równomiernie dystrybuowane między nimi.
Aby włączyć równoważenie obciążenia i utrwalone punkty kontrolne, checkpoint_store należy ustawić podczas tworzenia klasy EventHubConsumerClient. Jeśli nie podano magazynu punktów kontrolnych, punkt kontrolny będzie utrzymywany wewnętrznie w pamięci.
Obiekt EventHubConsumerClient może również odbierać dane z określonej partycji podczas wywoływania metody receive() lub receive_batch() i określić partition_id. Równoważenie obciążenia nie będzie działać w trybie pojedynczej partycji. Jednak użytkownicy nadal mogą zapisywać punkty kontrolne, jeśli ustawiono checkpoint_store.
- Dziedziczenie
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
Konstruktor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parametry
- fully_qualified_namespace
- str
W pełni kwalifikowana nazwa hosta dla przestrzeni nazw usługi Event Hubs. Format przestrzeni nazw to: .servicebus.windows.net.
- credential
- TokenCredential lub AzureSasCredential lub AzureNamedKeyCredential
Obiekt poświadczeń używany do uwierzytelniania, który implementuje określony interfejs do pobierania tokenów. Akceptuje EventHubSharedKeyCredentialobiekty , lub poświadczenia generowane przez bibliotekę azure-identity i obiekty, które implementują metodę *get_token(self, scopes).
- logging_enable
- bool
Określa, czy dane wyjściowe dzienników śledzenia sieci do rejestratora. Wartość domyślna to False.
- auth_timeout
- float
Czas w sekundach oczekiwania na autoryzowanie tokenu przez usługę. Wartość domyślna to 60 sekund. W przypadku ustawienia wartości 0 nie zostanie wymuszony limit czasu od klienta.
- user_agent
- str
Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.
- retry_total
- int
Całkowita liczba prób ponownego wykonania operacji zakończonych niepowodzeniem w przypadku wystąpienia błędu. Wartość domyślna to 3. Kontekst retry_total odbierania jest specjalny: metoda odbierania jest implementowana przez pętlę czasową wywołującą wewnętrzną metodę odbierania w każdej iteracji. W przypadku odbieraniaretry_total określa liczbę ponownych prób po błędzie zgłoszonych przez wewnętrzną metodę odbierania w pętli while. Jeśli próby ponawiania zostaną wyczerpane, wywołanie zwrotne on_error zostanie wywołane (jeśli podano) z informacjami o błędzie. Klient partycji wewnętrznej, który zakończył się niepowodzeniem, zostanie zamknięty (on_partition_close zostanie wywołany, jeśli zostanie podany), a nowy odbiorca partycji wewnętrznej zostanie utworzony (on_partition_initialize zostanie wywołany, jeśli zostanie podany), aby wznowić odbieranie.
- retry_backoff_factor
- float
Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnień). W trybie stałym zasady ponawiania zawsze będą w stanie uśpienia dla składnika {backoff}. W trybie wykładniczym zasady ponawiania zostaną uśpine dla: {współczynnik wycofywania} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie zostanie spane dla wartości [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.
- retry_backoff_max
- float
Maksymalny czas wycofywania. Wartość domyślna to 120 sekund (2 minuty).
- retry_mode
- str
Zachowanie opóźnienia między ponownymi próbami. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".
- idle_timeout
- float
Limit czasu (w sekundach), po którym klient zamknie połączenie bazowe, jeśli nie będzie żadnych dalszych działań. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.
- transport_type
- TransportType
Typ protokołu transportowego, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.
Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość ciągu) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".
- checkpoint_store
- CheckpointStore lub None
Menedżer, który przechowuje dane punktu kontrolnego i równoważenia obciążenia partycji podczas odbierania zdarzeń. Magazyn punktów kontrolnych będzie używany w obu przypadkach odbierania ze wszystkich partycji lub jednej partycji. W drugim przypadku równoważenie obciążenia nie ma zastosowania. Jeśli magazyn punktów kontrolnych nie zostanie podany, punkt kontrolny zostanie zachowany wewnętrznie w pamięci, a wystąpienie EventHubConsumerClient będzie odbierać zdarzenia bez równoważenia obciążenia.
- load_balancing_interval
- float
Po rozpoczęciu równoważenia obciążenia. Jest to interwał w sekundach między dwiema ocenami równoważenia obciążenia. Wartość domyślna to 30 sekund.
- partition_ownership_expiration_interval
- float
Własność partycji wygaśnie po tej liczbie sekund. Każda ocena równoważenia obciążenia automatycznie wydłuży czas wygaśnięcia własności. Wartość domyślna to 6 * load_balancing_interval, tj. 180 sekund podczas korzystania z domyślnego load_balancing_interval 30 sekund.
- load_balancing_strategy
- str lub LoadBalancingStrategy
Podczas uruchamiania równoważenia obciążenia ta strategia będzie używana do oświadczeń i równoważenia własności partycji. Użyj "chciwości" lub LoadBalancingStrategy.GREEDY dla chciwej strategii, która dla każdej oceny równoważenia obciążenia będzie pobierać tyle nieodebranych partycji wymaganych do równoważenia obciążenia. Użyj wartości "balanced" lub LoadBalancingStrategy.BALANCED dla strategii zrównoważonej, która dla każdej oceny równoważenia obciążenia twierdzi, że tylko jedna partycja, która nie jest przejmowana przez inną wartość EventHubConsumerClient. Jeśli wszystkie partycje usługi EventHub są żądane przez inne klasy EventHubConsumerClient , a ten klient twierdził, że zbyt mało partycji, ten klient ukradnie jedną partycję z innych klientów na potrzeby oceny równoważenia obciążenia niezależnie od strategii równoważenia obciążenia. Chciwa strategia jest używana domyślnie.
Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiający kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie wyglądać następująco: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie będzie używany port 443.
Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to Brak, w którym przypadku zostanie użyty element certifi.where().
- uamqp_transport
- bool
Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.
- socket_timeout
- float
Czas w sekundach oczekiwania bazowego gniazda na połączeniu podczas wysyłania i odbierania danych przed przekroczeniem limitu czasu. Wartość domyślna to 0,2 dla transportType.Amqp i 1 dla transportType.AmqpOverWebsocket. Jeśli występują błędy EventHubsConnectionError spowodowane przekroczeniem limitu czasu zapisu, może być konieczne przekazanie większej niż wartość domyślna. Jest to przeznaczone dla zaawansowanych scenariuszy użycia i zwykle wartość domyślna powinna być wystarczająca.
Przykłady
Utwórz nowe wystąpienie klasy EventHubConsumerClient.
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
Metody
close |
Zatrzymaj pobieranie zdarzeń z centrum zdarzeń i zamknij bazowe połączenie i łącza protokołu AMQP. |
from_connection_string |
Utwórz element EventHubConsumerClient na podstawie parametry połączenia. |
get_eventhub_properties |
Pobierz właściwości centrum zdarzeń. Klucze w zwracanym słowniku obejmują:
|
get_partition_ids |
Pobierz identyfikatory partycji centrum zdarzeń. |
get_partition_properties |
Pobierz właściwości określonej partycji. Klucze w słowniku właściwości obejmują:
|
receive |
Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi. |
receive_batch |
Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi. |
close
Zatrzymaj pobieranie zdarzeń z centrum zdarzeń i zamknij bazowe połączenie i łącza protokołu AMQP.
close() -> None
Typ zwracany
Przykłady
Zamknij klienta.
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
Utwórz element EventHubConsumerClient na podstawie parametry połączenia.
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
Parametry
- eventhub_name
- str
Ścieżka określonego centrum zdarzeń w celu połączenia klienta z.
- logging_enable
- bool
Określa, czy dane wyjściowe dzienników śledzenia sieci do rejestratora. Wartość domyślna to False.
- auth_timeout
- float
Czas w sekundach oczekiwania na autoryzowanie tokenu przez usługę. Wartość domyślna to 60 sekund. W przypadku ustawienia wartości 0 nie zostanie wymuszony limit czasu od klienta.
- user_agent
- str
Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.
- retry_total
- int
Całkowita liczba prób ponownego wykonania operacji zakończonych niepowodzeniem w przypadku wystąpienia błędu. Wartość domyślna to 3. Kontekst retry_total odbierania jest specjalny: metoda odbierania jest implementowana przez pętlę czasową wywołującą wewnętrzną metodę odbierania w każdej iteracji. W przypadku odbieraniaretry_total określa liczbę ponownych prób po błędzie zgłoszonych przez wewnętrzną metodę odbierania w pętli while. Jeśli próby ponawiania zostaną wyczerpane, wywołanie zwrotne on_error zostanie wywołane (jeśli podano) z informacjami o błędzie. Klient partycji wewnętrznej, który zakończył się niepowodzeniem, zostanie zamknięty (on_partition_close zostanie wywołany, jeśli zostanie podany), a nowy odbiorca partycji wewnętrznej zostanie utworzony (on_partition_initialize zostanie wywołany, jeśli zostanie podany), aby wznowić odbieranie.
- retry_backoff_factor
- float
Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnień). W trybie stałym zasady ponawiania zawsze będą w stanie uśpienia dla składnika {backoff}. W trybie wykładniczym zasady ponawiania zostaną uśpine dla: {współczynnik wycofywania} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.
- retry_backoff_max
- float
Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).
- retry_mode
- str
Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".
- idle_timeout
- float
Limit czasu, w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie ma działania furthur. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.
- transport_type
- TransportType
Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.
- http_proxy
- dict
Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".
- checkpoint_store
- CheckpointStore lub None
Menedżer, który przechowuje dane modułu równoważenia obciążenia partycji i punktu kontrolnego podczas odbierania zdarzeń. Magazyn punktów kontrolnych będzie używany w obu przypadkach odbierania ze wszystkich partycji lub jednej partycji. W drugim przypadku równoważenie obciążenia nie ma zastosowania. Jeśli nie podano magazynu punktów kontrolnych, punkt kontrolny będzie utrzymywany wewnętrznie w pamięci, a wystąpienie EventHubConsumerClient będzie odbierać zdarzenia bez równoważenia obciążenia.
- load_balancing_interval
- float
Po rozpoczęciu równoważenia obciążenia. Jest to interwał w sekundach między dwoma ocenami równoważenia obciążenia. Wartość domyślna to 10 sekund.
- partition_ownership_expiration_interval
- float
Własność partycji wygaśnie po tej liczbie sekund. Każda ocena równoważenia obciążenia automatycznie wydłuży czas wygaśnięcia własności. Wartość domyślna to 6 * load_balancing_interval, czyli 60 sekund podczas korzystania z domyślnej load_balancing_interval 30 sekund.
- load_balancing_strategy
- str lub LoadBalancingStrategy
Po rozpoczęciu równoważenia obciążenia ta strategia będzie używana do oświadczeń i równoważenia własności partycji. Użyj "chciwości" lub LoadBalancingStrategy.GREEDY dla strategii chciwości, która dla każdej oceny równoważenia obciążenia będzie pobierać tyle nie odzyskanych partycji wymaganych do równoważenia obciążenia. Użyj wartości "zrównoważony" lub LoadBalancingStrategy.BALANCED dla strategii zrównoważonej, która w przypadku każdej oceny równoważenia obciążenia twierdzi tylko jedną partycję, która nie jest zgłaszana przez inną usługę EventHubConsumerClient. Jeśli wszystkie partycje usługi EventHub są obsługiwane przez inne klasy EventHubConsumerClient , a ten klient twierdził zbyt mało partycji, ten klient ukradnie jedną partycję z innych klientów na potrzeby każdej oceny równoważenia obciążenia niezależnie od strategii równoważenia obciążenia. Strategia chciwości jest domyślnie używana.
Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.
Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.
- uamqp_transport
- bool
Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.
Typ zwracany
Przykłady
Utwórz nowe wystąpienie klasy EventHubConsumerClient z parametry połączenia.
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Pobierz właściwości centrum zdarzeń.
Klucze w zwracanym słowniku obejmują:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (lista[str])
get_eventhub_properties() -> Dict[str, Any]
Zwraca
Słownik zawierający informacje o centrum zdarzeń.
Typ zwracany
Wyjątki
get_partition_ids
Pobierz identyfikatory partycji centrum zdarzeń.
get_partition_ids() -> List[str]
Zwraca
Lista identyfikatorów partycji.
Typ zwracany
Wyjątki
get_partition_properties
Pobierz właściwości określonej partycji.
Klucze w słowniku właściwości obejmują:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametry
Zwraca
Słownik zawierający właściwości partycji.
Typ zwracany
Wyjątki
receive
Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
Parametry
- on_event
- callable[PartitionContext, EventData lub None]
Funkcja wywołania zwrotnego do obsługi odebranego zdarzenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera kontekst partycji i zdarzenie , które jest odebrane zdarzenie. Funkcja wywołania zwrotnego powinna być zdefiniowana na przykład: on_event(partition_context, event). Aby uzyskać szczegółowe informacje o kontekście partycji, zobacz PartitionContext.
- max_wait_time
- float
Maksymalny interwał w sekundach oczekiwania procesora zdarzeń przed wywołaniem wywołania zwrotnego. Jeśli w tym interwale nie zostaną odebrane żadne zdarzenia, wywołanie zwrotne on_event zostanie wywołane z brakiem. Jeśli ta wartość jest ustawiona na Wartość Brak lub 0 (wartość domyślna), wywołanie zwrotne nie będzie wywoływane do momentu odebrania zdarzenia.
- partition_id
- str
Jeśli zostanie określony, klient otrzyma tylko z tej partycji. W przeciwnym razie klient otrzyma od wszystkich partycji.
- owner_level
- int
Priorytet dla wyłącznego konsumenta. W przypadku ustawienia owner_level zostanie utworzony wyłączny konsument. Konsument o wyższym owner_level ma wyższy wyłączny priorytet. Poziom właściciela jest również znany jako "wartość epoki" konsumenta.
- prefetch
- int
Liczba zdarzeń do wstępnego pobrania z usługi na potrzeby przetwarzania. Wartość domyślna to 300.
- track_last_enqueued_event_properties
- bool
Wskazuje, czy odbiorca powinien zażądać informacji o ostatnim zdarzeniu w kolejce na skojarzonej partycji i śledzić te informacje w miarę odbierania zdarzeń. Gdy są śledzone informacje o zdarzeniach w kolejce partycji, każde zdarzenie odebrane z usługi Event Hubs będzie zawierać metadane dotyczące partycji. Powoduje to niewielkie użycie dodatkowej przepustowości sieci, które jest zazwyczaj korzystnym kompromisem w przypadku okresowego podejmowania żądań dotyczących właściwości partycji przy użyciu klienta centrum zdarzeń. Jest ona domyślnie ustawiona na wartość Fałsz .
Rozpocznij odbieranie z tej pozycji zdarzenia, jeśli nie ma danych punktu kontrolnego dla partycji. Dane punktu kontrolnego będą używane, jeśli są dostępne. Może to być dykt z identyfikatorem partycji jako kluczem i pozycją jako wartością poszczególnych partycji lub pojedynczą wartością dla wszystkich partycji. Typ wartości może być str, int lub datetime.datetime. Obsługiwane są również wartości "-1" do odbierania od początku strumienia i "@latest" do odbierania tylko nowych zdarzeń. Wartość domyślna to "@latest".
Ustal, czy dana starting_position jest inkluzywna(>=) czy nie (>). Prawda dla inkluzywnego i fałszu dla wyłącznych. Może to być dykt z identyfikatorem partycji jako kluczem i wartością logiczną wskazującą, czy starting_position dla określonej partycji jest włącznie, czy nie. Może to być również pojedyncza wartość logiczna dla wszystkich starting_position. Wartość domyślna to False.
- on_error
- callable[[PartitionContext, Exception]]
Funkcja wywołania zwrotnego, która zostanie wywołana w przypadku wystąpienia błędu podczas odbierania po ponowieniu prób, zostanie wyczerpana lub podczas procesu równoważenia obciążenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i błąd jest wyjątkiem. partition_context może być brakiem, jeśli błąd jest zgłaszany podczas procesu równoważenia obciążenia. Wywołanie zwrotne powinno być zdefiniowane tak, jak: on_error(partition_context, błąd). Wywołanie zwrotne on_error będzie również wywoływane, jeśli podczas wywołania zwrotnego on_event zostanie zgłoszony nieobsługiwany wyjątek.
- on_partition_initialize
- callable[[PartitionContext]]
Funkcja wywołania zwrotnego, która będzie wywoływana po odbiorcy dla określonej partycji, kończy inicjowanie. Zostanie również wywołana, gdy zostanie utworzony nowy wewnętrzny odbiorca partycji, aby przejąć proces odbierania dla użytkownika partycji wewnętrznej i zakończonego niepowodzeniem. Wywołanie zwrotne przyjmuje jeden parametr: partition_context który zawiera informacje o partycji. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Funkcja wywołania zwrotnego, która zostanie wywołana po zamknięciu odbiorcy dla określonej partycji. Zostanie również wywołana w przypadku wystąpienia błędu podczas odbierania po wyczerpaniu prób ponawiania prób. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i przyczynę zamknięcia. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_close(partition_context, przyczyna). Zapoznaj się z CloseReason różnymi przyczynami zamknięcia.
Typ zwracany
Przykłady
Odbieranie zdarzeń z usługi EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
Parametry
- on_event_batch
- callable[PartitionContext, list[EventData]]
Funkcja wywołania zwrotnego do obsługi partii odebranych zdarzeń. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera kontekst partycji i event_batch, czyli odebrane zdarzenia. Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_event_batch(partition_context, event_batch). event_batch może być pustą listą, jeśli max_wait_time nie ma wartości None ani 0 i żadne zdarzenie nie zostanie odebrane po max_wait_time. Aby uzyskać szczegółowe informacje o kontekście partycji, zobacz PartitionContext.
- max_batch_size
- int
Maksymalna liczba zdarzeń w partii przekazanych do wywołania zwrotnego on_event_batch. Jeśli rzeczywista liczba odebranych zdarzeń jest większa niż max_batch_size, odebrane zdarzenia są podzielone na partie i wywoływanie wywołania zwrotnego dla każdej partii z maksymalnie max_batch_size zdarzeniami.
- max_wait_time
- float
Maksymalny interwał w sekundach oczekiwania procesora zdarzeń przed wywołaniem wywołania zwrotnego. Jeśli w tym interwale nie zostaną odebrane żadne zdarzenia, wywołanie zwrotne on_event_batch zostanie wywołane z pustą listą.
- partition_id
- str
Jeśli zostanie określony, klient otrzyma tylko z tej partycji. W przeciwnym razie klient otrzyma od wszystkich partycji.
- owner_level
- int
Priorytet dla wyłącznego konsumenta. W przypadku ustawienia owner_level zostanie utworzony wyłączny konsument. Konsument o wyższym owner_level ma wyższy wyłączny priorytet. Poziom właściciela jest również znany jako "wartość epoki" konsumenta.
- prefetch
- int
Liczba zdarzeń do wstępnego pobrania z usługi na potrzeby przetwarzania. Wartość domyślna to 300.
- track_last_enqueued_event_properties
- bool
Wskazuje, czy odbiorca powinien zażądać informacji o ostatnim zdarzeniu w kolejce na skojarzonej partycji i śledzić te informacje w miarę odbierania zdarzeń. Gdy są śledzone informacje o zdarzeniach w kolejce partycji, każde zdarzenie odebrane z usługi Event Hubs będzie zawierać metadane dotyczące partycji. Powoduje to niewielkie użycie dodatkowej przepustowości sieci, które jest zazwyczaj korzystnym kompromisem w przypadku okresowego podejmowania żądań dotyczących właściwości partycji przy użyciu klienta centrum zdarzeń. Jest ona domyślnie ustawiona na wartość Fałsz .
Rozpocznij odbieranie z tej pozycji zdarzenia, jeśli nie ma danych punktu kontrolnego dla partycji. Dane punktu kontrolnego będą używane, jeśli są dostępne. Może to być dykt z identyfikatorem partycji jako kluczem i pozycją jako wartością poszczególnych partycji lub pojedynczą wartością dla wszystkich partycji. Typ wartości może być str, int lub datetime.datetime. Obsługiwane są również wartości "-1" do odbierania od początku strumienia i "@latest" do odbierania tylko nowych zdarzeń. Wartość domyślna to "@latest".
Ustal, czy dana starting_position jest inkluzywna(>=) czy nie (>). Prawda dla inkluzywnego i fałszu dla wyłącznych. Może to być dykt z identyfikatorem partycji jako kluczem i wartością logiczną wskazującą, czy starting_position dla określonej partycji jest włącznie, czy nie. Może to być również pojedyncza wartość logiczna dla wszystkich starting_position. Wartość domyślna to False.
- on_error
- callable[[PartitionContext, Exception]]
Funkcja wywołania zwrotnego, która zostanie wywołana w przypadku wystąpienia błędu podczas odbierania po ponowieniu prób, zostanie wyczerpana lub podczas procesu równoważenia obciążenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i błąd jest wyjątkiem. partition_context może być brakiem, jeśli błąd jest zgłaszany podczas procesu równoważenia obciążenia. Wywołanie zwrotne powinno być zdefiniowane tak, jak: on_error(partition_context, błąd). Wywołanie zwrotne on_error będzie również wywoływane, jeśli podczas wywołania zwrotnego on_event zostanie zgłoszony nieobsługiwany wyjątek.
- on_partition_initialize
- callable[[PartitionContext]]
Funkcja wywołania zwrotnego, która będzie wywoływana po odbiorcy dla określonej partycji, kończy inicjowanie. Zostanie również wywołana, gdy zostanie utworzony nowy wewnętrzny odbiorca partycji, aby przejąć proces odbierania dla użytkownika partycji wewnętrznej i zakończonego niepowodzeniem. Wywołanie zwrotne przyjmuje jeden parametr: partition_context który zawiera informacje o partycji. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Funkcja wywołania zwrotnego, która zostanie wywołana po zamknięciu odbiorcy dla określonej partycji. Zostanie również wywołana w przypadku wystąpienia błędu podczas odbierania po wyczerpaniu prób ponawiania prób. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i przyczynę zamknięcia. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_close(partition_context, przyczyna). Zapoznaj się z CloseReason różnymi przyczynami zamknięcia.
Typ zwracany
Przykłady
Odbieranie zdarzeń w partiach z usługi EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Azure SDK for Python