Sdílet prostřednictvím


EventHubConsumerClient Třída

Třída EventHubConsumerClient definuje rozhraní vysoké úrovně pro příjem událostí ze služby Azure Event Hubs.

Hlavním cílem EventHubConsumerClient je přijímat události ze všech oddílů EventHubu s vyrovnáváním zatížení a kontrolními body.

Pokud je ve stejném centru událostí, skupině příjemců a umístění kontrolních bodů spuštěno více instancí EventHubConsumerClient , rozdělí se mezi ně oddíly rovnoměrně.

Pokud chcete povolit vyrovnávání zatížení a trvalé kontrolní body, musí být při vytváření EventHubConsumerClient nastavena checkpoint_store. Pokud úložiště kontrolních bodů není k dispozici, bude kontrolní bod interně udržován v paměti.

EventHubConsumerClient může také přijímat z konkrétního oddílu, když zavoláte jeho metodu receive() nebo receive_batch() a zadáte partition_id. Vyrovnávání zatížení nebude fungovat v režimu s jedním oddílem. Pokud je ale checkpoint_store nastavená, můžou uživatelé kontrolní body pořád ukládat.

Dědičnost
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Konstruktor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parametry

fully_qualified_namespace
str
Vyžadováno

Plně kvalifikovaný název hostitele pro obor názvů služby Event Hubs. Formát oboru názvů: .servicebus.windows.net.

eventhub_name
str
Vyžadováno

Cesta ke konkrétnímu centru událostí, ke kterému se má klient připojit.

consumer_group
str
Vyžadováno

Příjem událostí z centra událostí pro tuto skupinu příjemců

credential
AsyncTokenCredential nebo AzureSasCredential nebo AzureNamedKeyCredential
Vyžadováno

Objekt přihlašovacích údajů používaný k ověřování, který implementuje konkrétní rozhraní pro získávání tokenů. Přijímá EventHubSharedKeyCredentialobjekty přihlašovacích údajů nebo vygenerované knihovnou azure-identity a objekty, které implementují metodu *get_token(self, scopes).

logging_enable
bool

Určuje, zda se mají protokoly trasování sítě vypisovat do protokolovacího nástroje. Výchozí hodnota je False.

auth_timeout
float

Doba v sekundách čekání na autorizaci tokenu službou. Výchozí hodnota je 60 sekund. Pokud je nastavená hodnota 0, nebude se z klienta vynucovat žádný časový limit.

user_agent
str

Pokud je zadaný, přidá se před řetězec uživatelského agenta.

retry_total
int

Celkový počet pokusů o opakování neúspěšné operace, když dojde k chybě. Výchozí hodnota je 3. Kontext retry_total při příjmu je zvláštní: Metoda příjmu je implementována smyčkou while, která volá interní metodu příjmu v každé iteraci. V případě příjmuretry_total určuje počet opakování po chybě vyvolané interní metodou příjmu ve smyčce while. Pokud jsou opakované pokusy vyčerpány, zavolá se zpětné volání on_error (pokud je k dispozici) s informacemi o chybě. Neúspěšný příjemce interního oddílu se zavře (on_partition_close se bude volat, pokud je zadaný) a vytvoří se nový interní příjemce oddílu (on_partition_initialize bude volána, pokud bude zadáno), aby bylo možné pokračovat v příjmu.

retry_backoff_factor
float

Faktor zpomalování, který se použije mezi pokusy po druhém pokusu (většina chyb se vyřeší okamžitě druhým pokusem bez zpoždění). V pevném režimu zásady opakování vždy přejdou do režimu spánku pro {backoff factor}. V exponenciálním režimu budou zásady opakování v režimu spánku po dobu: {backoff factor} * (2 ** ({počet celkového počtu opakování} – 1)) sekund. Pokud je backoff_factor 0,1, pak bude opakování mezi opakováními [0,0 s, 0,2 s, 0,4 s, ...] v režimu spánku. Výchozí hodnota je 0,8.

retry_backoff_max
float

Maximální doba návratu. Výchozí hodnota je 120 sekund (2 minuty).

retry_mode
str

Chování zpoždění mezi opakovanými pokusy. Podporované hodnoty jsou "pevné" nebo "exponenciální", kde výchozí hodnota je exponenciální.

idle_timeout
float

Časový limit v sekundách, po jehož uplynutí klient ukončí základní připojení, pokud nedojde k žádné další aktivitě. Ve výchozím nastavení je hodnota None, což znamená, že klient se z důvodu nečinnosti nevystaví, dokud ho služba neaktivuje.

