EventHubConsumerClient Osztály

Az EventHubConsumerClient osztály egy magas szintű felületet határoz meg az események fogadásához a Azure Event Hubs szolgáltatásból.

Az EventHubConsumerClient fő célja, hogy az EventHub összes partíciójáról fogadjon eseményeket terheléselosztással és ellenőrzőpontokkal.

Ha több EventHubConsumerClient-példány fut ugyanazon az eseményközponton, fogyasztói csoporton és ellenőrzőpont-helyen, a partíciók egyenletesen lesznek elosztva közöttük.

A terheléselosztás és a megőrzött ellenőrzőpontok engedélyezéséhez checkpoint_store kell beállítani az EventHubConsumerClient létrehozásakor. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont a memóriában marad.

Az EventHubConsumerClient egy adott partícióról is fogadhat, amikor meghívja annak metódusát receive() vagy receive_batch(), és megadja a partition_id. A terheléselosztás nem működik egypartíciós módban. A felhasználók azonban továbbra is menthetik az ellenőrzőpontokat, ha a checkpoint_store be van állítva.

Öröklődés
azure.eventhub._client_base.ClientBase
EventHubConsumerClient

Konstruktor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Paraméterek

fully_qualified_namespace
str
Kötelező

Az Event Hubs-névtér teljes gazdagépneve. A névtér formátuma: .servicebus.windows.net.

eventhub_name
str
Kötelező

Az adott eseményközpont elérési útja az ügyfél csatlakoztatásához.

consumer_group
str
Kötelező

Események fogadása ehhez a fogyasztói csoporthoz tartozó eseményközpontból.

credential
TokenCredential vagy AzureSasCredential vagy AzureNamedKeyCredential
Kötelező

A hitelesítéshez használt hitelesítőadat-objektum, amely egy adott felületet implementál a jogkivonatok lekéréséhez. Elfogadja az EventHubSharedKeyCredentialazure-identity kódtár által létrehozott , vagy hitelesítő adatokat tartalmazó objektumokat, valamint azokat az objektumokat, amelyek a *get_token(saját, hatókörök) metódust implementálják.

logging_enable
bool

Hálózati nyomkövetési naplók kimenete a naplózónak. Az alapértelmezett érték Hamis.

auth_timeout
float

A jogkivonat szolgáltatás általi engedélyezésének időtartama másodpercben. Az alapértelmezett érték 60 másodperc. Ha 0 értékre van állítva, az ügyfél nem kényszerít időtúllépést.

user_agent
str

Ha meg van adva, ez a felhasználói ügynök sztringje előtt lesz hozzáadva.

retry_total
int

A sikertelen művelet ismételt végrehajtására tett kísérletek teljes száma hiba esetén. Az alapértelmezett érték 3. A fogadási retry_total kontextusa különleges: A fogadási metódust az egyes iterációkban egy időhurkos, belső fogadási metódust hív meg. A fogadási esetben retry_total adja meg az újrapróbálkozások számát a belső fogadó metódus által a while-loopban kiváltott hiba után. Ha az újrapróbálkozási kísérletek kimerülnek, a rendszer meghívja a on_error visszahívást (ha meg van adva) a hibainformációkkal. A rendszer bezárja a sikertelen belső partíció-fogyasztót (on_partition_close ha meg van adva), és új belső partíciófelhasználó jön létre (on_partition_initialize lesz meghívva) a fogadás folytatásához.

retry_backoff_factor
float

A második próbálkozás utáni kísérletek közötti visszalépési tényező (a legtöbb hibát azonnal, egy második próbálkozással, késedelem nélkül meg kell oldani). Rögzített módban az újrapróbálkozási szabályzat mindig alvó állapotba kerül a(z) {backoff factor} esetében. "Exponenciális" módban az újrapróbálkozási szabályzat a következőhöz fog aludni: {backoff factor} * (2 ** ({az újrapróbálkozások száma} – 1)) másodperc. Ha a backoff_factor 0,1, akkor az újrapróbálkozás az újrapróbálkozások között [0.0s, 0.2s, 0.4s, ...] állapotban lesz. Az alapértelmezett érték 0,8.

retry_backoff_max
float

A maximális visszaállási idő. Az alapértelmezett érték 120 másodperc (2 perc).

