Bagikan melalui


pustaka klien Azure Event Hubs untuk Python - versi 5.11.5

Azure Event Hubs adalah layanan terbitkan-berlangganan yang sangat dapat diskalakan yang dapat menyerap jutaan peristiwa per detik dan mengalirkannya ke beberapa konsumen. Ini memungkinkan Anda memproses dan menganalisis sejumlah besar data yang dihasilkan oleh perangkat dan aplikasi Anda yang terhubung. Setelah Azure Event Hubs mengumpulkan data, Anda dapat mengambil, mengubah, dan menyimpannya dengan menggunakan penyedia analitik real-time atau dengan adaptor batching/penyimpanan. Jika Anda ingin tahu lebih banyak tentang Azure Event Hubs, Anda mungkin ingin meninjau: Apa itu Azure Event Hubs?

Pustaka klien Azure Event Hubs memungkinkan Anda untuk menerbitkan dan mengkonsumsi peristiwa Azure Event Hubs dan dapat digunakan untuk:

  • Mengeluarkan telemetri tentang aplikasi Anda untuk kecerdasan bisnis dan tujuan diagnostik.
  • Mempublikasikan fakta tentang kondisi aplikasi Anda yang dapat diamati dan digunakan oleh pihak yang berkepentingan sebagai pemicu untuk mengambil tindakan.
  • Mengamati operasi dan interaksi menarik yang terjadi dalam bisnis Anda atau ekosistem lainnya, memungkinkan sistem yang digabungkan secara longgar untuk berinteraksi tanpa perlu mengikatnya bersama.
  • Menerima peristiwa dari satu penerbit atau lebih, mengubahnya untuk lebih memenuhi kebutuhan ekosistem Anda, kemudian mempublikasikan peristiwa yang diubah ke aliran baru untuk diamati konsumen.

Kode sumber | Paket (PyPi) | Paket (Conda) | Dokumentasi | referensi APIDokumentasi | produkSampel

Memulai

Prasyarat

  • Python 3.7 atau yang lebih baru.

  • Langganan Microsoft Azure: Untuk menggunakan layanan Azure, termasuk Azure Event Hubs, Anda memerlukan langganan. Jika Anda tidak memiliki akun Azure yang sudah ada, Anda dapat mendaftar untuk uji coba gratis atau menggunakan manfaat pelanggan MSDN saat membuat akun.

  • Namespace layanan Azure Event Hubs dengan Pusat Aktivitas: Untuk berinteraksi dengan Azure Event Hubs, Anda juga harus memiliki namespace layanan dan Pusat Aktivitas yang tersedia. Jika Anda tidak terbiasa membuat sumber daya Azure, Anda mungkin ingin mengikuti panduan langkah demi langkah untuk membuat Pusat Aktivitas menggunakan portal Azure. Di sana, Anda juga dapat menemukan instruksi terperinci untuk menggunakan templat Azure CLI, Azure PowerShell, atau Azure Resource Manager (ARM) untuk membuat Pusat Aktivitas.

Instal paketnya

Instal pustaka klien Azure Event Hubs untuk Python dengan pip:

$ pip install azure-eventhub

Mengautentikasi klien

Interaksi dengan Azure Event Hubs dimulai dengan instans kelas EventHubConsumerClient atau EventHubProducerClient. Anda memerlukan nama host, kredensial SAS/AAD, dan nama hub peristiwa atau string koneksi untuk membuat instans objek klien.

Buat klien dari string koneksi:

Agar pustaka klien Azure Event Hubs berinteraksi dengan Pusat Aktivitas, cara termampunya adalah menggunakan string koneksi, yang dibuat secara otomatis saat membuat namespace layanan Azure Event Hubs. Jika Anda tidak terbiasa dengan kebijakan akses bersama di Azure, Anda mungkin ingin mengikuti panduan langkah demi langkah untuk mendapatkan azure Event Hubs string koneksi.

  • Metode ini from_connection_string mengambil string koneksi nama formulir Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> dan entitas ke instans Pusat Aktivitas Anda. Anda bisa mendapatkan string koneksi dari portal Azure.