transport_type
TransportType

Typ přenosového protokolu, který se použije ke komunikaci se službou Event Hubs. Výchozí hodnota je TransportType.Amqp , v takovém případě se použije port 5671. Pokud je port 5671 v síťovém prostředí nedostupný nebo blokovaný, je možné místo toho použít transportType.AmqpOverWebsocket , který ke komunikaci používá port 443.

http_proxy

Nastavení proxy serveru HTTP. Musí se jednat o slovník s následujícími klíči: "proxy_hostname" (hodnota str) a "proxy_port" (hodnota int).

checkpoint_store
Optional[CheckpointStore]

Správce, který při přijímání událostí ukládá data o vyrovnávání zatížení oddílu a kontrolních bodech. Úložiště kontrolních bodů se použije v obou případech příjmu ze všech oddílů nebo jednoho oddílu. V druhém případě vyrovnávání zatížení neplatí. Pokud není k dispozici úložiště kontrolních bodů, bude se kontrolní bod uchovávat interně v paměti a instance EventHubConsumerClient bude přijímat události bez vyrovnávání zatížení.

load_balancing_interval
float

Když se spustí vyrovnávání zatížení. Jedná se o interval v sekundách mezi dvěma vyhodnoceními vyrovnávání zatížení. Výchozí hodnota je 30 sekund.

partition_ownership_expiration_interval
float

Platnost vlastnictví oddílu vyprší po tomto počtu sekund. Každé vyhodnocení vyrovnávání zatížení automaticky prodlouží dobu vypršení platnosti vlastnictví. Výchozí hodnota je 6 * load_balancing_interval, tj. 180 sekund při použití výchozího load_balancing_interval 30 sekund.

load_balancing_strategy
str nebo LoadBalancingStrategy

Když se spustí vyrovnávání zatížení, použije tuto strategii k deklarace a vyrovnávání vlastnictví oddílu. Pro strategii greedy použijte "greedy" nebo LoadBalancingStrategy.GREEDY , která při každém vyhodnocení vyrovnávání zatížení zachytí tolik nepovolených oddílů potřebných k vyrovnávání zatížení. Pro vyváženou strategii použijte "balanced" nebo LoadBalancingStrategy.BALANCED , která pro každé vyhodnocení vyrovnávání zatížení deklaruje pouze jeden oddíl, který není deklarován jiným eventHubConsumerClient. Pokud jsou všechny oddíly EventHubu deklarované jiným eventHubConsumerClient a tento klient má nárok na příliš málo oddílů, tento klient odcizí jeden oddíl od ostatních klientů pro každé vyhodnocení vyrovnávání zatížení bez ohledu na strategii vyrovnávání zatížení. Strategie Greedy se používá ve výchozím nastavení.

custom_endpoint_address
Optional[str]

Vlastní adresa koncového bodu, která se má použít k navázání připojení ke službě Event Hubs, což umožňuje směrování síťových požadavků přes všechny aplikační brány nebo jiné cesty potřebné pro hostitelské prostředí. Výchozí hodnota je Žádná. Formát by vypadal takto: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Pokud není port v custom_endpoint_address zadaný, použije se ve výchozím nastavení port 443.

connection_verify
Optional[str]

Cesta k souboru vlastního CA_BUNDLE certifikátu SSL, který slouží k ověření identity koncového bodu připojení. Výchozí hodnota je None, v takovém případě se použije certifi.where().

uamqp_transport
bool

Určuje, jestli se má jako podkladový přenos použít knihovna uamqp . Výchozí hodnota je False a jako podkladový přenos se použije knihovna AMQP pure Pythonu.

socket_timeout
float

Čas v sekundách, kdy by měl podkladový soket na připojení čekat při odesílání a přijímání dat před vypršením časového limitu. Výchozí hodnota je 0,2 pro TransportType.Amqp a 1 pro TransportType.AmqpOverWebsocket. Pokud k chybám EventHubsConnectionError dochází kvůli vypršení časového limitu zápisu, může být potřeba předat větší než výchozí hodnotu. To je pro pokročilé scénáře použití a obvykle by měla stačit výchozí hodnota.

Příklady

Vytvořte novou instanci EventHubConsumerClient.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Metody

close

Zastavte načítání událostí z centra událostí a zavřete základní připojení AMQP a odkazy.

from_connection_string

Vytvořte EventHubConsumerClient z připojovací řetězec.

