EventHubConsumerClient Osztály
Az EventHubConsumerClient osztály egy magas szintű felületet határoz meg az események fogadásához a Azure Event Hubs szolgáltatásból.
Az EventHubConsumerClient fő célja, hogy a terheléselosztással és ellenőrzőpontokkal rendelkező EventHub összes partíciójáról fogadjon eseményeket.
Ha több EventHubConsumerClient-példány fut ugyanazon az eseményközponton, a fogyasztói csoporton és az ellenőrzőpont-helyen, a partíciók egyenletesen lesznek elosztva közöttük.
A terheléselosztás és a megőrzött ellenőrzőpontok engedélyezéséhez checkpoint_store be kell állítani az EventHubConsumerClient létrehozásakor. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont belsőleg megmarad a memóriában.
Az EventHubConsumerClient egy adott partícióról is fogadhat, amikor meghívja a metódusát receive() vagy receive_batch(), és megadja a partition_id. A terheléselosztás nem működik egypartíciós módban. A felhasználók azonban továbbra is menthetik az ellenőrzőpontokat, ha a checkpoint_store be van állítva.
- Öröklődés
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
Konstruktor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Paraméterek
- fully_qualified_namespace
- str
Az Event Hubs-névtér teljes állomásneve. A névtér formátuma: .servicebus.windows.net.
- eventhub_name
- str
Annak az eseményközpontnak az elérési útja, amelyhez az ügyfelet csatlakoztatni szeretné.
- credential
- AsyncTokenCredential vagy AzureSasCredential vagy AzureNamedKeyCredential
A hitelesítéshez használt hitelesítőadat-objektum, amely egy adott felületet implementál a jogkivonatok lekéréséhez. Elfogadja az EventHubSharedKeyCredentialazure-identity kódtár által létrehozott , vagy hitelesítőadat-objektumokat, valamint az *get_token(saját, hatókörök) metódust implementáló objektumokat.
- logging_enable
- bool
Hálózati nyomkövetési naplók kimenete a naplózónak. Az alapértelmezett érték Hamis.
- auth_timeout
- float
A jogkivonat szolgáltatás általi engedélyezésének időtartama másodpercben. Az alapértelmezett érték 60 másodperc. Ha 0-ra van állítva, az ügyfél nem kényszerít időtúllépést.
- user_agent
- str
Ha meg van adva, ez a felhasználói ügynök sztringje elé kerül.
- retry_total
- int
A sikertelen művelet ismételt végrehajtására tett kísérletek teljes száma hiba esetén. Az alapértelmezett érték 3. A fogadási retry_total kontextusa különleges: A fogadó metódust minden iterációban egy kis ciklusú belső fogadó metódus implementálja. A fogadási esetben a retry_total adja meg az újrapróbálkozások számát a belső fogadó metódus által a while-loopban kiváltott hiba után. Ha az újrapróbálkozási kísérletek elfogynak, a rendszer meghívja a on_error visszahívást (ha vannak ilyenek) a hibainformációkkal. A rendszer bezárja a sikertelen belső partíciófelhasználót (on_partition_close ha meg van adva), és új belső partíciófelhasználó jön létre (on_partition_initialize lesz meghívva, ha meg van adva) a fogadás folytatásához.
- retry_backoff_factor
- float
A második próbálkozás utáni próbálkozások közötti visszalépési tényező (a legtöbb hibát azonnal egy második próbálkozás késlelteti). Rögzített módban az újrapróbálkozási szabályzat mindig alvó állapotba kerül a(z) {backoff factor} esetében. "Exponenciális" módban az újrapróbálkozási szabályzat a következőre fog aludni: {backoff factor} * (2 ** ({az újrapróbálkozások száma} – 1)) másodpercig. Ha a backoff_factor 0,1, akkor az újrapróbálkozás az újrapróbálkozások között [0.0s, 0.2s, 0.4s, ...] esetén alvó állapotba kerül. Az alapértelmezett érték 0,8.
- retry_backoff_max
- float
A maximális visszalépési idő. Az alapértelmezett érték 120 másodperc (2 perc).
- retry_mode
- str
Az újrapróbálkozási kísérletek közötti késleltetési viselkedés. A támogatott értékek "rögzítettek" vagy "exponenciálisak", ahol az alapértelmezett érték az "exponenciális".
- idle_timeout
- float
Időtúllépés másodpercben, amely után az ügyfél lezárja a mögöttes kapcsolatot, ha nincs további tevékenység. Alapértelmezés szerint a Nincs érték, ami azt jelenti, hogy az ügyfél nem fog leállni inaktivitás miatt, hacsak a szolgáltatás nem kezdeményezi.
- transport_type
- TransportType
Az Event Hubs szolgáltatással való kommunikációhoz használt átviteli protokoll típusa. Az alapértelmezett érték a TransportType.Amqp , amely esetben az 5671-es portot használja. Ha az 5671-es port nem érhető el/le van tiltva a hálózati környezetben, a TransportType.AmqpOverWebsocket használható helyette, amely a 443-es portot használja a kommunikációhoz.
- http_proxy
HTTP-proxybeállítások. Ennek a szótárnak a következő kulcsokkal kell rendelkeznie: "proxy_hostname" (str érték) és "proxy_port" (int érték).
- checkpoint_store
- Optional[CheckpointStore]
Egy kezelő, amely a partíció terheléselosztási és ellenőrzőpont-adatait tárolja az események fogadásakor. Az ellenőrzőpont-tárolót mindkét esetben az összes partícióról vagy egyetlen partícióról fogadjuk. Az utóbbi esetben a terheléselosztás nem alkalmazható. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont a memóriában marad, és az EventHubConsumerClient példány terheléselosztás nélkül fogadja az eseményeket.
- load_balancing_interval
- float
Amikor a terheléselosztás beindul. Ez a két terheléselosztási kiértékelés közötti időköz másodpercben. Az alapértelmezett érték 30 másodperc.
- partition_ownership_expiration_interval
- float
A partíció tulajdonjoga ennyi másodperc elteltével lejár. Minden terheléselosztási kiértékelés automatikusan meghosszabbítja a tulajdonjog lejárati idejét. Az alapértelmezett érték 6 * load_balancing_interval, azaz 180 másodperc az alapértelmezett 30 másodperces load_balancing_interval használatakor.
- load_balancing_strategy
- str vagy LoadBalancingStrategy
Amikor beindul a terheléselosztás, ezzel a stratégiával igényelheti és egyensúlyba állíthatja a partíció tulajdonjogát. Használja a "kapzsi" vagy a LoadBalancingStrategy.GREEDY értéket a kapzsi stratégiához, amely minden terheléselosztási kiértékeléshez annyi felszabadítatlan partíciót fog megragadni, amennyi a terhelés elosztásához szükséges. Használja a "balanced" vagy a LoadBalancingStrategy.BALANCED függvényt az kiegyensúlyozott stratégiához, amely minden terheléselosztási értékeléshez csak egy olyan partíciót igényel, amelyet más EventHubConsumerClient nem igényel. Ha egy EventHub összes partícióját egy másik EventHubConsumerClient igényelte, és az ügyfél túl kevés partíciót igényelt, az ügyfél a terheléselosztási stratégiától függetlenül minden terheléselosztási kiértékeléshez ellop egy partíciót a többi ügyféltől. A kapzsi stratégia alapértelmezés szerint használatban van.
Az Event Hubs szolgáltatással való kapcsolat létrehozásához használandó egyéni végpontcím, amely lehetővé teszi a hálózati kérések átirányítását a gazdakörnyezethez szükséges bármely alkalmazásátjárón vagy más útvonalon. Az alapértelmezett érték a Nincs. A formátum a következő: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Ha a port nincs megadva a custom_endpoint_address, a rendszer alapértelmezés szerint a 443-at használja.
A kapcsolati végpont identitásának hitelesítéséhez használt SSL-tanúsítvány egyéni CA_BUNDLE fájljának elérési útja. Az alapértelmezett érték Nincs, ebben az esetben a certifi.where() lesz használva.
- uamqp_transport
- bool
Azt határozza meg, hogy az uamqp kódtárat használja-e alapul szolgáló átvitelként. Az alapértelmezett érték Hamis, és a rendszer a Pure Python AMQP-kódtárat használja alapul szolgáló átvitelként.
- socket_timeout
- float
Az az idő másodpercben, amikor a kapcsolat mögöttes szoftvercsatornájának várnia kell az adatok küldésekor és fogadásakor, mielőtt túllépi az időkorlátot. Az alapértelmezett érték a TransportType.Amqp esetében 0,2, a TransportType.AmqpOverWebsocket esetében pedig 1. Ha az EventHubsConnectionError hibák az írási időtúllépés miatt jelentkeznek, előfordulhat, hogy az alapértelmezettnél nagyobb értéket kell átadni. Ez speciális használati forgatókönyvekhez tartozik, és általában az alapértelmezett értéknek elegendőnek kell lennie.
Példák
Hozza létre az EventHubConsumerClient új példányát.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Metódusok
close |
Állítsa le az események lekérését az Eseményközpontból, és zárja be a mögöttes AMQP-kapcsolatot és hivatkozásokat. |
from_connection_string |
EventHubConsumerClient létrehozása egy kapcsolati karakterlánc. |
get_eventhub_properties |
Az eseményközpont tulajdonságainak lekérése. A visszaadott szótár kulcsai a következők:
|
get_partition_ids |
Kérje le az eseményközpont partícióazonosítóit. |
get_partition_properties |
A megadott partíció tulajdonságainak lekérése. A tulajdonságok szótárában található kulcsok a következők:
|
receive |
Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpontokkal. |
receive_batch |
Események fogadása partíció(k)ból kötegekben, opcionális terheléselosztással és ellenőrzőpontokkal. |
close
Állítsa le az események lekérését az Eseményközpontból, és zárja be a mögöttes AMQP-kapcsolatot és hivatkozásokat.
async close() -> None
Visszatérési típus
Példák
Zárja be az ügyfelet.
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
EventHubConsumerClient létrehozása egy kapcsolati karakterlánc.
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
Paraméterek
- eventhub_name
- str
Annak az eseményközpontnak az elérési útja, amelyhez az ügyfelet csatlakoztatni szeretné.
- logging_enable
- bool
Hálózati nyomkövetési naplók kimenete a naplózónak. Az alapértelmezett érték Hamis.
- http_proxy
- dict
HTTP-proxybeállítások. Ennek a szótárnak a következő kulcsokkal kell rendelkeznie: "proxy_hostname" (str érték) és "proxy_port" (int érték). Emellett a következő kulcsok is lehetnek jelen: "felhasználónév", "jelszó".
- auth_timeout
- float
A jogkivonat szolgáltatás általi engedélyezésének időtartama másodpercben. Az alapértelmezett érték 60 másodperc. Ha 0-ra van állítva, az ügyfél nem kényszerít időtúllépést.
- user_agent
- str
Ha meg van adva, ez a felhasználói ügynök sztringje elé kerül.
- retry_total
- int
A sikertelen művelet ismételt végrehajtására tett kísérletek teljes száma hiba esetén. Az alapértelmezett érték 3. A fogadási retry_total kontextusa különleges: A fogadó metódust minden iterációban egy kis ciklusú belső fogadó metódus implementálja. A fogadási esetben a retry_total adja meg az újrapróbálkozások számát a belső fogadó metódus által a while-loopban kiváltott hiba után. Ha az újrapróbálkozási kísérletek elfogynak, a rendszer meghívja a on_error visszahívást (ha vannak ilyenek) a hibainformációkkal. A rendszer bezárja a sikertelen belső partíciófelhasználót (on_partition_close ha meg van adva), és új belső partíciófelhasználó jön létre (on_partition_initialize lesz meghívva, ha meg van adva) a fogadás folytatásához.
- retry_backoff_factor
- float
A második próbálkozás utáni próbálkozások közötti visszalépési tényező (a legtöbb hibát azonnal egy második próbálkozás késlelteti). Rögzített módban az újrapróbálkozási szabályzat mindig alvó állapotba kerül a(z) {backoff factor} esetében. "Exponenciális" módban az újrapróbálkozási szabályzat a következőre fog aludni: {backoff factor} * (2 ** ({az újrapróbálkozások száma} – 1)) másodpercig. Ha a backoff_factor 0,1, akkor az újrapróbálkozás az újrapróbálkozások között [0.0s, 0.2s, 0.4s, ...] esetén alvó állapotba kerül. Az alapértelmezett érték 0,8.
- retry_backoff_max
- float
A maximális visszalépési idő. Az alapértelmezett érték 120 másodperc (2 perc).
- retry_mode
- str
Az újrapróbálkozási kísérletek közötti késleltetési viselkedés. A támogatott értékek "rögzítettek" vagy "exponenciálisak", ahol az alapértelmezett érték az "exponenciális".
- idle_timeout
- float
Időtúllépés másodpercben, amely után az ügyfél lezárja a mögöttes kapcsolatot, ha nincs további tevékenység. Alapértelmezés szerint a Nincs érték, ami azt jelenti, hogy az ügyfél nem fog leállni inaktivitás miatt, hacsak a szolgáltatás nem kezdeményezi.
- transport_type
- TransportType
Az Event Hubs szolgáltatással való kommunikációhoz használt átviteli protokoll típusa. Az alapértelmezett érték a TransportType.Amqp , amely esetben az 5671-es portot használja. Ha az 5671-es port nem érhető el/le van tiltva a hálózati környezetben, a TransportType.AmqpOverWebsocket használható helyette, amely a 443-es portot használja a kommunikációhoz.
- checkpoint_store
- Optional[CheckpointStore]
Egy kezelő, amely a partíció terheléselosztási és ellenőrzőpont-adatait tárolja az események fogadásakor. Az ellenőrzőpont-tárolót mindkét esetben az összes partícióról vagy egyetlen partícióról fogadjuk. Az utóbbi esetben a terheléselosztás nem alkalmazható. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont a memóriában marad, és az EventHubConsumerClient példány terheléselosztás nélkül fogadja az eseményeket.
- load_balancing_interval
- float
Amikor a terheléselosztás beindul. Ez a két terheléselosztási kiértékelés közötti időköz másodpercben. Az alapértelmezett érték 30 másodperc.
- partition_ownership_expiration_interval
- float
A partíció tulajdonjoga ennyi másodperc elteltével lejár. Minden terheléselosztási kiértékelés automatikusan meghosszabbítja a tulajdonjog lejárati idejét. Az alapértelmezett érték 6 * load_balancing_interval, azaz 180 másodperc az alapértelmezett 30 másodperces load_balancing_interval használatakor.
- load_balancing_strategy
- str vagy LoadBalancingStrategy
Amikor beindul a terheléselosztás, ezzel a stratégiával igényelheti és egyensúlyba állíthatja a partíció tulajdonjogát. Használja a "kapzsi" vagy a LoadBalancingStrategy.GREEDY értéket a kapzsi stratégiához, amely minden terheléselosztási kiértékeléshez annyi felszabadítatlan partíciót fog megragadni, amennyi a terhelés elosztásához szükséges. Használja a "balanced" vagy a LoadBalancingStrategy.BALANCED függvényt az kiegyensúlyozott stratégiához, amely minden terheléselosztási értékeléshez csak egy olyan partíciót igényel, amelyet más EventHubConsumerClient nem igényel. Ha egy EventHub összes partícióját egy másik EventHubConsumerClient igényelte, és az ügyfél túl kevés partíciót igényelt, az ügyfél a terheléselosztási stratégiától függetlenül minden terheléselosztási kiértékeléshez ellop egy partíciót a többi ügyféltől. A kapzsi stratégia alapértelmezés szerint használatban van.
Az Event Hubs szolgáltatással való kapcsolat létrehozásához használandó egyéni végpontcím, amely lehetővé teszi a hálózati kérések átirányítását a gazdakörnyezethez szükséges bármely alkalmazásátjárón vagy más útvonalon. Az alapértelmezett érték a Nincs. A formátum a következő: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Ha a port nincs megadva a custom_endpoint_address, a rendszer alapértelmezés szerint a 443-at használja.
A kapcsolati végpont identitásának hitelesítéséhez használt SSL-tanúsítvány egyéni CA_BUNDLE fájljának elérési útja. Az alapértelmezett érték Nincs, ebben az esetben a certifi.where() lesz használva.
- uamqp_transport
- bool
Azt határozza meg, hogy az uamqp kódtárat használja-e alapul szolgáló átvitelként. Az alapértelmezett érték Hamis, és a rendszer a Pure Python AMQP-kódtárat használja alapul szolgáló átvitelként.
Visszatérési típus
Példák
Hozzon létre egy új EventHubConsumerClient-példányt a kapcsolati karakterlánc.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Az eseményközpont tulajdonságainak lekérése.
A visszaadott szótár kulcsai a következők:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Válaszok
Az eseményközpontra vonatkozó információkat tartalmazó szótár.
Visszatérési típus
Kivételek
get_partition_ids
Kérje le az eseményközpont partícióazonosítóit.
async get_partition_ids() -> List[str]
Válaszok
A partícióazonosítók listája.
Visszatérési típus
Kivételek
get_partition_properties
A megadott partíció tulajdonságainak lekérése.
A tulajdonságok szótárában található kulcsok a következők:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Paraméterek
Válaszok
Partíciótulajdonságokat tartalmazó szótár.
Visszatérési típus
Kivételek
receive
Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpontokkal.
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Paraméterek
- on_event
- Callable[PartitionContext, Optional[EventData]]
A fogadott események kezelésére szolgáló visszahívási függvény. A visszahívás két paramétert vesz igénybe: partition_context , amely partíciókörnyezetet és a fogadott eseményt tartalmazó eseményt tartalmaz. A visszahívási függvényt a következőképpen kell definiálni: on_event(partition_context, esemény). A partíciókörnyezettel kapcsolatos részletes információkért tekintse meg a következőt: PartitionContext.
- max_wait_time
- float
Az eseményfeldolgozó által a visszahívás meghívása előtt várakozó maximális időköz másodpercben. Ha ezen az időtartamon belül nem érkeznek események, a on_eventvisszahívást a Rendszer a Nincs beállítással hívja meg. Ha ez az érték Nincs vagy 0 (alapértelmezett) értékre van állítva, a visszahívás nem lesz meghívva, amíg egy esemény nem érkezik.
- partition_id
- str
Ha meg van adva, az ügyfél csak erről a partícióról fog fogadni. Ellenkező esetben az ügyfél minden partícióról megkapja a kódot.
- owner_level
- int
A kizárólagos fogyasztó prioritása. Ha owner_level be van állítva, létrejön egy kizárólagos fogyasztó. A magasabb owner_level rendelkező fogyasztók magasabb kizárólagos prioritást élveznek. A tulajdonosi szint a fogyasztó "alapértékeként" is ismert.
- prefetch
- int
A szolgáltatásból feldolgozásra elővetendő események száma. Az alapértelmezett érték 300.
- track_last_enqueued_event_properties
- bool
Azt jelzi, hogy a fogyasztónak adatokat kell-e kérnie a társított partíción az utolsóként lekért eseményről, és nyomon követheti ezeket az információkat az események fogadása során. A partíciók utolsó lekérdezett eseményével kapcsolatos információk nyomon követésekor az Event Hubs szolgáltatástól kapott minden esemény metaadatokat fog hordozni a partícióról. Ez kis mennyiségű további hálózati sávszélesség-használatot eredményez, amely általában kedvező kompromisszumot jelent, ha figyelembe vesszük, hogy a partíciótulajdonságokra vonatkozó kéréseket rendszeres időközönként az Event Hub-ügyfél használatával kell elvégezni. Alapértelmezés szerint False ( Hamis ) értékre van állítva.
Kezdje el fogadni ezt az eseményhelyzetet, ha nincs ellenőrzőpont-adat egy partícióhoz. A rendszer ellenőrzőpont-adatokat használ, ha elérhetők. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és pozícióként az egyes partíciók értékeként, vagy egyetlen érték az összes partícióhoz. Az érték típusa lehet str, int vagy datetime.datetime. Szintén támogatottak a "-1" értékek a stream elejétől való fogadáshoz, és "@latest" csak az új események fogadásához.
Állapítsa meg, hogy a megadott starting_position befogadó(>=) vagy sem (>). Igaz a befogadó és a hamis kizárólagos. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és logikai értékként, amely azt jelzi, hogy egy adott partíció starting_position befogadó-e vagy sem. Ez az összes starting_position egyetlen bool értéke is lehet. Az alapértelmezett érték a Hamis.
- on_error
- Callable[[PartitionContext, Exception]]
A visszahívási függvény, amely akkor lesz meghívva, ha az újrapróbálkozási kísérletek kimerítése vagy a terheléselosztás folyamata során hiba keletkezik a fogadás során. A visszahívás két paramétert vesz igénybe: partition_context , amely partícióinformációkat tartalmaz, és kivételként a hiba . partition_context lehet Nincs, ha a hiba a terheléselosztási folyamat során jelentkezik. A visszahívást a következőképpen kell definiálni: on_error(partition_context, hiba). A on_error visszahívást is meghívja a rendszer, ha a on_event visszahívás során nem kezelt kivétel lép fel.
- on_partition_initialize
- Callable[[PartitionContext]]
A visszahívási függvény, amely egy adott partíció fogyasztója után lesz meghívva, befejezi az inicializálást. A rendszer akkor is meghívja, ha egy új belső partíciófelhasználót hoznak létre, hogy átvegye a sikertelen és lezárt belső partíciófelhasználók fogadási folyamatát. A visszahívás egyetlen paramétert vesz fel: partition_context , amely tartalmazza a partíció adatait. A visszahívást a következőképpen kell definiálni: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
A visszahívási függvény, amely egy adott partíció fogyasztójának bezárása után lesz meghívva. A rendszer akkor is meghívja, ha az újrapróbálkozási kísérletek kimerítése után hiba keletkezik a fogadás során. A visszahívás két paramétert vesz fel: partition_context , amely partícióinformációkat és a lezárás okát tartalmazza. A visszahívást a következőképpen kell definiálni: on_partition_close(partition_context, ok). CloseReason Tekintse meg a különböző záró okokat.
Visszatérési típus
Példák
Események fogadása az EventHubról.
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
Események fogadása partíció(k)ból kötegekben, opcionális terheléselosztással és ellenőrzőpontokkal.
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Paraméterek
- on_event_batch
- Callable[PartitionContext, List[EventData]]
A fogadott események kötegének kezelésére szolgáló visszahívási függvény. A visszahívás két paramétert vesz igénybe: partition_context , amely partíciókörnyezetet és event_batch tartalmaz, amelyek a fogadott események. A visszahívási függvényt a következőképpen kell definiálni: on_event_batch(partition_context, event_batch). event_batch üres lista lehet, ha max_wait_time nem Nincs vagy 0, és nem érkezik esemény max_wait_time után. A partíciókörnyezettel kapcsolatos részletes információkért tekintse meg a következőt PartitionContext: .
- max_batch_size
- int
A visszahívási on_event_batch átadott kötegben lévő események maximális száma. Ha a tényleges fogadott események száma nagyobb , mint max_batch_size, a kapott események kötegekre vannak osztva, és az egyes kötegek visszahívását akár max_batch_size eseményekkel is meghívják.
- max_wait_time
- float
A visszahívás meghívása előtt az eseményfeldolgozó által várt maximális időköz másodpercben. Ha ebben az intervallumban nem érkeznek események, a on_event_batch visszahívás üres listával lesz meghívva. Ha ez az érték Nincs vagy 0 (alapértelmezett) értékre van állítva, a visszahívás nem lesz meghívva, amíg az események meg nem érkeznek.
- partition_id
- str
Ha meg van adva, az ügyfél csak erről a partícióról fog fogadni. Ellenkező esetben az ügyfél az összes partíciótól megkapja a kódot.
- owner_level
- int
A kizárólagos fogyasztó prioritása. Ha owner_level be van állítva, létrejön egy kizárólagos fogyasztó. A magasabb owner_level rendelkező fogyasztóknak magasabb a kizárólagos prioritásuk. A tulajdonosi szint a fogyasztó "korszakértékének" is tekinthető.
- prefetch
- int
A szolgáltatásból feldolgozásra elővetendő események száma. Az alapértelmezett érték 300.
- track_last_enqueued_event_properties
- bool
Azt jelzi, hogy a fogyasztónak adatokat kell-e kérnie az utolsóként lekért eseményről a társított partíción, és nyomon követheti ezeket az információkat események érkezésekor. Az utolsóként lekérdezett partíciókkal kapcsolatos információk nyomon követésekor az Event Hubs szolgáltatástól kapott minden esemény metaadatokat fog hordozni a partícióról. Ez kis mennyiségű további hálózati sávszélesség-használatot eredményez, ami általában kedvező kompromisszum, ha figyelembe vesszük, hogy az Event Hub-ügyfél használatával rendszeres időközönként kérnek partíciótulajdonságokat. Alapértelmezés szerint False ( Hamis ) értékre van állítva.
Kezdje el fogadni ezt az eseményhelyzetet, ha nincs ellenőrzőpont-adat egy partícióhoz. Az ellenőrzőpont-adatok akkor lesznek felhasználva, ha elérhetők. Ez lehet egy partícióazonosítóval rendelkező diktálás, amely kulcsként és pozícióként szolgál az egyes partíciókhoz, vagy egyetlen érték az összes partícióhoz. Az érték típusa lehet str, int vagy datetime.datetime. Emellett támogatottak a "-1" értékek a stream elejétől való fogadáshoz, és a "@latest" a csak új események fogadásához.
Határozza meg, hogy az adott starting_position befogadó(>=) vagy sem (>). Igaz a befogadóra, a Hamis pedig a kizárólagosra. Ez lehet kulcsként partícióazonosítóval rendelkező diktálás, az érték pedig bool, amely azt jelzi, hogy egy adott partíció starting_position tartalmazza-e vagy sem. Ez az összes starting_position egyetlen bool értéke is lehet. Az alapértelmezett érték Hamis.
- on_error
- Callable[[PartitionContext, Exception]]
A visszahívási függvény, amely akkor lesz meghívva, ha az újrapróbálkozási kísérletek kimerülése után vagy a terheléselosztási folyamat során hiba merül fel a fogadás során. A visszahívás két paramétert vesz igénybe: partition_context , amely partícióadatokat tartalmaz, és a kivétel a hiba . partition_context Nem lehet, ha a hiba a terheléselosztási folyamat során merül fel. A visszahívást a következőképpen kell definiálni: on_error(partition_context, hiba). A on_error visszahívást akkor is meghívja a rendszer, ha nem kezelt kivétel lép fel a on_event visszahívás során.
- on_partition_initialize
- Callable[[PartitionContext]]
A visszahívási függvény, amely egy adott partíció fogyasztója után lesz meghívva, befejezi az inicializálást. A rendszer akkor is meghívja, ha létrehoznak egy új belső partíciófelhasználót, hogy átvegye a sikertelen és zárt belső partíciófelhasználók fogadási folyamatát. A visszahívás egyetlen paramétert vesz igénybe: partition_context , amely tartalmazza a partíció adatait. A visszahívást a következőképpen kell definiálni: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
A visszahívási függvény, amely egy adott partíció fogyasztójának bezárása után lesz meghívva. A rendszer akkor is meghívja, ha az újrapróbálkozási kísérletek kimerítése után hiba keletkezik a fogadás során. A visszahívás két paramétert vesz igénybe: partition_context , amely partícióadatokat és a lezárás okát tartalmazza. A visszahívást a következőképpen kell definiálni: on_partition_close(partition_context, ok). CloseReason Tekintse meg a különböző záró okokat.
Visszatérési típus
Példák
Események fogadása kötegekben az EventHubról.
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: