EventHubConsumerClient Klasa

Klasa EventHubConsumerClient definiuje interfejs wysokiego poziomu do odbierania zdarzeń z usługi Azure Event Hubs.

Głównym celem klasy EventHubConsumerClient jest odbieranie zdarzeń ze wszystkich partycji usługi EventHub z równoważeniem obciążenia i punktami kontrolnymi.

Gdy wiele wystąpień EventHubConsumerClient działa w tym samym centrum zdarzeń, grupa odbiorców i lokalizacja punktów kontrolnych, partycje będą równomiernie dystrybuowane między nimi.

Aby włączyć równoważenie obciążenia i utrwalone punkty kontrolne, checkpoint_store należy ustawić podczas tworzenia klasy EventHubConsumerClient. Jeśli nie podano magazynu punktów kontrolnych, punkt kontrolny będzie utrzymywany wewnętrznie w pamięci.

Obiekt EventHubConsumerClient może również odbierać dane z określonej partycji podczas wywoływania metody receive() lub receive_batch() i określić partition_id. Równoważenie obciążenia nie będzie działać w trybie pojedynczej partycji. Jednak użytkownicy nadal mogą zapisywać punkty kontrolne, jeśli ustawiono checkpoint_store.

Dziedziczenie
azure.eventhub._client_base.ClientBase
EventHubConsumerClient

Konstruktor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parametry

fully_qualified_namespace
str
Wymagane

W pełni kwalifikowana nazwa hosta dla przestrzeni nazw usługi Event Hubs. Format przestrzeni nazw to: .servicebus.windows.net.

eventhub_name
str
Wymagane

Ścieżka określonego centrum zdarzeń w celu połączenia klienta z.

consumer_group
str
Wymagane

Odbieranie zdarzeń z centrum zdarzeń dla tej grupy odbiorców.

credential
TokenCredential lub AzureSasCredential lub AzureNamedKeyCredential
Wymagane

Obiekt poświadczeń używany do uwierzytelniania, który implementuje określony interfejs do pobierania tokenów. Akceptuje EventHubSharedKeyCredentialobiekty , lub poświadczenia generowane przez bibliotekę azure-identity i obiekty, które implementują metodę *get_token(self, scopes).

logging_enable
bool

Określa, czy dane wyjściowe dzienników śledzenia sieci do rejestratora. Wartość domyślna to False.

auth_timeout
float

Czas w sekundach oczekiwania na autoryzowanie tokenu przez usługę. Wartość domyślna to 60 sekund. W przypadku ustawienia wartości 0 nie zostanie wymuszony limit czasu od klienta.

user_agent
str

Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.

retry_total
int

Całkowita liczba prób ponownego wykonania operacji zakończonych niepowodzeniem w przypadku wystąpienia błędu. Wartość domyślna to 3. Kontekst retry_total odbierania jest specjalny: metoda odbierania jest implementowana przez pętlę czasową wywołującą wewnętrzną metodę odbierania w każdej iteracji. W przypadku odbieraniaretry_total określa liczbę ponownych prób po błędzie zgłoszonych przez wewnętrzną metodę odbierania w pętli while. Jeśli próby ponawiania zostaną wyczerpane, wywołanie zwrotne on_error zostanie wywołane (jeśli podano) z informacjami o błędzie. Klient partycji wewnętrznej, który zakończył się niepowodzeniem, zostanie zamknięty (on_partition_close zostanie wywołany, jeśli zostanie podany), a nowy odbiorca partycji wewnętrznej zostanie utworzony (on_partition_initialize zostanie wywołany, jeśli zostanie podany), aby wznowić odbieranie.

retry_backoff_factor
float

Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnień). W trybie stałym zasady ponawiania zawsze będą w stanie uśpienia dla składnika {backoff}. W trybie wykładniczym zasady ponawiania zostaną uśpine dla: {współczynnik wycofywania} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie zostanie spane dla wartości [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.

retry_backoff_max
float

Maksymalny czas wycofywania. Wartość domyślna to 120 sekund (2 minuty).

retry_mode
str

Zachowanie opóźnienia między ponownymi próbami. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".

idle_timeout
float

Limit czasu (w sekundach), po którym klient zamknie połączenie bazowe, jeśli nie będzie żadnych dalszych działań. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.

transport_type
TransportType

Typ protokołu transportowego, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.

http_proxy
dict[str, str lub int]

Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość ciągu) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".

checkpoint_store
CheckpointStore lub None

Menedżer, który przechowuje dane punktu kontrolnego i równoważenia obciążenia partycji podczas odbierania zdarzeń. Magazyn punktów kontrolnych będzie używany w obu przypadkach odbierania ze wszystkich partycji lub jednej partycji. W drugim przypadku równoważenie obciążenia nie ma zastosowania. Jeśli magazyn punktów kontrolnych nie zostanie podany, punkt kontrolny zostanie zachowany wewnętrznie w pamięci, a wystąpienie EventHubConsumerClient będzie odbierać zdarzenia bez równoważenia obciążenia.

load_balancing_interval
float

Po rozpoczęciu równoważenia obciążenia. Jest to interwał w sekundach między dwiema ocenami równoważenia obciążenia. Wartość domyślna to 30 sekund.

partition_ownership_expiration_interval
float

Własność partycji wygaśnie po tej liczbie sekund. Każda ocena równoważenia obciążenia automatycznie wydłuży czas wygaśnięcia własności. Wartość domyślna to 6 * load_balancing_interval, tj. 180 sekund podczas korzystania z domyślnego load_balancing_interval 30 sekund.

load_balancing_strategy
str lub LoadBalancingStrategy

Podczas uruchamiania równoważenia obciążenia ta strategia będzie używana do oświadczeń i równoważenia własności partycji. Użyj "chciwości" lub LoadBalancingStrategy.GREEDY dla chciwej strategii, która dla każdej oceny równoważenia obciążenia będzie pobierać tyle nieodebranych partycji wymaganych do równoważenia obciążenia. Użyj wartości "balanced" lub LoadBalancingStrategy.BALANCED dla strategii zrównoważonej, która dla każdej oceny równoważenia obciążenia twierdzi, że tylko jedna partycja, która nie jest przejmowana przez inną wartość EventHubConsumerClient. Jeśli wszystkie partycje usługi EventHub są żądane przez inne klasy EventHubConsumerClient , a ten klient twierdził, że zbyt mało partycji, ten klient ukradnie jedną partycję z innych klientów na potrzeby oceny równoważenia obciążenia niezależnie od strategii równoważenia obciążenia. Chciwa strategia jest używana domyślnie.

custom_endpoint_address
str lub None

Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiający kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie wyglądać następująco: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie będzie używany port 443.

connection_verify
str lub None

Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to Brak, w którym przypadku zostanie użyty element certifi.where().

uamqp_transport
bool

Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.

socket_timeout
float

Czas w sekundach oczekiwania bazowego gniazda na połączeniu podczas wysyłania i odbierania danych przed przekroczeniem limitu czasu. Wartość domyślna to 0,2 dla transportType.Amqp i 1 dla transportType.AmqpOverWebsocket. Jeśli występują błędy EventHubsConnectionError spowodowane przekroczeniem limitu czasu zapisu, może być konieczne przekazanie większej niż wartość domyślna. Jest to przeznaczone dla zaawansowanych scenariuszy użycia i zwykle wartość domyślna powinna być wystarczająca.

Przykłady

Utwórz nowe wystąpienie klasy EventHubConsumerClient.


   import os
   from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,
       consumer_group='$Default',
       credential=credential)

Metody

close

Zatrzymaj pobieranie zdarzeń z centrum zdarzeń i zamknij bazowe połączenie i łącza protokołu AMQP.

from_connection_string

Utwórz element EventHubConsumerClient na podstawie parametry połączenia.

get_eventhub_properties

Pobierz właściwości centrum zdarzeń.

Klucze w zwracanym słowniku obejmują:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (lista[str])

get_partition_ids

Pobierz identyfikatory partycji centrum zdarzeń.

get_partition_properties

Pobierz właściwości określonej partycji.

Klucze w słowniku właściwości obejmują:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.

receive_batch

Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.

close

Zatrzymaj pobieranie zdarzeń z centrum zdarzeń i zamknij bazowe połączenie i łącza protokołu AMQP.

