EventHubConsumerClient Klass

Klassen EventHubConsumerClient definierar ett högnivågränssnitt för att ta emot händelser från Azure Event Hubs-tjänsten.

Huvudmålet med EventHubConsumerClient är att ta emot händelser från alla partitioner i en EventHub med belastningsutjämning och kontrollpunkter.

När flera EventHubConsumerClient-instanser körs mot samma händelsehubb, konsumentgrupp och kontrollpunktsplats fördelas partitionerna jämnt mellan dem.

Om du vill aktivera belastningsutjämning och beständiga kontrollpunkter måste checkpoint_store anges när du skapar EventHubConsumerClient. Om det inte finns något kontrollpunktsarkiv behålls kontrollpunkten internt i minnet.

En EventHubConsumerClient kan också ta emot från en specifik partition när du anropar dess metod receive() eller receive_batch() och anger partition_id. Belastningsutjämning fungerar inte i läge med en partition. Men användarna kan fortfarande spara kontrollpunkter om checkpoint_store har angetts.

Arv
azure.eventhub._client_base.ClientBase
EventHubConsumerClient

Konstruktor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parametrar

fully_qualified_namespace
str
Obligatorisk

Det fullständigt kvalificerade värdnamnet för Event Hubs-namnområdet. Namnområdesformatet är: .servicebus.windows.net.

eventhub_name
str
Obligatorisk

Sökvägen till den specifika händelsehubb som klienten ska anslutas till.

consumer_group
str
Obligatorisk

Ta emot händelser från händelsehubben för den här konsumentgruppen.

credential
TokenCredential eller AzureSasCredential eller AzureNamedKeyCredential
Obligatorisk

Det autentiseringsobjekt som används för autentisering som implementerar ett visst gränssnitt för att hämta token. Den accepterar , eller autentiseringsobjekt som genereras av azure-identity-biblioteket och objekt som implementerar EventHubSharedKeyCredentialmetoden *get_token(self, scopes).

logging_enable
bool

Om nätverksspårningsloggar ska matas ut till loggarna. Standardvärdet är Falskt.

auth_timeout
float

Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.

user_agent
str

Om detta anges läggs detta till framför användaragentsträngen.

retry_total
int

Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3. Kontexten för retry_total i mottagandet är speciell : Mottagningsmetoden implementeras med en "while-loop calling internal receive"-metod i varje iteration. I mottagarfallet anger retry_total antalet återförsök efter fel som genererats av den interna mottagningsmetoden i while-loopen. Om återförsöken är slut anropas on_error återanrop (om så anges) med felinformationen. Den misslyckade interna partitionskonsumenten stängs (on_partition_close anropas om det tillhandahålls) och en ny intern partitionskonsument skapas (on_partition_initialize anropas om det tillhandahålls) för att återuppta mottagandet.

retry_backoff_factor
float

En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge kommer återförsöksprincipen alltid att vara i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} – 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.

retry_backoff_max
float

Maximal ledighetstid. Standardvärdet är 120 sekunder (2 minuter).

retry_mode
str

Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".

idle_timeout
float

Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon ytterligare aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.

transport_type
TransportType

Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standard är TransportType.Amqp i vilket fall port 5671 används. Om port 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.

http_proxy
dict[str, str eller int]

HTTP-proxyinställningar. Detta måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde). Dessutom kan följande nycklar också finnas: "användarnamn", "lösenord".

checkpoint_store
CheckpointStore eller None

En hanterare som lagrar partitionens belastningsutjämnings- och kontrollpunktsdata när händelser tas emot. Kontrollpunktsarkivet används i båda fallen för att ta emot från alla partitioner eller en enda partition. I det senare fallet gäller inte belastningsutjämning. Om det inte finns något kontrollpunktslager bevaras kontrollpunkten internt i minnet och EventHubConsumerClient-instansen tar emot händelser utan belastningsutjämning.

load_balancing_interval
float

När belastningsutjämningen startar. Det här är intervallet i sekunder mellan två belastningsutjämningsutvärderingar. Standardvärdet är 30 sekunder.

partition_ownership_expiration_interval
float

