EventHubConsumerClient Sınıf
EventHubConsumerClient sınıfı, Azure Event Hubs hizmetinden olay almak için üst düzey bir arabirim tanımlar.
EventHubConsumerClient'ın temel amacı, yük dengeleme ve denetim noktası oluşturma ile bir EventHub'ın tüm bölümlerinden olayları almaktır.
Aynı olay hub'ına, tüketici grubuna ve denetim noktası konumuna karşı birden çok EventHubConsumerClient örneği çalıştığında, bölümler bunlar arasında eşit olarak dağıtılır.
Yük dengelemeyi ve kalıcı denetim noktalarını etkinleştirmek için EventHubConsumerClient oluşturulurken checkpoint_store ayarlanmalıdır. Denetim noktası deposu sağlanmazsa denetim noktası dahili olarak bellekte tutulur.
Bir EventHubConsumerClient, yöntemi receive() veya receive_batch() çağırıp partition_id belirttiğinizde belirli bir bölümden de alabilir. Yük dengeleme tek bölümlü modda çalışmaz. Ancak checkpoint_store ayarlandıysa kullanıcılar denetim noktalarını kaydetmeye devam edebilir.
- Devralma
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
Oluşturucu
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parametreler
- fully_qualified_namespace
- str
Event Hubs ad alanı için tam konak adı. Ad alanı biçimi: .servicebus.windows.net.
- credential
- TokenCredential veya AzureSasCredential veya AzureNamedKeyCredential
Belirteçleri almak için belirli bir arabirim uygulayan kimlik doğrulaması için kullanılan kimlik bilgisi nesnesi. Azure-identity kitaplığı tarafından oluşturulan , veya kimlik bilgisi nesnelerini ve *get_token(self, scopes) yöntemini uygulayan nesneleri kabul EventHubSharedKeyCredentialeder.
- logging_enable
- bool
Günlükçüye ağ izleme günlüklerinin çıkışının yapılıp yapılmaydığı. Varsayılan değer False'tur.
- auth_timeout
- float
Saniye cinsinden bir belirtecin hizmet tarafından yetkilendirilmesini bekleme süresi. Varsayılan değer 60 saniyedir. 0 olarak ayarlanırsa, istemciden hiçbir zaman aşımı uygulanmaz.
- user_agent
- str
Belirtilirse, bu kullanıcı aracısı dizesinin önüne eklenir.
- retry_total
- int
Hata oluştuğunda başarısız olan bir işlemi yineleme denemelerinin toplam sayısı. Varsayılan değer 3’tür. Almadaki retry_total bağlamı özeldir: Alma yöntemi, her yinelemede iç alma yöntemini çağıran bir while döngüsü tarafından uygulanır. Alma durumunda, retry_total while döngüsündeki iç alma yöntemi tarafından oluşturulan hatadan sonra yeniden deneme sayısını belirtir. Yeniden deneme girişimleri tükenirse, hata bilgileriyle birlikte geri çağırma on_error çağrılır (sağlanırsa). Başarısız iç bölüm tüketicisi kapatılır (on_partition_close sağlanırsa çağrılır) ve almaya devam etmek için yeni iç bölüm tüketicisi oluşturulur (on_partition_initialize sağlanırsa çağrılır).
- retry_backoff_factor
- float
İkinci denemeden sonra girişimler arasında uygulanacak bir geri alma faktörü (çoğu hata gecikme olmadan ikinci bir denemeyle hemen çözülür). Sabit modda, yeniden deneme ilkesi {backoff factor} için her zaman uyku moduna geçer. 'Üstel' modda, yeniden deneme ilkesi şu süre boyunca uyku moduna geçer: {backoff factor} * (2 ** ({toplam yeniden deneme sayısı} - 1)) saniye. backoff_factor 0,1 ise yeniden denemeler arasında [0,0s, 0,2s, 0,4s, ...] için yeniden deneme uyku moduna alır. Varsayılan değer 0,8'dir.
- retry_backoff_max
- float
İzin süresi üst sınırı. Varsayılan değer 120 saniyedir (2 dakika).
- retry_mode
- str
Yeniden deneme girişimleri arasındaki gecikme davranışı. Desteklenen değerler 'sabit' veya 'üstel'dir; burada varsayılan değer 'üstel'dir.
- idle_timeout
- float
Saniyeler içinde zaman aşımı, bundan sonra başka etkinlik yoksa bu istemci temel bağlantıyı kapatır. Varsayılan olarak değer Yok'tur, yani hizmet tarafından başlatılmadığı sürece istemcinin etkinlik dışı kalma nedeniyle kapanmayacağı anlamına gelir.
- transport_type
- TransportType
Event Hubs hizmetiyle iletişim kurmak için kullanılacak aktarım protokolünün türü. Varsayılan değer TransportType.Amqp'dir ve bu durumda bağlantı noktası 5671 kullanılır. Ağ ortamında 5671 numaralı bağlantı noktası kullanılamıyorsa/engelleniyorsa, iletişim için 443 numaralı bağlantı noktasını kullanan TransportType.AmqpOverWebsocket kullanılabilir.
HTTP proxy ayarları. Bu, şu anahtarlara sahip bir sözlük olmalıdır: 'proxy_hostname' (str değeri) ve 'proxy_port' (int değeri). Ayrıca aşağıdaki anahtarlar da mevcut olabilir: 'username', 'password'.
- checkpoint_store
- CheckpointStore veya None
Olayları alırken bölüm yük dengeleme ve denetim noktası verilerini depolayan bir yönetici. Denetim noktası deposu her iki durumda da tüm bölümlerden veya tek bir bölümden alınır. İkinci durumda yük dengeleme uygulanmaz. Denetim noktası deposu sağlanmazsa denetim noktası bellek içinde tutulur ve EventHubConsumerClient örneği yük dengelemesi olmadan olayları alır.
- load_balancing_interval
- float
Yük dengeleme devreye giriyorsa. Bu, iki yük dengeleme değerlendirmesi arasındaki saniye cinsinden aralıktır. Varsayılan değer 30 saniyedir.
- partition_ownership_expiration_interval
- float
Bölüm sahipliğinin süresi bu kadar saniye sonra dolar. Her yük dengeleme değerlendirmesi, sahipliğin sona erme süresini otomatik olarak uzatır. Varsayılan değer 6 * load_balancing_interval,30 saniyelik varsayılan load_balancing_interval kullanılırken 180 saniyedir.
- load_balancing_strategy
- str veya LoadBalancingStrategy
Yük dengeleme devreye giriyorsa, bölüm sahipliğini talep etmek ve dengelemek için bu stratejiyi kullanır. Doyumsuz strateji için "doyumsuz" veya LoadBalancingStrategy.GREEDY kullanın. Bu strateji, her yük dengeleme değerlendirmesi için yükü dengelemek için gereken sayıda sahipsiz bölümü alır. Her yük dengeleme değerlendirmesi için diğer EventHubConsumerClient tarafından talep edilmeyen yalnızca bir bölümü talep eden dengeli strateji için "balanced" veya LoadBalancingStrategy.BALANCED kullanın. Bir EventHub'ın tüm bölümleri diğer EventHubConsumerClient tarafından talep edilirse ve bu istemci çok az bölüm talep ettiyse, bu istemci yük dengeleme stratejisinden bağımsız olarak her yük dengeleme değerlendirmesi için diğer istemcilerden bir bölüm çalar. Doyumsuz strateji varsayılan olarak kullanılır.
Event Hubs hizmetine bağlantı kurmak için kullanılacak özel uç nokta adresi, ağ isteklerinin konak ortamı için gereken tüm uygulama ağ geçitleri veya diğer yollar üzerinden yönlendirilmesine olanak tanır. Varsayılan değer Yok'tur. Biçim "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" şeklinde olacaktır. bağlantı noktası custom_endpoint_address belirtilmezse, varsayılan olarak 443 numaralı bağlantı noktası kullanılır.
Bağlantı uç noktasının kimliğini doğrulamak için kullanılan SSL sertifikasının özel CA_BUNDLE dosyasının yolu. Varsayılan değer Hiçbiri'dir ve bu durumda certifi.where() kullanılır.
- uamqp_transport
- bool
Temel alınan aktarım olarak uamqp kitaplığının kullanılıp kullanılmaymayacağı. Varsayılan değer False'tur ve temel alınan aktarım olarak Pure Python AMQP kitaplığı kullanılır.
- socket_timeout
- float
Zaman aşımına uğramadan önce bağlantıdaki temel yuvanın veri gönderirken ve alırken beklemesi gereken saniye cinsinden süre. Varsayılan değer TransportType.Amqp için 0.2 ve TransportType.AmqpOverWebsocket için 1'dir. Yazma zaman aşımı nedeniyle EventHubsConnectionError hataları oluşuyorsa, varsayılan değerden daha büyük bir değerin geçirilmesi gerekebilir. Bu gelişmiş kullanım senaryoları içindir ve normalde varsayılan değer yeterli olmalıdır.
Örnekler
EventHubConsumerClient'ın yeni bir örneğini oluşturun.
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
Yöntemler
close |
Olay Hub'ından olay almayı durdurun ve temel alınan AMQP bağlantısını ve bağlantılarını kapatın. |
from_connection_string |
bir bağlantı dizesi EventHubConsumerClient oluşturun. |
get_eventhub_properties |
Olay Hub'ının özelliklerini alma. Döndürülen sözlükteki anahtarlar şunlardır:
|
get_partition_ids |
Olay Hub'ının bölüm kimliklerini alın. |
get_partition_properties |
Belirtilen bölümün özelliklerini alın. Özellikler sözlüğündeki anahtarlar şunlardır:
|
receive |
İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerden olayları alın. |
receive_batch |
İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerden olayları alın. |
close
Olay Hub'ından olay almayı durdurun ve temel alınan AMQP bağlantısını ve bağlantılarını kapatın.
close() -> None
Dönüş türü
Örnekler
İstemciyi kapatın.
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
bir bağlantı dizesi EventHubConsumerClient oluşturun.
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
Parametreler
- eventhub_name
- str
İstemcinin bağlanacak belirli Olay Hub'ının yolu.
- logging_enable
- bool
Günlükçüye ağ izleme günlüklerinin çıkışının yapılıp yapılmaydığı. Varsayılan değer False'tur.
- auth_timeout
- float
Saniye cinsinden bir belirtecin hizmet tarafından yetkilendirilmesini bekleme süresi. Varsayılan değer 60 saniyedir. 0 olarak ayarlanırsa, istemciden hiçbir zaman aşımı uygulanmaz.
- user_agent
- str
Belirtilirse, bu kullanıcı aracısı dizesinin önüne eklenir.
- retry_total
- int
Hata oluştuğunda başarısız olan bir işlemi yineleme denemelerinin toplam sayısı. Varsayılan değer 3’tür. Almadaki retry_total bağlamı özeldir: Alma yöntemi, her yinelemede iç alma yöntemini çağıran bir while döngüsü tarafından uygulanır. Alma durumunda, retry_total while döngüsündeki iç alma yöntemi tarafından oluşturulan hatadan sonra yeniden deneme sayısını belirtir. Yeniden deneme girişimleri tükenirse, hata bilgileriyle birlikte geri çağırma on_error çağrılır (sağlanırsa). Başarısız iç bölüm tüketicisi kapatılır (on_partition_close sağlanırsa çağrılır) ve almaya devam etmek için yeni iç bölüm tüketicisi oluşturulur (on_partition_initialize sağlanırsa çağrılır).
- retry_backoff_factor
- float
İkinci denemeden sonra girişimler arasında uygulanacak bir geri alma faktörü (çoğu hata gecikme olmadan ikinci bir denemeyle hemen çözülür). Sabit modda, yeniden deneme ilkesi {backoff factor} için her zaman uyku moduna geçer. 'Üstel' modda, yeniden deneme ilkesi şu süre boyunca uyku moduna geçer: {backoff factor} * (2 ** ({toplam yeniden deneme sayısı} - 1)) saniye. backoff_factor 0,1 ise yeniden denemeler arasında [0,0s, 0,2s, 0,4s, ...] için yeniden deneme uyku moduna girer. Varsayılan değer 0,8'dir.
- retry_backoff_max
- float
İzin süresi üst sınırı. Varsayılan değer 120 saniyedir (2 dakika).
- retry_mode
- str
Yeniden deneme girişimleri arasındaki gecikme davranışı. Desteklenen değerler 'sabit' veya 'üstel'dir; burada varsayılan değer 'üstel'dir.
- idle_timeout
- float
Saniyeler içinde zaman aşımı, daha sonra furthur etkinliği yoksa bu istemci temel bağlantıyı kapatır. Varsayılan olarak değer Yok'tır, yani hizmet tarafından başlatılmadığı sürece istemcinin etkinlik dışı kalma nedeniyle kapanmayacağı anlamına gelir.
- transport_type
- TransportType
Event Hubs hizmetiyle iletişim kurmak için kullanılacak aktarım protokolünün türü. Varsayılan değer TransportType.Amqp'tir ve bu durumda 5671 numaralı bağlantı noktası kullanılır. Ağ ortamında 5671 numaralı bağlantı noktası kullanılamıyorsa/engelleniyorsa, iletişim için 443 numaralı bağlantı noktasını kullanan TransportType.AmqpOverWebsocket kullanılabilir.
- http_proxy
- dict
HTTP proxy ayarları. Bu, şu anahtarlara sahip bir sözlük olmalıdır: 'proxy_hostname' (str değeri) ve 'proxy_port' (int değeri). Ayrıca aşağıdaki anahtarlar da mevcut olabilir: 'kullanıcı adı', 'parola'.
- checkpoint_store
- CheckpointStore veya None
Olayları alırken bölüm yük dengeleme ve denetim noktası verilerini depolayan bir yönetici. Denetim noktası deposu her iki durumda da tüm bölümlerden veya tek bir bölümden alma durumunda kullanılır. İkinci durumda yük dengeleme uygulanmaz. Denetim noktası deposu sağlanmazsa denetim noktası dahili olarak bellekte tutulur ve EventHubConsumerClient örneği yük dengelemesi olmadan olayları alır.
- load_balancing_interval
- float
Yük dengeleme devreye giriyorsa. Bu, iki yük dengeleme değerlendirmesi arasındaki saniye cinsinden aralıktır. Varsayılan değer 10 saniyedir.
- partition_ownership_expiration_interval
- float
Bölüm sahipliğinin süresi bu kadar saniye sonra dolar. Her yük dengeleme değerlendirmesi, sahipliğin sona erme süresini otomatik olarak uzatır. Varsayılan değer 6 * load_balancing_interval, yani 30 saniyelik varsayılan load_balancing_interval kullanıldığında 60 saniyedir.
- load_balancing_strategy
- str veya LoadBalancingStrategy
Yük dengeleme devreye giriyorsa, bölüm sahipliğini talep etmek ve dengelemek için bu stratejiyi kullanır. Doyumsuz strateji için "greedy" veya LoadBalancingStrategy.GREEDY kullanın. Bu strateji, her yük dengeleme değerlendirmesi için yükü dengelemek için gereken sahipsiz bölümlerin sayısını alır. Her yük dengeleme değerlendirmesi için diğer EventHubConsumerClient tarafından talep edilmeyen yalnızca bir bölümü talep eden dengeli strateji için "balanced" veya LoadBalancingStrategy.BALANCED kullanın. Bir EventHub'ın tüm bölümleri diğer EventHubConsumerClient tarafından talep edilirse ve bu istemci çok az bölüm talep ettiyse, bu istemci yük dengeleme stratejisinden bağımsız olarak her yük dengeleme değerlendirmesi için diğer istemcilerden bir bölümü çalar. Doyumsuz strateji varsayılan olarak kullanılır.
Ağ isteklerinin herhangi bir uygulama ağ geçidi veya konak ortamı için gereken diğer yollar üzerinden yönlendirilmesine olanak tanıyarak Event Hubs hizmetine bağlantı kurmak için kullanılacak özel uç nokta adresi. Varsayılan değer Yok'tır. Biçim "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" gibi olabilir. bağlantı noktası custom_endpoint_address belirtilmezse, varsayılan olarak 443 numaralı bağlantı noktası kullanılır.
Bağlantı uç noktasının kimliğini doğrulamak için kullanılan SSL sertifikasının özel CA_BUNDLE dosyasının yolu. Varsayılan değer Hiçbiri'dir ve bu durumda certifi.where() kullanılır.
- uamqp_transport
- bool
Temel alınan aktarım olarak uamqp kitaplığının kullanılıp kullanılmaymayacağı. Varsayılan değer False'tur ve temel alınan aktarım olarak Pure Python AMQP kitaplığı kullanılır.
Dönüş türü
Örnekler
bağlantı dizesi'den EventHubConsumerClient'ın yeni bir örneğini oluşturun.
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Olay Hub'ının özelliklerini alma.
Döndürülen sözlükteki anahtarlar şunlardır:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
Döndürülenler
Olay Hub'ı hakkında bilgi içeren bir sözlük.
Dönüş türü
Özel durumlar
get_partition_ids
Olay Hub'ının bölüm kimliklerini alın.
get_partition_ids() -> List[str]
Döndürülenler
Bölüm kimliklerinin listesi.
Dönüş türü
Özel durumlar
get_partition_properties
Belirtilen bölümün özelliklerini alın.
Özellikler sözlüğündeki anahtarlar şunlardır:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametreler
Döndürülenler
Bölüm özelliklerini içeren bir sözlük.
Dönüş türü
Özel durumlar
receive
İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerden olayları alın.
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
Parametreler
- on_event
- callable[PartitionContext, EventData veya None]
Alınan olayı işlemek için geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bağlamını ve alınan olay olan olayı içeren partition_context. Geri çağırma işlevi şöyle tanımlanmalıdır: on_event(partition_context, olay). Ayrıntılı bölüm bağlamı bilgileri için PartitionContextbkz. .
- max_wait_time
- float
Olay işlemcisinin geri çağırmayı çağırmadan önce bekleyeceği saniye cinsinden maksimum aralık. Bu aralık içinde hiçbir olay alınmazsa, geri çağırma on_eventHiçbiri ile çağrılır. Bu değer Yok veya 0 (varsayılan) olarak ayarlanırsa, bir olay alınana kadar geri çağırma çağrılmayacak.
- partition_id
- str
Belirtilirse, istemci yalnızca bu bölümden alır. Aksi takdirde istemci tüm bölümlerden alır.
- owner_level
- int
Özel bir tüketici için öncelik. owner_level ayarlanırsa özel bir tüketici oluşturulur. Daha yüksek owner_level sahip bir tüketicinin özel önceliği daha yüksektir. Sahip düzeyi, tüketicinin 'dönem değeri' olarak da bilinmektedir.
- prefetch
- int
İşlenmek üzere hizmetten ön eklenmek üzere olayların sayısı. Varsayılan değer 300'dür.
- track_last_enqueued_event_properties
- bool
Tüketicinin ilişkili bölümünde son sıraya alınan olayla ilgili bilgi isteyip istememesi ve olaylar alınırken bu bilgileri izlemesi gerekip gerekmediğini gösterir. Bölümler hakkında son sıraya alınan olay hakkındaki bilgiler izlendiğinde, Event Hubs hizmetinden alınan her olay bölümle ilgili meta verileri taşır. Bu, Event Hub istemcisini kullanarak bölüm özellikleri için düzenli aralıklarla istekte bulunmaya karşı göz önünde bulundurulduğunda genellikle olumlu bir dengeleme olan az miktarda ek ağ bant genişliği tüketimine neden olur. Varsayılan olarak False olarak ayarlanır.
Bir bölüm için denetim noktası verisi yoksa bu olay konumundan almaya başlayın. Varsa denetim noktası verileri kullanılır. Bu, anahtar olarak bölüm kimliğine ve tek tek bölümlerin değeri olarak konuma sahip bir dikte veya tüm bölümler için tek bir değer olabilir. Değer türü str, int veya datetime.datetime olabilir. Ayrıca, akışın başından almaya yönelik "-1" ve yalnızca yeni olayları almak için "@latest" değerleri de desteklenir. Varsayılan değer "@latest"dir.
Verilen starting_position kapsayıcı (>=) olup olmadığını (>) belirleyin. Kapsayıcı için True ve özel kullanım için False. Bu, anahtar olarak bölüm kimliğine ve belirli bir bölüm için starting_position kapsayıcı olup olmadığını belirten değer olarak bool değerine sahip bir dikte olabilir. Bu, tüm starting_position için tek bir bool değeri de olabilir. Varsayılan değer False'tur.
- on_error
- callable[[PartitionContext, Exception]]
Yeniden deneme girişimleri tükendikten sonra veya yük dengeleme işlemi sırasında alma sırasında bir hata oluştuğunda çağrılan geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bilgilerini içeren partition_context ve özel durum olan hata . yük dengeleme işlemi sırasında hata oluşursa partition_context Hiçbiri olabilir. Geri çağırma şu şekilde tanımlanmalıdır: on_error(partition_context, hata). geri çağırma on_event sırasında işlenmeyen bir özel durum oluşursa, on_error geri çağırması da çağrılır.
- on_partition_initialize
- callable[[PartitionContext]]
Belirli bir bölüm için tüketicinin başlatılmasını tamamladıktan sonra çağrılan geri çağırma işlevi. Başarısız ve kapalı bir iç bölüm tüketicisi için alma işlemini devralmak üzere yeni bir iç bölüm tüketicisi oluşturulduğunda da çağrılır. Geri çağırma tek bir parametre alır: bölüm bilgilerini içeren partition_context . Geri çağırma şu şekilde tanımlanmalıdır: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Belirli bir bölüm için tüketici kapatıldıktan sonra çağrılacak geri çağırma işlevi. Yeniden deneme girişimleri tükendikten sonra alma sırasında hata oluştuğunda da çağrılabilir. Geri çağırma iki parametre alır: bölüm bilgilerini ve kapatma nedenini içeren partition_context. Geri çağırma şu şekilde tanımlanmalıdır: on_partition_close(partition_context, neden). Lütfen çeşitli kapanış nedenleri için adresine başvurun CloseReason .
Dönüş türü
Örnekler
EventHub'dan olayları alma.
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
İsteğe bağlı yük dengeleme ve denetim noktası oluşturma ile bölümlerden olayları alın.
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
Parametreler
- on_event_batch
- callable[PartitionContext, list[EventData]]
Alınan olayların toplu işlemini işlemek için geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bağlamını içeren partition_context ve alınan olaylar olan event_batch. Geri çağırma işlevi şöyle tanımlanmalıdır: on_event_batch(partition_context, event_batch). event_batch max_wait_time Hiçbiri veya 0 değilse ve max_wait_time sonra hiçbir olay alınmadıysa boş bir liste olabilir. Ayrıntılı bölüm bağlamı bilgileri için PartitionContextbkz. .
- max_batch_size
- int
Geri çağırma on_event_batch bir toplu işlemde geçirilen en fazla olay sayısı. Alınan gerçek olay sayısı max_batch_size'den büyükse, alınan olaylar toplu işlere bölünür ve en fazla max_batch_size olay içeren her toplu iş için geri çağırmayı çağırır.
- max_wait_time
- float
Olay işlemcisinin geri çağırmayı çağırmadan önce bekleyeceği saniye cinsinden maksimum aralık. Bu aralık içinde hiçbir olay alınmazsa, on_event_batch geri çağırma boş bir listeyle çağrılır.
- partition_id
- str
Belirtilirse, istemci yalnızca bu bölümden alır. Aksi takdirde istemci tüm bölümlerden alır.
- owner_level
- int
Özel bir tüketici için öncelik. owner_level ayarlanırsa özel bir tüketici oluşturulur. Daha yüksek owner_level sahip bir tüketicinin özel önceliği daha yüksektir. Sahip düzeyi, tüketicinin 'dönem değeri' olarak da bilinmektedir.
- prefetch
- int
İşlenmek üzere hizmetten ön eklenmek üzere olayların sayısı. Varsayılan değer 300'dür.
- track_last_enqueued_event_properties
- bool
Tüketicinin ilişkili bölümünde son sıraya alınan olayla ilgili bilgi isteyip istememesi ve olaylar alınırken bu bilgileri izlemesi gerekip gerekmediğini gösterir. Bölümler hakkında son sıraya alınan olay hakkındaki bilgiler izlendiğinde, Event Hubs hizmetinden alınan her olay bölümle ilgili meta verileri taşır. Bu, Event Hub istemcisini kullanarak bölüm özellikleri için düzenli aralıklarla istekte bulunmaya karşı göz önünde bulundurulduğunda genellikle olumlu bir dengeleme olan az miktarda ek ağ bant genişliği tüketimine neden olur. Varsayılan olarak False olarak ayarlanır.
Bir bölüm için denetim noktası verisi yoksa bu olay konumundan almaya başlayın. Varsa denetim noktası verileri kullanılır. Bu, anahtar olarak bölüm kimliğine ve tek tek bölümlerin değeri olarak konuma sahip bir dikte veya tüm bölümler için tek bir değer olabilir. Değer türü str, int veya datetime.datetime olabilir. Ayrıca, akışın başından almaya yönelik "-1" ve yalnızca yeni olayları almak için "@latest" değerleri de desteklenir. Varsayılan değer "@latest"dir.
Verilen starting_position kapsayıcı (>=) olup olmadığını (>) belirleyin. Kapsayıcı için True ve özel kullanım için False. Bu, anahtar olarak bölüm kimliğine ve belirli bir bölüm için starting_position kapsayıcı olup olmadığını belirten değer olarak bool değerine sahip bir dikte olabilir. Bu, tüm starting_position için tek bir bool değeri de olabilir. Varsayılan değer False'tur.
- on_error
- callable[[PartitionContext, Exception]]
Yeniden deneme girişimleri tükendikten sonra veya yük dengeleme işlemi sırasında alma sırasında bir hata oluştuğunda çağrılan geri çağırma işlevi. Geri çağırma iki parametre alır: bölüm bilgilerini içeren partition_context ve özel durum olan hata . yük dengeleme işlemi sırasında hata oluşursa partition_context Hiçbiri olabilir. Geri çağırma şu şekilde tanımlanmalıdır: on_error(partition_context, hata). geri çağırma on_event sırasında işlenmeyen bir özel durum oluşursa, on_error geri çağırması da çağrılır.
- on_partition_initialize
- callable[[PartitionContext]]
Belirli bir bölüm için tüketicinin başlatılmasını tamamladıktan sonra çağrılan geri çağırma işlevi. Başarısız ve kapalı bir iç bölüm tüketicisi için alma işlemini devralmak üzere yeni bir iç bölüm tüketicisi oluşturulduğunda da çağrılır. Geri çağırma tek bir parametre alır: bölüm bilgilerini içeren partition_context . Geri çağırma şu şekilde tanımlanmalıdır: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
Belirli bir bölüm için tüketici kapatıldıktan sonra çağrılacak geri çağırma işlevi. Yeniden deneme girişimleri tükendikten sonra alma sırasında hata oluştuğunda da çağrılabilir. Geri çağırma iki parametre alır: bölüm bilgilerini ve kapatma nedenini içeren partition_context. Geri çağırma şu şekilde tanımlanmalıdır: on_partition_close(partition_context, neden). Lütfen çeşitli kapanış nedenleri için adresine başvurun CloseReason .
Dönüş türü
Örnekler
Olayları EventHub'dan toplu olarak alın.
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Azure SDK for Python