EventHubConsumerClient Sınıf

EventHubConsumerClient sınıfı, Azure Event Hubs hizmetinden olay almak için üst düzey bir arabirim tanımlar.

EventHubConsumerClient'ın temel amacı, yük dengeleme ve denetim noktası oluşturma ile bir EventHub'ın tüm bölümlerinden olayları almaktır.

Aynı olay hub'ına, tüketici grubuna ve denetim noktası konumuna karşı birden çok EventHubConsumerClient örneği çalıştığında, bölümler bunlar arasında eşit olarak dağıtılır.

Yük dengelemeyi ve kalıcı denetim noktalarını etkinleştirmek için EventHubConsumerClient oluşturulurken checkpoint_store ayarlanmalıdır. Denetim noktası deposu sağlanmazsa denetim noktası dahili olarak bellekte tutulur.

Bir EventHubConsumerClient, yöntemi receive() veya receive_batch() çağırıp partition_id belirttiğinizde belirli bir bölümden de alabilir. Yük dengeleme tek bölümlü modda çalışmaz. Ancak checkpoint_store ayarlandıysa kullanıcılar denetim noktalarını kaydetmeye devam edebilir.

Devralma
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Oluşturucu

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parametreler

fully_qualified_namespace
str
Gerekli

Event Hubs ad alanı için tam konak adı. Ad alanı biçimi: .servicebus.windows.net.

eventhub_name
str
Gerekli

İstemcinin bağlanacak olay hub'ının yolu.

consumer_group
str
Gerekli

Bu tüketici grubu için olay hub'ından olayları alın.

credential
AsyncTokenCredential veya AzureSasCredential veya AzureNamedKeyCredential
Gerekli

Belirteçleri almak için belirli bir arabirim uygulayan kimlik doğrulaması için kullanılan kimlik bilgisi nesnesi. Azure-identity kitaplığı tarafından oluşturulan veya kimlik bilgisi nesnelerini ve *get_token(self, scopes) yöntemini uygulayan nesneleri kabul EventHubSharedKeyCredentialeder.

logging_enable
bool

Günlükçüye ağ izleme günlüklerinin çıkışının yapılıp yapılmaydığı. Varsayılan değer False'tur.

auth_timeout
float

Bir belirtecin hizmet tarafından yetkilendirilmesini beklemek için saniye cinsinden süre. Varsayılan değer 60 saniyedir. 0 olarak ayarlanırsa, istemciden zaman aşımı uygulanmaz.

user_agent
str

Belirtilirse, bu kullanıcı aracısı dizesinin önüne eklenir.

retry_total
int

Hata oluştuğunda başarısız olan bir işlemi yineleme denemelerinin toplam sayısı. Varsayılan değer 3’tür. Almadaki retry_total bağlamı özeldir: Alma yöntemi, her yinelemede iç alma yöntemini çağıran bir while döngüsü tarafından uygulanır. Alma durumunda, retry_total while döngüsünde iç alma yöntemi tarafından tetiklenen hatadan sonra yeniden deneme sayısını belirtir. Yeniden deneme girişimleri tükenirse, hata bilgileriyle on_error geri çağırma çağrılır (sağlanmışsa). Başarısız iç bölüm tüketicisi kapatılır (on_partition_close sağlanırsa çağrılır) ve almaya devam etmek için yeni iç bölüm tüketicisi oluşturulur (on_partition_initialize sağlanırsa çağrılır).

retry_backoff_factor
float

İkinci denemeden sonraki denemeler arasında uygulanacak geri alma faktörü (çoğu hata gecikme olmadan ikinci bir denemeyle hemen çözülür). Sabit modda, yeniden deneme ilkesi {backoff factor} için her zaman uyku moduna geçer. 'Üstel' modda, yeniden deneme ilkesi şu süre boyunca uyku moduna geçer: {backoff factor} * (2 ** ({toplam yeniden deneme sayısı} - 1)) saniye. backoff_factor 0,1 ise yeniden denemeler arasında [0,0s, 0,2s, 0,4s, ...] için yeniden deneme uyku moduna girer. Varsayılan değer 0,8'dir.

retry_backoff_max
float

İzin süresi üst sınırı. Varsayılan değer 120 saniyedir (2 dakika).

retry_mode
str

Yeniden deneme girişimleri arasındaki gecikme davranışı. Desteklenen değerler 'sabit' veya 'üstel'dir; burada varsayılan değer 'üstel'dir.

idle_timeout
float

Saniyeler içinde zaman aşımı, bundan sonra başka etkinlik yoksa bu istemci temel bağlantıyı kapatır. Varsayılan olarak değer Yok'tır, yani hizmet tarafından başlatılmadığı sürece istemcinin etkinlik dışı kalma nedeniyle kapanmayacağı anlamına gelir.

transport_type
TransportType

Event Hubs hizmetiyle iletişim kurmak için kullanılacak aktarım protokolünün türü. Varsayılan değer TransportType.Amqp'tir ve bu durumda 5671 numaralı bağlantı noktası kullanılır. Ağ ortamında 5671 numaralı bağlantı noktası kullanılamıyorsa/engelleniyorsa, iletişim için 443 numaralı bağlantı noktasını kullanan TransportType.AmqpOverWebsocket kullanılabilir.

http_proxy

HTTP proxy ayarları. Bu, şu anahtarlara sahip bir sözlük olmalıdır: 'proxy_hostname' (str değeri) ve 'proxy_port' (int değeri).

checkpoint_store
Optional[CheckpointStore]

Olayları alırken bölüm yük dengeleme ve denetim noktası verilerini depolayan bir yönetici. Denetim noktası deposu her iki durumda da tüm bölümlerden veya tek bir bölümden alma durumunda kullanılır. İkinci durumda yük dengeleme uygulanmaz. Denetim noktası deposu sağlanmazsa denetim noktası dahili olarak bellekte tutulur ve EventHubConsumerClient örneği yük dengelemesi olmadan olayları alır.

load_balancing_interval
float

Yük dengeleme devreye giriyorsa. Bu, iki yük dengeleme değerlendirmesi arasındaki saniye cinsinden aralıktır. Varsayılan değer 30 saniyedir.

partition_ownership_expiration_interval
float

Bölüm sahipliğinin süresi bu kadar saniye sonra dolar. Her yük dengeleme değerlendirmesi, sahipliğin sona erme süresini otomatik olarak uzatır. Varsayılan değer 6 * load_balancing_interval, yani 30 saniyelik varsayılan load_balancing_interval kullanıldığında 180 saniyedir.

load_balancing_strategy
str veya LoadBalancingStrategy

Yük dengeleme devreye giriyorsa, bölüm sahipliğini talep etmek ve dengelemek için bu stratejiyi kullanır. Doyumsuz strateji için "greedy" veya LoadBalancingStrategy.GREEDY kullanın. Bu strateji, her yük dengeleme değerlendirmesi için yükü dengelemek için gereken sahipsiz bölümlerin sayısını alır. Her yük dengeleme değerlendirmesi için diğer EventHubConsumerClient tarafından talep edilmeyen yalnızca bir bölümü talep eden dengeli strateji için "balanced" veya LoadBalancingStrategy.BALANCED kullanın. Bir EventHub'ın tüm bölümleri diğer EventHubConsumerClient tarafından talep edilirse ve bu istemci çok az bölüm talep ettiyse, bu istemci yük dengeleme stratejisinden bağımsız olarak her yük dengeleme değerlendirmesi için diğer istemcilerden bir bölümü çalar. Doyumsuz strateji varsayılan olarak kullanılır.

custom_endpoint_address
Optional[str]

Ağ isteklerinin herhangi bir uygulama ağ geçidi veya konak ortamı için gereken diğer yollar üzerinden yönlendirilmesine olanak tanıyarak Event Hubs hizmetine bağlantı kurmak için kullanılacak özel uç nokta adresi. Varsayılan değer Yok'tır. Biçim "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" gibi olabilir. bağlantı noktası custom_endpoint_address belirtilmezse, varsayılan olarak 443 numaralı bağlantı noktası kullanılır.

connection_verify
Optional[str]

Bağlantı uç noktasının kimliğini doğrulamak için kullanılan SSL sertifikasının özel CA_BUNDLE dosyasının yolu. Varsayılan değer Hiçbiri'dir ve bu durumda certifi.where() kullanılır.

uamqp_transport
bool

Temel alınan aktarım olarak uamqp kitaplığının kullanılıp kullanılmaymayacağı. Varsayılan değer False'tur ve temel alınan aktarım olarak Pure Python AMQP kitaplığı kullanılır.

socket_timeout
float

Zaman aşımına uğramadan önce bağlantıdaki temel yuvanın veri gönderirken ve alırken beklemesi gereken saniye cinsinden süre. Varsayılan değer TransportType.Amqp için 0.2 ve TransportType.AmqpOverWebsocket için 1'dir. Yazma zaman aşımı nedeniyle EventHubsConnectionError hataları oluşuyorsa, varsayılan değerden daha büyük bir değerin geçirilmesi gerekebilir. Bu, gelişmiş kullanım senaryoları içindir ve normalde varsayılan değerin yeterli olması gerekir.

Örnekler

EventHubConsumerClient'ın yeni bir örneğini oluşturun.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Yöntemler

close

Olay Hub'ından olayları almayı durdurun ve temel alınan AMQP bağlantısını ve bağlantılarını kapatın.

from_connection_string

bir bağlantı dizesi EventHubConsumerClient oluşturun.

get_eventhub_properties

Olay Hub'ının özelliklerini alma.

Döndürülen sözlükteki anahtarlar şunlardır:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Olay Hub'ının bölüm kimliklerini alın.

get_partition_properties

Belirtilen bölümün özelliklerini alın.

Özellikler sözlüğündeki anahtarlar şunlardır:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerden olayları alın.

receive_batch

İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerin olaylarını toplu olarak alın.

close

Olay Hub'ından olayları almayı durdurun ve temel alınan AMQP bağlantısını ve bağlantılarını kapatın.

async close() -> None

Dönüş türü

Örnekler

İstemciyi kapatın.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

bir bağlantı dizesi EventHubConsumerClient oluşturun.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

Parametreler

conn_str
str
Gerekli

Olay Hub'ının bağlantı dizesi.

consumer_group
str
Gerekli

Bu tüketici grubu için Olay Hub'ından olayları alın.

eventhub_name
str

İstemcinin bağlanacak olay hub'ının yolu.

logging_enable
bool

Günlükçüye ağ izleme günlüklerinin çıkışının yapılıp yapılmaydığı. Varsayılan değer False'tur.

http_proxy
dict

HTTP proxy ayarları. Bu, şu anahtarlara sahip bir sözlük olmalıdır: 'proxy_hostname' (str değeri) ve 'proxy_port' (int değeri). Ayrıca aşağıdaki anahtarlar da mevcut olabilir: 'kullanıcı adı', 'parola'.

auth_timeout
float

Bir belirtecin hizmet tarafından yetkilendirilmesini beklemek için saniye cinsinden süre. Varsayılan değer 60 saniyedir. 0 olarak ayarlanırsa, istemciden zaman aşımı uygulanmaz.

user_agent
str

Belirtilirse, bu kullanıcı aracısı dizesinin önüne eklenir.

retry_total
int

Hata oluştuğunda başarısız olan bir işlemi yineleme denemelerinin toplam sayısı. Varsayılan değer 3’tür. Almadaki retry_total bağlamı özeldir: Alma yöntemi, her yinelemede iç alma yöntemini çağıran bir while döngüsü tarafından uygulanır. Alma durumunda, retry_total while döngüsünde iç alma yöntemi tarafından tetiklenen hatadan sonra yeniden deneme sayısını belirtir. Yeniden deneme girişimleri tükenirse, hata bilgileriyle on_error geri çağırma çağrılır (sağlanmışsa). Başarısız iç bölüm tüketicisi kapatılır (on_partition_close sağlanırsa çağrılır) ve almaya devam etmek için yeni iç bölüm tüketicisi oluşturulur (on_partition_initialize sağlanırsa çağrılır).

retry_backoff_factor
float

İkinci denemeden sonraki denemeler arasında uygulanacak geri alma faktörü (çoğu hata gecikme olmadan ikinci bir denemeyle hemen çözülür). Sabit modda, yeniden deneme ilkesi {backoff factor} için her zaman uyku moduna geçer. 'Üstel' modda, yeniden deneme ilkesi şu süre boyunca uyku moduna geçer: {backoff factor} * (2 ** ({toplam yeniden deneme sayısı} - 1)) saniye. backoff_factor 0,1 ise yeniden denemeler arasında [0,0s, 0,2s, 0,4s, ...] için yeniden deneme uyku moduna girer. Varsayılan değer 0,8'dir.

retry_backoff_max
float

İzin süresi üst sınırı. Varsayılan değer 120 saniyedir (2 dakika).

retry_mode
str

Yeniden deneme girişimleri arasındaki gecikme davranışı. Desteklenen değerler 'sabit' veya 'üstel'dir; burada varsayılan değer 'üstel'dir.

idle_timeout
float

Saniyeler içinde zaman aşımı, bundan sonra başka etkinlik yoksa bu istemci temel bağlantıyı kapatır. Varsayılan olarak değer Yok'tır, yani hizmet tarafından başlatılmadığı sürece istemcinin etkinlik dışı kalma nedeniyle kapanmayacağı anlamına gelir.

transport_type
TransportType

Event Hubs hizmetiyle iletişim kurmak için kullanılacak aktarım protokolünün türü. Varsayılan değer TransportType.Amqp'tir ve bu durumda 5671 numaralı bağlantı noktası kullanılır. Ağ ortamında 5671 numaralı bağlantı noktası kullanılamıyorsa/engelleniyorsa, iletişim için 443 numaralı bağlantı noktasını kullanan TransportType.AmqpOverWebsocket kullanılabilir.

checkpoint_store
Optional[CheckpointStore]

Olayları alırken bölüm yük dengeleme ve denetim noktası verilerini depolayan bir yönetici. Denetim noktası deposu her iki durumda da tüm bölümlerden veya tek bir bölümden alma durumunda kullanılır. İkinci durumda yük dengeleme uygulanmaz. Denetim noktası deposu sağlanmazsa denetim noktası dahili olarak bellekte tutulur ve EventHubConsumerClient örneği yük dengelemesi olmadan olayları alır.

load_balancing_interval
float

Yük dengeleme devreye giriyorsa. Bu, iki yük dengeleme değerlendirmesi arasındaki saniye cinsinden aralıktır. Varsayılan değer 30 saniyedir.

partition_ownership_expiration_interval
float

Bölüm sahipliğinin süresi bu kadar saniye sonra dolar. Her yük dengeleme değerlendirmesi, sahipliğin sona erme süresini otomatik olarak uzatır. Varsayılan değer 6 * load_balancing_interval, yani 30 saniyelik varsayılan load_balancing_interval kullanıldığında 180 saniyedir.

load_balancing_strategy
str veya LoadBalancingStrategy

Yük dengeleme devreye giriyorsa, bölüm sahipliğini talep etmek ve dengelemek için bu stratejiyi kullanır. Doyumsuz strateji için "greedy" veya LoadBalancingStrategy.GREEDY kullanın. Bu strateji, her yük dengeleme değerlendirmesi için yükü dengelemek için gereken sahipsiz bölümlerin sayısını alır. Her yük dengeleme değerlendirmesi için diğer EventHubConsumerClient tarafından talep edilmeyen yalnızca bir bölümü talep eden dengeli strateji için "balanced" veya LoadBalancingStrategy.BALANCED kullanın. Bir EventHub'ın tüm bölümleri diğer EventHubConsumerClient tarafından talep edilirse ve bu istemci çok az bölüm talep ettiyse, bu istemci yük dengeleme stratejisinden bağımsız olarak her yük dengeleme değerlendirmesi için diğer istemcilerden bir bölümü çalar. Doyumsuz strateji varsayılan olarak kullanılır.

custom_endpoint_address
Optional[str]

Ağ isteklerinin herhangi bir uygulama ağ geçidi veya konak ortamı için gereken diğer yollar üzerinden yönlendirilmesine olanak tanıyarak Event Hubs hizmetine bağlantı kurmak için kullanılacak özel uç nokta adresi. Varsayılan değer Yok'tır. Biçim "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" gibi olabilir. bağlantı noktası custom_endpoint_address belirtilmezse, varsayılan olarak 443 numaralı bağlantı noktası kullanılır.

connection_verify
Optional[str]

Bağlantı uç noktasının kimliğini doğrulamak için kullanılan SSL sertifikasının özel CA_BUNDLE dosyasının yolu. Varsayılan değer Hiçbiri'dir ve bu durumda certifi.where() kullanılır.

uamqp_transport
bool

Temel alınan aktarım olarak uamqp kitaplığının kullanılıp kullanılmaymayacağı. Varsayılan değer False'tur ve temel alınan aktarım olarak Pure Python AMQP kitaplığı kullanılır.

Dönüş türü

Örnekler

bağlantı dizesi'dan EventHubConsumerClient'ın yeni bir örneğini oluşturun.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Olay Hub'ının özelliklerini alma.

Döndürülen sözlükteki anahtarlar şunlardır:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Döndürülenler

Olay Hub'ı hakkında bilgi içeren bir sözlük.

Dönüş türü

Özel durumlar

get_partition_ids

Olay Hub'ının bölüm kimliklerini alın.

async get_partition_ids() -> List[str]

Döndürülenler

Bölüm kimliklerinin listesi.

Dönüş türü

Özel durumlar

get_partition_properties

Belirtilen bölümün özelliklerini alın.

Özellikler sözlüğündeki anahtarlar şunlardır:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametreler

partition_id
str
Gerekli

Hedef bölüm kimliği.

Döndürülenler

Bölüm özelliklerini içeren bir sözlük.

Dönüş türü

Özel durumlar

receive

İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerden olayları alın.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parametreler

on_event
Callable[PartitionContext, Optional[EventData]]
Gerekli

Alınan olayı işlemek için geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bağlamını ve alınan olay olan olayı içeren partition_context. Geri çağırma işlevi şöyle tanımlanmalıdır: on_event(partition_context, olay). Ayrıntılı bölüm bağlamı bilgileri için PartitionContextbkz. .

max_wait_time
float

Olay işlemcisinin geri çağırmayı çağırmadan önce bekleyeceği saniye cinsinden maksimum aralık. Bu aralık içinde hiçbir olay alınmazsa, geri çağırma on_eventHiçbiri ile çağrılır. Bu değer Yok veya 0 (varsayılan) olarak ayarlanırsa, bir olay alınana kadar geri çağırma çağrılmayacak.

partition_id
str

Belirtilirse, istemci yalnızca bu bölümden alır. Aksi takdirde istemci tüm bölümlerden alır.

owner_level
int

Özel bir tüketici için öncelik. owner_level ayarlanırsa özel bir tüketici oluşturulur. Daha yüksek owner_level sahip bir tüketicinin özel önceliği daha yüksektir. Sahip düzeyi, tüketicinin 'dönem değeri' olarak da bilinmektedir.

prefetch
int

İşlenmek üzere hizmetten ön eklenmek üzere olayların sayısı. Varsayılan değer 300'dür.

track_last_enqueued_event_properties
bool

Tüketicinin ilişkili bölümünde son sıraya alınan olayla ilgili bilgi isteyip istememesi ve olaylar alınırken bu bilgileri izlemesi gerekip gerekmediğini gösterir. Bölümler hakkında son sıraya alınan olay hakkındaki bilgiler izlendiğinde, Event Hubs hizmetinden alınan her olay bölümle ilgili meta verileri taşır. Bu, Event Hub istemcisini kullanarak bölüm özellikleri için düzenli aralıklarla istekte bulunmaya karşı göz önünde bulundurulduğunda genellikle olumlu bir dengeleme olan az miktarda ek ağ bant genişliği tüketimine neden olur. Varsayılan olarak False olarak ayarlanır.

starting_position
str, int, datetime veya dict[str,any]

Bir bölüm için denetim noktası verisi yoksa bu olay konumundan almaya başlayın. Varsa denetim noktası verileri kullanılır. Bu, anahtar olarak bölüm kimliğine ve tek tek bölümlerin değeri olarak konuma sahip bir dikte veya tüm bölümler için tek bir değer olabilir. Değer türü str, int veya datetime.datetime olabilir. Ayrıca, akışın başından almaya yönelik "-1" ve yalnızca yeni olayları almak için "@latest" değerleri de desteklenir.

starting_position_inclusive
bool veya dict[str,bool]

Verilen starting_position kapsayıcı (>=) olup olmadığını (>) belirleyin. Kapsayıcı için True ve özel kullanım için False. Bu, anahtar olarak bölüm kimliğine ve belirli bir bölüm için starting_position kapsayıcı olup olmadığını belirten değer olarak bool değerine sahip bir dikte olabilir. Bu, tüm starting_position için tek bir bool değeri de olabilir. Varsayılan değer False'tur.

on_error
Callable[[PartitionContext, Exception]]

Yeniden deneme girişimleri tükendikten sonra veya yük dengeleme işlemi sırasında alma sırasında bir hata oluştuğunda çağrılan geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bilgilerini içeren partition_context ve özel durum olan hata . yük dengeleme işlemi sırasında hata oluşursa partition_context Hiçbiri olabilir. Geri çağırma şu şekilde tanımlanmalıdır: on_error(partition_context, hata). geri çağırma on_event sırasında işlenmeyen bir özel durum oluşursa, on_error geri çağırması da çağrılır.

on_partition_initialize
Callable[[PartitionContext]]

Belirli bir bölüm için tüketicinin başlatılmasını tamamladıktan sonra çağrılan geri çağırma işlevi. Başarısız ve kapalı bir iç bölüm tüketicisi için alma işlemini devralmak üzere yeni bir iç bölüm tüketicisi oluşturulduğunda da çağrılır. Geri çağırma tek bir parametre alır: bölüm bilgilerini içeren partition_context . Geri çağırma şu şekilde tanımlanmalıdır: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Belirli bir bölüm için tüketici kapatıldıktan sonra çağrılacak geri çağırma işlevi. Yeniden deneme girişimleri tükendikten sonra alma sırasında hata oluştuğunda da çağrılabilir. Geri çağırma iki parametre alır: bölüm bilgilerini ve kapatma nedenini içeren partition_context. Geri çağırma şu şekilde tanımlanmalıdır: on_partition_close(partition_context, neden). Lütfen çeşitli kapanış nedenleri için adresine başvurun CloseReason .

Dönüş türü

Örnekler

EventHub'dan olayları alma.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerin olaylarını toplu olarak alın.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parametreler

on_event_batch
Callable[PartitionContext, List[EventData]]
Gerekli

Alınan olayların toplu işlemini işlemek için geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bağlamını içeren partition_context ve alınan olaylar olan event_batch. Geri çağırma işlevi şöyle tanımlanmalıdır: on_event_batch(partition_context, event_batch). event_batch max_wait_time Hiçbiri veya 0 değilse ve max_wait_time sonra hiçbir olay alınmadıysa boş bir liste olabilir. Ayrıntılı bölüm bağlamı bilgileri için PartitionContextbkz. .

max_batch_size
int

Geri çağırma on_event_batch bir toplu işlemde geçirilen en fazla olay sayısı. Alınan gerçek olay sayısı max_batch_size'den büyükse, alınan olaylar toplu işlere bölünür ve en fazla max_batch_size olay içeren her toplu iş için geri çağırmayı çağırır.

max_wait_time
float

Olay işlemcisinin geri çağırmayı çağırmadan önce bekleyeceği saniye cinsinden maksimum aralık. Bu aralık içinde hiçbir olay alınmazsa, on_event_batch geri çağırma boş bir listeyle çağrılır. Bu değer Yok veya 0 (varsayılan) olarak ayarlanırsa, olaylar alınana kadar geri çağırma çağrılmayacak.

partition_id
str

Belirtilirse, istemci yalnızca bu bölümden alır. Aksi takdirde istemci tüm bölümlerden alır.

owner_level
int

Özel bir tüketici için öncelik. owner_level ayarlanırsa özel bir tüketici oluşturulur. Daha yüksek owner_level sahip bir tüketicinin özel önceliği daha yüksektir. Sahip düzeyi, tüketicinin 'dönem değeri' olarak da bilinmektedir.

prefetch
int

İşlenmek üzere hizmetten ön eklenmek üzere olayların sayısı. Varsayılan değer 300'dür.

track_last_enqueued_event_properties
bool

Tüketicinin ilişkili bölümünde son sıraya alınan olayla ilgili bilgi isteyip istememesi ve olaylar alınırken bu bilgileri izlemesi gerekip gerekmediğini gösterir. Bölümler hakkında son sıraya alınan olay hakkındaki bilgiler izlendiğinde, Event Hubs hizmetinden alınan her olay bölümle ilgili meta verileri taşır. Bu, Event Hub istemcisini kullanarak bölüm özellikleri için düzenli aralıklarla istekte bulunmaya karşı göz önünde bulundurulduğunda genellikle olumlu bir dengeleme olan az miktarda ek ağ bant genişliği tüketimine neden olur. Varsayılan olarak False olarak ayarlanır.

starting_position
str, int, datetime veya dict[str,any]

Bir bölüm için denetim noktası verisi yoksa bu olay konumundan almaya başlayın. Varsa denetim noktası verileri kullanılır. Bu, anahtar olarak bölüm kimliğine ve tek tek bölümlerin değeri olarak konuma sahip bir dikte veya tüm bölümler için tek bir değer olabilir. Değer türü str, int veya datetime.datetime olabilir. Ayrıca, akışın başından almaya yönelik "-1" ve yalnızca yeni olayları almak için "@latest" değerleri de desteklenir.

starting_position_inclusive
bool veya dict[str,bool]

Verilen starting_position kapsayıcı (>=) olup olmadığını (>) belirleyin. Kapsayıcı için True ve özel kullanım için False. Bu, anahtar olarak bölüm kimliğine ve belirli bir bölüm için starting_position kapsayıcı olup olmadığını belirten değer olarak bool değerine sahip bir dikte olabilir. Bu, tüm starting_position için tek bir bool değeri de olabilir. Varsayılan değer False'tur.

on_error
Callable[[PartitionContext, Exception]]

Yeniden deneme girişimleri tükendikten sonra veya yük dengeleme işlemi sırasında alma sırasında bir hata oluştuğunda çağrılan geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bilgilerini içeren partition_context ve özel durum olan hata . yük dengeleme işlemi sırasında hata oluşursa partition_context Hiçbiri olabilir. Geri çağırma şu şekilde tanımlanmalıdır: on_error(partition_context, hata). geri çağırma on_event sırasında işlenmeyen bir özel durum oluşursa, on_error geri çağırması da çağrılır.

on_partition_initialize
Callable[[PartitionContext]]

Belirli bir bölüm için tüketicinin başlatılmasını tamamladıktan sonra çağrılan geri çağırma işlevi. Başarısız ve kapalı bir iç bölüm tüketicisi için alma işlemini devralmak üzere yeni bir iç bölüm tüketicisi oluşturulduğunda da çağrılır. Geri çağırma tek bir parametre alır: bölüm bilgilerini içeren partition_context . Geri çağırma şu şekilde tanımlanmalıdır: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Belirli bir bölüm için tüketici kapatıldıktan sonra çağrılacak geri çağırma işlevi. Yeniden deneme girişimleri tükendikten sonra alma sırasında hata oluştuğunda da çağrılabilir. Geri çağırma iki parametre alır: bölüm bilgilerini ve kapatma nedenini içeren partition_context. Geri çağırma şu şekilde tanımlanmalıdır: on_partition_close(partition_context, neden). Lütfen çeşitli kapanış nedenleri için adresine başvurun CloseReason .

Dönüş türü

Örnekler

Olayları EventHub'dan toplu olarak alın.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )