EventHubConsumerClient Osztály
Az EventHubConsumerClient osztály egy magas szintű felületet határoz meg az események fogadásához a Azure Event Hubs szolgáltatásból.
Az EventHubConsumerClient fő célja, hogy az EventHub összes partíciójáról fogadjon eseményeket terheléselosztással és ellenőrzőpontokkal.
Ha több EventHubConsumerClient-példány fut ugyanazon az eseményközponton, fogyasztói csoporton és ellenőrzőpont-helyen, a partíciók egyenletesen lesznek elosztva közöttük.
A terheléselosztás és a megőrzött ellenőrzőpontok engedélyezéséhez checkpoint_store kell beállítani az EventHubConsumerClient létrehozásakor. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont a memóriában marad.
Az EventHubConsumerClient egy adott partícióról is fogadhat, amikor meghívja annak metódusát receive() vagy receive_batch(), és megadja a partition_id. A terheléselosztás nem működik egypartíciós módban. A felhasználók azonban továbbra is menthetik az ellenőrzőpontokat, ha a checkpoint_store be van állítva.
- Öröklődés
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
Konstruktor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Paraméterek
- fully_qualified_namespace
- str
Az Event Hubs-névtér teljes gazdagépneve. A névtér formátuma: .servicebus.windows.net.
- consumer_group
- str
Események fogadása ehhez a fogyasztói csoporthoz tartozó eseményközpontból.
- credential
- TokenCredential vagy AzureSasCredential vagy AzureNamedKeyCredential
A hitelesítéshez használt hitelesítőadat-objektum, amely egy adott felületet implementál a jogkivonatok lekéréséhez. Elfogadja az EventHubSharedKeyCredentialazure-identity kódtár által létrehozott , vagy hitelesítő adatokat tartalmazó objektumokat, valamint azokat az objektumokat, amelyek a *get_token(saját, hatókörök) metódust implementálják.
- logging_enable
- bool
Hálózati nyomkövetési naplók kimenete a naplózónak. Az alapértelmezett érték Hamis.
- auth_timeout
- float
A jogkivonat szolgáltatás általi engedélyezésének időtartama másodpercben. Az alapértelmezett érték 60 másodperc. Ha 0 értékre van állítva, az ügyfél nem kényszerít időtúllépést.
- user_agent
- str
Ha meg van adva, ez a felhasználói ügynök sztringje előtt lesz hozzáadva.
- retry_total
- int
A sikertelen művelet ismételt végrehajtására tett kísérletek teljes száma hiba esetén. Az alapértelmezett érték 3. A fogadási retry_total kontextusa különleges: A fogadási metódust az egyes iterációkban egy időhurkos, belső fogadási metódust hív meg. A fogadási esetben retry_total adja meg az újrapróbálkozások számát a belső fogadó metódus által a while-loopban kiváltott hiba után. Ha az újrapróbálkozási kísérletek kimerülnek, a rendszer meghívja a on_error visszahívást (ha meg van adva) a hibainformációkkal. A rendszer bezárja a sikertelen belső partíció-fogyasztót (on_partition_close ha meg van adva), és új belső partíciófelhasználó jön létre (on_partition_initialize lesz meghívva) a fogadás folytatásához.
- retry_backoff_factor
- float
A második próbálkozás utáni kísérletek közötti visszalépési tényező (a legtöbb hibát azonnal, egy második próbálkozással, késedelem nélkül meg kell oldani). Rögzített módban az újrapróbálkozási szabályzat mindig alvó állapotba kerül a(z) {backoff factor} esetében. "Exponenciális" módban az újrapróbálkozási szabályzat a következőhöz fog aludni: {backoff factor} * (2 ** ({az újrapróbálkozások száma} – 1)) másodperc. Ha a backoff_factor 0,1, akkor az újrapróbálkozás az újrapróbálkozások között [0.0s, 0.2s, 0.4s, ...] állapotban lesz. Az alapértelmezett érték 0,8.
- retry_backoff_max
- float
A maximális visszaállási idő. Az alapértelmezett érték 120 másodperc (2 perc).
- retry_mode
- str
Az újrapróbálkozási kísérletek közötti késleltetési viselkedés. A támogatott értékek "rögzítettek" vagy "exponenciálisak", ahol az alapértelmezett érték az "exponenciális".
- idle_timeout
- float
Időtúllépés másodpercben, amely után az ügyfél bezárja a mögöttes kapcsolatot, ha nincs további tevékenység. Alapértelmezés szerint az érték Nincs, ami azt jelenti, hogy az ügyfél nem fog leállni inaktivitás miatt, hacsak a szolgáltatás nem kezdeményezi.
- transport_type
- TransportType
Az Event Hubs szolgáltatással való kommunikációhoz használt átviteli protokoll típusa. Az alapértelmezett érték a TransportType.Amqp , amely esetben az 5671-es portot használja. Ha az 5671-es port nem érhető el/blokkolva van a hálózati környezetben, akkor a TransportType.AmqpOverWebsocket használható, amely a 443-es portot használja a kommunikációhoz.
HTTP-proxybeállítások. Ennek a szótárnak a következő kulcsokkal kell rendelkeznie: "proxy_hostname" (str érték) és "proxy_port" (int érték). Emellett a következő kulcsok is lehetnek jelen: "felhasználónév", "jelszó".
- checkpoint_store
- CheckpointStore vagy None
Egy kezelő, amely a partíció terheléselosztási és ellenőrzőpont-adatait tárolja az események fogadásakor. Az ellenőrzőpont-tároló mindkét esetben az összes partícióból vagy egyetlen partícióból való fogadáskor használatos. Az utóbbi esetben a terheléselosztás nem érvényes. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont belsőleg megmarad a memóriában, és az EventHubConsumerClient-példány terheléselosztás nélkül fogadja az eseményeket.
- load_balancing_interval
- float
Amikor a terheléselosztás beindul. Ez a két terheléselosztási értékelés közötti időköz másodpercben. Az alapértelmezett érték 30 másodperc.
- partition_ownership_expiration_interval
- float
A partíció tulajdonjoga ennyi másodperc elteltével lejár. Minden terheléselosztási kiértékelés automatikusan meghosszabbítja a tulajdonjog lejárati idejét. Az alapértelmezett érték 6 * load_balancing_interval, azaz 180 másodperc az alapértelmezett 30 másodperces load_balancing_interval használatakor.
- load_balancing_strategy
- str vagy LoadBalancingStrategy
Amikor beindul a terheléselosztás, ezzel a stratégiával igényelheti és kiegyensúlyozza a partíció tulajdonjogát. Használja a "kapzsi" vagy a LoadBalancingStrategy.GREEDY parancsot a kapzsi stratégiához, amely minden terheléselosztási értékeléshez annyi nem igényelt partíciót fog megragadni, amennyi a terhelés kiegyensúlyozásához szükséges. Használja a "balanced" vagy a LoadBalancingStrategy.BALANCED függvényt a kiegyensúlyozott stratégiához, amely minden terheléselosztási értékeléshez csak egy olyan partíciót igényel, amelyet más EventHubConsumerClient nem igényel. Ha egy EventHub összes partícióját más EventHubConsumerClient követeli, és ez az ügyfél túl kevés partíciót igényelt, az ügyfél a terheléselosztási stratégiától függetlenül ellop egy partíciót a többi ügyféltől minden terheléselosztási értékeléshez. A kapzsi stratégia alapértelmezés szerint használatos.
Az Event Hubs szolgáltatáshoz való csatlakozáshoz használandó egyéni végpontcím, amely lehetővé teszi a hálózati kérések átirányítását a gazdagépkörnyezethez szükséges bármely alkalmazásátjárón vagy más útvonalon. Az alapértelmezett érték Nincs. A formátum a következő: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Ha a port nincs megadva a custom_endpoint_address, a rendszer alapértelmezés szerint a 443-at használja.
A kapcsolati végpont identitásának hitelesítéséhez használt SSL-tanúsítvány egyéni CA_BUNDLE fájljának elérési útja. Alapértelmezés szerint Nincs, ebben az esetben a certifi.where() lesz használva.
- uamqp_transport
- bool
Azt, hogy az uamqp kódtárat használja-e alapul szolgáló átvitelként. Az alapértelmezett érték Hamis, és a rendszer a Pure Python AMQP-kódtárat használja alapul szolgáló átvitelként.
- socket_timeout
- float
Az az idő másodpercben, amíg a kapcsolat mögöttes szoftvercsatornájának várnia kell az adatok küldésekor és fogadásakor, mielőtt túllépi az időkorlátot. Az alapértelmezett érték a TransportType.Amqp esetében 0.2, a TransportType.AmqpOverWebsocket esetében pedig 1. Ha az EventHubsConnectionError hiba az írási időtúllépés miatt jelentkezik, előfordulhat, hogy az alapértelmezettnél nagyobb értéket kell megadni. Ez speciális használati forgatókönyvekhez készült, és általában az alapértelmezett értéknek elegendőnek kell lennie.
Példák
Hozza létre az EventHubConsumerClient új példányát.
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
Metódusok
close |
Állítsa le az eseményeket az Event Hubról, és zárja be a mögöttes AMQP-kapcsolatot és hivatkozásokat. |
from_connection_string |
EventHubConsumerClient létrehozása egy kapcsolati karakterlánc. |
get_eventhub_properties |
Az eseményközpont tulajdonságainak lekérése. A visszaadott szótár kulcsai a következők:
|
get_partition_ids |
Kérje le az eseményközpont partícióazonosítóit. |
get_partition_properties |
A megadott partíció tulajdonságainak lekérése. A tulajdonságok szótárában található kulcsok a következők:
|
receive |
Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpont-készítéssel. |
receive_batch |
Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpont-készítéssel. |
close
Állítsa le az eseményeket az Event Hubról, és zárja be a mögöttes AMQP-kapcsolatot és hivatkozásokat.
close() -> None
Visszatérési típus
Példák
Zárja be az ügyfelet.
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
EventHubConsumerClient létrehozása egy kapcsolati karakterlánc.
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
Paraméterek
- eventhub_name
- str
Az adott eseményközpont elérési útja az ügyfél csatlakoztatásához.
- logging_enable
- bool
Hálózati nyomkövetési naplók kimenete a naplózónak. Az alapértelmezett érték Hamis.
- auth_timeout
- float
A jogkivonat szolgáltatás általi engedélyezésének időtartama másodpercben. Az alapértelmezett érték 60 másodperc. Ha 0 értékre van állítva, az ügyfél nem kényszerít időtúllépést.
- user_agent
- str
Ha meg van adva, ez a felhasználói ügynök sztringje előtt lesz hozzáadva.
- retry_total
- int
A sikertelen művelet ismételt végrehajtására tett kísérletek teljes száma hiba esetén. Az alapértelmezett érték 3. A fogadási retry_total kontextusa különleges: A fogadási metódust az egyes iterációkban egy időhurkos, belső fogadási metódust hív meg. A fogadási esetben retry_total adja meg az újrapróbálkozások számát a belső fogadó metódus által a while-loopban kiváltott hiba után. Ha az újrapróbálkozási kísérletek kimerülnek, a rendszer meghívja a on_error visszahívást (ha meg van adva) a hibainformációkkal. A rendszer bezárja a sikertelen belső partíció-fogyasztót (on_partition_close ha meg van adva), és új belső partíciófelhasználó jön létre (on_partition_initialize lesz meghívva) a fogadás folytatásához.
- retry_backoff_factor
- float
A második próbálkozás utáni kísérletek közötti visszalépési tényező (a legtöbb hibát azonnal, egy második próbálkozással, késedelem nélkül meg kell oldani). Rögzített módban az újrapróbálkozási szabályzat mindig alvó állapotba kerül a(z) {backoff factor} esetében. "Exponenciális" módban az újrapróbálkozási szabályzat a következőhöz fog aludni: {backoff factor} * (2 ** ({az újrapróbálkozások száma} – 1)) másodperc. Ha a backoff_factor 0,1, akkor az újrapróbálkozás az újrapróbálkozások között [0.0s, 0.2s, 0.4s, ...] esetén alvó állapotba kerül. Az alapértelmezett érték 0,8.
- retry_backoff_max
- float
A maximális visszalépési idő. Az alapértelmezett érték 120 másodperc (2 perc).
- retry_mode
- str
Az újrapróbálkozási kísérletek közötti késleltetési viselkedés. A támogatott értékek "rögzítettek" vagy "exponenciálisak", ahol az alapértelmezett érték az "exponenciális".
- idle_timeout
- float
Időtúllépés másodpercben, amely után az ügyfél lezárja a mögöttes kapcsolatot, ha nincs furthur-tevékenység. Alapértelmezés szerint a Nincs érték, ami azt jelenti, hogy az ügyfél nem fog leállni inaktivitás miatt, hacsak a szolgáltatás nem kezdeményezi.
- transport_type
- TransportType
Az Event Hubs szolgáltatással való kommunikációhoz használt átviteli protokoll típusa. Az alapértelmezett érték a TransportType.Amqp , amely esetben az 5671-es portot használja. Ha az 5671-es port nem érhető el/le van tiltva a hálózati környezetben, a TransportType.AmqpOverWebsocket használható helyette, amely a 443-es portot használja a kommunikációhoz.
- http_proxy
- dict
HTTP-proxybeállítások. Ennek a szótárnak a következő kulcsokkal kell rendelkeznie: "proxy_hostname" (str érték) és "proxy_port" (int érték). Emellett a következő kulcsok is lehetnek jelen: "felhasználónév", "jelszó".
- checkpoint_store
- CheckpointStore vagy None
Egy kezelő, amely a partíció terheléselosztási és ellenőrzőpont-adatait tárolja az események fogadásakor. Az ellenőrzőpont-tárolót mindkét esetben az összes partícióról vagy egyetlen partícióról fogadjuk. Az utóbbi esetben a terheléselosztás nem alkalmazható. Ha nincs megadva ellenőrzőpont-tároló, az ellenőrzőpont a memóriában marad, és az EventHubConsumerClient példány terheléselosztás nélkül fogadja az eseményeket.
- load_balancing_interval
- float
Amikor a terheléselosztás beindul. Ez a két terheléselosztási kiértékelés közötti időköz másodpercben. Az alapértelmezett érték 10 másodperc.
- partition_ownership_expiration_interval
- float
A partíció tulajdonjoga ennyi másodperc elteltével lejár. Minden terheléselosztási kiértékelés automatikusan meghosszabbítja a tulajdonjog lejárati idejét. Az alapértelmezett érték 6 * load_balancing_interval, azaz 60 másodperc az alapértelmezett 30 másodperces load_balancing_interval használatakor.
- load_balancing_strategy
- str vagy LoadBalancingStrategy
Amikor beindul a terheléselosztás, ezzel a stratégiával igényelheti és egyensúlyba állíthatja a partíció tulajdonjogát. Használja a "kapzsi" vagy a LoadBalancingStrategy.GREEDY értéket a kapzsi stratégiához, amely minden terheléselosztási kiértékeléshez annyi felszabadítatlan partíciót fog megragadni, amennyi a terhelés elosztásához szükséges. Használja a "balanced" vagy a LoadBalancingStrategy.BALANCED függvényt az kiegyensúlyozott stratégiához, amely minden terheléselosztási értékeléshez csak egy olyan partíciót igényel, amelyet más EventHubConsumerClient nem igényel. Ha egy EventHub összes partícióját egy másik EventHubConsumerClient igényelte, és az ügyfél túl kevés partíciót igényelt, az ügyfél a terheléselosztási stratégiától függetlenül minden terheléselosztási kiértékeléshez ellop egy partíciót a többi ügyféltől. A kapzsi stratégia alapértelmezés szerint használatban van.
Az Event Hubs szolgáltatással való kapcsolat létrehozásához használandó egyéni végpontcím, amely lehetővé teszi a hálózati kérések átirányítását a gazdakörnyezethez szükséges bármely alkalmazásátjárón vagy más útvonalon. Az alapértelmezett érték a Nincs. A formátum a következő: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Ha a port nincs megadva a custom_endpoint_address, a rendszer alapértelmezés szerint a 443-at használja.
A kapcsolati végpont identitásának hitelesítéséhez használt SSL-tanúsítvány egyéni CA_BUNDLE fájljának elérési útja. Az alapértelmezett érték Nincs, ebben az esetben a certifi.where() lesz használva.
- uamqp_transport
- bool
Azt határozza meg, hogy az uamqp kódtárat használja-e alapul szolgáló átvitelként. Az alapértelmezett érték Hamis, és a rendszer a Pure Python AMQP-kódtárat használja alapul szolgáló átvitelként.
Visszatérési típus
Példák
Hozzon létre egy új EventHubConsumerClient-példányt a kapcsolati karakterlánc.
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Az eseményközpont tulajdonságainak lekérése.
A visszaadott szótár kulcsai a következők:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
Válaszok
Az eseményközpontra vonatkozó információkat tartalmazó szótár.
Visszatérési típus
Kivételek
get_partition_ids
Kérje le az eseményközpont partícióazonosítóit.
get_partition_ids() -> List[str]
Válaszok
A partícióazonosítók listája.
Visszatérési típus
Kivételek
get_partition_properties
A megadott partíció tulajdonságainak lekérése.
A tulajdonságok szótárában található kulcsok a következők:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
Paraméterek
Válaszok
Partíciótulajdonságokat tartalmazó szótár.
Visszatérési típus
Kivételek
receive
Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpont-készítéssel.
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
Paraméterek
- on_event
- callable[PartitionContext, EventData vagy None]
A fogadott események kezelésére szolgáló visszahívási függvény. A visszahívás két paramétert vesz igénybe: partition_context , amely partíciókörnyezetet és a fogadott eseményt tartalmazó eseményt tartalmaz. A visszahívási függvényt a következőképpen kell definiálni: on_event(partition_context, esemény). A partíciókörnyezettel kapcsolatos részletes információkért tekintse meg a következőt: PartitionContext.
- max_wait_time
- float
Az eseményfeldolgozó által a visszahívás meghívása előtt várakozó maximális időköz másodpercben. Ha ezen az időtartamon belül nem érkeznek események, a on_eventvisszahívást a Rendszer a Nincs beállítással hívja meg. Ha ez az érték Nincs vagy 0 (alapértelmezett) értékre van állítva, a visszahívás nem lesz meghívva, amíg egy esemény nem érkezik.
- partition_id
- str
Ha meg van adva, az ügyfél csak erről a partícióról fog fogadni. Ellenkező esetben az ügyfél minden partícióról megkapja a kódot.
- owner_level
- int
A kizárólagos fogyasztó prioritása. Ha owner_level be van állítva, létrejön egy kizárólagos fogyasztó. A magasabb owner_level rendelkező fogyasztók magasabb kizárólagos prioritást élveznek. A tulajdonosi szint a fogyasztó "alapértékeként" is ismert.
- prefetch
- int
A szolgáltatásból feldolgozásra elővetendő események száma. Az alapértelmezett érték 300.
- track_last_enqueued_event_properties
- bool
Azt jelzi, hogy a fogyasztónak adatokat kell-e kérnie a társított partíción az utolsóként lekért eseményről, és nyomon követheti ezeket az információkat az események fogadása során. A partíciók utolsó lekérdezett eseményével kapcsolatos információk nyomon követésekor az Event Hubs szolgáltatástól kapott minden esemény metaadatokat fog hordozni a partícióról. Ez kis mennyiségű további hálózati sávszélesség-használatot eredményez, amely általában kedvező kompromisszumot jelent, ha figyelembe vesszük, hogy a partíciótulajdonságokra vonatkozó kéréseket rendszeres időközönként az Event Hub-ügyfél használatával kell elvégezni. Alapértelmezés szerint False ( Hamis ) értékre van állítva.
Kezdje el fogadni ezt az eseményhelyzetet, ha nincs ellenőrzőpont-adat egy partícióhoz. A rendszer ellenőrzőpont-adatokat használ, ha elérhetők. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és pozícióként az egyes partíciók értékeként, vagy egyetlen érték az összes partícióhoz. Az érték típusa lehet str, int vagy datetime.datetime. Szintén támogatottak a "-1" értékek a stream elejétől való fogadáshoz, és "@latest" csak az új események fogadásához. Az alapértelmezett érték a "@latest".
Állapítsa meg, hogy a megadott starting_position befogadó(>=) vagy sem (>). Igaz a befogadó és a hamis kizárólagos. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és logikai értékként, amely azt jelzi, hogy egy adott partíció starting_position befogadó-e vagy sem. Ez az összes starting_position egyetlen bool értéke is lehet. Az alapértelmezett érték a Hamis.
- on_error
- callable[[PartitionContext, Exception]]
A visszahívási függvény, amely akkor lesz meghívva, ha az újrapróbálkozási kísérletek kimerítése vagy a terheléselosztás folyamata során hiba keletkezik a fogadás során. A visszahívás két paramétert vesz igénybe: partition_context , amely partícióinformációkat tartalmaz, és kivételként a hiba . partition_context lehet Nincs, ha a hiba a terheléselosztási folyamat során jelentkezik. A visszahívást a következőképpen kell definiálni: on_error(partition_context, hiba). A on_error visszahívást is meghívja a rendszer, ha a on_event visszahívás során nem kezelt kivétel lép fel.
- on_partition_initialize
- callable[[PartitionContext]]
A visszahívási függvény, amely egy adott partíció fogyasztója után lesz meghívva, befejezi az inicializálást. A rendszer akkor is meghívja, ha egy új belső partíciófelhasználót hoznak létre, hogy átvegye a sikertelen és lezárt belső partíciófelhasználók fogadási folyamatát. A visszahívás egyetlen paramétert vesz fel: partition_context , amely tartalmazza a partíció adatait. A visszahívást a következőképpen kell definiálni: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
A visszahívási függvény, amely egy adott partíció fogyasztójának bezárása után lesz meghívva. A rendszer akkor is meghívja, ha az újrapróbálkozási kísérletek kimerítése után hiba keletkezik a fogadás során. A visszahívás két paramétert vesz fel: partition_context , amely partícióinformációkat és a lezárás okát tartalmazza. A visszahívást a következőképpen kell definiálni: on_partition_close(partition_context, ok). CloseReason Tekintse meg a különböző záró okokat.
Visszatérési típus
Példák
Események fogadása az EventHubról.
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
Események fogadása partíció(k)ról, opcionális terheléselosztással és ellenőrzőpont-készítéssel.
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
Paraméterek
- on_event_batch
- callable[PartitionContext, list[EventData]]
A fogadott események kötegének kezelésére szolgáló visszahívási függvény. A visszahívás két paramétert vesz igénybe: partition_context , amely partíciókörnyezetet és event_batch tartalmaz, amely a fogadott események. A visszahívási függvényt a következőképpen kell definiálni: on_event_batch(partition_context, event_batch). event_batch lehet egy üres lista, ha max_wait_time nem Nincs vagy 0, és nem érkezik esemény max_wait_time után. A partíciókörnyezettel kapcsolatos részletes információkért tekintse meg a következőt: PartitionContext.
- max_batch_size
- int
A visszahívási on_event_batch átadott kötegben lévő események maximális száma. Ha a tényleges fogadott események száma nagyobb , mint max_batch_size, a fogadott események kötegekre vannak osztva, és az egyes kötegek visszahívását legfeljebb max_batch_size eseményekkel hívja meg.
- max_wait_time
- float
Az eseményfeldolgozó által a visszahívás meghívása előtt várakozó maximális időköz másodpercben. Ha ezen az időtartamon belül nem érkeznek események, a on_event_batch visszahívás egy üres listával lesz meghívva.
- partition_id
- str
Ha meg van adva, az ügyfél csak erről a partícióról fog fogadni. Ellenkező esetben az ügyfél minden partícióról megkapja a kódot.
- owner_level
- int
A kizárólagos fogyasztó prioritása. Ha owner_level be van állítva, létrejön egy kizárólagos fogyasztó. A magasabb owner_level rendelkező fogyasztók magasabb kizárólagos prioritást élveznek. A tulajdonosi szint a fogyasztó "alapértékeként" is ismert.
- prefetch
- int
A szolgáltatásból feldolgozásra elővetendő események száma. Az alapértelmezett érték 300.
- track_last_enqueued_event_properties
- bool
Azt jelzi, hogy a fogyasztónak adatokat kell-e kérnie a társított partíción az utolsóként lekért eseményről, és nyomon követheti ezeket az információkat az események fogadása során. A partíciók utolsó lekérdezett eseményével kapcsolatos információk nyomon követésekor az Event Hubs szolgáltatástól kapott minden esemény metaadatokat fog hordozni a partícióról. Ez kis mennyiségű további hálózati sávszélesség-használatot eredményez, amely általában kedvező kompromisszumot jelent, ha figyelembe vesszük, hogy a partíciótulajdonságokra vonatkozó kéréseket rendszeres időközönként az Event Hub-ügyfél használatával kell elvégezni. Alapértelmezés szerint False ( Hamis ) értékre van állítva.
Kezdje el fogadni ezt az eseményhelyzetet, ha nincs ellenőrzőpont-adat egy partícióhoz. A rendszer ellenőrzőpont-adatokat használ, ha elérhetők. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és pozícióként az egyes partíciók értékeként, vagy egyetlen érték az összes partícióhoz. Az érték típusa lehet str, int vagy datetime.datetime. Szintén támogatottak a "-1" értékek a stream elejétől való fogadáshoz, és "@latest" csak az új események fogadásához. Az alapértelmezett érték a "@latest".
Állapítsa meg, hogy a megadott starting_position befogadó(>=) vagy sem (>). Igaz a befogadó és a hamis kizárólagos. Ez lehet egy partícióazonosítóval rendelkező diktálás kulcsként és logikai értékként, amely azt jelzi, hogy egy adott partíció starting_position befogadó-e vagy sem. Ez az összes starting_position egyetlen bool értéke is lehet. Az alapértelmezett érték a Hamis.
- on_error
- callable[[PartitionContext, Exception]]
A visszahívási függvény, amely akkor lesz meghívva, ha az újrapróbálkozási kísérletek kimerítése vagy a terheléselosztás folyamata során hiba keletkezik a fogadás során. A visszahívás két paramétert vesz igénybe: partition_context , amely partícióinformációkat tartalmaz, és kivételként a hiba . partition_context lehet Nincs, ha a hiba a terheléselosztási folyamat során jelentkezik. A visszahívást a következőképpen kell definiálni: on_error(partition_context, hiba). A on_error visszahívást is meghívja a rendszer, ha a on_event visszahívás során nem kezelt kivétel lép fel.
- on_partition_initialize
- callable[[PartitionContext]]
A visszahívási függvény, amely egy adott partíció fogyasztója után lesz meghívva, befejezi az inicializálást. A rendszer akkor is meghívja, ha egy új belső partíciófelhasználót hoznak létre, hogy átvegye a sikertelen és lezárt belső partíciófelhasználók fogadási folyamatát. A visszahívás egyetlen paramétert vesz fel: partition_context , amely tartalmazza a partíció adatait. A visszahívást a következőképpen kell definiálni: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
A visszahívási függvény, amely egy adott partíció fogyasztójának bezárása után lesz meghívva. A rendszer akkor is meghívja, ha az újrapróbálkozási kísérletek kimerítése után hiba keletkezik a fogadás során. A visszahívás két paramétert vesz fel: partition_context , amely partícióinformációkat és a lezárás okát tartalmazza. A visszahívást a következőképpen kell definiálni: on_partition_close(partition_context, ok). CloseReason Tekintse meg a különböző záró okokat.
Visszatérési típus
Példák
Események fogadása kötegekben az EventHubról.
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Azure SDK for Python