retry_mode
str

Az újrapróbálkozási kísérletek közötti késleltetési viselkedés. A támogatott értékek "rögzítettek" vagy "exponenciálisak", ahol az alapértelmezett érték az "exponenciális".

idle_timeout
float

Időtúllépés másodpercben, amely után az ügyfél bezárja a mögöttes kapcsolatot, ha nincs további tevékenység. Alapértelmezés szerint az érték Nincs, ami azt jelenti, hogy az ügyfél nem fog leállni inaktivitás miatt, hacsak a szolgáltatás nem kezdeményezi.

transport_type
TransportType

Az Event Hubs szolgáltatással való kommunikációhoz használt átviteli protokoll típusa. Az alapértelmezett érték a TransportType.Amqp , amely esetben az 5671-es portot használja. Ha az 5671-es port nem érhető el/blokkolva van a hálózati környezetben, akkor a TransportType.AmqpOverWebsocket használható, amely a 443-es portot használja a kommunikációhoz.

http_proxy
dict[str, str vagy int]

HTTP-proxybeállítások. Ennek a szótárnak a következő kulcsokkal kell rendelkeznie: "proxy_hostname" (str érték) és "proxy_port" (int érték). Emellett a következő kulcsok is lehetnek jelen: "felhasználónév", "jelszó".

checkpoint_store
CheckpointStore vagy None

Egy kezelő, amely a partíció terheléselosztási és ellenőrzőpont-adatait tárolja az események fogadásakor. Az ellenőrzőpont-tároló mindkét esetben az összes partícióból vagy egyetlen partícióból való fogadáskor használatos. Az utóbbi esetben a terheléselosztás nem érvényes. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont belsőleg megmarad a memóriában, és az EventHubConsumerClient-példány terheléselosztás nélkül fogadja az eseményeket.

load_balancing_interval
float

Amikor a terheléselosztás beindul. Ez a két terheléselosztási értékelés közötti időköz másodpercben. Az alapértelmezett érték 30 másodperc.

partition_ownership_expiration_interval
float

A partíció tulajdonjoga ennyi másodperc elteltével lejár. Minden terheléselosztási kiértékelés automatikusan meghosszabbítja a tulajdonjog lejárati idejét. Az alapértelmezett érték 6 * load_balancing_interval, azaz 180 másodperc az alapértelmezett 30 másodperces load_balancing_interval használatakor.

load_balancing_strategy
str vagy LoadBalancingStrategy

Amikor beindul a terheléselosztás, ezzel a stratégiával igényelheti és kiegyensúlyozza a partíció tulajdonjogát. Használja a "kapzsi" vagy a LoadBalancingStrategy.GREEDY parancsot a kapzsi stratégiához, amely minden terheléselosztási értékeléshez annyi nem igényelt partíciót fog megragadni, amennyi a terhelés kiegyensúlyozásához szükséges. Használja a "balanced" vagy a LoadBalancingStrategy.BALANCED függvényt a kiegyensúlyozott stratégiához, amely minden terheléselosztási értékeléshez csak egy olyan partíciót igényel, amelyet más EventHubConsumerClient nem igényel. Ha egy EventHub összes partícióját más EventHubConsumerClient követeli, és ez az ügyfél túl kevés partíciót igényelt, az ügyfél a terheléselosztási stratégiától függetlenül ellop egy partíciót a többi ügyféltől minden terheléselosztási értékeléshez. A kapzsi stratégia alapértelmezés szerint használatos.

custom_endpoint_address
str vagy None

Az Event Hubs szolgáltatáshoz való csatlakozáshoz használandó egyéni végpontcím, amely lehetővé teszi a hálózati kérések átirányítását a gazdagépkörnyezethez szükséges bármely alkalmazásátjárón vagy más útvonalon. Az alapértelmezett érték Nincs. A formátum a következő: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Ha a port nincs megadva a custom_endpoint_address, a rendszer alapértelmezés szerint a 443-at használja.

connection_verify
str vagy None

A kapcsolati végpont identitásának hitelesítéséhez használt SSL-tanúsítvány egyéni CA_BUNDLE fájljának elérési útja. Alapértelmezés szerint Nincs, ebben az esetben a certifi.where() lesz használva.

uamqp_transport
bool

Azt, hogy az uamqp kódtárat használja-e alapul szolgáló átvitelként. Az alapértelmezett érték Hamis, és a rendszer a Pure Python AMQP-kódtárat használja alapul szolgáló átvitelként.

socket_timeout
float

Az az idő másodpercben, amíg a kapcsolat mögöttes szoftvercsatornájának várnia kell az adatok küldésekor és fogadásakor, mielőtt túllépi az időkorlátot. Az alapértelmezett érték a TransportType.Amqp esetében 0.2, a TransportType.AmqpOverWebsocket esetében pedig 1. Ha az EventHubsConnectionError hiba az írási időtúllépés miatt jelentkezik, előfordulhat, hogy az alapértelmezettnél nagyobb értéket kell megadni. Ez speciális használati forgatókönyvekhez készült, és általában az alapértelmezett értéknek elegendőnek kell lennie.

Példák

Hozza létre az EventHubConsumerClient új példányát.


   import os
   from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,
       consumer_group='$Default',
       credential=credential)

Metódusok

close

Állítsa le az eseményeket az Event Hubról, és zárja be a mögöttes AMQP-kapcsolatot és hivatkozásokat.

from_connection_string

EventHubConsumerClient létrehozása egy kapcsolati karakterlánc.

get_eventhub_properties

Az eseményközpont tulajdonságainak lekérése.

A visszaadott szótár kulcsai a következők:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Kérje le az eseményközpont partícióazonosítóit.

get_partition_properties

A megadott partíció tulajdonságainak lekérése.

A tulajdonságok szótárában található kulcsok a következők:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpont-készítéssel.

receive_batch

Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpont-készítéssel.

close

Állítsa le az eseményeket az Event Hubról, és zárja be a mögöttes AMQP-kapcsolatot és hivatkozásokat.

close() -> None

Visszatérési típus

Példák

Zárja be az ügyfelet.


   import os
   import threading

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group="$Default",
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, multi-thread will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The 'receive' method is a blocking call, it can be executed in a thread for
   # non-blocking behavior, and combined with the 'close' method.

   worker = threading.Thread(
       target=consumer.receive,
       kwargs={
           "on_event": on_event,
           "starting_position": "-1",  # "-1" is from the beginning of the partition.
       }
   )
   worker.start()
   time.sleep(10)  # Keep receiving for 10s then close.
   # Close down the consumer handler explicitly.
   consumer.close()

from_connection_string

EventHubConsumerClient létrehozása egy kapcsolati karakterlánc.

from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient

Paraméterek

conn_str
str
Kötelező

Egy eseményközpont kapcsolati karakterlánc.

consumer_group
str
Kötelező

Események fogadása az eseményközpontból ehhez a fogyasztói csoporthoz.

eventhub_name
str

Az adott eseményközpont elérési útja az ügyfél csatlakoztatásához.

logging_enable
bool

Hálózati nyomkövetési naplók kimenete a naplózónak. Az alapértelmezett érték Hamis.

auth_timeout
float

A jogkivonat szolgáltatás általi engedélyezésének időtartama másodpercben. Az alapértelmezett érték 60 másodperc. Ha 0 értékre van állítva, az ügyfél nem kényszerít időtúllépést.

user_agent
str

Ha meg van adva, ez a felhasználói ügynök sztringje előtt lesz hozzáadva.

retry_total
int

A sikertelen művelet ismételt végrehajtására tett kísérletek teljes száma hiba esetén. Az alapértelmezett érték 3. A fogadási retry_total kontextusa különleges: A fogadási metódust az egyes iterációkban egy időhurkos, belső fogadási metódust hív meg. A fogadási esetben retry_total adja meg az újrapróbálkozások számát a belső fogadó metódus által a while-loopban kiváltott hiba után. Ha az újrapróbálkozási kísérletek kimerülnek, a rendszer meghívja a on_error visszahívást (ha meg van adva) a hibainformációkkal. A rendszer bezárja a sikertelen belső partíció-fogyasztót (on_partition_close ha meg van adva), és új belső partíciófelhasználó jön létre (on_partition_initialize lesz meghívva) a fogadás folytatásához.

retry_backoff_factor
float

A második próbálkozás utáni kísérletek közötti visszalépési tényező (a legtöbb hibát azonnal, egy második próbálkozással, késedelem nélkül meg kell oldani). Rögzített módban az újrapróbálkozási szabályzat mindig alvó állapotba kerül a(z) {backoff factor} esetében. "Exponenciális" módban az újrapróbálkozási szabályzat a következőhöz fog aludni: {backoff factor} * (2 ** ({az újrapróbálkozások száma} – 1)) másodperc. Ha a backoff_factor 0,1, akkor az újrapróbálkozás az újrapróbálkozások között [0.0s, 0.2s, 0.4s, ...] esetén alvó állapotba kerül. Az alapértelmezett érték 0,8.

retry_backoff_max
float

A maximális visszalépési idő. Az alapértelmezett érték 120 másodperc (2 perc).

retry_mode
str

Az újrapróbálkozási kísérletek közötti késleltetési viselkedés. A támogatott értékek "rögzítettek" vagy "exponenciálisak", ahol az alapértelmezett érték az "exponenciális".

idle_timeout
float

Időtúllépés másodpercben, amely után az ügyfél lezárja a mögöttes kapcsolatot, ha nincs furthur-tevékenység. Alapértelmezés szerint a Nincs érték, ami azt jelenti, hogy az ügyfél nem fog leállni inaktivitás miatt, hacsak a szolgáltatás nem kezdeményezi.

transport_type
TransportType

Az Event Hubs szolgáltatással való kommunikációhoz használt átviteli protokoll típusa. Az alapértelmezett érték a TransportType.Amqp , amely esetben az 5671-es portot használja. Ha az 5671-es port nem érhető el/le van tiltva a hálózati környezetben, a TransportType.AmqpOverWebsocket használható helyette, amely a 443-es portot használja a kommunikációhoz.

http_proxy
dict

HTTP-proxybeállítások. Ennek a szótárnak a következő kulcsokkal kell rendelkeznie: "proxy_hostname" (str érték) és "proxy_port" (int érték). Emellett a következő kulcsok is lehetnek jelen: "felhasználónév", "jelszó".

checkpoint_store
CheckpointStore vagy None

Egy kezelő, amely a partíció terheléselosztási és ellenőrzőpont-adatait tárolja az események fogadásakor. Az ellenőrzőpont-tárolót mindkét esetben az összes partícióról vagy egyetlen partícióról fogadjuk. Az utóbbi esetben a terheléselosztás nem alkalmazható. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont a memóriában marad, és az EventHubConsumerClient példány terheléselosztás nélkül fogadja az eseményeket.

load_balancing_interval
float

Amikor a terheléselosztás beindul. Ez a két terheléselosztási kiértékelés közötti időköz másodpercben. Az alapértelmezett érték 10 másodperc.

partition_ownership_expiration_interval
float

A partíció tulajdonjoga ennyi másodperc elteltével lejár. Minden terheléselosztási kiértékelés automatikusan meghosszabbítja a tulajdonjog lejárati idejét. Az alapértelmezett érték 6 * load_balancing_interval, azaz 60 másodperc az alapértelmezett 30 másodperces load_balancing_interval használatakor.

load_balancing_strategy
str vagy LoadBalancingStrategy

Amikor beindul a terheléselosztás, ezzel a stratégiával igényelheti és egyensúlyba állíthatja a partíció tulajdonjogát. Használja a "kapzsi" vagy a LoadBalancingStrategy.GREEDY értéket a kapzsi stratégiához, amely minden terheléselosztási kiértékeléshez annyi felszabadítatlan partíciót fog megragadni, amennyi a terhelés elosztásához szükséges. Használja a "balanced" vagy a LoadBalancingStrategy.BALANCED függvényt az kiegyensúlyozott stratégiához, amely minden terheléselosztási értékeléshez csak egy olyan partíciót igényel, amelyet más EventHubConsumerClient nem igényel. Ha egy EventHub összes partícióját egy másik EventHubConsumerClient igényelte, és az ügyfél túl kevés partíciót igényelt, az ügyfél a terheléselosztási stratégiától függetlenül minden terheléselosztási kiértékeléshez ellop egy partíciót a többi ügyféltől. A kapzsi stratégia alapértelmezés szerint használatban van.

custom_endpoint_address
str vagy None

