EventHubConsumerClient Osztály

Az EventHubConsumerClient osztály egy magas szintű felületet határoz meg az események fogadásához a Azure Event Hubs szolgáltatásból.

Az EventHubConsumerClient fő célja, hogy a terheléselosztással és ellenőrzőpontokkal rendelkező EventHub összes partíciójáról fogadjon eseményeket.

Ha több EventHubConsumerClient-példány fut ugyanazon az eseményközponton, a fogyasztói csoporton és az ellenőrzőpont-helyen, a partíciók egyenletesen lesznek elosztva közöttük.

A terheléselosztás és a megőrzött ellenőrzőpontok engedélyezéséhez checkpoint_store be kell állítani az EventHubConsumerClient létrehozásakor. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont belsőleg megmarad a memóriában.

Az EventHubConsumerClient egy adott partícióról is fogadhat, amikor meghívja a metódusát receive() vagy receive_batch(), és megadja a partition_id. A terheléselosztás nem működik egypartíciós módban. A felhasználók azonban továbbra is menthetik az ellenőrzőpontokat, ha a checkpoint_store be van állítva.

Öröklődés
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Konstruktor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Paraméterek

fully_qualified_namespace
str
Kötelező

Az Event Hubs-névtér teljes állomásneve. A névtér formátuma: .servicebus.windows.net.

eventhub_name
str
Kötelező

Annak az eseményközpontnak az elérési útja, amelyhez az ügyfelet csatlakoztatni szeretné.

consumer_group
str
Kötelező

Események fogadása a fogyasztói csoport eseményközpontjából.

credential
AsyncTokenCredential vagy AzureSasCredential vagy AzureNamedKeyCredential
Kötelező

A hitelesítéshez használt hitelesítőadat-objektum, amely egy adott felületet implementál a jogkivonatok lekéréséhez. Elfogadja az EventHubSharedKeyCredentialazure-identity kódtár által létrehozott , vagy hitelesítőadat-objektumokat, valamint az *get_token(saját, hatókörök) metódust implementáló objektumokat.

logging_enable
bool

Hálózati nyomkövetési naplók kimenete a naplózónak. Az alapértelmezett érték Hamis.

auth_timeout
float

A jogkivonat szolgáltatás általi engedélyezésének időtartama másodpercben. Az alapértelmezett érték 60 másodperc. Ha 0-ra van állítva, az ügyfél nem kényszerít időtúllépést.

user_agent
str

Ha meg van adva, ez a felhasználói ügynök sztringje elé kerül.

retry_total
int

A sikertelen művelet ismételt végrehajtására tett kísérletek teljes száma hiba esetén. Az alapértelmezett érték 3. A fogadási retry_total kontextusa különleges: A fogadó metódust minden iterációban egy kis ciklusú belső fogadó metódus implementálja. A fogadási esetben a retry_total adja meg az újrapróbálkozások számát a belső fogadó metódus által a while-loopban kiváltott hiba után. Ha az újrapróbálkozási kísérletek elfogynak, a rendszer meghívja a on_error visszahívást (ha vannak ilyenek) a hibainformációkkal. A rendszer bezárja a sikertelen belső partíciófelhasználót (on_partition_close ha meg van adva), és új belső partíciófelhasználó jön létre (on_partition_initialize lesz meghívva, ha meg van adva) a fogadás folytatásához.

retry_backoff_factor
float

A második próbálkozás utáni próbálkozások közötti visszalépési tényező (a legtöbb hibát azonnal egy második próbálkozás késlelteti). Rögzített módban az újrapróbálkozási szabályzat mindig alvó állapotba kerül a(z) {backoff factor} esetében. "Exponenciális" módban az újrapróbálkozási szabályzat a következőre fog aludni: {backoff factor} * (2 ** ({az újrapróbálkozások száma} – 1)) másodpercig. Ha a backoff_factor 0,1, akkor az újrapróbálkozás az újrapróbálkozások között [0.0s, 0.2s, 0.4s, ...] esetén alvó állapotba kerül. Az alapértelmezett érték 0,8.

retry_backoff_max
float

A maximális visszalépési idő. Az alapértelmezett érték 120 másodperc (2 perc).

retry_mode
str

Az újrapróbálkozási kísérletek közötti késleltetési viselkedés. A támogatott értékek "rögzítettek" vagy "exponenciálisak", ahol az alapértelmezett érték az "exponenciális".

idle_timeout
float

Időtúllépés másodpercben, amely után az ügyfél lezárja a mögöttes kapcsolatot, ha nincs további tevékenység. Alapértelmezés szerint a Nincs érték, ami azt jelenti, hogy az ügyfél nem fog leállni inaktivitás miatt, hacsak a szolgáltatás nem kezdeményezi.

transport_type
TransportType

Az Event Hubs szolgáltatással való kommunikációhoz használt átviteli protokoll típusa. Az alapértelmezett érték a TransportType.Amqp , amely esetben az 5671-es portot használja. Ha az 5671-es port nem érhető el/le van tiltva a hálózati környezetben, a TransportType.AmqpOverWebsocket használható helyette, amely a 443-es portot használja a kommunikációhoz.

http_proxy

HTTP-proxybeállítások. Ennek a szótárnak a következő kulcsokkal kell rendelkeznie: "proxy_hostname" (str érték) és "proxy_port" (int érték).

checkpoint_store
Optional[CheckpointStore]

Egy kezelő, amely a partíció terheléselosztási és ellenőrzőpont-adatait tárolja az események fogadásakor. Az ellenőrzőpont-tárolót mindkét esetben az összes partícióról vagy egyetlen partícióról fogadjuk. Az utóbbi esetben a terheléselosztás nem alkalmazható. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont a memóriában marad, és az EventHubConsumerClient példány terheléselosztás nélkül fogadja az eseményeket.

load_balancing_interval
float

Amikor a terheléselosztás beindul. Ez a két terheléselosztási kiértékelés közötti időköz másodpercben. Az alapértelmezett érték 30 másodperc.

partition_ownership_expiration_interval
float

A partíció tulajdonjoga ennyi másodperc elteltével lejár. Minden terheléselosztási kiértékelés automatikusan meghosszabbítja a tulajdonjog lejárati idejét. Az alapértelmezett érték 6 * load_balancing_interval, azaz 180 másodperc az alapértelmezett 30 másodperces load_balancing_interval használatakor.

load_balancing_strategy
str vagy LoadBalancingStrategy

Amikor beindul a terheléselosztás, ezzel a stratégiával igényelheti és egyensúlyba állíthatja a partíció tulajdonjogát. Használja a "kapzsi" vagy a LoadBalancingStrategy.GREEDY értéket a kapzsi stratégiához, amely minden terheléselosztási kiértékeléshez annyi felszabadítatlan partíciót fog megragadni, amennyi a terhelés elosztásához szükséges. Használja a "balanced" vagy a LoadBalancingStrategy.BALANCED függvényt az kiegyensúlyozott stratégiához, amely minden terheléselosztási értékeléshez csak egy olyan partíciót igényel, amelyet más EventHubConsumerClient nem igényel. Ha egy EventHub összes partícióját egy másik EventHubConsumerClient igényelte, és az ügyfél túl kevés partíciót igényelt, az ügyfél a terheléselosztási stratégiától függetlenül minden terheléselosztási kiértékeléshez ellop egy partíciót a többi ügyféltől. A kapzsi stratégia alapértelmezés szerint használatban van.

custom_endpoint_address
Optional[str]

Az Event Hubs szolgáltatással való kapcsolat létrehozásához használandó egyéni végpontcím, amely lehetővé teszi a hálózati kérések átirányítását a gazdakörnyezethez szükséges bármely alkalmazásátjárón vagy más útvonalon. Az alapértelmezett érték a Nincs. A formátum a következő: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Ha a port nincs megadva a custom_endpoint_address, a rendszer alapértelmezés szerint a 443-at használja.

connection_verify
Optional[str]

A kapcsolati végpont identitásának hitelesítéséhez használt SSL-tanúsítvány egyéni CA_BUNDLE fájljának elérési útja. Az alapértelmezett érték Nincs, ebben az esetben a certifi.where() lesz használva.

uamqp_transport
bool

