EventHubConsumerClient Klass
Klassen EventHubConsumerClient definierar ett högnivågränssnitt för att ta emot händelser från Azure Event Hubs-tjänsten.
Huvudmålet med EventHubConsumerClient är att ta emot händelser från alla partitioner i en EventHub med belastningsutjämning och kontrollpunkter.
När flera EventHubConsumerClient-instanser körs mot samma händelsehubb, konsumentgrupp och kontrollpunktsplats fördelas partitionerna jämnt mellan dem.
Om du vill aktivera belastningsutjämning och beständiga kontrollpunkter måste checkpoint_store anges när du skapar EventHubConsumerClient. Om det inte finns något kontrollpunktsarkiv behålls kontrollpunkten internt i minnet.
En EventHubConsumerClient kan också ta emot från en specifik partition när du anropar dess metod receive() eller receive_batch() och anger partition_id. Belastningsutjämning fungerar inte i läge med en partition. Men användarna kan fortfarande spara kontrollpunkter om checkpoint_store har angetts.
- Arv
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
Konstruktor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parametrar
- fully_qualified_namespace
- str
Det fullständigt kvalificerade värdnamnet för Event Hubs-namnområdet. Namnområdesformatet är: .servicebus.windows.net.
- eventhub_name
- str
Sökvägen till den specifika händelsehubb som klienten ska anslutas till.
- credential
- TokenCredential eller AzureSasCredential eller AzureNamedKeyCredential
Det autentiseringsobjekt som används för autentisering som implementerar ett visst gränssnitt för att hämta token. Den accepterar , eller autentiseringsobjekt som genereras av azure-identity-biblioteket och objekt som implementerar EventHubSharedKeyCredentialmetoden *get_token(self, scopes).
- logging_enable
- bool
Om nätverksspårningsloggar ska matas ut till loggarna. Standardvärdet är Falskt.
- auth_timeout
- float
Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.
- user_agent
- str
Om detta anges läggs detta till framför användaragentsträngen.
- retry_total
- int
Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3. Kontexten för retry_total i mottagandet är speciell : Mottagningsmetoden implementeras med en "while-loop calling internal receive"-metod i varje iteration. I mottagarfallet anger retry_total antalet återförsök efter fel som genererats av den interna mottagningsmetoden i while-loopen. Om återförsöken är slut anropas on_error återanrop (om så anges) med felinformationen. Den misslyckade interna partitionskonsumenten stängs (on_partition_close anropas om det tillhandahålls) och en ny intern partitionskonsument skapas (on_partition_initialize anropas om det tillhandahålls) för att återuppta mottagandet.
- retry_backoff_factor
- float
En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge kommer återförsöksprincipen alltid att vara i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} – 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.
- retry_backoff_max
- float
Maximal ledighetstid. Standardvärdet är 120 sekunder (2 minuter).
- retry_mode
- str
Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".
- idle_timeout
- float
Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon ytterligare aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.
- transport_type
- TransportType
Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standard är TransportType.Amqp i vilket fall port 5671 används. Om port 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.
HTTP-proxyinställningar. Detta måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde). Dessutom kan följande nycklar också finnas: "användarnamn", "lösenord".
- checkpoint_store
- CheckpointStore eller None
En hanterare som lagrar partitionens belastningsutjämnings- och kontrollpunktsdata när händelser tas emot. Kontrollpunktsarkivet används i båda fallen för att ta emot från alla partitioner eller en enda partition. I det senare fallet gäller inte belastningsutjämning. Om det inte finns något kontrollpunktslager bevaras kontrollpunkten internt i minnet och EventHubConsumerClient-instansen tar emot händelser utan belastningsutjämning.
- load_balancing_interval
- float
När belastningsutjämningen startar. Det här är intervallet i sekunder mellan två belastningsutjämningsutvärderingar. Standardvärdet är 30 sekunder.
- partition_ownership_expiration_interval
- float
Ett partitionsägarskap upphör att gälla efter det här antalet sekunder. Varje belastningsutjämningsutvärdering förlänger automatiskt ägarskapets förfallotid. Standardvärdet är 6 * load_balancing_interval, dvs. 180 sekunder när standardvärdet load_balancing_interval på 30 sekunder används.
- load_balancing_strategy
- str eller LoadBalancingStrategy
När belastningsutjämningen startar använder den den här strategin för att göra anspråk på och balansera partitionsägarskapet. Använd "girig" eller LoadBalancingStrategy.GREEDY för den giriga strategin, som för varje belastningsutjämningsutvärdering tar så många oanvända partitioner som krävs för att balansera belastningen. Använd "balanserad" eller LoadBalancingStrategy.BALANCED för den balanserade strategin, som för varje belastningsutjämningsutvärdering endast gör anspråk på en partition som inte begärs av andra EventHubConsumerClient. Om alla partitioner i en EventHub begärs av andra EventHubConsumerClient och klienten har gjort anspråk på för få partitioner stjäl den här klienten en partition från andra klienter för varje belastningsutjämningsutvärdering oavsett belastningsutjämningsstrategin. Girig strategi används som standard.
Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle vara som "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används port 443 som standard.
Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standard är Ingen i vilket fall certifi.where() kommer att användas.
- uamqp_transport
- bool
Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som underliggande transport.
- socket_timeout
- float
Tiden i sekunder som den underliggande socketen på anslutningen ska vänta när du skickar och tar emot data innan tidsgränsen nås. Standardvärdet är 0.2 för TransportType.Amqp och 1 för TransportType.AmqpOverWebsocket. Om EventHubsConnectionError-fel inträffar på grund av skrivtidsutgång kan ett större värde än standardvärdet behöva skickas in. Detta gäller för avancerade användningsscenarier och normalt bör standardvärdet vara tillräckligt.
Exempel
Skapa en ny instans av EventHubConsumerClient.
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
Metoder
close |
Sluta hämta händelser från händelsehubben och stäng den underliggande AMQP-anslutningen och länkarna. |
from_connection_string |
Skapa en EventHubConsumerClient från en anslutningssträng. |
get_eventhub_properties |
Hämta egenskaper för händelsehubben. Nycklar i den returnerade ordlistan är:
|
get_partition_ids |
Hämta partitions-ID:t för händelsehubben. |
get_partition_properties |
Hämta egenskaper för den angivna partitionen. Nycklar i egenskapsordlistan är:
|
receive |
Ta emot händelser från partitioner, med valfri belastningsutjämning och kontrollpunkter. |
receive_batch |
Ta emot händelser från partitioner, med valfri belastningsutjämning och kontrollpunkter. |
close
Sluta hämta händelser från händelsehubben och stäng den underliggande AMQP-anslutningen och länkarna.
close() -> None
Returtyp
Exempel
Stäng klienten.
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
Skapa en EventHubConsumerClient från en anslutningssträng.
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
Parametrar
- eventhub_name
- str
Sökvägen till den specifika händelsehubb som klienten ska anslutas till.
- logging_enable
- bool
Om nätverksspårningsloggar ska matas ut till loggarna. Standardvärdet är Falskt.
- auth_timeout
- float
Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.
- user_agent
- str
Om detta anges läggs detta till framför användaragentsträngen.
- retry_total
- int
Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3. Kontexten för retry_total i mottagandet är speciell : Mottagningsmetoden implementeras med en "while-loop calling internal receive"-metod i varje iteration. I mottagarfallet anger retry_total antalet återförsök efter fel som genererats av den interna mottagningsmetoden i while-loopen. Om återförsöken är slut anropas on_error återanrop (om så anges) med felinformationen. Den misslyckade interna partitionskonsumenten stängs (on_partition_close anropas om det tillhandahålls) och en ny intern partitionskonsument skapas (on_partition_initialize anropas om det tillhandahålls) för att återuppta mottagandet.
- retry_backoff_factor
- float
En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge kommer återförsöksprincipen alltid att vara i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} – 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.
- retry_backoff_max
- float
Maximal ledighetstid. Standardvärdet är 120 sekunder (2 minuter).
- retry_mode
- str
Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".
- idle_timeout
- float
Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon furthur-aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.
- transport_type
- TransportType
Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standard är TransportType.Amqp i vilket fall port 5671 används. Om port 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.
- http_proxy
- dict
HTTP-proxyinställningar. Detta måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde). Dessutom kan följande nycklar också finnas: "användarnamn", "lösenord".
- checkpoint_store
- CheckpointStore eller None
En hanterare som lagrar partitionens belastningsutjämnings- och kontrollpunktsdata när händelser tas emot. Kontrollpunktsarkivet används i båda fallen för att ta emot från alla partitioner eller en enda partition. I det senare fallet gäller inte belastningsutjämning. Om det inte finns något kontrollpunktslager bevaras kontrollpunkten internt i minnet och EventHubConsumerClient-instansen tar emot händelser utan belastningsutjämning.
- load_balancing_interval
- float
När belastningsutjämningen startar. Det här är intervallet i sekunder mellan två belastningsutjämningsutvärderingar. Standardvärdet är 10 sekunder.
- partition_ownership_expiration_interval
- float
Ett partitionsägarskap upphör att gälla efter det här antalet sekunder. Varje belastningsutjämningsutvärdering förlänger automatiskt ägarskapets förfallotid. Standardvärdet är 6 * load_balancing_interval, dvs. 60 sekunder när standardvärdet load_balancing_interval på 30 sekunder används.
- load_balancing_strategy
- str eller LoadBalancingStrategy
När belastningsutjämningen startar använder den den här strategin för att göra anspråk på och balansera partitionsägarskapet. Använd "girig" eller LoadBalancingStrategy.GREEDY för den giriga strategin, som för varje belastningsutjämningsutvärdering tar så många oanvända partitioner som krävs för att balansera belastningen. Använd "balanserad" eller LoadBalancingStrategy.BALANCED för den balanserade strategin, som för varje belastningsutjämningsutvärdering endast gör anspråk på en partition som inte begärs av andra EventHubConsumerClient. Om alla partitioner i en EventHub begärs av andra EventHubConsumerClient och klienten har gjort anspråk på för få partitioner stjäl den här klienten en partition från andra klienter för varje belastningsutjämningsutvärdering oavsett belastningsutjämningsstrategin. Girig strategi används som standard.
Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle vara som "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används port 443 som standard.
Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standard är Ingen i vilket fall certifi.where() kommer att användas.
- uamqp_transport
- bool
Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som underliggande transport.
Returtyp
Exempel
Skapa en ny instans av EventHubConsumerClient från anslutningssträng.
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Hämta egenskaper för händelsehubben.
Nycklar i den returnerade ordlistan är:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
Returer
En ordlista som innehåller information om händelsehubben.
Returtyp
Undantag
get_partition_ids
Hämta partitions-ID:t för händelsehubben.
get_partition_ids() -> List[str]
Returer
En lista över partitions-ID:t.
Returtyp
Undantag
get_partition_properties
Hämta egenskaper för den angivna partitionen.
Nycklar i egenskapsordlistan är:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametrar
Returer
En ordlista som innehåller partitionsegenskaper.
Returtyp
Undantag
receive
Ta emot händelser från partitioner, med valfri belastningsutjämning och kontrollpunkter.
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
Parametrar
- on_event
- callable[PartitionContext, EventData eller None]
Återanropsfunktionen för hantering av en mottagen händelse. Återanropet tar två parametrar: partition_context som innehåller partitionskontext och händelse som är den mottagna händelsen. Återanropsfunktionen ska definieras som: on_event(partition_context, händelse). Detaljerad information om partitionskontext finns i PartitionContext.
- max_wait_time
- float
Det maximala intervallet i sekunder som händelseprocessorn väntar innan återanropet anropas. Om inga händelser tas emot inom det här intervallet anropas on_event motringning med Ingen. Om det här värdet är inställt på Ingen eller 0 (standard) anropas inte återanropet förrän en händelse tas emot.
- partition_id
- str
Om det anges får klienten endast från den här partitionen. Annars tar klienten emot från alla partitioner.
- owner_level
- int
Prioritet för en exklusiv konsument. En exklusiv konsument skapas om owner_level anges. En konsument med högre owner_level har högre exklusiv prioritet. Ägarnivån är också känt som konsumentens epokvärde.
- prefetch
- int
Antalet händelser som ska prefetch från tjänsten för bearbetning. Standardvärdet är 300.
- track_last_enqueued_event_properties
- bool
Anger om konsumenten ska begära information om den senast efterfrågade händelsen på den associerade partitionen och spåra informationen när händelser tas emot. När information om den senaste lagringshändelsen för partitioner spåras kommer varje händelse som tas emot från Event Hubs-tjänsten att innehålla metadata om partitionen. Detta resulterar i en liten mängd ytterligare nätverksbandbreddsförbrukning som i allmänhet är en god kompromiss när den övervägs mot att regelbundet göra begäranden om partitionsegenskaper med event hub-klienten. Den är inställd på Falskt som standard.
Börja ta emot från den här händelsepositionen om det inte finns några kontrollpunktsdata för en partition. Kontrollpunktsdata används om de är tillgängliga. Detta kan vara en dikta med partitions-ID som nyckel och position som värde för enskilda partitioner eller ett enda värde för alla partitioner. Värdetypen kan vara str, int eller datetime.datetime. Dessutom stöds värdena "-1" för att ta emot från början av dataströmmen och "@latest" för att endast ta emot nya händelser. Standardvärdet är "@latest".
Avgör om den angivna starting_position är inclusive(>=) eller inte (>). Sant för inkluderande och Falskt för exklusivt. Detta kan vara en dikta med partitions-ID som nyckel och bool som värde som anger om starting_position för en viss partition är inkluderande eller inte. Detta kan också vara ett enda bool-värde för alla starting_position. Standardvärdet är Falskt.
- on_error
- callable[[PartitionContext, Exception]]
Återanropsfunktionen som anropas när ett fel utlöses under mottagandet efter återförsök är uttömd eller under belastningsutjämningsprocessen. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och felet är undantaget. partition_context kan vara Ingen om felet uppstår under belastningsutjämningsprocessen. Återanropet ska definieras som: on_error(partition_context, fel). Det on_error återanropet anropas också om ett ohanterat undantag utlöses under on_event återanrop.
- on_partition_initialize
- callable[[PartitionContext]]
Återanropsfunktionen som anropas efter att en konsument för en viss partition har slutfört initieringen. Det anropas också när en ny intern partitionskonsument skapas för att ta över mottagandeprocessen för en misslyckad och stängd intern partitionskonsument. Återanropet tar en enda parameter: partition_context som innehåller partitionsinformationen. Återanropet ska definieras som: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Återanropsfunktionen som anropas efter att en konsument för en viss partition har stängts. Det anropas också när felet uppstår under mottagandet när återförsök har uttömts. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och orsak till stängningen. Återanropet ska definieras så här: on_partition_close(partition_context, orsak). CloseReason Se de olika stängningsorsakerna.
Returtyp
Exempel
Ta emot händelser från EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
Ta emot händelser från partitioner, med valfri belastningsutjämning och kontrollpunkter.
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
Parametrar
- on_event_batch
- callable[PartitionContext, list[EventData]]
Återanropsfunktionen för hantering av en batch med mottagna händelser. Återanropet tar två parametrar: partition_context som innehåller partitionskontext och event_batch, vilket är de mottagna händelserna. Återanropsfunktionen ska definieras så här: on_event_batch(partition_context, event_batch). event_batch kan vara en tom lista om max_wait_time inte är Ingen eller 0 och ingen händelse tas emot efter max_wait_time. Detaljerad information om partitionskontext finns i PartitionContext.
- max_batch_size
- int
Det maximala antalet händelser i en batch som skickas till motringning on_event_batch. Om det faktiska mottagna antalet händelser är större än max_batch_size delas de mottagna händelserna in i batchar och anropar återanropet för varje batch med upp till max_batch_size händelser.
- max_wait_time
- float
Det maximala intervallet i sekunder som händelseprocessorn väntar innan återanropet anropas. Om inga händelser tas emot inom det här intervallet anropas on_event_batch motringning med en tom lista.
- partition_id
- str
Om detta anges tar klienten endast emot från den här partitionen. Annars tar klienten emot från alla partitioner.
- owner_level
- int
Prioritet för en exklusiv konsument. En exklusiv konsument skapas om owner_level anges. En konsument med högre owner_level har högre exklusiv prioritet. Ägarnivån kallas även för konsumentens epokvärde.
- prefetch
- int
Antalet händelser som ska prefetch från tjänsten för bearbetning. Standardvärdet är 300.
- track_last_enqueued_event_properties
- bool
Anger om konsumenten ska begära information om den senast köade händelsen på den associerade partitionen och spåra informationen när händelser tas emot. När information om den senaste köade partitionshändelsen spåras kommer varje händelse som tas emot från Event Hubs-tjänsten att innehålla metadata om partitionen. Detta resulterar i en liten mängd ytterligare förbrukning av nätverksbandbredd som i allmänhet är en gynnsam kompromiss när den övervägs mot att regelbundet göra begäranden om partitionsegenskaper med hjälp av Event Hub-klienten. Den är inställd på Falskt som standard.
Börja ta emot från den här händelsepositionen om det inte finns några kontrollpunktsdata för en partition. Kontrollpunktsdata kommer att användas om de är tillgängliga. Detta kan vara en diktamen med partitions-ID som nyckel och position som värde för enskilda partitioner eller ett enda värde för alla partitioner. Värdetypen kan vara str, int eller datetime.datetime. Dessutom stöds värdena "-1" för att ta emot från början av dataströmmen och "@latest" för att endast ta emot nya händelser. Standardvärdet är "@latest".
Avgör om den angivna starting_position är inclusive(>=) eller inte (>). Sant för inkluderande och Falskt för exklusivt. Detta kan vara en diktamen med partitions-ID som nyckel och bool som värde som anger om starting_position för en viss partition är inkluderande eller inte. Detta kan också vara ett enda bool-värde för alla starting_position. Standardvärdet är False.
- on_error
- callable[[PartitionContext, Exception]]
Återanropsfunktionen som anropas när ett fel utlöses under mottagandet efter att återförsök har uttömts eller under belastningsutjämningsprocessen. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och felet är undantaget. partition_context kan vara Ingen om felet utlöses under belastningsutjämningsprocessen. Återanropet ska definieras så här: on_error(partition_context, error). Det on_error återanropet anropas också om ett ohanterat undantag utlöses under on_event återanrop.
- on_partition_initialize
- callable[[PartitionContext]]
Återanropsfunktionen som anropas när en konsument för en viss partition har slutfört initieringen. Den anropas också när en ny intern partitionskonsument skapas för att ta över den mottagande processen för en misslyckad och stängd intern partitionskonsument. Återanropet tar en enda parameter: partition_context som innehåller partitionsinformationen. Återanropet ska definieras så här: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Återanropsfunktionen som anropas efter att en konsument för en viss partition har stängts. Det anropas också när felet utlöses under mottagningen efter att återförsök har uttömts. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och orsaken till stängningen. Återanropet ska definieras så här: on_partition_close(partition_context, orsak). CloseReason Se de olika stängningsorsakerna.
Returtyp
Exempel
Ta emot händelser i batchar från EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Azure SDK for Python
Feedback
https://aka.ms/ContentUserFeedback.
Kommer snart: Under hela 2024 kommer vi att fasa ut GitHub-problem som feedbackmekanism för innehåll och ersätta det med ett nytt feedbacksystem. Mer information finns i:Skicka och visa feedback för