close() -> None

Typ zwracany

Przykłady

Zamknij klienta.


   import os
   import threading

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group="$Default",
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, multi-thread will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The 'receive' method is a blocking call, it can be executed in a thread for
   # non-blocking behavior, and combined with the 'close' method.

   worker = threading.Thread(
       target=consumer.receive,
       kwargs={
           "on_event": on_event,
           "starting_position": "-1",  # "-1" is from the beginning of the partition.
       }
   )
   worker.start()
   time.sleep(10)  # Keep receiving for 10s then close.
   # Close down the consumer handler explicitly.
   consumer.close()

from_connection_string

Utwórz element EventHubConsumerClient na podstawie parametry połączenia.

from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient

Parametry

conn_str
str
Wymagane

Parametry połączenia centrum zdarzeń.

consumer_group
str
Wymagane

Odbieranie zdarzeń z centrum zdarzeń dla tej grupy odbiorców.

eventhub_name
str

Ścieżka określonego centrum zdarzeń w celu połączenia klienta z.

logging_enable
bool

Określa, czy dane wyjściowe dzienników śledzenia sieci do rejestratora. Wartość domyślna to False.

auth_timeout
float

Czas w sekundach oczekiwania na autoryzowanie tokenu przez usługę. Wartość domyślna to 60 sekund. W przypadku ustawienia wartości 0 nie zostanie wymuszony limit czasu od klienta.

user_agent
str

Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.

retry_total
int

Całkowita liczba prób ponownego wykonania operacji zakończonych niepowodzeniem w przypadku wystąpienia błędu. Wartość domyślna to 3. Kontekst retry_total odbierania jest specjalny: metoda odbierania jest implementowana przez pętlę czasową wywołującą wewnętrzną metodę odbierania w każdej iteracji. W przypadku odbieraniaretry_total określa liczbę ponownych prób po błędzie zgłoszonych przez wewnętrzną metodę odbierania w pętli while. Jeśli próby ponawiania zostaną wyczerpane, wywołanie zwrotne on_error zostanie wywołane (jeśli podano) z informacjami o błędzie. Klient partycji wewnętrznej, który zakończył się niepowodzeniem, zostanie zamknięty (on_partition_close zostanie wywołany, jeśli zostanie podany), a nowy odbiorca partycji wewnętrznej zostanie utworzony (on_partition_initialize zostanie wywołany, jeśli zostanie podany), aby wznowić odbieranie.

retry_backoff_factor
float

Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnień). W trybie stałym zasady ponawiania zawsze będą w stanie uśpienia dla składnika {backoff}. W trybie wykładniczym zasady ponawiania zostaną uśpine dla: {współczynnik wycofywania} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.

retry_backoff_max
float

Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).

retry_mode
str

Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".

idle_timeout
float

Limit czasu, w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie ma działania furthur. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.

transport_type
TransportType

Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.

http_proxy
dict

Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".

checkpoint_store
CheckpointStore lub None

Menedżer, który przechowuje dane modułu równoważenia obciążenia partycji i punktu kontrolnego podczas odbierania zdarzeń. Magazyn punktów kontrolnych będzie używany w obu przypadkach odbierania ze wszystkich partycji lub jednej partycji. W drugim przypadku równoważenie obciążenia nie ma zastosowania. Jeśli nie podano magazynu punktów kontrolnych, punkt kontrolny będzie utrzymywany wewnętrznie w pamięci, a wystąpienie EventHubConsumerClient będzie odbierać zdarzenia bez równoważenia obciążenia.

load_balancing_interval
float

Po rozpoczęciu równoważenia obciążenia. Jest to interwał w sekundach między dwoma ocenami równoważenia obciążenia. Wartość domyślna to 10 sekund.

partition_ownership_expiration_interval
float

Własność partycji wygaśnie po tej liczbie sekund. Każda ocena równoważenia obciążenia automatycznie wydłuży czas wygaśnięcia własności. Wartość domyślna to 6 * load_balancing_interval, czyli 60 sekund podczas korzystania z domyślnej load_balancing_interval 30 sekund.

load_balancing_strategy
str lub LoadBalancingStrategy