Az Event Hubs szolgáltatással való kapcsolat létrehozásához használandó egyéni végpontcím, amely lehetővé teszi a hálózati kérések átirányítását a gazdakörnyezethez szükséges bármely alkalmazásátjárón vagy más útvonalon. Az alapértelmezett érték a Nincs. A formátum a következő: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Ha a port nincs megadva a custom_endpoint_address, a rendszer alapértelmezés szerint a 443-at használja.

connection_verify
str vagy None

A kapcsolati végpont identitásának hitelesítéséhez használt SSL-tanúsítvány egyéni CA_BUNDLE fájljának elérési útja. Az alapértelmezett érték Nincs, ebben az esetben a certifi.where() lesz használva.

uamqp_transport
bool

Azt határozza meg, hogy az uamqp kódtárat használja-e alapul szolgáló átvitelként. Az alapértelmezett érték Hamis, és a rendszer a Pure Python AMQP-kódtárat használja alapul szolgáló átvitelként.

Visszatérési típus

Példák

Hozzon létre egy új EventHubConsumerClient-példányt a kapcsolati karakterlánc.


   import os
   from azure.eventhub import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Az eseményközpont tulajdonságainak lekérése.

A visszaadott szótár kulcsai a következők:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

Válaszok

Az eseményközpontra vonatkozó információkat tartalmazó szótár.

Visszatérési típus

Kivételek

get_partition_ids

Kérje le az eseményközpont partícióazonosítóit.

get_partition_ids() -> List[str]

Válaszok

A partícióazonosítók listája.

Visszatérési típus

Kivételek

get_partition_properties

A megadott partíció tulajdonságainak lekérése.

A tulajdonságok szótárában található kulcsok a következők:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Paraméterek

partition_id
str
Kötelező

A célpartíció azonosítója.

Válaszok

Partíciótulajdonságokat tartalmazó szótár.

Visszatérési típus

Kivételek

receive

Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpont-készítéssel.

receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None

Paraméterek

on_event
callable[PartitionContext, EventData vagy None]
Kötelező

A fogadott események kezelésére szolgáló visszahívási függvény. A visszahívás két paramétert vesz igénybe: partition_context , amely partíciókörnyezetet és a fogadott eseményt tartalmazó eseményt tartalmaz. A visszahívási függvényt a következőképpen kell definiálni: on_event(partition_context, esemény). A partíciókörnyezettel kapcsolatos részletes információkért tekintse meg a következőt: PartitionContext.

max_wait_time
float

Az eseményfeldolgozó által a visszahívás meghívása előtt várakozó maximális időköz másodpercben. Ha ezen az időtartamon belül nem érkeznek események, a on_eventvisszahívást a Rendszer a Nincs beállítással hívja meg. Ha ez az érték Nincs vagy 0 (alapértelmezett) értékre van állítva, a visszahívás nem lesz meghívva, amíg egy esemény nem érkezik.

partition_id
str

Ha meg van adva, az ügyfél csak erről a partícióról fog fogadni. Ellenkező esetben az ügyfél minden partícióról megkapja a kódot.

owner_level
int

A kizárólagos fogyasztó prioritása. Ha owner_level be van állítva, létrejön egy kizárólagos fogyasztó. A magasabb owner_level rendelkező fogyasztók magasabb kizárólagos prioritást élveznek. A tulajdonosi szint a fogyasztó "alapértékeként" is ismert.

prefetch
int

A szolgáltatásból feldolgozásra elővetendő események száma. Az alapértelmezett érték 300.

track_last_enqueued_event_properties
bool

Azt jelzi, hogy a fogyasztónak adatokat kell-e kérnie a társított partíción az utolsóként lekért eseményről, és nyomon követheti ezeket az információkat az események fogadása során. A partíciók utolsó lekérdezett eseményével kapcsolatos információk nyomon követésekor az Event Hubs szolgáltatástól kapott minden esemény metaadatokat fog hordozni a partícióról. Ez kis mennyiségű további hálózati sávszélesség-használatot eredményez, amely általában kedvező kompromisszumot jelent, ha figyelembe vesszük, hogy a partíciótulajdonságokra vonatkozó kéréseket rendszeres időközönként az Event Hub-ügyfél használatával kell elvégezni. Alapértelmezés szerint False ( Hamis ) értékre van állítva.

starting_position
str, int, datetime vagy dict[str,any]

Kezdje el fogadni ezt az eseményhelyzetet, ha nincs ellenőrzőpont-adat egy partícióhoz. A rendszer ellenőrzőpont-adatokat használ, ha elérhetők. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és pozícióként az egyes partíciók értékeként, vagy egyetlen érték az összes partícióhoz. Az érték típusa lehet str, int vagy datetime.datetime. Szintén támogatottak a "-1" értékek a stream elejétől való fogadáshoz, és "@latest" csak az új események fogadásához. Az alapértelmezett érték a "@latest".

starting_position_inclusive
bool vagy dict[str,bool]

Állapítsa meg, hogy a megadott starting_position befogadó(>=) vagy sem (>). Igaz a befogadó és a hamis kizárólagos. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és logikai értékként, amely azt jelzi, hogy egy adott partíció starting_position befogadó-e vagy sem. Ez az összes starting_position egyetlen bool értéke is lehet. Az alapértelmezett érték a Hamis.

on_error
callable[[PartitionContext, Exception]]

A visszahívási függvény, amely akkor lesz meghívva, ha az újrapróbálkozási kísérletek kimerítése vagy a terheléselosztás folyamata során hiba keletkezik a fogadás során. A visszahívás két paramétert vesz igénybe: partition_context , amely partícióinformációkat tartalmaz, és kivételként a hiba . partition_context lehet Nincs, ha a hiba a terheléselosztási folyamat során jelentkezik. A visszahívást a következőképpen kell definiálni: on_error(partition_context, hiba). A on_error visszahívást is meghívja a rendszer, ha a on_event visszahívás során nem kezelt kivétel lép fel.

on_partition_initialize
callable[[PartitionContext]]

A visszahívási függvény, amely egy adott partíció fogyasztója után lesz meghívva, befejezi az inicializálást. A rendszer akkor is meghívja, ha egy új belső partíciófelhasználót hoznak létre, hogy átvegye a sikertelen és lezárt belső partíciófelhasználók fogadási folyamatát. A visszahívás egyetlen paramétert vesz fel: partition_context , amely tartalmazza a partíció adatait. A visszahívást a következőképpen kell definiálni: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

A visszahívási függvény, amely egy adott partíció fogyasztójának bezárása után lesz meghívva. A rendszer akkor is meghívja, ha az újrapróbálkozási kísérletek kimerítése után hiba keletkezik a fogadás során. A visszahívás két paramétert vesz fel: partition_context , amely partícióinformációkat és a lezárás okát tartalmazza. A visszahívást a következőképpen kell definiálni: on_partition_close(partition_context, ok). CloseReason Tekintse meg a különböző záró okokat.

Visszatérési típus

Példák

Események fogadása az EventHubról.


       logger = logging.getLogger("azure.eventhub")

       def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive(on_event=on_event)

receive_batch

Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpont-készítéssel.

receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None

Paraméterek

on_event_batch
callable[PartitionContext, list[EventData]]
Kötelező

A fogadott események kötegének kezelésére szolgáló visszahívási függvény. A visszahívás két paramétert vesz igénybe: partition_context , amely partíciókörnyezetet és event_batch tartalmaz, amely a fogadott események. A visszahívási függvényt a következőképpen kell definiálni: on_event_batch(partition_context, event_batch). event_batch lehet egy üres lista, ha max_wait_time nem Nincs vagy 0, és nem érkezik esemény max_wait_time után. A partíciókörnyezettel kapcsolatos részletes információkért tekintse meg a következőt: PartitionContext.

max_batch_size
int

A visszahívási on_event_batch átadott kötegben lévő események maximális száma. Ha a tényleges fogadott események száma nagyobb , mint max_batch_size, a fogadott események kötegekre vannak osztva, és az egyes kötegek visszahívását legfeljebb max_batch_size eseményekkel hívja meg.

max_wait_time
float

Az eseményfeldolgozó által a visszahívás meghívása előtt várakozó maximális időköz másodpercben. Ha ezen az időtartamon belül nem érkeznek események, a on_event_batch visszahívás egy üres listával lesz meghívva.

partition_id
str