get_eventhub_properties

Získejte vlastnosti centra událostí.

Mezi klíče ve vráceném slovníku patří:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Získejte ID oddílů centra událostí.

get_partition_properties

Získá vlastnosti zadaného oddílu.

Mezi klíče ve slovníku vlastností patří:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Příjem událostí z oddílů s volitelným vyrovnáváním zatížení a kontrolními body

receive_batch

Příjem událostí z oddílů v dávkách s volitelným vyrovnáváním zatížení a kontrolními body

close

Zastavte načítání událostí z centra událostí a zavřete základní připojení AMQP a odkazy.

async close() -> None

Návratový typ

Příklady

Zavřete klienta.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

Vytvořte EventHubConsumerClient z připojovací řetězec.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

Parametry

conn_str
str
Vyžadováno

Připojovací řetězec centra událostí.

consumer_group
str
Vyžadováno

Příjem událostí z centra událostí pro tuto skupinu příjemců

eventhub_name
str

Cesta ke konkrétnímu centru událostí, ke kterému se má klient připojit.

logging_enable
bool

Určuje, zda se mají protokoly trasování sítě vypisovat do protokolovacího nástroje. Výchozí hodnota je False.

http_proxy
dict

Nastavení proxy serveru HTTP. Musí se jednat o slovník s následujícími klíči: "proxy_hostname" (hodnota str) a "proxy_port" (hodnota int). Kromě toho mohou být k dispozici také následující klíče: 'username', 'password'.

auth_timeout
float

Doba v sekundách čekání na autorizaci tokenu službou. Výchozí hodnota je 60 sekund. Pokud je nastavená hodnota 0, nebude se z klienta vynucovat žádný časový limit.

user_agent
str

Pokud je zadaný, přidá se před řetězec uživatelského agenta.

retry_total
int

Celkový počet pokusů o opakování neúspěšné operace, když dojde k chybě. Výchozí hodnota je 3. Kontext retry_total při příjmu je zvláštní: Metoda příjmu je implementována smyčkou while, která volá interní metodu příjmu v každé iteraci. V případě příjmuretry_total určuje počet opakování po chybě vyvolané interní metodou příjmu ve smyčce while. Pokud jsou opakované pokusy vyčerpány, zavolá se zpětné volání on_error (pokud je k dispozici) s informacemi o chybě. Neúspěšný příjemce interního oddílu se zavře (on_partition_close se bude volat, pokud je zadaný) a vytvoří se nový interní příjemce oddílu (on_partition_initialize bude volána, pokud bude zadáno), aby bylo možné pokračovat v příjmu.

retry_backoff_factor
float

Faktor zpomalování, který se použije mezi pokusy po druhém pokusu (většina chyb se vyřeší okamžitě druhým pokusem bez zpoždění). V pevném režimu zásady opakování vždy přejdou do režimu spánku pro {backoff factor}. V exponenciálním režimu budou zásady opakování v režimu spánku po dobu: {backoff factor} * (2 ** ({počet celkového počtu opakování} – 1)) sekund. Pokud je backoff_factor 0,1, pak bude opakování mezi opakováními [0,0 s, 0,2 s, 0,4 s, ...] v režimu spánku. Výchozí hodnota je 0,8.

retry_backoff_max
float

Maximální doba návratu. Výchozí hodnota je 120 sekund (2 minuty).

retry_mode
str

Chování zpoždění mezi opakovanými pokusy. Podporované hodnoty jsou "pevné" nebo "exponenciální", kde výchozí hodnota je exponenciální.

idle_timeout
float

Časový limit v sekundách, po jehož uplynutí klient ukončí základní připojení, pokud nedojde k žádné další aktivitě. Ve výchozím nastavení je hodnota None, což znamená, že klient se z důvodu nečinnosti nevystaví, dokud ho služba neaktivuje.

transport_type
TransportType

Typ přenosového protokolu, který se použije ke komunikaci se službou Event Hubs. Výchozí hodnota je TransportType.Amqp , v takovém případě se použije port 5671. Pokud je port 5671 v síťovém prostředí nedostupný nebo blokovaný, je možné místo toho použít transportType.AmqpOverWebsocket , který ke komunikaci používá port 443.

checkpoint_store
Optional[CheckpointStore]