Po rozpoczęciu równoważenia obciążenia ta strategia będzie używana do oświadczeń i równoważenia własności partycji. Użyj "chciwości" lub LoadBalancingStrategy.GREEDY dla strategii chciwości, która dla każdej oceny równoważenia obciążenia będzie pobierać tyle nie odzyskanych partycji wymaganych do równoważenia obciążenia. Użyj wartości "zrównoważony" lub LoadBalancingStrategy.BALANCED dla strategii zrównoważonej, która w przypadku każdej oceny równoważenia obciążenia twierdzi tylko jedną partycję, która nie jest zgłaszana przez inną usługę EventHubConsumerClient. Jeśli wszystkie partycje usługi EventHub są obsługiwane przez inne klasy EventHubConsumerClient , a ten klient twierdził zbyt mało partycji, ten klient ukradnie jedną partycję z innych klientów na potrzeby każdej oceny równoważenia obciążenia niezależnie od strategii równoważenia obciążenia. Strategia chciwości jest domyślnie używana.

custom_endpoint_address
str lub None

Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.

connection_verify
str lub None

Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.

uamqp_transport
bool

Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.

Typ zwracany

Przykłady

Utwórz nowe wystąpienie klasy EventHubConsumerClient z parametry połączenia.


   import os
   from azure.eventhub import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Pobierz właściwości centrum zdarzeń.

Klucze w zwracanym słowniku obejmują:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (lista[str])

get_eventhub_properties() -> Dict[str, Any]

Zwraca

Słownik zawierający informacje o centrum zdarzeń.

Typ zwracany

Wyjątki

get_partition_ids

Pobierz identyfikatory partycji centrum zdarzeń.

get_partition_ids() -> List[str]

Zwraca

Lista identyfikatorów partycji.

Typ zwracany

Wyjątki

get_partition_properties

Pobierz właściwości określonej partycji.

Klucze w słowniku właściwości obejmują:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametry

partition_id
str
Wymagane

Identyfikator partycji docelowej.

Zwraca

Słownik zawierający właściwości partycji.

Typ zwracany

Wyjątki

receive

Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.

receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None

Parametry

on_event
callable[PartitionContext, EventData lub None]
Wymagane

Funkcja wywołania zwrotnego do obsługi odebranego zdarzenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera kontekst partycji i zdarzenie , które jest odebrane zdarzenie. Funkcja wywołania zwrotnego powinna być zdefiniowana na przykład: on_event(partition_context, event). Aby uzyskać szczegółowe informacje o kontekście partycji, zobacz PartitionContext.

max_wait_time
float

Maksymalny interwał w sekundach oczekiwania procesora zdarzeń przed wywołaniem wywołania zwrotnego. Jeśli w tym interwale nie zostaną odebrane żadne zdarzenia, wywołanie zwrotne on_event zostanie wywołane z brakiem. Jeśli ta wartość jest ustawiona na Wartość Brak lub 0 (wartość domyślna), wywołanie zwrotne nie będzie wywoływane do momentu odebrania zdarzenia.

partition_id
str

Jeśli zostanie określony, klient otrzyma tylko z tej partycji. W przeciwnym razie klient otrzyma od wszystkich partycji.

owner_level
int

Priorytet dla wyłącznego konsumenta. W przypadku ustawienia owner_level zostanie utworzony wyłączny konsument. Konsument o wyższym owner_level ma wyższy wyłączny priorytet. Poziom właściciela jest również znany jako "wartość epoki" konsumenta.

prefetch
int

Liczba zdarzeń do wstępnego pobrania z usługi na potrzeby przetwarzania. Wartość domyślna to 300.

track_last_enqueued_event_properties
bool

Wskazuje, czy odbiorca powinien zażądać informacji o ostatnim zdarzeniu w kolejce na skojarzonej partycji i śledzić te informacje w miarę odbierania zdarzeń. Gdy są śledzone informacje o zdarzeniach w kolejce partycji, każde zdarzenie odebrane z usługi Event Hubs będzie zawierać metadane dotyczące partycji. Powoduje to niewielkie użycie dodatkowej przepustowości sieci, które jest zazwyczaj korzystnym kompromisem w przypadku okresowego podejmowania żądań dotyczących właściwości partycji przy użyciu klienta centrum zdarzeń. Jest ona domyślnie ustawiona na wartość Fałsz .