Azt határozza meg, hogy az uamqp kódtárat használja-e alapul szolgáló átvitelként. Az alapértelmezett érték Hamis, és a rendszer a Pure Python AMQP-kódtárat használja alapul szolgáló átvitelként.

socket_timeout
float

Az az idő másodpercben, amikor a kapcsolat mögöttes szoftvercsatornájának várnia kell az adatok küldésekor és fogadásakor, mielőtt túllépi az időkorlátot. Az alapértelmezett érték a TransportType.Amqp esetében 0,2, a TransportType.AmqpOverWebsocket esetében pedig 1. Ha az EventHubsConnectionError hibák az írási időtúllépés miatt jelentkeznek, előfordulhat, hogy az alapértelmezettnél nagyobb értéket kell átadni. Ez speciális használati forgatókönyvekhez tartozik, és általában az alapértelmezett értéknek elegendőnek kell lennie.

Példák

Hozza létre az EventHubConsumerClient új példányát.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Metódusok

close

Állítsa le az események lekérését az Eseményközpontból, és zárja be a mögöttes AMQP-kapcsolatot és hivatkozásokat.

from_connection_string

EventHubConsumerClient létrehozása egy kapcsolati karakterlánc.

get_eventhub_properties

Az eseményközpont tulajdonságainak lekérése.

A visszaadott szótár kulcsai a következők:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Kérje le az eseményközpont partícióazonosítóit.

get_partition_properties

A megadott partíció tulajdonságainak lekérése.

A tulajdonságok szótárában található kulcsok a következők:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpontokkal.

receive_batch

Események fogadása partíció(k)ból kötegekben, opcionális terheléselosztással és ellenőrzőpontokkal.

close

Állítsa le az események lekérését az Eseményközpontból, és zárja be a mögöttes AMQP-kapcsolatot és hivatkozásokat.

async close() -> None

Visszatérési típus

Példák

Zárja be az ügyfelet.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

EventHubConsumerClient létrehozása egy kapcsolati karakterlánc.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

Paraméterek

conn_str
str
Kötelező

Egy eseményközpont kapcsolati karakterlánc.

consumer_group
str
Kötelező

Események fogadása az eseményközpontból ehhez a fogyasztói csoporthoz.

eventhub_name
str

Annak az eseményközpontnak az elérési útja, amelyhez az ügyfelet csatlakoztatni szeretné.

logging_enable
bool

Hálózati nyomkövetési naplók kimenete a naplózónak. Az alapértelmezett érték Hamis.

http_proxy
dict

HTTP-proxybeállítások. Ennek a szótárnak a következő kulcsokkal kell rendelkeznie: "proxy_hostname" (str érték) és "proxy_port" (int érték). Emellett a következő kulcsok is lehetnek jelen: "felhasználónév", "jelszó".

auth_timeout
float

A jogkivonat szolgáltatás általi engedélyezésének időtartama másodpercben. Az alapértelmezett érték 60 másodperc. Ha 0-ra van állítva, az ügyfél nem kényszerít időtúllépést.

user_agent
str

Ha meg van adva, ez a felhasználói ügynök sztringje elé kerül.

retry_total
int

A sikertelen művelet ismételt végrehajtására tett kísérletek teljes száma hiba esetén. Az alapértelmezett érték 3. A fogadási retry_total kontextusa különleges: A fogadó metódust minden iterációban egy kis ciklusú belső fogadó metódus implementálja. A fogadási esetben a retry_total adja meg az újrapróbálkozások számát a belső fogadó metódus által a while-loopban kiváltott hiba után. Ha az újrapróbálkozási kísérletek elfogynak, a rendszer meghívja a on_error visszahívást (ha vannak ilyenek) a hibainformációkkal. A rendszer bezárja a sikertelen belső partíciófelhasználót (on_partition_close ha meg van adva), és új belső partíciófelhasználó jön létre (on_partition_initialize lesz meghívva, ha meg van adva) a fogadás folytatásához.

retry_backoff_factor
float

