Pustaka klien Azure EventHubs Checkpoint Store untuk Python - versi 1.1.4
menggunakan Blob Penyimpanan
Penyimpanan Titik Pemeriksaan Azure EventHubs digunakan untuk menyimpan titik pemeriksaan saat memproses peristiwa dari Azure Event Hubs.
Paket Penyimpanan Titik Pemeriksaan ini berfungsi sebagai paket plug-in ke EventHubConsumerClient
. Ini menggunakan Azure Storage Blob sebagai penyimpanan persisten untuk mempertahankan titik pemeriksaan dan informasi kepemilikan partisi.
Harap dicatat bahwa ini adalah pustaka asinkron, untuk versi sinkronisasi pustaka klien Azure EventHubs Checkpoint Store, silakan lihat azure-eventhub-checkpointstoreblob.
Kode sumber | Paket (PyPi) | Dokumentasi | referensi API Dokumentasi | Azure EventhubsDokumentasi Azure Storage
Memulai
Prasyarat
Python 3.6 atau yang lebih baru.
Langganan Microsoft Azure: Untuk menggunakan layanan Azure, termasuk Azure Event Hubs, Anda memerlukan langganan. Jika Anda tidak memiliki akun Azure yang sudah ada, Anda dapat mendaftar untuk uji coba gratis atau menggunakan manfaat pelanggan MSDN saat membuat akun.
Namespace layanan Azure Event Hubs dengan Pusat Aktivitas: Untuk berinteraksi dengan Azure Event Hubs, Anda juga harus memiliki namespace layanan dan Pusat Aktivitas yang tersedia. Jika Anda tidak terbiasa membuat sumber daya Azure, Anda mungkin ingin mengikuti panduan langkah demi langkah untuk membuat Pusat Aktivitas menggunakan portal Azure. Di sana, Anda juga dapat menemukan instruksi terperinci untuk menggunakan templat Azure CLI, Azure PowerShell, atau Azure Resource Manager (ARM) untuk membuat Pusat Aktivitas.
Akun Azure Storage: Anda harus memiliki Akun Azure Storage dan membuat Kontainer Blok Azure Blob Storage untuk menyimpan data titik pemeriksaan dengan blob. Anda dapat mengikuti panduan membuat Akun Azure Block Blob Storage.
Instal paketnya
$ pip install azure-eventhub-checkpointstoreblob-aio
Konsep utama
Titik pemeriksaan
Titik pemeriksaan adalah proses di mana pembaca menandai atau melakukan posisi mereka dalam urutan peristiwa partisi. Titik pemeriksaan adalah tanggung jawab konsumen dan terjadi berbasis partisi dalam kelompok konsumen. Tanggung jawab ini berarti bahwa untuk setiap kelompok konsumen, setiap pembaca partisi harus melacak posisinya saat ini di aliran peristiwa, dan dapat menginformasikan layanan ketika menganggap aliran data selesai. Jika pembaca memutuskan sambungan dari partisi, ketika menghubungkannya kembali, ia mulai membaca di titik pemeriksaan yang sebelumnya dikirimkan oleh pembaca terakhir partisi dalam kelompok konsumen itu. Ketika pembaca terhubung, itu menyalurkan offset ke pusat aktivitas untuk menentukan lokasi untuk mulai membaca. Dengan cara ini, Anda dapat menggunakan titik pemeriksaan untuk menandai peristiwa sebagai "lengkap" oleh aplikasi hilir, dan untuk memberikan ketahanan jika kegagalan dengan pembaca yang menjalankan komputer yang berbeda terjadi. Anda dapat kembali ke data yang lebih lama dengan menentukan offset yang lebih rendah dari proses titik pemeriksaan ini. Melalui mekanisme ini, titik pemeriksaan memungkinkan ketahanan kegagalan dan pemutaran ulang aliran peristiwa.
Mengimbangi & nomor urut
Kedua nomor urutan offset & mengacu pada posisi peristiwa dalam partisi. Anda dapat menganggapnya sebagai kursor sisi klien. Offset adalah numbering byte dari peristiwa tersebut. Nomor offset/urutan memungkinkan konsumen peristiwa (pembaca) untuk menentukan titik dalam aliran peristiwa tempat mereka ingin mulai membaca peristiwa. Anda dapat menentukan tanda waktu sehingga Anda menerima peristiwa yang diantrekan hanya setelah tanda waktu yang diberikan. Konsumen bertanggung jawab untuk menyimpan nilai offset mereka sendiri di luar layanan Pusat Aktivitas. Dalam partisi, setiap peristiwa menyertakan offset, nomor urutan, dan tanda waktu saat diantrekan.
Contoh
- Membuat Azure EventHubs
EventHubConsumerClient
- Mengonsumsi peristiwa menggunakan
BlobCheckpointStore
Buat EventHubConsumerClient
Cara term mudah untuk membuat adalah EventHubConsumerClient
dengan menggunakan string koneksi.
from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")
Untuk cara lain membuat EventHubConsumerClient
, lihat pustaka EventHubs untuk detail selengkapnya.
Mengonsumsi peristiwa menggunakan BlobCheckpointStore
titik pemeriksaan untuk dilakukan
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'
async def on_event(partition_context, event):
# Put your code here.
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name
)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)
async with client:
await client.receive(on_event)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Gunakan BlobCheckpointStore
dengan versi AZURE Storage Service API yang berbeda
Beberapa lingkungan memiliki versi AZURE Storage Service API yang berbeda.
BlobCheckpointStore
secara default menggunakan Storage Service API versi 2019-07-07. Untuk menggunakannya terhadap versi yang berbeda, tentukan api_version
kapan Anda membuat BlobCheckpointStore
objek.
Pemecahan Masalah
Umum
Mengaktifkan pengelogan akan sangat membantu untuk melakukan pemotretan masalah.
Pembuatan Log
- Aktifkan
azure.eventhub.extensions.checkpointstoreblobaio
pencatat untuk mengumpulkan jejak dari pustaka. - Aktifkan
azure.eventhub
pencatat untuk mengumpulkan jejak dari pustaka azure-eventhub utama. - Aktifkan pencatat
azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage
untuk mengumpulkan jejak dari pustaka blob penyimpanan azure. - Aktifkan
uamqp
pencatat untuk mengumpulkan jejak dari pustaka uAMQP yang mendasar. - Aktifkan pelacakan tingkat bingkai AMQP dengan mengatur
logging_enable=True
saat membuat klien.
Langkah berikutnya
Lebih banyak kode sampel
Mulai menggunakan sampel asinkron EventHubs Checkpoint Store kami.
- receive_events_using_checkpoint_store_async.py - EventHubConsumerClient dengan contoh penyimpanan titik pemeriksaan blob
- receive_events_using_checkpoint_store_storage_api_version_async.py - EventHubConsumerClient dengan penyimpanan titik pemeriksaan blob dan contoh versi penyimpanan
Dokumentasi
Dokumentasi referensi tersedia di sini.
Berikan Umpan Balik
Jika Anda menemukan bug atau memiliki saran, silakan ajukan masalah di bagian Masalah proyek.
Berkontribusi
Proyek ini menyambut baik kontribusi dan saran. Sebagian besar kontribusi mengharuskan Anda menyetujui Perjanjian Lisensi Kontributor (CLA) yang menyatakan bahwa Anda memiliki hak untuk, dan benar-benar melakukannya, memberi kami hak untuk menggunakan kontribusi Anda. Untuk detailnya, kunjungi https://cla.microsoft.com.
Ketika Anda mengirimkan permintaan tarik, CLA-bot akan secara otomatis menentukan apakah Anda perlu memberikan CLA dan menghias PR dengan tepat (misalnya, label, komentar). Cukup ikuti instruksi yang diberikan oleh bot. Anda hanya perlu melakukan ini sekali di semua repos menggunakan CLA kami.
Proyek ini telah mengadopsi Kode Etik Sumber Terbuka Microsoft. Untuk informasi selengkapnya, lihat Tanya Jawab Umum Tata Tertib atau hubungi opencode@microsoft.com untuk pertanyaan atau komentar lainnya.
Azure SDK for Python
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk