EventHubConsumerClient Sınıf
EventHubConsumerClient sınıfı, Azure Event Hubs hizmetinden olay almak için üst düzey bir arabirim tanımlar.
EventHubConsumerClient'ın temel amacı, yük dengeleme ve denetim noktası oluşturma ile bir EventHub'ın tüm bölümlerinden olayları almaktır.
Aynı olay hub'ına, tüketici grubuna ve denetim noktası konumuna karşı birden çok EventHubConsumerClient örneği çalıştığında, bölümler bunlar arasında eşit olarak dağıtılır.
Yük dengelemeyi ve kalıcı denetim noktalarını etkinleştirmek için EventHubConsumerClient oluşturulurken checkpoint_store ayarlanmalıdır. Denetim noktası deposu sağlanmazsa denetim noktası dahili olarak bellekte tutulur.
Bir EventHubConsumerClient, yöntemi receive() veya receive_batch() çağırıp partition_id belirttiğinizde belirli bir bölümden de alabilir. Yük dengeleme tek bölümlü modda çalışmaz. Ancak checkpoint_store ayarlandıysa kullanıcılar denetim noktalarını kaydetmeye devam edebilir.
- Devralma
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
Oluşturucu
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parametreler
- fully_qualified_namespace
- str
Event Hubs ad alanı için tam konak adı. Ad alanı biçimi: .servicebus.windows.net.
- credential
- AsyncTokenCredential veya AzureSasCredential veya AzureNamedKeyCredential
Belirteçleri almak için belirli bir arabirim uygulayan kimlik doğrulaması için kullanılan kimlik bilgisi nesnesi. Azure-identity kitaplığı tarafından oluşturulan veya kimlik bilgisi nesnelerini ve *get_token(self, scopes) yöntemini uygulayan nesneleri kabul EventHubSharedKeyCredentialeder.
- logging_enable
- bool
Günlükçüye ağ izleme günlüklerinin çıkışının yapılıp yapılmaydığı. Varsayılan değer False'tur.
- auth_timeout
- float
Bir belirtecin hizmet tarafından yetkilendirilmesini beklemek için saniye cinsinden süre. Varsayılan değer 60 saniyedir. 0 olarak ayarlanırsa, istemciden zaman aşımı uygulanmaz.
- user_agent
- str
Belirtilirse, bu kullanıcı aracısı dizesinin önüne eklenir.
- retry_total
- int
Hata oluştuğunda başarısız olan bir işlemi yineleme denemelerinin toplam sayısı. Varsayılan değer 3’tür. Almadaki retry_total bağlamı özeldir: Alma yöntemi, her yinelemede iç alma yöntemini çağıran bir while döngüsü tarafından uygulanır. Alma durumunda, retry_total while döngüsünde iç alma yöntemi tarafından tetiklenen hatadan sonra yeniden deneme sayısını belirtir. Yeniden deneme girişimleri tükenirse, hata bilgileriyle on_error geri çağırma çağrılır (sağlanmışsa). Başarısız iç bölüm tüketicisi kapatılır (on_partition_close sağlanırsa çağrılır) ve almaya devam etmek için yeni iç bölüm tüketicisi oluşturulur (on_partition_initialize sağlanırsa çağrılır).
- retry_backoff_factor
- float
İkinci denemeden sonraki denemeler arasında uygulanacak geri alma faktörü (çoğu hata gecikme olmadan ikinci bir denemeyle hemen çözülür). Sabit modda, yeniden deneme ilkesi {backoff factor} için her zaman uyku moduna geçer. 'Üstel' modda, yeniden deneme ilkesi şu süre boyunca uyku moduna geçer: {backoff factor} * (2 ** ({toplam yeniden deneme sayısı} - 1)) saniye. backoff_factor 0,1 ise yeniden denemeler arasında [0,0s, 0,2s, 0,4s, ...] için yeniden deneme uyku moduna girer. Varsayılan değer 0,8'dir.
- retry_backoff_max
- float
İzin süresi üst sınırı. Varsayılan değer 120 saniyedir (2 dakika).
- retry_mode
- str
Yeniden deneme girişimleri arasındaki gecikme davranışı. Desteklenen değerler 'sabit' veya 'üstel'dir; burada varsayılan değer 'üstel'dir.
- idle_timeout
- float
Saniyeler içinde zaman aşımı, bundan sonra başka etkinlik yoksa bu istemci temel bağlantıyı kapatır. Varsayılan olarak değer Yok'tır, yani hizmet tarafından başlatılmadığı sürece istemcinin etkinlik dışı kalma nedeniyle kapanmayacağı anlamına gelir.
- transport_type
- TransportType
Event Hubs hizmetiyle iletişim kurmak için kullanılacak aktarım protokolünün türü. Varsayılan değer TransportType.Amqp'tir ve bu durumda 5671 numaralı bağlantı noktası kullanılır. Ağ ortamında 5671 numaralı bağlantı noktası kullanılamıyorsa/engelleniyorsa, iletişim için 443 numaralı bağlantı noktasını kullanan TransportType.AmqpOverWebsocket kullanılabilir.
- http_proxy
HTTP proxy ayarları. Bu, şu anahtarlara sahip bir sözlük olmalıdır: 'proxy_hostname' (str değeri) ve 'proxy_port' (int değeri).
- checkpoint_store
- Optional[CheckpointStore]
Olayları alırken bölüm yük dengeleme ve denetim noktası verilerini depolayan bir yönetici. Denetim noktası deposu her iki durumda da tüm bölümlerden veya tek bir bölümden alma durumunda kullanılır. İkinci durumda yük dengeleme uygulanmaz. Denetim noktası deposu sağlanmazsa denetim noktası dahili olarak bellekte tutulur ve EventHubConsumerClient örneği yük dengelemesi olmadan olayları alır.
- load_balancing_interval
- float
Yük dengeleme devreye giriyorsa. Bu, iki yük dengeleme değerlendirmesi arasındaki saniye cinsinden aralıktır. Varsayılan değer 30 saniyedir.
- partition_ownership_expiration_interval
- float
Bölüm sahipliğinin süresi bu kadar saniye sonra dolar. Her yük dengeleme değerlendirmesi, sahipliğin sona erme süresini otomatik olarak uzatır. Varsayılan değer 6 * load_balancing_interval, yani 30 saniyelik varsayılan load_balancing_interval kullanıldığında 180 saniyedir.
- load_balancing_strategy
- str veya LoadBalancingStrategy
Yük dengeleme devreye giriyorsa, bölüm sahipliğini talep etmek ve dengelemek için bu stratejiyi kullanır. Doyumsuz strateji için "greedy" veya LoadBalancingStrategy.GREEDY kullanın. Bu strateji, her yük dengeleme değerlendirmesi için yükü dengelemek için gereken sahipsiz bölümlerin sayısını alır. Her yük dengeleme değerlendirmesi için diğer EventHubConsumerClient tarafından talep edilmeyen yalnızca bir bölümü talep eden dengeli strateji için "balanced" veya LoadBalancingStrategy.BALANCED kullanın. Bir EventHub'ın tüm bölümleri diğer EventHubConsumerClient tarafından talep edilirse ve bu istemci çok az bölüm talep ettiyse, bu istemci yük dengeleme stratejisinden bağımsız olarak her yük dengeleme değerlendirmesi için diğer istemcilerden bir bölümü çalar. Doyumsuz strateji varsayılan olarak kullanılır.
Ağ isteklerinin herhangi bir uygulama ağ geçidi veya konak ortamı için gereken diğer yollar üzerinden yönlendirilmesine olanak tanıyarak Event Hubs hizmetine bağlantı kurmak için kullanılacak özel uç nokta adresi. Varsayılan değer Yok'tır. Biçim "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" gibi olabilir. bağlantı noktası custom_endpoint_address belirtilmezse, varsayılan olarak 443 numaralı bağlantı noktası kullanılır.
Bağlantı uç noktasının kimliğini doğrulamak için kullanılan SSL sertifikasının özel CA_BUNDLE dosyasının yolu. Varsayılan değer Hiçbiri'dir ve bu durumda certifi.where() kullanılır.
- uamqp_transport
- bool
Temel alınan aktarım olarak uamqp kitaplığının kullanılıp kullanılmaymayacağı. Varsayılan değer False'tur ve temel alınan aktarım olarak Pure Python AMQP kitaplığı kullanılır.
- socket_timeout
- float
Zaman aşımına uğramadan önce bağlantıdaki temel yuvanın veri gönderirken ve alırken beklemesi gereken saniye cinsinden süre. Varsayılan değer TransportType.Amqp için 0.2 ve TransportType.AmqpOverWebsocket için 1'dir. Yazma zaman aşımı nedeniyle EventHubsConnectionError hataları oluşuyorsa, varsayılan değerden daha büyük bir değerin geçirilmesi gerekebilir. Bu, gelişmiş kullanım senaryoları içindir ve normalde varsayılan değerin yeterli olması gerekir.
Örnekler
EventHubConsumerClient'ın yeni bir örneğini oluşturun.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Yöntemler
close |
Olay Hub'ından olayları almayı durdurun ve temel alınan AMQP bağlantısını ve bağlantılarını kapatın. |
from_connection_string |
bir bağlantı dizesi EventHubConsumerClient oluşturun. |
get_eventhub_properties |
Olay Hub'ının özelliklerini alma. Döndürülen sözlükteki anahtarlar şunlardır:
|
get_partition_ids |
Olay Hub'ının bölüm kimliklerini alın. |
get_partition_properties |
Belirtilen bölümün özelliklerini alın. Özellikler sözlüğündeki anahtarlar şunlardır:
|
receive |
İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerden olayları alın. |
receive_batch |
İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerin olaylarını toplu olarak alın. |
close
Olay Hub'ından olayları almayı durdurun ve temel alınan AMQP bağlantısını ve bağlantılarını kapatın.
async close() -> None
Dönüş türü
Örnekler
İstemciyi kapatın.
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
bir bağlantı dizesi EventHubConsumerClient oluşturun.
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
Parametreler
- eventhub_name
- str
İstemcinin bağlanacak olay hub'ının yolu.
- logging_enable
- bool
Günlükçüye ağ izleme günlüklerinin çıkışının yapılıp yapılmaydığı. Varsayılan değer False'tur.
- http_proxy
- dict
HTTP proxy ayarları. Bu, şu anahtarlara sahip bir sözlük olmalıdır: 'proxy_hostname' (str değeri) ve 'proxy_port' (int değeri). Ayrıca aşağıdaki anahtarlar da mevcut olabilir: 'kullanıcı adı', 'parola'.
- auth_timeout
- float
Bir belirtecin hizmet tarafından yetkilendirilmesini beklemek için saniye cinsinden süre. Varsayılan değer 60 saniyedir. 0 olarak ayarlanırsa, istemciden zaman aşımı uygulanmaz.
- user_agent
- str
Belirtilirse, bu kullanıcı aracısı dizesinin önüne eklenir.
- retry_total
- int
Hata oluştuğunda başarısız olan bir işlemi yineleme denemelerinin toplam sayısı. Varsayılan değer 3’tür. Almadaki retry_total bağlamı özeldir: Alma yöntemi, her yinelemede iç alma yöntemini çağıran bir while döngüsü tarafından uygulanır. Alma durumunda, retry_total while döngüsünde iç alma yöntemi tarafından tetiklenen hatadan sonra yeniden deneme sayısını belirtir. Yeniden deneme girişimleri tükenirse, hata bilgileriyle on_error geri çağırma çağrılır (sağlanmışsa). Başarısız iç bölüm tüketicisi kapatılır (on_partition_close sağlanırsa çağrılır) ve almaya devam etmek için yeni iç bölüm tüketicisi oluşturulur (on_partition_initialize sağlanırsa çağrılır).
- retry_backoff_factor
- float
İkinci denemeden sonraki denemeler arasında uygulanacak geri alma faktörü (çoğu hata gecikme olmadan ikinci bir denemeyle hemen çözülür). Sabit modda, yeniden deneme ilkesi {backoff factor} için her zaman uyku moduna geçer. 'Üstel' modda, yeniden deneme ilkesi şu süre boyunca uyku moduna geçer: {backoff factor} * (2 ** ({toplam yeniden deneme sayısı} - 1)) saniye. backoff_factor 0,1 ise yeniden denemeler arasında [0,0s, 0,2s, 0,4s, ...] için yeniden deneme uyku moduna girer. Varsayılan değer 0,8'dir.
- retry_backoff_max
- float
İzin süresi üst sınırı. Varsayılan değer 120 saniyedir (2 dakika).
- retry_mode
- str
Yeniden deneme girişimleri arasındaki gecikme davranışı. Desteklenen değerler 'sabit' veya 'üstel'dir; burada varsayılan değer 'üstel'dir.
- idle_timeout
- float
Saniyeler içinde zaman aşımı, bundan sonra başka etkinlik yoksa bu istemci temel bağlantıyı kapatır. Varsayılan olarak değer Yok'tır, yani hizmet tarafından başlatılmadığı sürece istemcinin etkinlik dışı kalma nedeniyle kapanmayacağı anlamına gelir.
- transport_type
- TransportType
Event Hubs hizmetiyle iletişim kurmak için kullanılacak aktarım protokolünün türü. Varsayılan değer TransportType.Amqp'tir ve bu durumda 5671 numaralı bağlantı noktası kullanılır. Ağ ortamında 5671 numaralı bağlantı noktası kullanılamıyorsa/engelleniyorsa, iletişim için 443 numaralı bağlantı noktasını kullanan TransportType.AmqpOverWebsocket kullanılabilir.
- checkpoint_store
- Optional[CheckpointStore]
Olayları alırken bölüm yük dengeleme ve denetim noktası verilerini depolayan bir yönetici. Denetim noktası deposu her iki durumda da tüm bölümlerden veya tek bir bölümden alma durumunda kullanılır. İkinci durumda yük dengeleme uygulanmaz. Denetim noktası deposu sağlanmazsa denetim noktası dahili olarak bellekte tutulur ve EventHubConsumerClient örneği yük dengelemesi olmadan olayları alır.
- load_balancing_interval
- float
Yük dengeleme devreye giriyorsa. Bu, iki yük dengeleme değerlendirmesi arasındaki saniye cinsinden aralıktır. Varsayılan değer 30 saniyedir.
- partition_ownership_expiration_interval
- float
Bölüm sahipliğinin süresi bu kadar saniye sonra dolar. Her yük dengeleme değerlendirmesi, sahipliğin sona erme süresini otomatik olarak uzatır. Varsayılan değer 6 * load_balancing_interval, yani 30 saniyelik varsayılan load_balancing_interval kullanıldığında 180 saniyedir.
- load_balancing_strategy
- str veya LoadBalancingStrategy
Yük dengeleme devreye giriyorsa, bölüm sahipliğini talep etmek ve dengelemek için bu stratejiyi kullanır. Doyumsuz strateji için "greedy" veya LoadBalancingStrategy.GREEDY kullanın. Bu strateji, her yük dengeleme değerlendirmesi için yükü dengelemek için gereken sahipsiz bölümlerin sayısını alır. Her yük dengeleme değerlendirmesi için diğer EventHubConsumerClient tarafından talep edilmeyen yalnızca bir bölümü talep eden dengeli strateji için "balanced" veya LoadBalancingStrategy.BALANCED kullanın. Bir EventHub'ın tüm bölümleri diğer EventHubConsumerClient tarafından talep edilirse ve bu istemci çok az bölüm talep ettiyse, bu istemci yük dengeleme stratejisinden bağımsız olarak her yük dengeleme değerlendirmesi için diğer istemcilerden bir bölümü çalar. Doyumsuz strateji varsayılan olarak kullanılır.
Ağ isteklerinin herhangi bir uygulama ağ geçidi veya konak ortamı için gereken diğer yollar üzerinden yönlendirilmesine olanak tanıyarak Event Hubs hizmetine bağlantı kurmak için kullanılacak özel uç nokta adresi. Varsayılan değer Yok'tır. Biçim "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" gibi olabilir. bağlantı noktası custom_endpoint_address belirtilmezse, varsayılan olarak 443 numaralı bağlantı noktası kullanılır.
Bağlantı uç noktasının kimliğini doğrulamak için kullanılan SSL sertifikasının özel CA_BUNDLE dosyasının yolu. Varsayılan değer Hiçbiri'dir ve bu durumda certifi.where() kullanılır.
- uamqp_transport
- bool
Temel alınan aktarım olarak uamqp kitaplığının kullanılıp kullanılmaymayacağı. Varsayılan değer False'tur ve temel alınan aktarım olarak Pure Python AMQP kitaplığı kullanılır.
Dönüş türü
Örnekler
bağlantı dizesi'dan EventHubConsumerClient'ın yeni bir örneğini oluşturun.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Olay Hub'ının özelliklerini alma.
Döndürülen sözlükteki anahtarlar şunlardır:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Döndürülenler
Olay Hub'ı hakkında bilgi içeren bir sözlük.
Dönüş türü
Özel durumlar
get_partition_ids
Olay Hub'ının bölüm kimliklerini alın.
async get_partition_ids() -> List[str]
Döndürülenler
Bölüm kimliklerinin listesi.
Dönüş türü
Özel durumlar
get_partition_properties
Belirtilen bölümün özelliklerini alın.
Özellikler sözlüğündeki anahtarlar şunlardır:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametreler
Döndürülenler
Bölüm özelliklerini içeren bir sözlük.
Dönüş türü
Özel durumlar
receive
İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerden olayları alın.
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parametreler
- on_event
- Callable[PartitionContext, Optional[EventData]]
Alınan olayı işlemek için geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bağlamını ve alınan olay olan olayı içeren partition_context. Geri çağırma işlevi şöyle tanımlanmalıdır: on_event(partition_context, olay). Ayrıntılı bölüm bağlamı bilgileri için PartitionContextbkz. .
- max_wait_time
- float
Olay işlemcisinin geri çağırmayı çağırmadan önce bekleyeceği saniye cinsinden maksimum aralık. Bu aralık içinde hiçbir olay alınmazsa, geri çağırma on_eventHiçbiri ile çağrılır. Bu değer Yok veya 0 (varsayılan) olarak ayarlanırsa, bir olay alınana kadar geri çağırma çağrılmayacak.
- partition_id
- str
Belirtilirse, istemci yalnızca bu bölümden alır. Aksi takdirde istemci tüm bölümlerden alır.
- owner_level
- int
Özel bir tüketici için öncelik. owner_level ayarlanırsa özel bir tüketici oluşturulur. Daha yüksek owner_level sahip bir tüketicinin özel önceliği daha yüksektir. Sahip düzeyi, tüketicinin 'dönem değeri' olarak da bilinmektedir.
- prefetch
- int
İşlenmek üzere hizmetten ön eklenmek üzere olayların sayısı. Varsayılan değer 300'dür.
- track_last_enqueued_event_properties
- bool
Tüketicinin ilişkili bölümünde son sıraya alınan olayla ilgili bilgi isteyip istememesi ve olaylar alınırken bu bilgileri izlemesi gerekip gerekmediğini gösterir. Bölümler hakkında son sıraya alınan olay hakkındaki bilgiler izlendiğinde, Event Hubs hizmetinden alınan her olay bölümle ilgili meta verileri taşır. Bu, Event Hub istemcisini kullanarak bölüm özellikleri için düzenli aralıklarla istekte bulunmaya karşı göz önünde bulundurulduğunda genellikle olumlu bir dengeleme olan az miktarda ek ağ bant genişliği tüketimine neden olur. Varsayılan olarak False olarak ayarlanır.
Bir bölüm için denetim noktası verisi yoksa bu olay konumundan almaya başlayın. Varsa denetim noktası verileri kullanılır. Bu, anahtar olarak bölüm kimliğine ve tek tek bölümlerin değeri olarak konuma sahip bir dikte veya tüm bölümler için tek bir değer olabilir. Değer türü str, int veya datetime.datetime olabilir. Ayrıca, akışın başından almaya yönelik "-1" ve yalnızca yeni olayları almak için "@latest" değerleri de desteklenir.
Verilen starting_position kapsayıcı (>=) olup olmadığını (>) belirleyin. Kapsayıcı için True ve özel kullanım için False. Bu, anahtar olarak bölüm kimliğine ve belirli bir bölüm için starting_position kapsayıcı olup olmadığını belirten değer olarak bool değerine sahip bir dikte olabilir. Bu, tüm starting_position için tek bir bool değeri de olabilir. Varsayılan değer False'tur.
- on_error
- Callable[[PartitionContext, Exception]]
Yeniden deneme girişimleri tükendikten sonra veya yük dengeleme işlemi sırasında alma sırasında bir hata oluştuğunda çağrılan geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bilgilerini içeren partition_context ve özel durum olan hata . yük dengeleme işlemi sırasında hata oluşursa partition_context Hiçbiri olabilir. Geri çağırma şu şekilde tanımlanmalıdır: on_error(partition_context, hata). geri çağırma on_event sırasında işlenmeyen bir özel durum oluşursa, on_error geri çağırması da çağrılır.
- on_partition_initialize
- Callable[[PartitionContext]]
Belirli bir bölüm için tüketicinin başlatılmasını tamamladıktan sonra çağrılan geri çağırma işlevi. Başarısız ve kapalı bir iç bölüm tüketicisi için alma işlemini devralmak üzere yeni bir iç bölüm tüketicisi oluşturulduğunda da çağrılır. Geri çağırma tek bir parametre alır: bölüm bilgilerini içeren partition_context . Geri çağırma şu şekilde tanımlanmalıdır: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Belirli bir bölüm için tüketici kapatıldıktan sonra çağrılacak geri çağırma işlevi. Yeniden deneme girişimleri tükendikten sonra alma sırasında hata oluştuğunda da çağrılabilir. Geri çağırma iki parametre alır: bölüm bilgilerini ve kapatma nedenini içeren partition_context. Geri çağırma şu şekilde tanımlanmalıdır: on_partition_close(partition_context, neden). Lütfen çeşitli kapanış nedenleri için adresine başvurun CloseReason .
Dönüş türü
Örnekler
EventHub'dan olayları alma.
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerin olaylarını toplu olarak alın.
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parametreler
- on_event_batch
- Callable[PartitionContext, List[EventData]]
Alınan olayların toplu işlemini işlemek için geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bağlamını içeren partition_context ve alınan olaylar olan event_batch. Geri çağırma işlevi şöyle tanımlanmalıdır: on_event_batch(partition_context, event_batch). event_batch max_wait_time Hiçbiri veya 0 değilse ve max_wait_time sonra hiçbir olay alınmadıysa boş bir liste olabilir. Ayrıntılı bölüm bağlamı bilgileri için PartitionContextbkz. .
- max_batch_size
- int
Geri çağırma on_event_batch bir toplu işlemde geçirilen en fazla olay sayısı. Alınan gerçek olay sayısı max_batch_size'den büyükse, alınan olaylar toplu işlere bölünür ve en fazla max_batch_size olay içeren her toplu iş için geri çağırmayı çağırır.
- max_wait_time
- float
Olay işlemcisinin geri çağırmayı çağırmadan önce bekleyeceği saniye cinsinden maksimum aralık. Bu aralık içinde hiçbir olay alınmazsa, on_event_batch geri çağırma boş bir listeyle çağrılır. Bu değer Yok veya 0 (varsayılan) olarak ayarlanırsa, olaylar alınana kadar geri çağırma çağrılmayacak.
- partition_id
- str
Belirtilirse, istemci yalnızca bu bölümden alır. Aksi takdirde istemci tüm bölümlerden alır.
- owner_level
- int
Özel bir tüketici için öncelik. owner_level ayarlanırsa özel bir tüketici oluşturulur. Daha yüksek owner_level sahip bir tüketicinin özel önceliği daha yüksektir. Sahip düzeyi, tüketicinin 'dönem değeri' olarak da bilinmektedir.
- prefetch
- int
İşlenmek üzere hizmetten ön eklenmek üzere olayların sayısı. Varsayılan değer 300'dür.
- track_last_enqueued_event_properties
- bool
Tüketicinin ilişkili bölümünde son sıraya alınan olayla ilgili bilgi isteyip istememesi ve olaylar alınırken bu bilgileri izlemesi gerekip gerekmediğini gösterir. Bölümler hakkında son sıraya alınan olay hakkındaki bilgiler izlendiğinde, Event Hubs hizmetinden alınan her olay bölümle ilgili meta verileri taşır. Bu, Event Hub istemcisini kullanarak bölüm özellikleri için düzenli aralıklarla istekte bulunmaya karşı göz önünde bulundurulduğunda genellikle olumlu bir dengeleme olan az miktarda ek ağ bant genişliği tüketimine neden olur. Varsayılan olarak False olarak ayarlanır.
Bir bölüm için denetim noktası verisi yoksa bu olay konumundan almaya başlayın. Varsa denetim noktası verileri kullanılır. Bu, anahtar olarak bölüm kimliğine ve tek tek bölümlerin değeri olarak konuma sahip bir dikte veya tüm bölümler için tek bir değer olabilir. Değer türü str, int veya datetime.datetime olabilir. Ayrıca, akışın başından almaya yönelik "-1" ve yalnızca yeni olayları almak için "@latest" değerleri de desteklenir.
Verilen starting_position kapsayıcı (>=) olup olmadığını (>) belirleyin. Kapsayıcı için True ve özel kullanım için False. Bu, anahtar olarak bölüm kimliğine ve belirli bir bölüm için starting_position kapsayıcı olup olmadığını belirten değer olarak bool değerine sahip bir dikte olabilir. Bu, tüm starting_position için tek bir bool değeri de olabilir. Varsayılan değer False'tur.
- on_error
- Callable[[PartitionContext, Exception]]
Yeniden deneme girişimleri tükendikten sonra veya yük dengeleme işlemi sırasında alma sırasında bir hata oluştuğunda çağrılan geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bilgilerini içeren partition_context ve özel durum olan hata . yük dengeleme işlemi sırasında hata oluşursa partition_context Hiçbiri olabilir. Geri çağırma şu şekilde tanımlanmalıdır: on_error(partition_context, hata). geri çağırma on_event sırasında işlenmeyen bir özel durum oluşursa, on_error geri çağırması da çağrılır.
- on_partition_initialize
- Callable[[PartitionContext]]
Belirli bir bölüm için tüketicinin başlatılmasını tamamladıktan sonra çağrılan geri çağırma işlevi. Başarısız ve kapalı bir iç bölüm tüketicisi için alma işlemini devralmak üzere yeni bir iç bölüm tüketicisi oluşturulduğunda da çağrılır. Geri çağırma tek bir parametre alır: bölüm bilgilerini içeren partition_context . Geri çağırma şu şekilde tanımlanmalıdır: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Belirli bir bölüm için tüketici kapatıldıktan sonra çağrılacak geri çağırma işlevi. Yeniden deneme girişimleri tükendikten sonra alma sırasında hata oluştuğunda da çağrılabilir. Geri çağırma iki parametre alır: bölüm bilgilerini ve kapatma nedenini içeren partition_context. Geri çağırma şu şekilde tanımlanmalıdır: on_partition_close(partition_context, neden). Lütfen çeşitli kapanış nedenleri için adresine başvurun CloseReason .
Dönüş türü
Örnekler
Olayları EventHub'dan toplu olarak alın.
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python