starting_position
str, int, datetime lub dict[str,any]

Rozpocznij odbieranie z tej pozycji zdarzenia, jeśli nie ma danych punktu kontrolnego dla partycji. Dane punktu kontrolnego będą używane, jeśli są dostępne. Może to być dykt z identyfikatorem partycji jako kluczem i pozycją jako wartością poszczególnych partycji lub pojedynczą wartością dla wszystkich partycji. Typ wartości może być str, int lub datetime.datetime. Obsługiwane są również wartości "-1" do odbierania od początku strumienia i "@latest" do odbierania tylko nowych zdarzeń. Wartość domyślna to "@latest".

starting_position_inclusive
bool lub dict[str,bool]

Ustal, czy dana starting_position jest inkluzywna(>=) czy nie (>). Prawda dla inkluzywnego i fałszu dla wyłącznych. Może to być dykt z identyfikatorem partycji jako kluczem i wartością logiczną wskazującą, czy starting_position dla określonej partycji jest włącznie, czy nie. Może to być również pojedyncza wartość logiczna dla wszystkich starting_position. Wartość domyślna to False.

on_error
callable[[PartitionContext, Exception]]

Funkcja wywołania zwrotnego, która zostanie wywołana w przypadku wystąpienia błędu podczas odbierania po ponowieniu prób, zostanie wyczerpana lub podczas procesu równoważenia obciążenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i błąd jest wyjątkiem. partition_context może być brakiem, jeśli błąd jest zgłaszany podczas procesu równoważenia obciążenia. Wywołanie zwrotne powinno być zdefiniowane tak, jak: on_error(partition_context, błąd). Wywołanie zwrotne on_error będzie również wywoływane, jeśli podczas wywołania zwrotnego on_event zostanie zgłoszony nieobsługiwany wyjątek.

on_partition_initialize
callable[[PartitionContext]]

Funkcja wywołania zwrotnego, która będzie wywoływana po odbiorcy dla określonej partycji, kończy inicjowanie. Zostanie również wywołana, gdy zostanie utworzony nowy wewnętrzny odbiorca partycji, aby przejąć proces odbierania dla użytkownika partycji wewnętrznej i zakończonego niepowodzeniem. Wywołanie zwrotne przyjmuje jeden parametr: partition_context który zawiera informacje o partycji. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

Funkcja wywołania zwrotnego, która zostanie wywołana po zamknięciu odbiorcy dla określonej partycji. Zostanie również wywołana w przypadku wystąpienia błędu podczas odbierania po wyczerpaniu prób ponawiania prób. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i przyczynę zamknięcia. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_close(partition_context, przyczyna). Zapoznaj się z CloseReason różnymi przyczynami zamknięcia.

Typ zwracany

Przykłady

Odbieranie zdarzeń z usługi EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive(on_event=on_event)

receive_batch

Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.

receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None

Parametry

on_event_batch
callable[PartitionContext, list[EventData]]
Wymagane

Funkcja wywołania zwrotnego do obsługi partii odebranych zdarzeń. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera kontekst partycji i event_batch, czyli odebrane zdarzenia. Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_event_batch(partition_context, event_batch). event_batch może być pustą listą, jeśli max_wait_time nie ma wartości None ani 0 i żadne zdarzenie nie zostanie odebrane po max_wait_time. Aby uzyskać szczegółowe informacje o kontekście partycji, zobacz PartitionContext.

max_batch_size
int

Maksymalna liczba zdarzeń w partii przekazanych do wywołania zwrotnego on_event_batch. Jeśli rzeczywista liczba odebranych zdarzeń jest większa niż max_batch_size, odebrane zdarzenia są podzielone na partie i wywoływanie wywołania zwrotnego dla każdej partii z maksymalnie max_batch_size zdarzeniami.

max_wait_time
float

Maksymalny interwał w sekundach oczekiwania procesora zdarzeń przed wywołaniem wywołania zwrotnego. Jeśli w tym interwale nie zostaną odebrane żadne zdarzenia, wywołanie zwrotne on_event_batch zostanie wywołane z pustą listą.