Správce, který při přijímání událostí ukládá data o vyrovnávání zatížení oddílu a kontrolních bodech. Úložiště kontrolních bodů se použije v obou případech příjmu ze všech oddílů nebo jednoho oddílu. V druhém případě vyrovnávání zatížení neplatí. Pokud není k dispozici úložiště kontrolních bodů, bude se kontrolní bod uchovávat interně v paměti a instance EventHubConsumerClient bude přijímat události bez vyrovnávání zatížení.

load_balancing_interval
float

Když se spustí vyrovnávání zatížení. Jedná se o interval v sekundách mezi dvěma vyhodnoceními vyrovnávání zatížení. Výchozí hodnota je 30 sekund.

partition_ownership_expiration_interval
float

Platnost vlastnictví oddílu vyprší po tomto počtu sekund. Každé vyhodnocení vyrovnávání zatížení automaticky prodlouží dobu vypršení platnosti vlastnictví. Výchozí hodnota je 6 * load_balancing_interval, tj. 180 sekund při použití výchozího load_balancing_interval 30 sekund.

load_balancing_strategy
str nebo LoadBalancingStrategy

Když se spustí vyrovnávání zatížení, použije tuto strategii k deklarace a vyrovnávání vlastnictví oddílu. Pro strategii greedy použijte "greedy" nebo LoadBalancingStrategy.GREEDY , která při každém vyhodnocení vyrovnávání zatížení zachytí tolik nepovolených oddílů potřebných k vyrovnávání zatížení. Pro vyváženou strategii použijte "balanced" nebo LoadBalancingStrategy.BALANCED , která pro každé vyhodnocení vyrovnávání zatížení deklaruje pouze jeden oddíl, který není deklarován jiným eventHubConsumerClient. Pokud jsou všechny oddíly EventHubu deklarované jiným eventHubConsumerClient a tento klient má nárok na příliš málo oddílů, tento klient odcizí jeden oddíl od ostatních klientů pro každé vyhodnocení vyrovnávání zatížení bez ohledu na strategii vyrovnávání zatížení. Strategie Greedy se používá ve výchozím nastavení.

custom_endpoint_address
Optional[str]

Vlastní adresa koncového bodu, která se má použít k navázání připojení ke službě Event Hubs, což umožňuje směrování síťových požadavků přes všechny aplikační brány nebo jiné cesty potřebné pro hostitelské prostředí. Výchozí hodnota je Žádná. Formát by vypadal takto: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Pokud není port v custom_endpoint_address zadaný, použije se ve výchozím nastavení port 443.

connection_verify
Optional[str]

Cesta k souboru vlastního CA_BUNDLE certifikátu SSL, který slouží k ověření identity koncového bodu připojení. Výchozí hodnota je None, v takovém případě se použije certifi.where().

uamqp_transport
bool

Určuje, jestli se má jako podkladový přenos použít knihovna uamqp . Výchozí hodnota je False a jako podkladový přenos se použije knihovna AMQP pure Pythonu.

Návratový typ

Příklady

Vytvořte novou instanci EventHubConsumerClient z připojovací řetězec.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Získejte vlastnosti centra událostí.

Mezi klíče ve vráceném slovníku patří:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Návraty

Slovník obsahující informace o centru událostí.

Návratový typ

Výjimky

get_partition_ids

Získejte ID oddílů centra událostí.

async get_partition_ids() -> List[str]

Návraty

Seznam ID oddílů.

Návratový typ

Výjimky

get_partition_properties

Získá vlastnosti zadaného oddílu.

Mezi klíče ve slovníku vlastností patří:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametry

partition_id
str
Vyžadováno

ID cílového oddílu.

Návraty

Slovník obsahující vlastnosti oddílu.

Návratový typ

Výjimky

receive

Příjem událostí z oddílů s volitelným vyrovnáváním zatížení a kontrolními body

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parametry

on_event
Callable[PartitionContext, Optional[EventData]]
Vyžadováno

Funkce zpětného volání pro zpracování přijaté události. Zpětné volání má dva parametry: partition_context , který obsahuje kontext oddílu a událost , která je přijatou událostí. Funkce zpětného volání by měla být definována takto: on_event(partition_context, event). Podrobné informace o kontextu oddílu najdete v tématu PartitionContext.

max_wait_time
float

Maximální interval v sekundách, kdy procesor událostí počká před voláním zpětného volání. Pokud v tomto intervalu nejsou přijaty žádné události, zpětné volání on_event bude volána s žádnou. Pokud je tato hodnota nastavená na Hodnotu None nebo 0 (výchozí), zpětné volání se nebude volat, dokud nebude přijata událost.

partition_id
str

