Aracılığıyla paylaş


Python için Azure Event Hubs istemci kitaplığı - sürüm 5.11.5

Azure Event Hubs saniyede milyonlarca olay alıp bunları birden çok tüketiciye aktarabilen yüksek oranda ölçeklenebilir bir yayımlama-abone olma hizmetidir. Bu, bağlı cihazlarınız ve uygulamalarınız tarafından üretilen çok büyük miktarda veriyi işlemenize ve analiz etmenizi sağlar. Event Hubs verileri topladıktan sonra, herhangi bir gerçek zamanlı analiz sağlayıcısını kullanarak veya toplu işlem/depolama bağdaştırıcılarıyla verileri alabilir, dönüştürebilir ve depolayabilirsiniz. Azure Event Hubs hakkında daha fazla bilgi edinmek isterseniz şunları gözden geçirmek isteyebilirsiniz: Event Hubs nedir?

Azure Event Hubs istemci kitaplığı, Azure Event Hubs olaylarını yayımlamaya ve kullanmaya olanak tanır ve şunları yapmak için kullanılabilir:

  • İş zekası ve tanılama amaçları için uygulamanız hakkında telemetri yayma.
  • İlgili tarafların, eylemde bulunmak için bir tetikleyici olarak gözlemleyip kullanabileceği uygulamanızın durumu hakkında olgular yayımlama.
  • İşletmenizin veya diğer ekosistemin içinde gerçekleşen ilginç işlemleri ve etkileşimleri gözlemleyin, böylece gevşek bir şekilde bağlanmış sistemleri birbirine bağlamanıza gerek kalmadan sistemler etkileşime geçebilir.
  • Bir veya daha fazla yayımcıdan etkinlikler alın, ekosisteminizin ihtiyaçlarını daha iyi karşılayacak şekilde dönüştürün ve ardından dönüştürülen etkinlikleri, tüketiciler için yeni bir akışta yayımlayın.

Kaynak kodu | Paket (PyPi) | Paket (Conda) | API başvuru belgeleri | Ürün belgeleri | Örnekleri

Başlarken

Önkoşullar

  • Python 3.7 veya üzeri.

  • Microsoft Azure Aboneliği: Azure Event Hubs dahil olmak üzere Azure hizmetlerini kullanmak için bir aboneliğe ihtiyacınız vardır. Mevcut bir Azure hesabınız yoksa ücretsiz deneme sürümüne kaydolabilir veya hesap oluştururken MSDN abone avantajlarınızı kullanabilirsiniz.

  • Olay Hub'ı ile Event Hubs ad alanı: Azure Event Hubs etkileşime geçmek için bir ad alanı ve Olay Hub'ına da sahip olmanız gerekir. Azure kaynakları oluşturma hakkında bilginiz yoksa, Azure portal kullanarak Olay Hub'ı oluşturmak için adım adım kılavuzu izlemek isteyebilirsiniz. Burada, Olay Hub'ı oluşturmak için Azure CLI, Azure PowerShell veya Azure Resource Manager (ARM) şablonlarını kullanmaya yönelik ayrıntılı yönergeleri de bulabilirsiniz.

Paketi yükleme

Pip ile Python için Azure Event Hubs istemci kitaplığını yükleyin:

$ pip install azure-eventhub

İstemcinin kimliğini doğrulama

Event Hubs ile etkileşim, EventHubConsumerClient veya EventHubProducerClient sınıfının bir örneğiyle başlar. İstemci nesnesinin örneğini oluşturmak için konak adı, SAS/AAD kimlik bilgileri ve olay hub'ı adı veya bağlantı dizesi gerekir.

bağlantı dizesi'den istemci oluşturma:

Event Hubs istemci kitaplığının bir Olay Hub'ı ile etkileşim kurması için en kolay araç, Event Hubs ad alanı oluşturulurken otomatik olarak oluşturulan bir bağlantı dizesi kullanmaktır. Azure'da paylaşılan erişim ilkeleri hakkında bilginiz yoksa, Event Hubs bağlantı dizesi almak için adım adım kılavuzu izlemek isteyebilirsiniz.

  • yöntemi, from_connection_string form Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> ve varlık adının bağlantı dizesi Olay Hub'ı örneğine götürür. bağlantı dizesi Azure portal alabilirsiniz.