Ha meg van adva, az ügyfél csak erről a partícióról fog fogadni. Ellenkező esetben az ügyfél minden partícióról megkapja a kódot.

owner_level
int

A kizárólagos fogyasztó prioritása. Ha owner_level be van állítva, létrejön egy kizárólagos fogyasztó. A magasabb owner_level rendelkező fogyasztók magasabb kizárólagos prioritást élveznek. A tulajdonosi szint a fogyasztó "alapértékeként" is ismert.

prefetch
int

A szolgáltatásból feldolgozásra elővetendő események száma. Az alapértelmezett érték 300.

track_last_enqueued_event_properties
bool

Azt jelzi, hogy a fogyasztónak adatokat kell-e kérnie a társított partíción az utolsóként lekért eseményről, és nyomon követheti ezeket az információkat az események fogadása során. A partíciók utolsó lekérdezett eseményével kapcsolatos információk nyomon követésekor az Event Hubs szolgáltatástól kapott minden esemény metaadatokat fog hordozni a partícióról. Ez kis mennyiségű további hálózati sávszélesség-használatot eredményez, amely általában kedvező kompromisszumot jelent, ha figyelembe vesszük, hogy a partíciótulajdonságokra vonatkozó kéréseket rendszeres időközönként az Event Hub-ügyfél használatával kell elvégezni. Alapértelmezés szerint False ( Hamis ) értékre van állítva.

starting_position
str, int, datetime vagy dict[str,any]

Kezdje el fogadni ezt az eseményhelyzetet, ha nincs ellenőrzőpont-adat egy partícióhoz. A rendszer ellenőrzőpont-adatokat használ, ha elérhetők. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és pozícióként az egyes partíciók értékeként, vagy egyetlen érték az összes partícióhoz. Az érték típusa lehet str, int vagy datetime.datetime. Szintén támogatottak a "-1" értékek a stream elejétől való fogadáshoz, és "@latest" csak az új események fogadásához. Az alapértelmezett érték a "@latest".

starting_position_inclusive
bool vagy dict[str,bool]

Állapítsa meg, hogy a megadott starting_position befogadó(>=) vagy sem (>). Igaz a befogadó és a hamis kizárólagos. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és logikai értékként, amely azt jelzi, hogy egy adott partíció starting_position befogadó-e vagy sem. Ez az összes starting_position egyetlen bool értéke is lehet. Az alapértelmezett érték a Hamis.

on_error
callable[[PartitionContext, Exception]]

A visszahívási függvény, amely akkor lesz meghívva, ha az újrapróbálkozási kísérletek kimerítése vagy a terheléselosztás folyamata során hiba keletkezik a fogadás során. A visszahívás két paramétert vesz igénybe: partition_context , amely partícióinformációkat tartalmaz, és kivételként a hiba . partition_context lehet Nincs, ha a hiba a terheléselosztási folyamat során jelentkezik. A visszahívást a következőképpen kell definiálni: on_error(partition_context, hiba). A on_error visszahívást is meghívja a rendszer, ha a on_event visszahívás során nem kezelt kivétel lép fel.

on_partition_initialize
callable[[PartitionContext]]

A visszahívási függvény, amely egy adott partíció fogyasztója után lesz meghívva, befejezi az inicializálást. A rendszer akkor is meghívja, ha egy új belső partíciófelhasználót hoznak létre, hogy átvegye a sikertelen és lezárt belső partíciófelhasználók fogadási folyamatát. A visszahívás egyetlen paramétert vesz fel: partition_context , amely tartalmazza a partíció adatait. A visszahívást a következőképpen kell definiálni: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

A visszahívási függvény, amely egy adott partíció fogyasztójának bezárása után lesz meghívva. A rendszer akkor is meghívja, ha az újrapróbálkozási kísérletek kimerítése után hiba keletkezik a fogadás során. A visszahívás két paramétert vesz fel: partition_context , amely partícióinformációkat és a lezárás okát tartalmazza. A visszahívást a következőképpen kell definiálni: on_partition_close(partition_context, ok). CloseReason Tekintse meg a különböző záró okokat.

Visszatérési típus

Példák

Események fogadása kötegekben az EventHubról.


       logger = logging.getLogger("azure.eventhub")

       def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received events from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive_batch(on_event_batch=on_event_batch)