A második próbálkozás utáni próbálkozások közötti visszalépési tényező (a legtöbb hibát azonnal egy második próbálkozás késlelteti). Rögzített módban az újrapróbálkozási szabályzat mindig alvó állapotba kerül a(z) {backoff factor} esetében. "Exponenciális" módban az újrapróbálkozási szabályzat a következőre fog aludni: {backoff factor} * (2 ** ({az újrapróbálkozások száma} – 1)) másodpercig. Ha a backoff_factor 0,1, akkor az újrapróbálkozás az újrapróbálkozások között [0.0s, 0.2s, 0.4s, ...] esetén alvó állapotba kerül. Az alapértelmezett érték 0,8.

retry_backoff_max
float

A maximális visszalépési idő. Az alapértelmezett érték 120 másodperc (2 perc).

retry_mode
str

Az újrapróbálkozási kísérletek közötti késleltetési viselkedés. A támogatott értékek "rögzítettek" vagy "exponenciálisak", ahol az alapértelmezett érték az "exponenciális".

idle_timeout
float

Időtúllépés másodpercben, amely után az ügyfél lezárja a mögöttes kapcsolatot, ha nincs további tevékenység. Alapértelmezés szerint a Nincs érték, ami azt jelenti, hogy az ügyfél nem fog leállni inaktivitás miatt, hacsak a szolgáltatás nem kezdeményezi.

transport_type
TransportType

Az Event Hubs szolgáltatással való kommunikációhoz használt átviteli protokoll típusa. Az alapértelmezett érték a TransportType.Amqp , amely esetben az 5671-es portot használja. Ha az 5671-es port nem érhető el/le van tiltva a hálózati környezetben, a TransportType.AmqpOverWebsocket használható helyette, amely a 443-es portot használja a kommunikációhoz.

checkpoint_store
Optional[CheckpointStore]

Egy kezelő, amely a partíció terheléselosztási és ellenőrzőpont-adatait tárolja az események fogadásakor. Az ellenőrzőpont-tárolót mindkét esetben az összes partícióról vagy egyetlen partícióról fogadjuk. Az utóbbi esetben a terheléselosztás nem alkalmazható. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont a memóriában marad, és az EventHubConsumerClient példány terheléselosztás nélkül fogadja az eseményeket.

load_balancing_interval
float

Amikor a terheléselosztás beindul. Ez a két terheléselosztási kiértékelés közötti időköz másodpercben. Az alapértelmezett érték 30 másodperc.

partition_ownership_expiration_interval
float

A partíció tulajdonjoga ennyi másodperc elteltével lejár. Minden terheléselosztási kiértékelés automatikusan meghosszabbítja a tulajdonjog lejárati idejét. Az alapértelmezett érték 6 * load_balancing_interval, azaz 180 másodperc az alapértelmezett 30 másodperces load_balancing_interval használatakor.

load_balancing_strategy
str vagy LoadBalancingStrategy

Amikor beindul a terheléselosztás, ezzel a stratégiával igényelheti és egyensúlyba állíthatja a partíció tulajdonjogát. Használja a "kapzsi" vagy a LoadBalancingStrategy.GREEDY értéket a kapzsi stratégiához, amely minden terheléselosztási kiértékeléshez annyi felszabadítatlan partíciót fog megragadni, amennyi a terhelés elosztásához szükséges. Használja a "balanced" vagy a LoadBalancingStrategy.BALANCED függvényt az kiegyensúlyozott stratégiához, amely minden terheléselosztási értékeléshez csak egy olyan partíciót igényel, amelyet más EventHubConsumerClient nem igényel. Ha egy EventHub összes partícióját egy másik EventHubConsumerClient igényelte, és az ügyfél túl kevés partíciót igényelt, az ügyfél a terheléselosztási stratégiától függetlenül minden terheléselosztási kiértékeléshez ellop egy partíciót a többi ügyféltől. A kapzsi stratégia alapértelmezés szerint használatban van.

custom_endpoint_address
Optional[str]

Az Event Hubs szolgáltatással való kapcsolat létrehozásához használandó egyéni végpontcím, amely lehetővé teszi a hálózati kérések átirányítását a gazdakörnyezethez szükséges bármely alkalmazásátjárón vagy más útvonalon. Az alapértelmezett érték a Nincs. A formátum a következő: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Ha a port nincs megadva a custom_endpoint_address, a rendszer alapértelmezés szerint a 443-at használja.