Buat klien menggunakan pustaka azure-identity:

Secara bergantian, seseorang dapat menggunakan objek Kredensial untuk mengautentikasi melalui AAD dengan paket azure-identity.

  • Konstruktor ini menunjukkan dalam sampel yang ditautkan di atas mengambil nama host dan nama entitas instans Pusat Aktivitas dan kredensial Anda yang mengimplementasikan protokol TokenCredential . Ada implementasi protokol yang TokenCredential tersedia dalam paket azure-identity. Nama host berformat <yournamespace.servicebus.windows.net>.
  • Untuk menggunakan jenis kredensial yang disediakan oleh azure-identity, silakan instal paket: pip install azure-identity
  • Selain itu, untuk menggunakan API asinkron, Anda harus terlebih dahulu menginstal transportasi asinkron, seperti aiohttp: pip install aiohttp
  • Saat menggunakan Azure Active Directory, prinsipal Anda harus diberi peran yang memungkinkan akses ke Azure Event Hubs, seperti peran Pemilik Data Azure Event Hubs. Untuk informasi selengkapnya tentang menggunakan otorisasi Azure Active Directory dengan Azure Event Hubs, silakan lihat dokumentasi terkait.

Konsep utama

  • EventHubProducerClient adalah sumber data telemetri, informasi diagnostik, log penggunaan, atau data log lainnya, sebagai bagian dari solusi perangkat yang disematkan, aplikasi perangkat seluler, judul game yang berjalan di konsol atau perangkat lain, beberapa solusi bisnis berbasis klien atau server, atau situs web.

  • EventHubConsumerClient mengambil informasi tersebut dari Pusat Aktivitas dan memprosesnya. Pemrosesan dapat melibatkan agregasi, komputasi kompleks, dan pemfilteran. Pemrosesan juga dapat melibatkan distribusi atau penyimpanan informasi secara mentah atau dalam bentuk yang telah diubah. Konsumen Event Hub sering kali merupakan bagian infrastruktur platform yang kuat dan berskala tinggi dengan kemampuan analitik bawaan, seperti Azure Stream Analytics, Apache Spark, atau Apache Storm.

  • Partisi adalah urutan peristiwa yang terjadi di Event Hub. Azure Event Hubs menyediakan streaming pesan melalui pola konsumen yang dipartisi, yang memungkinkan setiap konsumen membaca subset tertentu saja, atau partisi, dari stream pesan. Ketika sudah tiba, peristiwa yang lebih baru ditambahkan ke akhir urutan ini. Jumlah partisi ditentukan pada saat Event Hub dibuat dan tidak dapat diubah.

  • Grup konsumen adalah tampilan dari seluruh Event Hub. Dengan grup konsumen, beberapa aplikasi konsumsi dapat masing-masing memiliki tampilan terpisah dari stream peristiwa, serta membaca stream secara mandiri dengan kecepatan dan dari posisi mereka sendiri. Untuk setiap grup konsumen, bisa ada maksimal 5 pembaca pada waktu yang bersamaan di partisi; tetapi sebaiknya hanya ada satu konsumen aktif untuk pasangan partisi dan grup konsumen tertentu. Setiap pembaca aktif menerima semua peristiwa dari partisinya; jika ada beberapa pembaca pada partisi yang sama, mereka akan menerima acara duplikat.

Untuk konsep dan diskusi yang lebih mendalam, lihat: Fitur Azure Event Hubs. Selain itu, konsep untuk AMQP di dokumentasikan dengan baik dalam OASIS Advanced Messaging Queuing Protocol (AMQP) Versi 1.0.

Keamanan utas

Kami tidak menjamin bahwa EventHubProducerClient atau EventHubConsumerClient aman untuk utas. Kami tidak menyarankan untuk menggunakan kembali instans ini di seluruh utas. Terserah aplikasi yang sedang berjalan untuk menggunakan kelas-kelas ini dengan cara yang aman untuk utas.

Jenis model data, EventDataBatch tidak aman untuk utas. Ini tidak boleh dibagikan di seluruh utas atau digunakan secara bersamaan dengan metode klien.

Contoh

Bagian berikut ini menyediakan beberapa cuplikan kode yang mencakup beberapa tugas Azure Event Hubs yang paling umum, termasuk:

Memeriksa Event Hub

Dapatkan id partisi dari Pusat Aktivitas.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Menerbitkan peristiwa ke Event Hub

create_batch Gunakan metode pada EventHubProducerClient untuk membuat EventDataBatch objek yang kemudian dapat dikirim menggunakan send_batch metode . Peristiwa dapat ditambahkan ke EventDataBatch menggunakan add metode hingga batas ukuran batch maksimum dalam byte telah tercapai.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Mengonsumsi peristiwa dari Pusat Aktivitas

Ada beberapa cara untuk menggunakan peristiwa dari EventHub. Untuk hanya memicu panggilan balik saat peristiwa diterima, EventHubConsumerClient.receive metode akan digunakan sebagai berikut:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Mengonsumsi peristiwa dari Pusat Aktivitas dalam batch

Sedangkan sampel di atas memicu panggilan balik untuk setiap pesan seperti yang diterima, sampel berikut memicu panggilan balik pada batch peristiwa, mencoba menerima angka pada satu waktu.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

Menerbitkan peristiwa ke Pusat Aktivitas secara asinkron

create_batch Gunakan metode pada EventHubProducer untuk membuat EventDataBatch objek yang kemudian dapat dikirim menggunakan send_batch metode . Peristiwa dapat ditambahkan ke EventDataBatch menggunakan add metode hingga batas ukuran batch maksimum dalam byte telah tercapai.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

Mengonsumsi peristiwa dari Pusat Aktivitas secara asinkron

SDK ini mendukung kode berbasis sinkron dan asinkron. Untuk menerima seperti yang ditunjukkan dalam sampel di atas, tetapi dalam aio, seseorang akan membutuhkan yang berikut:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

Mengonsumsi peristiwa dari Pusat Aktivitas dalam batch secara asinkron

Semua fungsi sinkron juga didukung di aio. Seperti yang ditunjukkan di atas untuk tanda terima batch sinkron, seseorang dapat mencapai hal yang sama dalam asinkron sebagai berikut:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Menggunakan peristiwa dan menyimpan titik pemeriksaan menggunakan penyimpanan titik pemeriksaan

EventHubConsumerClient adalah konstruksi tingkat tinggi yang memungkinkan Anda menerima peristiwa dari beberapa partisi sekaligus dan menyeimbangkan beban dengan konsumen lain menggunakan Event Hub dan grup konsumen yang sama.

Ini juga memungkinkan pengguna untuk melacak kemajuan ketika peristiwa diproses menggunakan titik pemeriksaan.

Titik pemeriksaan dimaksudkan untuk mewakili peristiwa terakhir yang berhasil diproses oleh pengguna dari partisi tertentu dari grup konsumen dalam instans Pusat Aktivitas. EventHubConsumerClient menggunakan instans CheckpointStore untuk memperbarui titik pemeriksaan dan untuk menyimpan informasi relevan yang diperlukan oleh algoritma penyeimbangan beban.

Cari pypi dengan awalan azure-eventhub-checkpointstore untuk menemukan paket yang mendukung ini dan menggunakan implementasi dari satu paket tersebut CheckpointStore . Harap dicatat bahwa pustaka sinkronisasi dan asinkron disediakan.

Dalam contoh di bawah ini, kami membuat instans EventHubConsumerClient dan menggunakan BlobCheckpointStore. Anda perlu membuat akun Azure Storage dan Kontainer Blob untuk menjalankan kode.

Azure Blob Storage Checkpoint Store Async dan Azure Blob Storage Checkpoint Store Sync adalah salah CheckpointStore satu implementasi yang kami berikan yang berlaku Azure Blob Storage sebagai penyimpanan persisten.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Gunakan EventHubConsumerClient untuk bekerja dengan IoT Hub

Anda juga dapat menggunakan EventHubConsumerClient untuk bekerja dengan IoT Hub. Ini berguna untuk menerima data telemetri IoT Hub dari EventHub yang ditautkan. string koneksi terkait tidak akan mengirim klaim, sehingga pengiriman peristiwa tidak dimungkinkan.

Perhatikan bahwa string koneksi harus untuk titik akhir yang kompatibel dengan Event Hub, misalnya "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Ada dua cara untuk mendapatkan titik akhir yang kompatibel dengan Azure Event Hubs:

  • Dapatkan "Titik akhir bawaan" secara manual dari IoT Hub di Portal Microsoft Azure dan terima darinya.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Pemecahan Masalah

azure-eventhubs Lihat panduan pemecahan masalah untuk detail tentang cara mendiagnosis berbagai skenario kegagalan.

Langkah berikutnya

Lebih banyak kode sampel

Silakan lihat direktori sampel untuk contoh terperinci tentang cara menggunakan pustaka ini untuk mengirim dan menerima peristiwa ke/dari Azure Event Hubs.

Dokumentasi

Dokumentasi referensi tersedia di sini.

Registri Skema dan Encoder Avro

EventHubs SDK terintegrasi dengan baik dengan layanan Schema Registry dan Avro. Untuk informasi lebih lanjut, silakan merujuk ke Schema Registry SDK dan Schema Registry Avro Encoder SDK.

Transportasi AMQP Python Murni dan Dukungan Kompatibilitas Mundur

Pustaka klien Azure Event Hubs sekarang didasarkan pada implementasi AMQP Python murni. uAMQP telah dihapus sebagai dependensi yang diperlukan.

Untuk digunakan uAMQP sebagai transportasi yang mendasar:

  1. Instal uamqp dengan pip.
$ pip install uamqp 
  1. Lulus uamqp_transport=True selama konstruksi klien.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Catatan: Atribut message padaEventDataBatchEventData/ , yang sebelumnya mengekspos uamqp.Message, telah ditolak. Objek "Warisan" yang dikembalikan oleh EventData.message/EventDataBatch.message telah diperkenalkan untuk membantu memfasilitasi transisi.

Membangun roda uAMQP dari sumber

Jika uAMQP dimaksudkan untuk digunakan sebagai implementasi protokol AMQP yang mendasar untuk azure-eventhub, roda uAMQP dapat ditemukan untuk sebagian besar sistem operasi utama.

Jika Anda berniat untuk menggunakan uAMQP dan Anda berjalan pada platform yang roda uAMQP-nya tidak disediakan, silakan ikuti panduan Penginstalan uAMQP untuk menginstal dari sumber.

Berikan Umpan Balik

Jika Anda menemukan bug atau memiliki saran, silakan ajukan masalah di bagian Masalah proyek.

Berkontribusi

Proyek ini menyambut baik kontribusi dan saran. Sebagian besar kontribusi mengharuskan Anda menyetujui Perjanjian Lisensi Kontributor (CLA) yang menyatakan bahwa Anda memiliki hak untuk, dan benar-benar melakukannya, memberi kami hak untuk menggunakan kontribusi Anda. Untuk detailnya, kunjungi https://cla.microsoft.com.

Ketika Anda mengirimkan permintaan tarik, CLA-bot akan secara otomatis menentukan apakah Anda perlu memberikan CLA dan menghias PR dengan tepat (misalnya, label, komentar). Cukup ikuti instruksi yang diberikan oleh bot. Anda hanya perlu melakukan ini sekali di semua repos menggunakan CLA kami.

Proyek ini telah mengadopsi Kode Etik Sumber Terbuka Microsoft. Untuk informasi selengkapnya, lihat Tanya Jawab Umum Tata Tertib atau hubungi opencode@microsoft.com untuk pertanyaan atau komentar lainnya.

Tayangan