azure-identity kitaplığını kullanarak istemci oluşturma:

Alternatif olarak, azure-identity paketiyle AAD aracılığıyla kimlik doğrulaması yapmak için Credential nesnesini kullanabilirsiniz.

  • Yukarıdaki bağlantılı örnekte gösterildiği gibi bu oluşturucu, Event Hub örneğinizin ana bilgisayar adını ve varlık adını ve TokenCredential protokolünün uygulandığı kimlik bilgilerini alır. Azure-identity paketindeTokenCredential kullanılabilen protokol uygulamaları vardır. Ana bilgisayar adı biçimindedir <yournamespace.servicebus.windows.net>.
  • tarafından azure-identitysağlanan kimlik bilgisi türlerini kullanmak için lütfen paketini yükleyin: pip install azure-identity
  • Ayrıca, zaman uyumsuz API'yi kullanmak için önce gibi aiohttpbir zaman uyumsuz aktarım yüklemeniz gerekir: pip install aiohttp
  • Azure Active Directory kullanırken sorumlunuza Event Hubs'a erişim izni veren Azure Event Hubs Veri Sahibi rolü gibi bir rol atanmalıdır. Event Hubs ile Azure Active Directory yetkilendirmesini kullanma hakkında daha fazla bilgi için lütfen ilişkili belgelere bakın.

Önemli kavramlar

  • EventHubProducerClient, ekli cihaz çözümü, mobil cihaz uygulaması, konsol veya başka bir cihazda çalışan oyun başlığı, bazı istemci veya sunucu tabanlı iş çözümü ya da bir web sitesinin parçası olarak telemetri verileri, tanılama bilgileri, kullanım günlükleri veya diğer günlük verilerinin kaynağıdır.

  • EventHubConsumerClient bu tür bilgileri Olay Hub'ından alır ve işler. İşleme toplama, karmaşık hesaplama ve filtreleme içerebilir. İşleme, bilgilerin ham veya dönüştürülmüş bir şekilde dağıtılmasını veya depolanmasını da içerebilir. Event Hub tüketicileri genellikle Azure Stream Analytics, Apache Spark veya Apache Storm gibi yerleşik analiz özelliklerine sahip sağlam ve yüksek ölçekli platform altyapı parçalarıdır.

  • Bölüm, Olay Hub'ında tutulan sıralı bir olay dizisidir. Azure Event Hubs, her tüketicinin ileti akışının yalnızca belirli bir alt kümesini veya bölümünü okuduğu bölümlenmiş bir tüketici deseni üzerinden ileti akışı sağlar. Yeni olaylar geldikçe dizinin sonuna eklenir. Bölüm sayısı bir Olay Hub'ı oluşturulduğunda belirtilir ve değiştirilemez.

  • Tüketici grubu, Olay Hub'ının tamamının görünümüdür. Tüketici grupları, her biri olay akışının ayrı bir görünümüne sahip olmak ve akışı kendi hızlarında ve kendi konumlarından bağımsız olarak okumak için birden çok kullanan uygulama sağlar. Tüketici grubu başına bir bölümde en fazla 5 eşzamanlı okuyucu olabilir; ancak belirli bir bölüm ve tüketici grubu eşleştirmesi için yalnızca bir etkin tüketici olması önerilir. Her etkin okuyucu, bölümünden tüm olayları alır; Aynı bölümde birden çok okuyucu varsa, yinelenen olayları alırlar.

Daha fazla kavram ve daha ayrıntılı tartışma için bkz. Event Hubs Özellikleri. Ayrıca AMQP kavramları OASIS Advanced Messaging Queuing Protocol (AMQP) Sürüm 1.0'da da belgelenmiştir.

İş parçacığı güvenliği

EventHubProducerClient veya EventHubConsumerClient'ın iş parçacığı güvenli olduğunu garanti etmeyiz. Bu örneklerin iş parçacıkları arasında yeniden kullanılması önerilmez. Bu sınıfları iş parçacığı güvenli bir şekilde kullanmak çalışan uygulamaya bağlı.