connection_verify
Optional[str]

A kapcsolati végpont identitásának hitelesítéséhez használt SSL-tanúsítvány egyéni CA_BUNDLE fájljának elérési útja. Az alapértelmezett érték Nincs, ebben az esetben a certifi.where() lesz használva.

uamqp_transport
bool

Azt határozza meg, hogy az uamqp kódtárat használja-e alapul szolgáló átvitelként. Az alapértelmezett érték Hamis, és a rendszer a Pure Python AMQP-kódtárat használja alapul szolgáló átvitelként.

Visszatérési típus

Példák

Hozzon létre egy új EventHubConsumerClient-példányt a kapcsolati karakterlánc.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Az eseményközpont tulajdonságainak lekérése.

A visszaadott szótár kulcsai a következők:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Válaszok

Az eseményközpontra vonatkozó információkat tartalmazó szótár.

Visszatérési típus

Kivételek

get_partition_ids

Kérje le az eseményközpont partícióazonosítóit.

async get_partition_ids() -> List[str]

Válaszok

A partícióazonosítók listája.

Visszatérési típus

Kivételek

get_partition_properties

A megadott partíció tulajdonságainak lekérése.

A tulajdonságok szótárában található kulcsok a következők:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Paraméterek

partition_id
str
Kötelező

A célpartíció azonosítója.

Válaszok

Partíciótulajdonságokat tartalmazó szótár.

Visszatérési típus

Kivételek

receive

Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpontokkal.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Paraméterek

on_event
Callable[PartitionContext, Optional[EventData]]
Kötelező

A fogadott események kezelésére szolgáló visszahívási függvény. A visszahívás két paramétert vesz igénybe: partition_context , amely partíciókörnyezetet és a fogadott eseményt tartalmazó eseményt tartalmaz. A visszahívási függvényt a következőképpen kell definiálni: on_event(partition_context, esemény). A partíciókörnyezettel kapcsolatos részletes információkért tekintse meg a következőt: PartitionContext.

max_wait_time
float

Az eseményfeldolgozó által a visszahívás meghívása előtt várakozó maximális időköz másodpercben. Ha ezen az időtartamon belül nem érkeznek események, a on_eventvisszahívást a Rendszer a Nincs beállítással hívja meg. Ha ez az érték Nincs vagy 0 (alapértelmezett) értékre van állítva, a visszahívás nem lesz meghívva, amíg egy esemény nem érkezik.

partition_id
str

Ha meg van adva, az ügyfél csak erről a partícióról fog fogadni. Ellenkező esetben az ügyfél minden partícióról megkapja a kódot.

owner_level
int

A kizárólagos fogyasztó prioritása. Ha owner_level be van állítva, létrejön egy kizárólagos fogyasztó. A magasabb owner_level rendelkező fogyasztók magasabb kizárólagos prioritást élveznek. A tulajdonosi szint a fogyasztó "alapértékeként" is ismert.

prefetch
int

A szolgáltatásból feldolgozásra elővetendő események száma. Az alapértelmezett érték 300.

track_last_enqueued_event_properties
bool

Azt jelzi, hogy a fogyasztónak adatokat kell-e kérnie a társított partíción az utolsóként lekért eseményről, és nyomon követheti ezeket az információkat az események fogadása során. A partíciók utolsó lekérdezett eseményével kapcsolatos információk nyomon követésekor az Event Hubs szolgáltatástól kapott minden esemény metaadatokat fog hordozni a partícióról. Ez kis mennyiségű további hálózati sávszélesség-használatot eredményez, amely általában kedvező kompromisszumot jelent, ha figyelembe vesszük, hogy a partíciótulajdonságokra vonatkozó kéréseket rendszeres időközönként az Event Hub-ügyfél használatával kell elvégezni. Alapértelmezés szerint False ( Hamis ) értékre van állítva.

starting_position
str, int, datetime vagy dict[str,any]

