Python için Azure Event Hubs istemci kitaplığı - sürüm 5.11.5
Azure Event Hubs saniyede milyonlarca olay alıp bunları birden çok tüketiciye aktarabilen yüksek oranda ölçeklenebilir bir yayımlama-abone olma hizmetidir. Bu, bağlı cihazlarınız ve uygulamalarınız tarafından üretilen çok büyük miktarda veriyi işlemenize ve analiz etmenizi sağlar. Event Hubs verileri topladıktan sonra, herhangi bir gerçek zamanlı analiz sağlayıcısını kullanarak veya toplu işlem/depolama bağdaştırıcılarıyla verileri alabilir, dönüştürebilir ve depolayabilirsiniz. Azure Event Hubs hakkında daha fazla bilgi edinmek isterseniz şunları gözden geçirmek isteyebilirsiniz: Event Hubs nedir?
Azure Event Hubs istemci kitaplığı, Azure Event Hubs olaylarını yayımlamaya ve kullanmaya olanak tanır ve şunları yapmak için kullanılabilir:
- İş zekası ve tanılama amaçları için uygulamanız hakkında telemetri yayma.
- İlgili tarafların, eylemde bulunmak için bir tetikleyici olarak gözlemleyip kullanabileceği uygulamanızın durumu hakkında olgular yayımlama.
- İşletmenizin veya diğer ekosistemin içinde gerçekleşen ilginç işlemleri ve etkileşimleri gözlemleyin, böylece gevşek bir şekilde bağlanmış sistemleri birbirine bağlamanıza gerek kalmadan sistemler etkileşime geçebilir.
- Bir veya daha fazla yayımcıdan etkinlikler alın, ekosisteminizin ihtiyaçlarını daha iyi karşılayacak şekilde dönüştürün ve ardından dönüştürülen etkinlikleri, tüketiciler için yeni bir akışta yayımlayın.
Kaynak kodu | Paket (PyPi) | Paket (Conda) | API başvuru belgeleri | Ürün belgeleri | Örnekleri
Başlarken
Önkoşullar
Python 3.7 veya üzeri.
Microsoft Azure Aboneliği: Azure Event Hubs dahil olmak üzere Azure hizmetlerini kullanmak için bir aboneliğe ihtiyacınız vardır. Mevcut bir Azure hesabınız yoksa ücretsiz deneme sürümüne kaydolabilir veya hesap oluştururken MSDN abone avantajlarınızı kullanabilirsiniz.
Olay Hub'ı ile Event Hubs ad alanı: Azure Event Hubs etkileşime geçmek için bir ad alanı ve Olay Hub'ına da sahip olmanız gerekir. Azure kaynakları oluşturma hakkında bilginiz yoksa, Azure portal kullanarak Olay Hub'ı oluşturmak için adım adım kılavuzu izlemek isteyebilirsiniz. Burada, Olay Hub'ı oluşturmak için Azure CLI, Azure PowerShell veya Azure Resource Manager (ARM) şablonlarını kullanmaya yönelik ayrıntılı yönergeleri de bulabilirsiniz.
Paketi yükleme
Pip ile Python için Azure Event Hubs istemci kitaplığını yükleyin:
$ pip install azure-eventhub
İstemcinin kimliğini doğrulama
Event Hubs ile etkileşim, EventHubConsumerClient veya EventHubProducerClient sınıfının bir örneğiyle başlar. İstemci nesnesinin örneğini oluşturmak için konak adı, SAS/AAD kimlik bilgileri ve olay hub'ı adı veya bağlantı dizesi gerekir.
bağlantı dizesi'den istemci oluşturma:
Event Hubs istemci kitaplığının bir Olay Hub'ı ile etkileşim kurması için en kolay araç, Event Hubs ad alanı oluşturulurken otomatik olarak oluşturulan bir bağlantı dizesi kullanmaktır. Azure'da paylaşılan erişim ilkeleri hakkında bilginiz yoksa, Event Hubs bağlantı dizesi almak için adım adım kılavuzu izlemek isteyebilirsiniz.
- yöntemi,
from_connection_string
formEndpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey>
ve varlık adının bağlantı dizesi Olay Hub'ı örneğine götürür. bağlantı dizesi Azure portal alabilirsiniz.
azure-identity kitaplığını kullanarak istemci oluşturma:
Alternatif olarak, azure-identity paketiyle AAD aracılığıyla kimlik doğrulaması yapmak için Credential nesnesini kullanabilirsiniz.
- Yukarıdaki bağlantılı örnekte gösterildiği gibi bu oluşturucu, Event Hub örneğinizin ana bilgisayar adını ve varlık adını ve TokenCredential protokolünün uygulandığı kimlik bilgilerini alır. Azure-identity paketinde
TokenCredential
kullanılabilen protokol uygulamaları vardır. Ana bilgisayar adı biçimindedir<yournamespace.servicebus.windows.net>
. - tarafından
azure-identity
sağlanan kimlik bilgisi türlerini kullanmak için lütfen paketini yükleyin:pip install azure-identity
- Ayrıca, zaman uyumsuz API'yi kullanmak için önce gibi
aiohttp
bir zaman uyumsuz aktarım yüklemeniz gerekir:pip install aiohttp
- Azure Active Directory kullanırken sorumlunuza Event Hubs'a erişim izni veren Azure Event Hubs Veri Sahibi rolü gibi bir rol atanmalıdır. Event Hubs ile Azure Active Directory yetkilendirmesini kullanma hakkında daha fazla bilgi için lütfen ilişkili belgelere bakın.
Önemli kavramlar
EventHubProducerClient, ekli cihaz çözümü, mobil cihaz uygulaması, konsol veya başka bir cihazda çalışan oyun başlığı, bazı istemci veya sunucu tabanlı iş çözümü ya da bir web sitesinin parçası olarak telemetri verileri, tanılama bilgileri, kullanım günlükleri veya diğer günlük verilerinin kaynağıdır.
EventHubConsumerClient bu tür bilgileri Olay Hub'ından alır ve işler. İşleme toplama, karmaşık hesaplama ve filtreleme içerebilir. İşleme, bilgilerin ham veya dönüştürülmüş bir şekilde dağıtılmasını veya depolanmasını da içerebilir. Event Hub tüketicileri genellikle Azure Stream Analytics, Apache Spark veya Apache Storm gibi yerleşik analiz özelliklerine sahip sağlam ve yüksek ölçekli platform altyapı parçalarıdır.
Bölüm, Olay Hub'ında tutulan sıralı bir olay dizisidir. Azure Event Hubs, her tüketicinin ileti akışının yalnızca belirli bir alt kümesini veya bölümünü okuduğu bölümlenmiş bir tüketici deseni üzerinden ileti akışı sağlar. Yeni olaylar geldikçe dizinin sonuna eklenir. Bölüm sayısı bir Olay Hub'ı oluşturulduğunda belirtilir ve değiştirilemez.
Tüketici grubu, Olay Hub'ının tamamının görünümüdür. Tüketici grupları, her biri olay akışının ayrı bir görünümüne sahip olmak ve akışı kendi hızlarında ve kendi konumlarından bağımsız olarak okumak için birden çok kullanan uygulama sağlar. Tüketici grubu başına bir bölümde en fazla 5 eşzamanlı okuyucu olabilir; ancak belirli bir bölüm ve tüketici grubu eşleştirmesi için yalnızca bir etkin tüketici olması önerilir. Her etkin okuyucu, bölümünden tüm olayları alır; Aynı bölümde birden çok okuyucu varsa, yinelenen olayları alırlar.
Daha fazla kavram ve daha ayrıntılı tartışma için bkz. Event Hubs Özellikleri. Ayrıca AMQP kavramları OASIS Advanced Messaging Queuing Protocol (AMQP) Sürüm 1.0'da da belgelenmiştir.
İş parçacığı güvenliği
EventHubProducerClient veya EventHubConsumerClient'ın iş parçacığı güvenli olduğunu garanti etmeyiz. Bu örneklerin iş parçacıkları arasında yeniden kullanılması önerilmez. Bu sınıfları iş parçacığı güvenli bir şekilde kullanmak çalışan uygulamaya bağlı.
Veri modeli türü iş EventDataBatch
parçacığı açısından güvenli değildir. İş parçacıkları arasında paylaşılmamalı veya istemci yöntemleriyle eşzamanlı olarak kullanılmamalıdır.
Örnekler
Aşağıdaki bölümlerde, en yaygın Event Hubs görevlerinden bazılarını kapsayan çeşitli kod parçacıkları sağlanır:
- Olay Hub'larını inceleme
- Olayları Olay Hub'ına yayımlama
- Olay Hub'ından olayları kullanma
- Olay Hub'ından olayları toplu olarak kullanma
- Olay Hub'ına olayları zaman uyumsuz olarak yayımlama
- Olay Hub'ından olayları zaman uyumsuz olarak kullanma
- Olay Hub'ından gelen olayları zaman uyumsuz olarak toplu olarak kullanma
- Olayları kullanma ve denetim noktası deposu kullanarak denetim noktalarını kaydetme
- IoT Hub ile çalışmak için EventHubConsumerClient kullanma
Olay Hub'larını inceleme
Olay Hub'ının bölüm kimliklerini alma.
import os
from azure.eventhub import EventHubConsumerClient
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STR,
consumer_group='$Default',
eventhub_name=EVENTHUB_NAME,
)
with consumer_client:
pass # consumer_client is now ready to be used.
Olayları Olay Hub'ına yayımlama
create_batch
EventHubProducerClient
Yöntemini kullanarak gönderilebilen send_batch
bir EventDataBatch
nesne oluşturmak için yöntemini kullanın.
Bayt cinsinden en büyük toplu iş boyutu sınırına ulaşılana EventDataBatch
kadar yöntemi kullanılarak add
olaylar eklenebilir.
def send_event_data_batch(producer):
# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.
event_data_batch = producer.create_batch()
event_data_batch.add(EventData('Single message'))
producer.send_batch(event_data_batch)
Olay Hub'ından olayları kullanma
EventHub'daki olayları kullanmanın birden çok yolu vardır. Yalnızca bir olay alındığında EventHubConsumerClient.receive
geri çağırma tetikleme yöntemi aşağıdaki gibi kullanılır:
import logging
from azure.eventhub import EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
def on_event(partition_context, event):
logger.info("Received event from partition {}".format(partition_context.partition_id))
partition_context.update_checkpoint(event)
with client:
client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive(on_event=on_event, partition_id='0')
Olay Hub'ından olayları toplu olarak kullanma
Yukarıdaki örnek alınan her ileti için geri çağırmayı tetiklerken, aşağıdaki örnek bir dizi olay üzerinde geri çağırmayı tetikler ve bir kerede bir sayı almaya çalışır.
import logging
from azure.eventhub import EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
def on_event_batch(partition_context, events):
logger.info("Received event from partition {}".format(partition_context.partition_id))
partition_context.update_checkpoint()
with client:
client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# client.receive_batch(on_event_batch=on_event_batch, partition_id='0')
Olay Hub'ına olayları zaman uyumsuz olarak yayımlama
create_batch
EventHubProducer
Yöntemini kullanarak gönderilebilen send_batch
bir EventDataBatch
nesne oluşturmak için yöntemini kullanın.
Bayt cinsinden en büyük toplu iş boyutu sınırına ulaşılana EventDataBatch
kadar yöntemi kullanılarak add
olaylar eklenebilir.
import asyncio
from azure.eventhub.aio import EventHubProducerClient # The package name suffixed with ".aio" for async
from azure.eventhub import EventData
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
async def create_batch(client):
event_data_batch = await client.create_batch()
can_add = True
while can_add:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.
return event_data_batch
async def send():
client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
batch_data = await create_batch(client)
async with client:
await client.send_batch(batch_data)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(send())
Olay Hub'ından olayları zaman uyumsuz olarak kullanma
Bu SDK hem zaman uyumlu hem de zaman uyumsuz tabanlı kodu destekler. Yukarıdaki örneklerde gösterildiği gibi almak için ancak aio içinde aşağıdakiler gerekir:
import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
async def on_event(partition_context, event):
logger.info("Received event from partition {}".format(partition_context.partition_id))
await partition_context.update_checkpoint(event)
async def receive():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
async with client:
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# await client.receive(on_event=on_event, partition_id='0')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(receive())
Olay Hub'ından gelen olayları zaman uyumsuz olarak toplu olarak kullanma
Tüm zaman uyumlu işlevler aio'da da desteklenir. Zaman uyumlu toplu alındı bilgisi için yukarıda gösterildiği gibi, zaman uyumsuz olarak aşağıdaki gibi aynı işlemi gerçekleştirebilirsiniz:
import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)
async def on_event_batch(partition_context, events):
logger.info("Received event from partition {}".format(partition_context.partition_id))
await partition_context.update_checkpoint()
async def receive_batch():
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
async with client:
await client.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
# receive events from specified partition:
# await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(receive_batch())
Olayları kullanma ve denetim noktası deposu kullanarak denetim noktalarını kaydetme
EventHubConsumerClient
aynı anda birden çok bölümden olay almanıza ve aynı Olay Hub'ını ve tüketici grubunu kullanan diğer tüketicilerle yük dengelemenizi sağlayan üst düzey bir yapıdır.
Bu, kullanıcının olaylar denetim noktaları kullanılarak işlendiğinde ilerleme durumunu izlemesine de olanak tanır.
Denetim noktası, olay hub'ı örneğindeki bir tüketici grubunun belirli bir bölümünden kullanıcı tarafından başarıyla işlenen son olayı temsil etmeye yöneliktir. , EventHubConsumerClient
denetim noktalarını güncelleştirmek ve yük dengeleme algoritmasının gerektirdiği ilgili bilgileri depolamak için bir örneğini CheckpointStore
kullanır.
Bunu destekleyen paketleri bulmak ve böyle bir paketten uygulamayı kullanmak CheckpointStore
için pypi'yi ön ekiyle azure-eventhub-checkpointstore
arayın. Hem eşitleme hem de zaman uyumsuz kitaplıkların sağlandığını lütfen unutmayın.
Aşağıdaki örnekte öğesinin bir örneğini EventHubConsumerClient
oluşturur ve kullanırız BlobCheckpointStore
. Kodu çalıştırmak için bir Azure Depolama hesabı ve blob kapsayıcısı oluşturmanız gerekir.
Azure Blob Depolama Checkpoint Store Async ve Azure Blob Depolama Checkpoint Store Eşitlemesi, kalıcı depo olarak Azure Blob Depolama uygulanan sağladığımız uygulamalardan biridirCheckpointStore
.
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'
async def on_event(partition_context, event):
# do something
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def receive(client):
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store, # For load balancing and checkpoint. Leave None for no load balancing
)
async with client:
await receive(client)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
IoT Hub ile çalışmak için EventHubConsumerClient kullanma
IoT Hub ile çalışmak için de kullanabilirsinizEventHubConsumerClient
. Bu, bağlı EventHub'dan IoT Hub telemetri verilerini almak için kullanışlıdır. İlişkili bağlantı dizesi gönderme talepleri olmaz, bu nedenle olay göndermek mümkün değildir.
bağlantı dizesi Event Hub uyumlu bir uç nokta için olması gerektiğini unutmayın; örneğin, "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"
Event Hubs uyumlu uç noktayı almanın iki yolu vardır:
- Azure Portal'da IoT Hub "Yerleşik uç noktaları" el ile alın ve buradan alın.
from azure.eventhub import EventHubConsumerClient
connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)
partition_ids = client.get_partition_ids()
- Yerleşik Event Hubs uyumlu uç noktayı program aracılığıyla alın. IoT Hub Bağlantı Dizesi Örneğine bakın.
Sorun giderme
azure-eventhubs
Çeşitli hata senaryolarını tanılama hakkında ayrıntılı bilgi için sorun giderme kılavuzuna bakın.
Sonraki adımlar
Daha fazla örnek kod
Event Hubs'a /Event Hubs'a olay gönderip almak için bu kitaplığın nasıl kullanılacağına ilişkin ayrıntılı örnekler için samples dizinine göz atın.
Belgeler
Başvuru belgelerine buradan ulaşabilirsiniz.
Şema Kayıt Defteri ve Avro Kodlayıcısı
EventHubs SDK'sı Schema Registry hizmeti ve Avro ile sorunsuz bir şekilde tümleşir. Daha fazla bilgi için bkz . Şema Kayıt Defteri SDK'sı ve Şema Kayıt Defteri Avro Kodlayıcı SDK'sı.
Pure Python AMQP Aktarım ve Geriye Dönük Uyumluluk Desteği
Azure Event Hubs istemci kitaplığı artık saf Python AMQP uygulamasını temel alır. uAMQP
gerekli bağımlılık olarak kaldırıldı.
Temel alınan aktarım olarak kullanmak uAMQP
için:
- pip ile yükleyin
uamqp
.
$ pip install uamqp
- İstemci oluşturma sırasında geçin
uamqp_transport=True
.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubProducerClient.from_connection_string(
connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)
Not: message
üzerinde daha önce uamqp.Message
öğesini kullanıma sunan özniteliği EventData
/EventDataBatch
kullanım dışı bırakılmıştır.
tarafından EventData.message
/EventDataBatch.message
döndürülen "Eski" nesneler geçişi kolaylaştırmaya yardımcı olmak için kullanıma sunulmuştur.
Kaynaktan uAMQP tekerleği oluşturma
uAMQP, için azure-eventhub
temel AMQP protokolü uygulaması olarak kullanılması amaçlanıyorsa, çoğu büyük işletim sistemi için uAMQP tekerlekleri bulunabilir.
Kullanmayı uAMQP
planlıyorsanız ve uAMQP tekerleklerinin sağlanmadığı bir platformda çalışıyorsanız, kaynaktan yüklemek için lütfen uAMQP Yükleme yönergelerini izleyin.
Geri Bildirim Sağlama
Hatalarla karşılaşırsanız veya önerileriniz varsa lütfen projenin Sorunlar bölümünde bir sorun oluşturun.
Katkıda bulunma
Bu proje, katkı ve önerilere açıktır. Çoğu durumda, sağladığınız katkıyı kullanmamız için bize hak tanıma hakkına sahip olduğunuzu ve bu hakkı bize tanıdığınızı bildiren bir Katkıda Bulunan Lisans Sözleşmesi’ni (CLA) kabul etmeniz gerekir. Ayrıntılar için bkz. https://cla.microsoft.com.
Bir çekme isteği gönderdiğinizde, CLA robotu bir CLA sağlamanız gerekip gerekmediğini otomatik olarak belirler ve çekme isteğini uygun şekilde donatır (örn. etiket, açıklama). Robot tarafından sağlanan yönergeleri izlemeniz yeterlidir. Bu işlemi, CLA’mızı kullanarak tüm depolarda yalnızca bir kere yapmanız gerekir.
Bu proje Microsoft Open Source Code of Conduct (Microsoft Açık Kaynak Kullanım Kuralları) belgesinde listelenen kurallara uygundur. Daha fazla bilgi için Kullanım Kuralları SSS bölümüne bakın veya ek sorularınız veya yorumlarınızla iletişime geçin opencode@microsoft.com .
Azure SDK for Python