partition_id
str

Jeśli zostanie określony, klient otrzyma tylko z tej partycji. W przeciwnym razie klient otrzyma od wszystkich partycji.

owner_level
int

Priorytet dla wyłącznego konsumenta. W przypadku ustawienia owner_level zostanie utworzony wyłączny konsument. Konsument o wyższym owner_level ma wyższy wyłączny priorytet. Poziom właściciela jest również znany jako "wartość epoki" konsumenta.

prefetch
int

Liczba zdarzeń do wstępnego pobrania z usługi na potrzeby przetwarzania. Wartość domyślna to 300.

track_last_enqueued_event_properties
bool

Wskazuje, czy odbiorca powinien zażądać informacji o ostatnim zdarzeniu w kolejce na skojarzonej partycji i śledzić te informacje w miarę odbierania zdarzeń. Gdy są śledzone informacje o zdarzeniach w kolejce partycji, każde zdarzenie odebrane z usługi Event Hubs będzie zawierać metadane dotyczące partycji. Powoduje to niewielkie użycie dodatkowej przepustowości sieci, które jest zazwyczaj korzystnym kompromisem w przypadku okresowego podejmowania żądań dotyczących właściwości partycji przy użyciu klienta centrum zdarzeń. Jest ona domyślnie ustawiona na wartość Fałsz .

starting_position
str, int, datetime lub dict[str,any]

Rozpocznij odbieranie z tej pozycji zdarzenia, jeśli nie ma danych punktu kontrolnego dla partycji. Dane punktu kontrolnego będą używane, jeśli są dostępne. Może to być dykt z identyfikatorem partycji jako kluczem i pozycją jako wartością poszczególnych partycji lub pojedynczą wartością dla wszystkich partycji. Typ wartości może być str, int lub datetime.datetime. Obsługiwane są również wartości "-1" do odbierania od początku strumienia i "@latest" do odbierania tylko nowych zdarzeń. Wartość domyślna to "@latest".

starting_position_inclusive
bool lub dict[str,bool]

Ustal, czy dana starting_position jest inkluzywna(>=) czy nie (>). Prawda dla inkluzywnego i fałszu dla wyłącznych. Może to być dykt z identyfikatorem partycji jako kluczem i wartością logiczną wskazującą, czy starting_position dla określonej partycji jest włącznie, czy nie. Może to być również pojedyncza wartość logiczna dla wszystkich starting_position. Wartość domyślna to False.

on_error
callable[[PartitionContext, Exception]]

Funkcja wywołania zwrotnego, która zostanie wywołana w przypadku wystąpienia błędu podczas odbierania po ponowieniu prób, zostanie wyczerpana lub podczas procesu równoważenia obciążenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i błąd jest wyjątkiem. partition_context może być brakiem, jeśli błąd jest zgłaszany podczas procesu równoważenia obciążenia. Wywołanie zwrotne powinno być zdefiniowane tak, jak: on_error(partition_context, błąd). Wywołanie zwrotne on_error będzie również wywoływane, jeśli podczas wywołania zwrotnego on_event zostanie zgłoszony nieobsługiwany wyjątek.

on_partition_initialize
callable[[PartitionContext]]

Funkcja wywołania zwrotnego, która będzie wywoływana po odbiorcy dla określonej partycji, kończy inicjowanie. Zostanie również wywołana, gdy zostanie utworzony nowy wewnętrzny odbiorca partycji, aby przejąć proces odbierania dla użytkownika partycji wewnętrznej i zakończonego niepowodzeniem. Wywołanie zwrotne przyjmuje jeden parametr: partition_context który zawiera informacje o partycji. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

Funkcja wywołania zwrotnego, która zostanie wywołana po zamknięciu odbiorcy dla określonej partycji. Zostanie również wywołana w przypadku wystąpienia błędu podczas odbierania po wyczerpaniu prób ponawiania prób. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i przyczynę zamknięcia. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_close(partition_context, przyczyna). Zapoznaj się z CloseReason różnymi przyczynami zamknięcia.

Typ zwracany

Przykłady

Odbieranie zdarzeń w partiach z usługi EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received events from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive_batch(on_event_batch=on_event_batch)