Ett partitionsägarskap upphör att gälla efter det här antalet sekunder. Varje belastningsutjämningsutvärdering förlänger automatiskt ägarskapets förfallotid. Standardvärdet är 6 * load_balancing_interval, dvs. 180 sekunder när standardvärdet load_balancing_interval på 30 sekunder används.

load_balancing_strategy
str eller LoadBalancingStrategy

När belastningsutjämningen startar använder den den här strategin för att göra anspråk på och balansera partitionsägarskapet. Använd "girig" eller LoadBalancingStrategy.GREEDY för den giriga strategin, som för varje belastningsutjämningsutvärdering tar så många oanvända partitioner som krävs för att balansera belastningen. Använd "balanserad" eller LoadBalancingStrategy.BALANCED för den balanserade strategin, som för varje belastningsutjämningsutvärdering endast gör anspråk på en partition som inte begärs av andra EventHubConsumerClient. Om alla partitioner i en EventHub begärs av andra EventHubConsumerClient och klienten har gjort anspråk på för få partitioner stjäl den här klienten en partition från andra klienter för varje belastningsutjämningsutvärdering oavsett belastningsutjämningsstrategin. Girig strategi används som standard.

custom_endpoint_address
str eller None

Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle vara som "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används port 443 som standard.

connection_verify
str eller None

Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standard är Ingen i vilket fall certifi.where() kommer att användas.

uamqp_transport
bool

Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som underliggande transport.

socket_timeout
float

Tiden i sekunder som den underliggande socketen på anslutningen ska vänta när du skickar och tar emot data innan tidsgränsen nås. Standardvärdet är 0.2 för TransportType.Amqp och 1 för TransportType.AmqpOverWebsocket. Om EventHubsConnectionError-fel inträffar på grund av skrivtidsutgång kan ett större värde än standardvärdet behöva skickas in. Detta gäller för avancerade användningsscenarier och normalt bör standardvärdet vara tillräckligt.

Exempel

Skapa en ny instans av EventHubConsumerClient.


   import os
   from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,
       consumer_group='$Default',
       credential=credential)

Metoder

close

Sluta hämta händelser från händelsehubben och stäng den underliggande AMQP-anslutningen och länkarna.

from_connection_string

Skapa en EventHubConsumerClient från en anslutningssträng.

get_eventhub_properties

Hämta egenskaper för händelsehubben.

Nycklar i den returnerade ordlistan är:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Hämta partitions-ID:t för händelsehubben.

get_partition_properties

Hämta egenskaper för den angivna partitionen.

Nycklar i egenskapsordlistan är:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Ta emot händelser från partitioner, med valfri belastningsutjämning och kontrollpunkter.

receive_batch

Ta emot händelser från partitioner, med valfri belastningsutjämning och kontrollpunkter.

close

Sluta hämta händelser från händelsehubben och stäng den underliggande AMQP-anslutningen och länkarna.

close() -> None

Returtyp

Exempel

Stäng klienten.


   import os
   import threading

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group="$Default",
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, multi-thread will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The 'receive' method is a blocking call, it can be executed in a thread for
   # non-blocking behavior, and combined with the 'close' method.

   worker = threading.Thread(
       target=consumer.receive,
       kwargs={
           "on_event": on_event,
           "starting_position": "-1",  # "-1" is from the beginning of the partition.
       }
   )
   worker.start()
   time.sleep(10)  # Keep receiving for 10s then close.
   # Close down the consumer handler explicitly.
   consumer.close()

from_connection_string

Skapa en EventHubConsumerClient från en anslutningssträng.

from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient

Parametrar

conn_str
str
Obligatorisk

Anslutningssträng för en händelsehubb.

consumer_group
str
Obligatorisk

Ta emot händelser från händelsehubben för den här konsumentgruppen.

eventhub_name
str

Sökvägen till den specifika händelsehubb som klienten ska anslutas till.

logging_enable
bool

Om nätverksspårningsloggar ska matas ut till loggarna. Standardvärdet är Falskt.

auth_timeout
float

Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.

user_agent
str

Om detta anges läggs detta till framför användaragentsträngen.

retry_total
int

Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3. Kontexten för retry_total i mottagandet är speciell : Mottagningsmetoden implementeras med en "while-loop calling internal receive"-metod i varje iteration. I mottagarfallet anger retry_total antalet återförsök efter fel som genererats av den interna mottagningsmetoden i while-loopen. Om återförsöken är slut anropas on_error återanrop (om så anges) med felinformationen. Den misslyckade interna partitionskonsumenten stängs (on_partition_close anropas om det tillhandahålls) och en ny intern partitionskonsument skapas (on_partition_initialize anropas om det tillhandahålls) för att återuppta mottagandet.

retry_backoff_factor
float

En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge kommer återförsöksprincipen alltid att vara i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} – 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.

retry_backoff_max
float

Maximal ledighetstid. Standardvärdet är 120 sekunder (2 minuter).

retry_mode
str

Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".

idle_timeout
float

Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon furthur-aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.

transport_type
TransportType

Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standard är TransportType.Amqp i vilket fall port 5671 används. Om port 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.

http_proxy
dict

HTTP-proxyinställningar. Detta måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde). Dessutom kan följande nycklar också finnas: "användarnamn", "lösenord".

checkpoint_store
CheckpointStore eller None

En hanterare som lagrar partitionens belastningsutjämnings- och kontrollpunktsdata när händelser tas emot. Kontrollpunktsarkivet används i båda fallen för att ta emot från alla partitioner eller en enda partition. I det senare fallet gäller inte belastningsutjämning. Om det inte finns något kontrollpunktslager bevaras kontrollpunkten internt i minnet och EventHubConsumerClient-instansen tar emot händelser utan belastningsutjämning.

load_balancing_interval
float

När belastningsutjämningen startar. Det här är intervallet i sekunder mellan två belastningsutjämningsutvärderingar. Standardvärdet är 10 sekunder.

partition_ownership_expiration_interval
float

Ett partitionsägarskap upphör att gälla efter det här antalet sekunder. Varje belastningsutjämningsutvärdering förlänger automatiskt ägarskapets förfallotid. Standardvärdet är 6 * load_balancing_interval, dvs. 60 sekunder när standardvärdet load_balancing_interval på 30 sekunder används.

load_balancing_strategy
str eller LoadBalancingStrategy

När belastningsutjämningen startar använder den den här strategin för att göra anspråk på och balansera partitionsägarskapet. Använd "girig" eller LoadBalancingStrategy.GREEDY för den giriga strategin, som för varje belastningsutjämningsutvärdering tar så många oanvända partitioner som krävs för att balansera belastningen. Använd "balanserad" eller LoadBalancingStrategy.BALANCED för den balanserade strategin, som för varje belastningsutjämningsutvärdering endast gör anspråk på en partition som inte begärs av andra EventHubConsumerClient. Om alla partitioner i en EventHub begärs av andra EventHubConsumerClient och klienten har gjort anspråk på för få partitioner stjäl den här klienten en partition från andra klienter för varje belastningsutjämningsutvärdering oavsett belastningsutjämningsstrategin. Girig strategi används som standard.

custom_endpoint_address
str eller None

Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle vara som "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används port 443 som standard.

connection_verify
str eller None

Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standard är Ingen i vilket fall certifi.where() kommer att användas.

uamqp_transport
bool

Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som underliggande transport.

Returtyp

Exempel

Skapa en ny instans av EventHubConsumerClient från anslutningssträng.


   import os
   from azure.eventhub import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Hämta egenskaper för händelsehubben.

Nycklar i den returnerade ordlistan är:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

Returer

En ordlista som innehåller information om händelsehubben.

Returtyp

Undantag

get_partition_ids

Hämta partitions-ID:t för händelsehubben.

get_partition_ids() -> List[str]

Returer

En lista över partitions-ID:t.

Returtyp

Undantag

get_partition_properties

Hämta egenskaper för den angivna partitionen.

Nycklar i egenskapsordlistan är:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametrar

partition_id
str
Obligatorisk

Målpartitions-ID: t.

Returer

En ordlista som innehåller partitionsegenskaper.

Returtyp

Undantag

receive

Ta emot händelser från partitioner, med valfri belastningsutjämning och kontrollpunkter.

receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None

Parametrar

on_event
callable[PartitionContext, EventData eller None]
Obligatorisk

Återanropsfunktionen för hantering av en mottagen händelse. Återanropet tar två parametrar: partition_context som innehåller partitionskontext och händelse som är den mottagna händelsen. Återanropsfunktionen ska definieras som: on_event(partition_context, händelse). Detaljerad information om partitionskontext finns i PartitionContext.

max_wait_time
float

Det maximala intervallet i sekunder som händelseprocessorn väntar innan återanropet anropas. Om inga händelser tas emot inom det här intervallet anropas on_event motringning med Ingen. Om det här värdet är inställt på Ingen eller 0 (standard) anropas inte återanropet förrän en händelse tas emot.

partition_id
str

Om det anges får klienten endast från den här partitionen. Annars tar klienten emot från alla partitioner.

owner_level
int

Prioritet för en exklusiv konsument. En exklusiv konsument skapas om owner_level anges. En konsument med högre owner_level har högre exklusiv prioritet. Ägarnivån är också känt som konsumentens epokvärde.

prefetch
int

Antalet händelser som ska prefetch från tjänsten för bearbetning. Standardvärdet är 300.

track_last_enqueued_event_properties
bool

Anger om konsumenten ska begära information om den senast efterfrågade händelsen på den associerade partitionen och spåra informationen när händelser tas emot. När information om den senaste lagringshändelsen för partitioner spåras kommer varje händelse som tas emot från Event Hubs-tjänsten att innehålla metadata om partitionen. Detta resulterar i en liten mängd ytterligare nätverksbandbreddsförbrukning som i allmänhet är en god kompromiss när den övervägs mot att regelbundet göra begäranden om partitionsegenskaper med event hub-klienten. Den är inställd på Falskt som standard.

starting_position
str, int, datetime eller dict[str,any]

Börja ta emot från den här händelsepositionen om det inte finns några kontrollpunktsdata för en partition. Kontrollpunktsdata används om de är tillgängliga. Detta kan vara en dikta med partitions-ID som nyckel och position som värde för enskilda partitioner eller ett enda värde för alla partitioner. Värdetypen kan vara str, int eller datetime.datetime. Dessutom stöds värdena "-1" för att ta emot från början av dataströmmen och "@latest" för att endast ta emot nya händelser. Standardvärdet är "@latest".

starting_position_inclusive
bool eller dict[str,bool]

Avgör om den angivna starting_position är inclusive(>=) eller inte (>). Sant för inkluderande och Falskt för exklusivt. Detta kan vara en dikta med partitions-ID som nyckel och bool som värde som anger om starting_position för en viss partition är inkluderande eller inte. Detta kan också vara ett enda bool-värde för alla starting_position. Standardvärdet är Falskt.

on_error
callable[[PartitionContext, Exception]]

Återanropsfunktionen som anropas när ett fel utlöses under mottagandet efter återförsök är uttömd eller under belastningsutjämningsprocessen. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och felet är undantaget. partition_context kan vara Ingen om felet uppstår under belastningsutjämningsprocessen. Återanropet ska definieras som: on_error(partition_context, fel). Det on_error återanropet anropas också om ett ohanterat undantag utlöses under on_event återanrop.

on_partition_initialize
callable[[PartitionContext]]

Återanropsfunktionen som anropas efter att en konsument för en viss partition har slutfört initieringen. Det anropas också när en ny intern partitionskonsument skapas för att ta över mottagandeprocessen för en misslyckad och stängd intern partitionskonsument. Återanropet tar en enda parameter: partition_context som innehåller partitionsinformationen. Återanropet ska definieras som: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

Återanropsfunktionen som anropas efter att en konsument för en viss partition har stängts. Det anropas också när felet uppstår under mottagandet när återförsök har uttömts. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och orsak till stängningen. Återanropet ska definieras så här: on_partition_close(partition_context, orsak). CloseReason Se de olika stängningsorsakerna.

Returtyp

Exempel

Ta emot händelser från EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive(on_event=on_event)

receive_batch

Ta emot händelser från partitioner, med valfri belastningsutjämning och kontrollpunkter.

receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None

Parametrar

on_event_batch
callable[PartitionContext, list[EventData]]
Obligatorisk

Återanropsfunktionen för hantering av en batch med mottagna händelser. Återanropet tar två parametrar: partition_context som innehåller partitionskontext och event_batch, vilket är de mottagna händelserna. Återanropsfunktionen ska definieras så här: on_event_batch(partition_context, event_batch). event_batch kan vara en tom lista om max_wait_time inte är Ingen eller 0 och ingen händelse tas emot efter max_wait_time. Detaljerad information om partitionskontext finns i PartitionContext.

max_batch_size
int

Det maximala antalet händelser i en batch som skickas till motringning on_event_batch. Om det faktiska mottagna antalet händelser är större än max_batch_size delas de mottagna händelserna in i batchar och anropar återanropet för varje batch med upp till max_batch_size händelser.

max_wait_time
float

Det maximala intervallet i sekunder som händelseprocessorn väntar innan återanropet anropas. Om inga händelser tas emot inom det här intervallet anropas on_event_batch motringning med en tom lista.

partition_id
str

Om detta anges tar klienten endast emot från den här partitionen. Annars tar klienten emot från alla partitioner.

owner_level
int

Prioritet för en exklusiv konsument. En exklusiv konsument skapas om owner_level anges. En konsument med högre owner_level har högre exklusiv prioritet. Ägarnivån kallas även för konsumentens epokvärde.

prefetch
int

Antalet händelser som ska prefetch från tjänsten för bearbetning. Standardvärdet är 300.

track_last_enqueued_event_properties
bool

Anger om konsumenten ska begära information om den senast köade händelsen på den associerade partitionen och spåra informationen när händelser tas emot. När information om den senaste köade partitionshändelsen spåras kommer varje händelse som tas emot från Event Hubs-tjänsten att innehålla metadata om partitionen. Detta resulterar i en liten mängd ytterligare förbrukning av nätverksbandbredd som i allmänhet är en gynnsam kompromiss när den övervägs mot att regelbundet göra begäranden om partitionsegenskaper med hjälp av Event Hub-klienten. Den är inställd på Falskt som standard.

starting_position
str, int, datetime eller dict[str,any]

Börja ta emot från den här händelsepositionen om det inte finns några kontrollpunktsdata för en partition. Kontrollpunktsdata kommer att användas om de är tillgängliga. Detta kan vara en diktamen med partitions-ID som nyckel och position som värde för enskilda partitioner eller ett enda värde för alla partitioner. Värdetypen kan vara str, int eller datetime.datetime. Dessutom stöds värdena "-1" för att ta emot från början av dataströmmen och "@latest" för att endast ta emot nya händelser. Standardvärdet är "@latest".

starting_position_inclusive
bool eller dict[str,bool]

Avgör om den angivna starting_position är inclusive(>=) eller inte (>). Sant för inkluderande och Falskt för exklusivt. Detta kan vara en diktamen med partitions-ID som nyckel och bool som värde som anger om starting_position för en viss partition är inkluderande eller inte. Detta kan också vara ett enda bool-värde för alla starting_position. Standardvärdet är False.

on_error
callable[[PartitionContext, Exception]]

Återanropsfunktionen som anropas när ett fel utlöses under mottagandet efter att återförsök har uttömts eller under belastningsutjämningsprocessen. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och felet är undantaget. partition_context kan vara Ingen om felet utlöses under belastningsutjämningsprocessen. Återanropet ska definieras så här: on_error(partition_context, error). Det on_error återanropet anropas också om ett ohanterat undantag utlöses under on_event återanrop.

on_partition_initialize
callable[[PartitionContext]]

Återanropsfunktionen som anropas när en konsument för en viss partition har slutfört initieringen. Den anropas också när en ny intern partitionskonsument skapas för att ta över den mottagande processen för en misslyckad och stängd intern partitionskonsument. Återanropet tar en enda parameter: partition_context som innehåller partitionsinformationen. Återanropet ska definieras så här: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

Återanropsfunktionen som anropas efter att en konsument för en viss partition har stängts. Det anropas också när felet utlöses under mottagningen efter att återförsök har uttömts. Återanropet tar två parametrar: partition_context som innehåller partitionsinformation och orsaken till stängningen. Återanropet ska definieras så här: on_partition_close(partition_context, orsak). CloseReason Se de olika stängningsorsakerna.

Returtyp

Exempel

Ta emot händelser i batchar från EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received events from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive_batch(on_event_batch=on_event_batch)