Pokud je zadaný, klient bude přijímat pouze z tohoto oddílu. V opačném případě klient obdrží ze všech oddílů.

owner_level
int

Priorita výhradního spotřebitele. Pokud je nastavená owner_level, vytvoří se výhradní příjemce. Spotřebitel s vyšší owner_level má vyšší výhradní prioritu. Úroveň vlastníka se také označuje jako "epochová hodnota" příjemce.

prefetch
int

Počet událostí, které se mají předem načíst ze služby ke zpracování. Výchozí hodnota je 300.

track_last_enqueued_event_properties
bool

Určuje, jestli má příjemce požadovat informace o poslední události ve frontě v přidruženém oddílu a sledovat, jak se události přijímají. Když se sledují informace o události posledního zařazení do fronty oddílů, každá událost přijatá ze služby Event Hubs bude obsahovat metadata o oddílu. Výsledkem je malá dodatečná spotřeba šířky pásma sítě, která je obecně příznivým kompromisem, když se uvažuje o pravidelném provádění požadavků na vlastnosti oddílu pomocí klienta centra událostí. Ve výchozím nastavení je nastavená na Hodnotu False .

starting_position
str, int, datetime nebo dict[str,any]

Pokud pro oddíl neexistují žádná data kontrolního bodu, začněte přijímat z této pozice události. Pokud jsou k dispozici, použijí se data kontrolního bodu. Může to být dict s ID oddílu jako klíčem a pozicí jako hodnotou pro jednotlivé oddíly nebo jedinou hodnotou pro všechny oddíly. Typ hodnoty může být str, int nebo datetime.datetime. Podporované jsou také hodnoty -1 pro příjem od začátku datového proudu a "@latest" pro příjem pouze nových událostí.

starting_position_inclusive
bool nebo dict[str,bool]

Určete, zda je daná starting_position inkluzivní(>=) nebo ne (>). True pro inkluzivní a False pro výhradní. Může to být dict s ID oddílu jako klíčem a logickou hodnotou označující, jestli je starting_position pro konkrétní oddíl inkluzivní nebo ne. Může to být také jedna logická hodnota pro všechny starting_position. Výchozí hodnota je Nepravda.

on_error
Callable[[PartitionContext, Exception]]

Funkce zpětného volání, která bude volána při vyvolání chyby během příjmu po vyčerpání opakovaných pokusů nebo během procesu vyrovnávání zatížení. Zpětné volání má dva parametry: partition_context , který obsahuje informace o oddílu a chyba je výjimkou. partition_context může být Žádná, pokud dojde k chybě během procesu vyrovnávání zatížení. Zpětné volání by mělo být definováno takto: on_error(partition_context, chyba). Zpětné volání on_error bude také voláno, pokud je během zpětného volání on_event vyvolána neošetřená výjimka.

on_partition_initialize
Callable[[PartitionContext]]

Funkce zpětného volání, která bude volána po příjemci pro určitý oddíl, dokončí inicializaci. Zavolá se také při vytvoření nového interního příjemce oddílu, který převezme proces přijímání pro neúspěšného a uzavřeného příjemce interního oddílu. Zpětné volání má jeden parametr: partition_context , který obsahuje informace o oddílu. Zpětné volání by mělo být definováno takto: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Funkce zpětného volání, která bude volána po uzavření příjemce pro určitý oddíl. Volá se také při vyvolání chyby během příjmu po vyčerpání opakovaných pokusů. Zpětné volání má dva parametry: partition_context , který obsahuje informace o oddílu a důvod uzavření. Zpětné volání by mělo být definováno takto: on_partition_close(partition_context, důvod). CloseReason Informace o různých závěrech najdete v tématu .

Návratový typ

Příklady

Příjem událostí z EventHubu


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

Příjem událostí z oddílů v dávkách s volitelným vyrovnáváním zatížení a kontrolními body

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parametry

on_event_batch
Callable[PartitionContext, List[EventData]]
Vyžadováno

Funkce zpětného volání pro zpracování dávky přijatých událostí. Zpětné volání má dva parametry: partition_context , který obsahuje kontext oddílu a event_batch, což jsou přijaté události. Funkce zpětného volání by měla být definovaná takto: on_event_batch(partition_context, event_batch). event_batch může být prázdný seznam, pokud max_wait_time není Žádný ani 0 a po max_wait_time se nepřijí žádná událost. Podrobné informace o kontextu oddílu najdete v tématu PartitionContext.

max_batch_size
int