Veri modeli türü iş EventDataBatch parçacığı açısından güvenli değildir. İş parçacıkları arasında paylaşılmamalı veya istemci yöntemleriyle eşzamanlı olarak kullanılmamalıdır.

Örnekler

Aşağıdaki bölümlerde, en yaygın Event Hubs görevlerinden bazılarını kapsayan çeşitli kod parçacıkları sağlanır:

Olay Hub'larını inceleme

Olay Hub'ının bölüm kimliklerini alma.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Olayları Olay Hub'ına yayımlama

create_batchEventHubProducerClient Yöntemini kullanarak gönderilebilen send_batch bir EventDataBatch nesne oluşturmak için yöntemini kullanın. Bayt cinsinden en büyük toplu iş boyutu sınırına ulaşılana EventDataBatch kadar yöntemi kullanılarak add olaylar eklenebilir.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Olay Hub'ından olayları kullanma

EventHub'daki olayları kullanmanın birden çok yolu vardır. Yalnızca bir olay alındığında EventHubConsumerClient.receive geri çağırma tetikleme yöntemi aşağıdaki gibi kullanılır:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Olay Hub'ından olayları toplu olarak kullanma

Yukarıdaki örnek alınan her ileti için geri çağırmayı tetiklerken, aşağıdaki örnek bir dizi olay üzerinde geri çağırmayı tetikler ve bir kerede bir sayı almaya çalışır.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

Olay Hub'ına olayları zaman uyumsuz olarak yayımlama

create_batchEventHubProducer Yöntemini kullanarak gönderilebilen send_batch bir EventDataBatch nesne oluşturmak için yöntemini kullanın. Bayt cinsinden en büyük toplu iş boyutu sınırına ulaşılana EventDataBatch kadar yöntemi kullanılarak add olaylar eklenebilir.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

Olay Hub'ından olayları zaman uyumsuz olarak kullanma

Bu SDK hem zaman uyumlu hem de zaman uyumsuz tabanlı kodu destekler. Yukarıdaki örneklerde gösterildiği gibi almak için ancak aio içinde aşağıdakiler gerekir:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

Olay Hub'ından gelen olayları zaman uyumsuz olarak toplu olarak kullanma

Tüm zaman uyumlu işlevler aio'da da desteklenir. Zaman uyumlu toplu alındı bilgisi için yukarıda gösterildiği gibi, zaman uyumsuz olarak aşağıdaki gibi aynı işlemi gerçekleştirebilirsiniz:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Olayları kullanma ve denetim noktası deposu kullanarak denetim noktalarını kaydetme

EventHubConsumerClient aynı anda birden çok bölümden olay almanıza ve aynı Olay Hub'ını ve tüketici grubunu kullanan diğer tüketicilerle yük dengelemenizi sağlayan üst düzey bir yapıdır.

Bu, kullanıcının olaylar denetim noktaları kullanılarak işlendiğinde ilerleme durumunu izlemesine de olanak tanır.

Denetim noktası, olay hub'ı örneğindeki bir tüketici grubunun belirli bir bölümünden kullanıcı tarafından başarıyla işlenen son olayı temsil etmeye yöneliktir. , EventHubConsumerClient denetim noktalarını güncelleştirmek ve yük dengeleme algoritmasının gerektirdiği ilgili bilgileri depolamak için bir örneğini CheckpointStore kullanır.

Bunu destekleyen paketleri bulmak ve böyle bir paketten uygulamayı kullanmak CheckpointStore için pypi'yi ön ekiyle azure-eventhub-checkpointstore arayın. Hem eşitleme hem de zaman uyumsuz kitaplıkların sağlandığını lütfen unutmayın.

Aşağıdaki örnekte öğesinin bir örneğini EventHubConsumerClient oluşturur ve kullanırız BlobCheckpointStore. Kodu çalıştırmak için bir Azure Depolama hesabı ve blob kapsayıcısı oluşturmanız gerekir.

Azure Blob Depolama Checkpoint Store Async ve Azure Blob Depolama Checkpoint Store Eşitlemesi, kalıcı depo olarak Azure Blob Depolama uygulanan sağladığımız uygulamalardan biridirCheckpointStore.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

IoT Hub ile çalışmak için EventHubConsumerClient kullanma

IoT Hub ile çalışmak için de kullanabilirsinizEventHubConsumerClient. Bu, bağlı EventHub'dan IoT Hub telemetri verilerini almak için kullanışlıdır. İlişkili bağlantı dizesi gönderme talepleri olmaz, bu nedenle olay göndermek mümkün değildir.

bağlantı dizesi Event Hub uyumlu bir uç nokta için olması gerektiğini unutmayın; örneğin, "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Event Hubs uyumlu uç noktayı almanın iki yolu vardır:

  • Azure Portal'da IoT Hub "Yerleşik uç noktaları" el ile alın ve buradan alın.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Sorun giderme

azure-eventhubs Çeşitli hata senaryolarını tanılama hakkında ayrıntılı bilgi için sorun giderme kılavuzuna bakın.

Sonraki adımlar

Daha fazla örnek kod

Event Hubs'a /Event Hubs'a olay gönderip almak için bu kitaplığın nasıl kullanılacağına ilişkin ayrıntılı örnekler için samples dizinine göz atın.

Belgeler

Başvuru belgelerine buradan ulaşabilirsiniz.

Şema Kayıt Defteri ve Avro Kodlayıcısı

EventHubs SDK'sı Schema Registry hizmeti ve Avro ile sorunsuz bir şekilde tümleşir. Daha fazla bilgi için bkz . Şema Kayıt Defteri SDK'sı ve Şema Kayıt Defteri Avro Kodlayıcı SDK'sı.

Pure Python AMQP Aktarım ve Geriye Dönük Uyumluluk Desteği

Azure Event Hubs istemci kitaplığı artık saf Python AMQP uygulamasını temel alır. uAMQP gerekli bağımlılık olarak kaldırıldı.

Temel alınan aktarım olarak kullanmak uAMQP için:

  1. pip ile yükleyin uamqp .
$ pip install uamqp 
  1. İstemci oluşturma sırasında geçin uamqp_transport=True .
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Not: message üzerinde daha önce uamqp.Messageöğesini kullanıma sunan özniteliği EventData/EventDataBatchkullanım dışı bırakılmıştır. tarafından EventData.message/EventDataBatch.message döndürülen "Eski" nesneler geçişi kolaylaştırmaya yardımcı olmak için kullanıma sunulmuştur.

Kaynaktan uAMQP tekerleği oluşturma

uAMQP, için azure-eventhubtemel AMQP protokolü uygulaması olarak kullanılması amaçlanıyorsa, çoğu büyük işletim sistemi için uAMQP tekerlekleri bulunabilir.

Kullanmayı uAMQP planlıyorsanız ve uAMQP tekerleklerinin sağlanmadığı bir platformda çalışıyorsanız, kaynaktan yüklemek için lütfen uAMQP Yükleme yönergelerini izleyin.

Geri Bildirim Sağlama

Hatalarla karşılaşırsanız veya önerileriniz varsa lütfen projenin Sorunlar bölümünde bir sorun oluşturun.

Katkıda bulunma

Bu proje, katkı ve önerilere açıktır. Çoğu durumda, sağladığınız katkıyı kullanmamız için bize hak tanıma hakkına sahip olduğunuzu ve bu hakkı bize tanıdığınızı bildiren bir Katkıda Bulunan Lisans Sözleşmesi’ni (CLA) kabul etmeniz gerekir. Ayrıntılar için bkz. https://cla.microsoft.com.

Bir çekme isteği gönderdiğinizde, CLA robotu bir CLA sağlamanız gerekip gerekmediğini otomatik olarak belirler ve çekme isteğini uygun şekilde donatır (örn. etiket, açıklama). Robot tarafından sağlanan yönergeleri izlemeniz yeterlidir. Bu işlemi, CLA’mızı kullanarak tüm depolarda yalnızca bir kere yapmanız gerekir.

Bu proje Microsoft Open Source Code of Conduct (Microsoft Açık Kaynak Kullanım Kuralları) belgesinde listelenen kurallara uygundur. Daha fazla bilgi için Kullanım Kuralları SSS bölümüne bakın veya ek sorularınız veya yorumlarınızla iletişime geçin opencode@microsoft.com .

İzlenimler