Kezdje el fogadni ezt az eseményhelyzetet, ha nincs ellenőrzőpont-adat egy partícióhoz. A rendszer ellenőrzőpont-adatokat használ, ha elérhetők. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és pozícióként az egyes partíciók értékeként, vagy egyetlen érték az összes partícióhoz. Az érték típusa lehet str, int vagy datetime.datetime. Szintén támogatottak a "-1" értékek a stream elejétől való fogadáshoz, és "@latest" csak az új események fogadásához.

starting_position_inclusive
bool vagy dict[str,bool]

Állapítsa meg, hogy a megadott starting_position befogadó(>=) vagy sem (>). Igaz a befogadó és a hamis kizárólagos. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és logikai értékként, amely azt jelzi, hogy egy adott partíció starting_position befogadó-e vagy sem. Ez az összes starting_position egyetlen bool értéke is lehet. Az alapértelmezett érték a Hamis.

on_error
Callable[[PartitionContext, Exception]]

A visszahívási függvény, amely akkor lesz meghívva, ha az újrapróbálkozási kísérletek kimerítése vagy a terheléselosztás folyamata során hiba keletkezik a fogadás során. A visszahívás két paramétert vesz igénybe: partition_context , amely partícióinformációkat tartalmaz, és kivételként a hiba . partition_context lehet Nincs, ha a hiba a terheléselosztási folyamat során jelentkezik. A visszahívást a következőképpen kell definiálni: on_error(partition_context, hiba). A on_error visszahívást is meghívja a rendszer, ha a on_event visszahívás során nem kezelt kivétel lép fel.

on_partition_initialize
Callable[[PartitionContext]]

A visszahívási függvény, amely egy adott partíció fogyasztója után lesz meghívva, befejezi az inicializálást. A rendszer akkor is meghívja, ha egy új belső partíciófelhasználót hoznak létre, hogy átvegye a sikertelen és lezárt belső partíciófelhasználók fogadási folyamatát. A visszahívás egyetlen paramétert vesz fel: partition_context , amely tartalmazza a partíció adatait. A visszahívást a következőképpen kell definiálni: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

A visszahívási függvény, amely egy adott partíció fogyasztójának bezárása után lesz meghívva. A rendszer akkor is meghívja, ha az újrapróbálkozási kísérletek kimerítése után hiba keletkezik a fogadás során. A visszahívás két paramétert vesz fel: partition_context , amely partícióinformációkat és a lezárás okát tartalmazza. A visszahívást a következőképpen kell definiálni: on_partition_close(partition_context, ok). CloseReason Tekintse meg a különböző záró okokat.

Visszatérési típus

Példák

Események fogadása az EventHubról.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

Események fogadása partíció(k)ból kötegekben, opcionális terheléselosztással és ellenőrzőpontokkal.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Paraméterek

on_event_batch
Callable[PartitionContext, List[EventData]]
Kötelező

A fogadott események kötegének kezelésére szolgáló visszahívási függvény. A visszahívás két paramétert vesz igénybe: partition_context , amely partíciókörnyezetet és event_batch tartalmaz, amelyek a fogadott események. A visszahívási függvényt a következőképpen kell definiálni: on_event_batch(partition_context, event_batch). event_batch üres lista lehet, ha max_wait_time nem Nincs vagy 0, és nem érkezik esemény max_wait_time után. A partíciókörnyezettel kapcsolatos részletes információkért tekintse meg a következőt PartitionContext: .

max_batch_size
int

A visszahívási on_event_batch átadott kötegben lévő események maximális száma. Ha a tényleges fogadott események száma nagyobb , mint max_batch_size, a kapott események kötegekre vannak osztva, és az egyes kötegek visszahívását akár max_batch_size eseményekkel is meghívják.

max_wait_time
float

A visszahívás meghívása előtt az eseményfeldolgozó által várt maximális időköz másodpercben. Ha ebben az intervallumban nem érkeznek események, a on_event_batch visszahívás üres listával lesz meghívva. Ha ez az érték Nincs vagy 0 (alapértelmezett) értékre van állítva, a visszahívás nem lesz meghívva, amíg az események meg nem érkeznek.

partition_id
str

Ha meg van adva, az ügyfél csak erről a partícióról fog fogadni. Ellenkező esetben az ügyfél az összes partíciótól megkapja a kódot.

owner_level
int

A kizárólagos fogyasztó prioritása. Ha owner_level be van állítva, létrejön egy kizárólagos fogyasztó. A magasabb owner_level rendelkező fogyasztóknak magasabb a kizárólagos prioritásuk. A tulajdonosi szint a fogyasztó "korszakértékének" is tekinthető.

prefetch
int

A szolgáltatásból feldolgozásra elővetendő események száma. Az alapértelmezett érték 300.

track_last_enqueued_event_properties
bool

Azt jelzi, hogy a fogyasztónak adatokat kell-e kérnie az utolsóként lekért eseményről a társított partíción, és nyomon követheti ezeket az információkat események érkezésekor. Az utolsóként lekérdezett partíciókkal kapcsolatos információk nyomon követésekor az Event Hubs szolgáltatástól kapott minden esemény metaadatokat fog hordozni a partícióról. Ez kis mennyiségű további hálózati sávszélesség-használatot eredményez, ami általában kedvező kompromisszum, ha figyelembe vesszük, hogy az Event Hub-ügyfél használatával rendszeres időközönként kérnek partíciótulajdonságokat. Alapértelmezés szerint False ( Hamis ) értékre van állítva.

starting_position
str, int, datetime vagy dict[str,any]

Kezdje el fogadni ezt az eseményhelyzetet, ha nincs ellenőrzőpont-adat egy partícióhoz. Az ellenőrzőpont-adatok akkor lesznek felhasználva, ha elérhetők. Ez lehet egy partícióazonosítóval rendelkező diktálás, amely kulcsként és pozícióként szolgál az egyes partíciókhoz, vagy egyetlen érték az összes partícióhoz. Az érték típusa lehet str, int vagy datetime.datetime. Emellett támogatottak a "-1" értékek a stream elejétől való fogadáshoz, és a "@latest" a csak új események fogadásához.

starting_position_inclusive
bool vagy dict[str,bool]

Határozza meg, hogy az adott starting_position befogadó(>=) vagy sem (>). Igaz a befogadóra, a Hamis pedig a kizárólagosra. Ez lehet kulcsként partícióazonosítóval rendelkező diktálás, az érték pedig bool, amely azt jelzi, hogy egy adott partíció starting_position tartalmazza-e vagy sem. Ez az összes starting_position egyetlen bool értéke is lehet. Az alapértelmezett érték Hamis.

on_error
Callable[[PartitionContext, Exception]]

A visszahívási függvény, amely akkor lesz meghívva, ha az újrapróbálkozási kísérletek kimerülése után vagy a terheléselosztási folyamat során hiba merül fel a fogadás során. A visszahívás két paramétert vesz igénybe: partition_context , amely partícióadatokat tartalmaz, és a kivétel a hiba . partition_context Nem lehet, ha a hiba a terheléselosztási folyamat során merül fel. A visszahívást a következőképpen kell definiálni: on_error(partition_context, hiba). A on_error visszahívást akkor is meghívja a rendszer, ha nem kezelt kivétel lép fel a on_event visszahívás során.

on_partition_initialize
Callable[[PartitionContext]]

A visszahívási függvény, amely egy adott partíció fogyasztója után lesz meghívva, befejezi az inicializálást. A rendszer akkor is meghívja, ha létrehoznak egy új belső partíciófelhasználót, hogy átvegye a sikertelen és zárt belső partíciófelhasználók fogadási folyamatát. A visszahívás egyetlen paramétert vesz igénybe: partition_context , amely tartalmazza a partíció adatait. A visszahívást a következőképpen kell definiálni: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

A visszahívási függvény, amely egy adott partíció fogyasztójának bezárása után lesz meghívva. A rendszer akkor is meghívja, ha az újrapróbálkozási kísérletek kimerítése után hiba keletkezik a fogadás során. A visszahívás két paramétert vesz igénybe: partition_context , amely partícióadatokat és a lezárás okát tartalmazza. A visszahívást a következőképpen kell definiálni: on_partition_close(partition_context, ok). CloseReason Tekintse meg a különböző záró okokat.

Visszatérési típus

Példák

Események fogadása kötegekben az EventHubról.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )