Bagikan melalui


Pustaka klien Azure EventHubs Checkpoint Store untuk Python - versi 1.1.4

menggunakan Blob Penyimpanan

Penyimpanan Titik Pemeriksaan Azure EventHubs digunakan untuk menyimpan titik pemeriksaan saat memproses peristiwa dari Azure Event Hubs. Paket Penyimpanan Titik Pemeriksaan ini berfungsi sebagai paket plug-in ke EventHubConsumerClient. Ini menggunakan Azure Storage Blob sebagai penyimpanan persisten untuk mempertahankan titik pemeriksaan dan informasi kepemilikan partisi.

Harap dicatat bahwa ini adalah pustaka asinkron, untuk versi sinkronisasi pustaka klien Azure EventHubs Checkpoint Store, silakan lihat azure-eventhub-checkpointstoreblob.

Kode sumber | Paket (PyPi) | Dokumentasi | referensi API Dokumentasi | Azure EventhubsDokumentasi Azure Storage

Memulai

Prasyarat

  • Python 3.6 atau yang lebih baru.

  • Langganan Microsoft Azure: Untuk menggunakan layanan Azure, termasuk Azure Event Hubs, Anda memerlukan langganan. Jika Anda tidak memiliki akun Azure yang sudah ada, Anda dapat mendaftar untuk uji coba gratis atau menggunakan manfaat pelanggan MSDN saat membuat akun.

  • Namespace layanan Azure Event Hubs dengan Pusat Aktivitas: Untuk berinteraksi dengan Azure Event Hubs, Anda juga harus memiliki namespace layanan dan Pusat Aktivitas yang tersedia. Jika Anda tidak terbiasa membuat sumber daya Azure, Anda mungkin ingin mengikuti panduan langkah demi langkah untuk membuat Pusat Aktivitas menggunakan portal Azure. Di sana, Anda juga dapat menemukan instruksi terperinci untuk menggunakan templat Azure CLI, Azure PowerShell, atau Azure Resource Manager (ARM) untuk membuat Pusat Aktivitas.

  • Akun Azure Storage: Anda harus memiliki Akun Azure Storage dan membuat Kontainer Blok Azure Blob Storage untuk menyimpan data titik pemeriksaan dengan blob. Anda dapat mengikuti panduan membuat Akun Azure Block Blob Storage.

Instal paketnya

$ pip install azure-eventhub-checkpointstoreblob-aio

Konsep utama

Titik pemeriksaan

Titik pemeriksaan adalah proses di mana pembaca menandai atau melakukan posisi mereka dalam urutan peristiwa partisi. Titik pemeriksaan adalah tanggung jawab konsumen dan terjadi berbasis partisi dalam kelompok konsumen. Tanggung jawab ini berarti bahwa untuk setiap kelompok konsumen, setiap pembaca partisi harus melacak posisinya saat ini di aliran peristiwa, dan dapat menginformasikan layanan ketika menganggap aliran data selesai. Jika pembaca memutuskan sambungan dari partisi, ketika menghubungkannya kembali, ia mulai membaca di titik pemeriksaan yang sebelumnya dikirimkan oleh pembaca terakhir partisi dalam kelompok konsumen itu. Ketika pembaca terhubung, itu menyalurkan offset ke pusat aktivitas untuk menentukan lokasi untuk mulai membaca. Dengan cara ini, Anda dapat menggunakan titik pemeriksaan untuk menandai peristiwa sebagai "lengkap" oleh aplikasi hilir, dan untuk memberikan ketahanan jika kegagalan dengan pembaca yang menjalankan komputer yang berbeda terjadi. Anda dapat kembali ke data yang lebih lama dengan menentukan offset yang lebih rendah dari proses titik pemeriksaan ini. Melalui mekanisme ini, titik pemeriksaan memungkinkan ketahanan kegagalan dan pemutaran ulang aliran peristiwa.

Mengimbangi & nomor urut

Kedua nomor urutan offset & mengacu pada posisi peristiwa dalam partisi. Anda dapat menganggapnya sebagai kursor sisi klien. Offset adalah numbering byte dari peristiwa tersebut. Nomor offset/urutan memungkinkan konsumen peristiwa (pembaca) untuk menentukan titik dalam aliran peristiwa tempat mereka ingin mulai membaca peristiwa. Anda dapat menentukan tanda waktu sehingga Anda menerima peristiwa yang diantrekan hanya setelah tanda waktu yang diberikan. Konsumen bertanggung jawab untuk menyimpan nilai offset mereka sendiri di luar layanan Pusat Aktivitas. Dalam partisi, setiap peristiwa menyertakan offset, nomor urutan, dan tanda waktu saat diantrekan.

Contoh

Buat EventHubConsumerClient

Cara term mudah untuk membuat adalah EventHubConsumerClient dengan menggunakan string koneksi.

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

Untuk cara lain membuat EventHubConsumerClient, lihat pustaka EventHubs untuk detail selengkapnya.

Mengonsumsi peristiwa menggunakan BlobCheckpointStore titik pemeriksaan untuk dilakukan

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Gunakan BlobCheckpointStore dengan versi AZURE Storage Service API yang berbeda

Beberapa lingkungan memiliki versi AZURE Storage Service API yang berbeda. BlobCheckpointStore secara default menggunakan Storage Service API versi 2019-07-07. Untuk menggunakannya terhadap versi yang berbeda, tentukan api_version kapan Anda membuat BlobCheckpointStore objek.

Pemecahan Masalah

Umum

Mengaktifkan pengelogan akan sangat membantu untuk melakukan pemotretan masalah.

Pembuatan Log

  • Aktifkan azure.eventhub.extensions.checkpointstoreblobaio pencatat untuk mengumpulkan jejak dari pustaka.
  • Aktifkan azure.eventhub pencatat untuk mengumpulkan jejak dari pustaka azure-eventhub utama.
  • Aktifkan pencatat azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage untuk mengumpulkan jejak dari pustaka blob penyimpanan azure.
  • Aktifkan uamqp pencatat untuk mengumpulkan jejak dari pustaka uAMQP yang mendasar.
  • Aktifkan pelacakan tingkat bingkai AMQP dengan mengatur logging_enable=True saat membuat klien.

Langkah berikutnya

Lebih banyak kode sampel

Mulai menggunakan sampel asinkron EventHubs Checkpoint Store kami.

Dokumentasi

Dokumentasi referensi tersedia di sini.

Berikan Umpan Balik

Jika Anda menemukan bug atau memiliki saran, silakan ajukan masalah di bagian Masalah proyek.

Berkontribusi

Proyek ini menyambut baik kontribusi dan saran. Sebagian besar kontribusi mengharuskan Anda menyetujui Perjanjian Lisensi Kontributor (CLA) yang menyatakan bahwa Anda memiliki hak untuk, dan benar-benar melakukannya, memberi kami hak untuk menggunakan kontribusi Anda. Untuk detailnya, kunjungi https://cla.microsoft.com.

Ketika Anda mengirimkan permintaan tarik, CLA-bot akan secara otomatis menentukan apakah Anda perlu memberikan CLA dan menghias PR dengan tepat (misalnya, label, komentar). Cukup ikuti instruksi yang diberikan oleh bot. Anda hanya perlu melakukan ini sekali di semua repos menggunakan CLA kami.

Proyek ini telah mengadopsi Kode Etik Sumber Terbuka Microsoft. Untuk informasi selengkapnya, lihat Tanya Jawab Umum Tata Tertib atau hubungi opencode@microsoft.com untuk pertanyaan atau komentar lainnya.

Tayangan