Maximální počet událostí v dávce předaných zpětnému volání on_event_batch. Pokud je skutečný přijatý počet událostí větší než max_batch_size, jsou přijaté události rozděleny do dávek a volání zpětného volání pro každou dávku s až max_batch_size událostmi.

max_wait_time
float

Maximální interval v sekundách, který bude procesor událostí čekat před voláním zpětného volání. Pokud v tomto intervalu nepřijdou žádné události, bude zpětné volání on_event_batch voláno s prázdným seznamem. Pokud je tato hodnota nastavená na Hodnotu Žádný nebo 0 (výchozí), nebude zpětné volání volána, dokud nebudou přijaty události.

partition_id
str

Pokud je tato hodnota zadána, klient bude přijímat pouze z tohoto oddílu. V opačném případě klient obdrží ze všech oddílů.

owner_level
int

Priorita výhradního spotřebitele. Pokud je nastavená owner_level, vytvoří se výhradní příjemce. Spotřebitel s vyšší owner_level má vyšší výhradní prioritu. Úroveň vlastníka je také známa jako "epochová hodnota" spotřebitele.

prefetch
int

Počet událostí, které se mají předem načíst ze služby ke zpracování. Výchozí hodnota je 300.

track_last_enqueued_event_properties
bool

Určuje, zda by měl příjemce požadovat informace o události posledního zařazení do fronty v přidruženém oddílu a sledovat informace při přijetí událostí. Když se sledují informace o události oddílů s posledním zařazením do fronty, každá událost přijatá ze služby Event Hubs bude obsahovat metadata o oddílu. To vede k malému množství dodatečné spotřeby šířky pásma sítě, což je obecně příznivá kompromis při zvažování proti pravidelnému provádění požadavků na vlastnosti oddílu pomocí klienta centra událostí. Ve výchozím nastavení je nastavená na Hodnotu False .

starting_position
str, int, datetime nebo dict[str,any]

Pokud pro oddíl neexistují žádná data kontrolního bodu, začněte přijímat z této pozice události. Pokud jsou k dispozici, použijí se data kontrolního bodu. Může to být diktát s ID oddílu jako klíčem a pozicí jako hodnotou pro jednotlivé oddíly nebo jedinou hodnotou pro všechny oddíly. Typ hodnoty může být str, int nebo datetime.datetime. Podporované jsou také hodnoty -1 pro příjem od začátku streamu a "@latest" pro příjem pouze nových událostí.

starting_position_inclusive
bool nebo dict[str,bool]

Určete, jestli je daná starting_position inkluzivní(>=) (>). Platí pro inkluzivní a Nepravda pro výhradní. Může se jednat o diktování s ID oddílu jako klíčem a logická hodnota označující, jestli je starting_position pro konkrétní oddíl inkluzivní nebo ne. Může to být také jedna logická hodnota pro všechny starting_position. Výchozí hodnota je False.

on_error
Callable[[PartitionContext, Exception]]

Funkce zpětného volání, která bude volána při vyvolání chyby při příjmu po vyčerpání opakovaných pokusů nebo během procesu vyrovnávání zatížení. Zpětné volání má dva parametry: partition_context , který obsahuje informace o oddílu a výjimkou je chyba . partition_context může být Žádná, pokud dojde k chybě během procesu vyrovnávání zatížení. Zpětné volání by mělo být definováno takto: on_error(partition_context, chyba). Zpětné volání on_error bude voláno také v případě, že se během zpětného volání on_event vyvolá neošetřená výjimka.

on_partition_initialize
Callable[[PartitionContext]]

Funkce zpětného volání, která bude volána po příjemci pro určitý oddíl, dokončí inicializaci. Volá se také při vytvoření nového příjemce interního oddílu, který převezme proces přijímání pro neúspěšného a uzavřeného příjemce interního oddílu. Zpětné volání přebírá jeden parametr: partition_context , který obsahuje informace o oddílu. Zpětné volání by mělo být definováno takto: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Funkce zpětného volání, která bude volána po příjemci pro určitý oddíl, je uzavřena. Volá se také v případě, že dojde k chybě během příjmu po vyčerpání opakovaných pokusů. Zpětné volání má dva parametry: partition_context , který obsahuje informace o oddílu a důvod uzavření. Zpětné volání by mělo být definováno takto: on_partition_close(partition_context, důvod). Informace o různých důvodech uzavření najdete CloseReason v tomto článku.

Návratový typ

Příklady

Příjem událostí v dávkách z EventHubu


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )