Python için Azure EventHubs Checkpoint Store istemci kitaplığı - sürüm 1.1.4
Depolama Bloblarını kullanma
Azure EventHubs Denetim Noktası Deposu, Azure Event Hubs olayları işlenirken denetim noktalarını depolamak için kullanılır.
Bu Checkpoint Store paketi, için EventHubConsumerClient
bir eklenti paketi olarak çalışır. Denetim noktalarını ve bölüm sahipliği bilgilerini korumak için kalıcı depo olarak Azure Depolama Blobunu kullanır.
Bunun zaman uyumsuz bir kitaplık olduğunu unutmayın. Azure EventHubs Checkpoint Store istemci kitaplığının eşitleme sürümü için lütfen azure-eventhub-checkpointstoreblob konusuna bakın.
Kaynak kodu | Paket (PyPi) | API başvuru belgeleri | Azure Eventhubs belgeleri | Azure Depolama belgeleri
Başlarken
Önkoşullar
Python 3.6 veya üzeri.
Microsoft Azure Aboneliği: Azure Event Hubs dahil olmak üzere Azure hizmetlerini kullanmak için bir aboneliğe ihtiyacınız vardır. Mevcut bir Azure hesabınız yoksa ücretsiz deneme sürümüne kaydolabilir veya hesap oluştururken MSDN abone avantajlarınızı kullanabilirsiniz.
Olay Hub'ı ile Event Hubs ad alanı: Azure Event Hubs etkileşime geçmek için bir ad alanı ve Olay Hub'ına da sahip olmanız gerekir. Azure kaynakları oluşturma hakkında bilginiz yoksa, Azure portal kullanarak Olay Hub'ı oluşturmak için adım adım kılavuzu izlemek isteyebilirsiniz. Burada, Olay Hub'ı oluşturmak için Azure CLI, Azure PowerShell veya Azure Resource Manager (ARM) şablonlarını kullanmaya yönelik ayrıntılı yönergeleri de bulabilirsiniz.
Azure Depolama Hesabı: Denetim noktası verilerini bloblarla depolamak için bir Azure Depolama Hesabınız olması ve bir Azure Blob Depolama Engelleme Kapsayıcısı oluşturmanız gerekir. Azure Blok Blob Depolama Hesabı oluşturma kılavuzunu izleyebilirsiniz.
Paketi yükleme
$ pip install azure-eventhub-checkpointstoreblob-aio
Önemli kavramlar
Denetim noktası oluşturma
Denetim noktası oluşturma, okuyucuların bir bölüm olay dizisindeki konumlarını işaretledikleri veya uyguladıkları bir işlemdir. Denetim noktası oluşturma, tüketicinin sorumluluğundadır ve bir tüketici grubunda bölüm başına temelinde gerçekleşir. Bu sorumluluk, her bir tüketici grubu için her bölüm okuyucusunun geçerli konumunu olay akışında izlemesi gerektiği ve veri akışının tamamlandığını düşündüğünde hizmeti bilgilendirebileceği anlamına gelir. Bir okuyucunun bölüm bağlantısı kesilirse yeniden bağlandığında ilgili tüketici grubundaki o bölümün son okuyucusu tarafından daha önce gönderilen denetim noktasında okumaya başlar. Okuyucu bağlandığında, okumaya başlayacağı konumu belirtmek için uzaklığı olay hub'ına geçirir. Bu şekilde, denetim noktası oluşturma özelliğini hem aşağı akış uygulamaları ile olayları "tamamlandı" olarak işaretlemek hem de farklı makinelerde çalışan okuyucular arasında bir yük devretme oluşması durumunda esneklik sağlamak amacıyla kullanabilirsiniz. Bu denetim noktası oluşturma işleminden daha düşük bir uzaklık belirterek daha eski verilere geri dönülebilir. Bu mekanizmayla denetim noktası oluşturma özelliği hem yük devretme esnekliği hem de olay akışı yeniden yürütmesi sağlar.
Sıra numaralarını uzaklıklar &
Her iki uzaklık & dizisi numarası da bir bölümün içindeki bir olayın konumuna başvurur. Bunları istemci tarafı imleç olarak düşünebilirsiniz. Uzaklık, olayın bayt cinsinden numaralandırılmasıdır. Uzaklık/sıra numarası, olay tüketicisinin (okuyucu) olay akışında olayları okumaya başlamak istediği bir nokta belirtmesini sağlar. Yalnızca verilen zaman damgasından sonra sıralanan olayları alacak şekilde bir zaman damgası belirtebilirsiniz. Tüketiciler, kendi uzaklık değerlerini Event Hubs hizmetinin dışında saklamaktan sorumludur. Bir bölüm içinde her olay bir uzaklık, sıra numarası ve sıralandığı zaman damgasını içerir.
Örnekler
- Azure EventHubs oluşturma
EventHubConsumerClient
- Bir kullanarak olayları kullanma
BlobCheckpointStore
Oluşturma EventHubConsumerClient
Oluşturmanın EventHubConsumerClient
en kolay yolu bağlantı dizesi kullanmaktır.
from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")
oluşturmanın EventHubConsumerClient
diğer yolları için daha fazla ayrıntı için EventHubs kitaplığına bakın.
Yapılacaklar denetim noktası kullanarak olayları kullanma BlobCheckpointStore
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'
async def on_event(partition_context, event):
# Put your code here.
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name
)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)
async with client:
await client.receive(on_event)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Azure Depolama Hizmeti API'sinin farklı bir sürümüyle kullanma BlobCheckpointStore
Bazı ortamlar Azure Depolama Hizmeti API'sinin farklı sürümlerine sahiptir.
BlobCheckpointStore
varsayılan olarak Depolama Hizmeti API'sinin 2019-07-07 sürümünü kullanır. Bunu farklı bir sürümde kullanmak için, nesneyi ne zaman oluşturduğunuzu BlobCheckpointStore
belirtinapi_version
.
Sorun giderme
Genel
Günlüğe kaydetmeyi etkinleştirmek, çekimde sorun yaşama konusunda yardımcı olacaktır.
Günlüğe Kaydetme
- Günlükçü'leri kitaplıktan izlemeleri toplamak için etkinleştirin
azure.eventhub.extensions.checkpointstoreblobaio
. - Ana azure-eventhub kitaplığından izlemeleri toplamak için günlükçü'leri etkinleştirin
azure.eventhub
. - Günlükçü'leri azure depolama blob kitaplığından izlemeleri toplamak için etkinleştirin
azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage
. - Günlükçü'leri temel alınan uAMQP kitaplığından izlemeleri toplamak için etkinleştirin
uamqp
. - İstemciyi oluştururken ayarlayarak
logging_enable=True
AMQP kare düzeyi izlemesini etkinleştirin.
Sonraki adımlar
Daha fazla örnek kod
EventHubs Checkpoint Store zaman uyumsuz örneklerimizi kullanmaya başlayın.
- receive_events_using_checkpoint_store_async.py - Blob denetim noktası deposu ile EventHubConsumerClient örneği
- receive_events_using_checkpoint_store_storage_api_version_async.py - Blob denetim noktası deposu ve depolama sürümü ile EventHubConsumerClient örneği
Belgeler
Başvuru belgelerine buradan ulaşabilirsiniz.
Geri Bildirim Sağlama
Herhangi bir hatayla karşılaşırsanız veya önerileriniz varsa, lütfen projenin Sorunlar bölümünde bir sorun oluşturun.
Katkıda bulunma
Bu proje, katkı ve önerilere açıktır. Çoğu durumda, sağladığınız katkıyı kullanmamız için bize hak tanıma hakkına sahip olduğunuzu ve bu hakkı bize tanıdığınızı bildiren bir Katkıda Bulunan Lisans Sözleşmesi’ni (CLA) kabul etmeniz gerekir. Ayrıntılar için bkz. https://cla.microsoft.com.
Bir çekme isteği gönderdiğinizde, CLA robotu bir CLA sağlamanız gerekip gerekmediğini otomatik olarak belirler ve çekme isteğini uygun şekilde donatır (örn. etiket, açıklama). Robot tarafından sağlanan yönergeleri izlemeniz yeterlidir. Bu işlemi, CLA’mızı kullanarak tüm depolarda yalnızca bir kere yapmanız gerekir.
Bu proje Microsoft Open Source Code of Conduct (Microsoft Açık Kaynak Kullanım Kuralları) belgesinde listelenen kurallara uygundur. Daha fazla bilgi için Kullanım Kuralları SSS bölümüne bakın veya ek sorular veya yorumlarla iletişime geçin opencode@microsoft.com .
Azure SDK for Python