Kirim acara ke atau terima acara dari hub acara dengan menggunakan Python

Mulai cepat ini menunjukkan cara mengirim acara ke dan menerima acara dari hub acara menggunakan paket Python azure-eventhub.

Prasyarat

Jika Anda baru menggunakan Azure Event Hubs, lihat Ringkasan Event Hubs sebelum Anda melakukan mulai cepat ini.

Untuk menyelesaikan mulai cepat ini, Anda memerlukan prasyarat berikut:

  • Langganan Microsoft Azure. Untuk menggunakan layanan Azure, termasuk Azure Event Hubs, Anda memerlukan langganan. Jika Anda tidak memiliki akun Azure yang sudah ada, daftar untuk uji coba gratis.
  • Python 3.8 atau yang lebih baru, dengan pip diinstal dan diperbarui.
  • Visual Studio Code (disarankan) atau lingkungan pengembangan terpadu (IDE) lainnya.
  • Membuat ruang nama Azure Event Hubs dan pusat aktivitas. Langkah pertama adalah menggunakan portal Azure untuk membuat namespace Layanan Pusat Aktivitas, dan mendapatkan kredensial manajemen yang perlu dikomunikasikan dengan hub peristiwa. Untuk membuat namespace layanan dan pusat aktivitas, ikuti prosedur dalam artikel ini.

Menginstal paket untuk mengirim peristiwa

Untuk menginstal paket Python untuk Azure Event Hubs, buka prompt perintah yang memiliki Python di jalurnya. Ubah direktori ke folder tempat Anda ingin menyimpan sampel Anda.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Mengautentikasi aplikasi ke Azure

Mulai cepat ini menunjukkan kepada Anda dua cara menyambungkan ke Azure Event Hubs: tanpa kata sandi dan string koneksi. Opsi pertama menunjukkan cara menggunakan prinsip keamanan Anda di ID Microsoft Entra dan kontrol akses berbasis peran (RBAC) untuk menyambungkan ke namespace Layanan Pusat Aktivitas. Anda tidak perlu khawatir tentang memiliki string koneksi yang dikodekan secara permanen dalam kode Anda atau dalam file konfigurasi atau dalam penyimpanan aman seperti Azure Key Vault. Opsi kedua menunjukkan cara menggunakan string koneksi untuk menyambungkan ke namespace Layanan Pusat Aktivitas. Jika Anda baru menggunakan Azure, Anda mungkin menemukan opsi string koneksi lebih mudah diikuti. Sebaiknya gunakan opsi tanpa kata sandi di aplikasi dunia nyata dan lingkungan produksi. Untuk informasi selengkapnya, lihat Autentikasi dan otorisasi. Anda juga dapat membaca selengkapnya tentang autentikasi tanpa kata sandi di halaman gambaran umum.

Menetapkan peran ke pengguna Microsoft Entra Anda

Saat mengembangkan secara lokal, pastikan bahwa akun pengguna yang tersambung ke Azure Event Hubs memiliki izin yang benar. Anda memerlukan peran Pemilik Data Azure Event Hubs untuk mengirim dan menerima pesan. Untuk menetapkan sendiri peran ini, Anda memerlukan peran Administrator Akses Pengguna, atau peran lain yang menyertakan tindakan.Microsoft.Authorization/roleAssignments/write Anda dapat menetapkan peran Azure RBAC kepada pengguna menggunakan portal Azure, Azure CLI, atau Azure PowerShell. Pelajari selengkapnya tentang cakupan yang tersedia untuk penetapan peran di halaman gambaran umum cakupan .

Contoh berikut menetapkan peran ke Azure Event Hubs Data Owner akun pengguna Anda, yang menyediakan akses penuh ke sumber daya Azure Event Hubs. Dalam skenario nyata, ikuti Prinsip Hak Istimewa Paling Sedikit untuk memberi pengguna hanya izin minimum yang diperlukan untuk lingkungan produksi yang lebih aman.

Peran bawaan Azure untuk Azure Event Hubs

Untuk Azure Event Hubs, pengelolaan namespace layanan dan semua sumber daya terkait melalui portal Azure dan API manajemen sumber daya Azure sudah dilindungi menggunakan model Azure RBAC. Azure menyediakan peran bawaan Azure di bawah ini untuk mengotorisasi akses ke namespace Layanan Pusat Aktivitas:

  • Pemilik Data Azure Event Hubs: Memungkinkan akses data ke namespace Layanan Pusat Aktivitas dan entitasnya (antrean, topik, langganan, dan filter)
  • Pengirim Data Azure Event Hubs: Gunakan peran ini untuk memberi pengirim akses ke namespace layanan Azure Event Hubs dan entitasnya.
  • Penerima Data Azure Event Hubs: Gunakan peran ini untuk memberi penerima akses ke namespace Layanan Azure Event Hubs dan entitasnya.

Jika Anda ingin membuat peran kustom, lihat Hak yang diperlukan untuk operasi Azure Event Hubs.

Penting

Dalam kebanyakan kasus, akan memakan waktu satu atau dua menit agar penetapan peran disebarluaskan di Azure. Dalam kasus yang jarang terjadi, mungkin perlu waktu hingga delapan menit. Jika Anda menerima kesalahan autentikasi saat pertama kali menjalankan kode, tunggu beberapa saat dan coba lagi.

  1. Di portal Azure, temukan namespace Layanan Pusat Aktivitas Anda menggunakan bilah pencarian utama atau navigasi kiri.

  2. Pada halaman gambaran umum, pilih Kontrol akses (IAM) dari menu sebelah kiri.

  3. Di halaman Kontrol akses (IAM), pilih tab Penetapan peran.

  4. Pilih + Tambahkan dari menu atas lalu Tambahkan penetapan peran dari menu drop-down yang dihasilkan.

    A screenshot showing how to assign a role.

  5. Gunakan kotak pencarian untuk memfilter hasil ke peran yang diinginkan. Untuk contoh ini, cari Azure Event Hubs Data Owner dan pilih hasil yang cocok. Kemudian pilih Berikutnya.

  6. Di bagian Tetapkan akses ke, pilih Pengguna, grup, atau perwakilan layanan, lalu pilih + Pilih anggota.

  7. Dalam dialog, cari nama pengguna Microsoft Entra Anda (biasanya alamat email user@domain Anda) lalu pilih Pilih di bagian bawah dialog.

  8. Pilih Tinjau + tetapkan untuk masuk ke halaman akhir, lalu Tinjau + tetapkan lagi untuk menyelesaikan proses.

Mengirim aktivitas

Di bagian ini, buat skrip Python untuk mengirim peristiwa ke hub peristiwa yang Anda buat sebelumnya.

  1. Buka editor favorit Anda, seperti Visual Studio Code.

  2. Buat skrip yang send.py. Skrip ini mengirimkan kumpulan peristiwa ke hub acara yang Anda buat sebelumnya.

  3. Tempelkan kode berikut ke send.py:

    Dalam kode, gunakan nilai nyata untuk mengganti tempat penampung berikut:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Catatan

    Untuk contoh opsi lain untuk mengirim peristiwa ke Pusat Aktivitas secara asinkron menggunakan string koneksi, lihat halaman send_async.py GitHub. Pola yang ditunjukkan di sana juga berlaku untuk mengirim peristiwa tanpa kata sandi.

Menerima peristiwa

mulai cepat ini menggunakan Microsoft Azure Storage Blob sebagai penyimpanan pos pemeriksaan. penyimpanan pos pemeriksaan digunakan untuk bertahan di pos pemeriksaan (artinya, posisi baca terakhir).

Ikuti rekomendasi ini saat menggunakan Azure Blob Storage sebagai penyimpanan titik pemeriksaan:

  • Gunakan kontainer terpisah untuk setiap grup konsumen. Anda dapat menggunakan akun penyimpanan yang sama, tetapi menggunakan satu kontainer per setiap grup.
  • Jangan gunakan kontainer untuk hal lain, dan jangan gunakan akun penyimpanan untuk hal lain.
  • Akun penyimpanan harus berada di wilayah yang sama dengan aplikasi yang disebarkan berada. Jika aplikasi lokal, coba pilih wilayah terdekat yang mungkin.

Pada halaman Akun penyimpanan di portal Azure, di bagian Blob service, pastikan bahwa pengaturan berikut dinonaktifkan.

  • Namespace hierarkis
  • Penghapusan sementara blob
  • Penerapan versi

Membuat akun Azure Storage dan kontainer blob

Buat akun Microsoft Azure Storage dan wadah gumpalan di dalamnya dengan melakukan langkah-langkah berikut:

  1. Membuat akun Microsoft Azure Storage
  2. Membuat kontainer blob.
  3. Autentikasi ke kontainer blob.

Pastikan untuk mencatat string koneksi dan nama kontainer untuk digunakan nanti dalam kode terima.

Saat mengembangkan secara lokal, pastikan bahwa akun pengguna yang mengakses data blob memiliki izin yang benar. Anda akan memerlukan Kontributor Data Blob Penyimpanan untuk membaca dan menulis data blob. Untuk menetapkan sendiri peran ini, Anda harus diberi peran Administrator Akses Pengguna, atau peran lain yang menyertakan tindakan Microsoft.Authorization/roleAssignments/write . Anda dapat menetapkan peran Azure RBAC kepada pengguna menggunakan portal Azure, Azure CLI, atau Azure PowerShell. Anda dapat mempelajari selengkapnya tentang cakupan yang tersedia untuk penetapan peran di halaman gambaran umum cakupan.

Dalam skenario ini, Anda akan menetapkan izin ke akun pengguna, yang tercakup ke akun penyimpanan, untuk mengikuti Prinsip Hak Istimewa Paling Rendah. Praktik ini hanya memberi pengguna izin minimum yang diperlukan dan menciptakan lingkungan produksi yang lebih aman.

Contoh berikut akan menetapkan peran Kontributor Data Blob Penyimpanan ke akun pengguna Anda, yang menyediakan akses baca dan tulis ke data blob di akun penyimpanan Anda.

Penting

Dalam kebanyakan kasus, dibutuhkan satu atau dua menit agar penetapan peran disebarluaskan di Azure, tetapi dalam kasus yang jarang terjadi mungkin perlu waktu hingga delapan menit. Jika Anda menerima kesalahan autentikasi saat pertama kali menjalankan kode, tunggu beberapa saat dan coba lagi.

  1. Di portal Azure, temukan akun penyimpanan Anda menggunakan bilah pencarian utama atau navigasi kiri.

  2. Di halaman gambaran umum akun penyimpanan, pilih Kontrol akses (IAM) dari menu kiri.

  3. Di halaman Kontrol akses (IAM), pilih tab Penetapan peran.

  4. Pilih + Tambahkan dari menu atas lalu Tambahkan penetapan peran dari menu drop-down yang dihasilkan.

    A screenshot showing how to assign a storage account role.

  5. Gunakan kotak pencarian untuk memfilter hasil ke peran yang diinginkan. Untuk contoh ini, cari Kontributor Data Blob Penyimpanan dan pilih hasil yang cocok, lalu pilih Berikutnya.

  6. Di bagian Tetapkan akses ke, pilih Pengguna, grup, atau perwakilan layanan, lalu pilih + Pilih anggota.

  7. Dalam dialog, cari nama pengguna Microsoft Entra Anda (biasanya alamat email user@domain Anda) lalu pilih Pilih di bagian bawah dialog.

  8. Pilih Tinjau + tetapkan untuk masuk ke halaman akhir, lalu Tinjau + tetapkan lagi untuk menyelesaikan proses.

Menginstal paket untuk menerima peristiwa

Untuk sisi penerimaan, Anda perlu menginstal satu atau beberapa paket. Dalam mulai cepat ini, Anda menggunakan penyimpanan Azure Blob untuk mempertahankan titik pemeriksaan sehingga program tidak membaca peristiwa yang telah dibacanya. Program menjalankan titik pemeriksaan metadata pada pesan yang diterima dalam interval rutin dalam blob. Pendekatan ini memudahkan untuk terus menerima pesan nanti dari posisi terakhir Anda.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Membuat skrip Python untuk menerima acara

Di bagian ini, Anda membuat skrip Python untuk menerima peristiwa dari hub acara Anda:

  1. Buka editor favorit Anda, seperti Visual Studio Code.

  2. Buat skrip yang recv.py.

  3. Tempelkan kode berikut ke recv.py:

    Dalam kode, gunakan nilai nyata untuk mengganti tempat penampung berikut:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Catatan

    Untuk contoh opsi lain untuk menerima peristiwa dari Event Hub secara asinkron menggunakan string koneksi, lihat halaman gitHub recv_with_checkpoint_store_async.py. Pola yang ditunjukkan di sana juga berlaku untuk menerima peristiwa tanpa kata sandi.

Jalankan aplikasi penerima

Untuk menjalankan skrip, buka prompt perintah yang memiliki Python di jalurnya, lalu jalankan perintah ini:

python recv.py

Menjalankan aplikasi pengirim

Untuk menjalankan skrip, buka prompt perintah yang memiliki Python di jalurnya, lalu jalankan perintah ini:

python send.py

Jendela penerima harus menampilkan pesan yang dikirim ke hub peristiwa.

Pemecahan Masalah

Jika Anda tidak melihat peristiwa di jendela penerima atau kode melaporkan kesalahan, coba tips pemecahan masalah berikut:

  • Jika Anda tidak melihat hasil dari recy.py, jalankan send.py beberapa kali.

  • Jika Anda melihat kesalahan tentang "coroutine" saat menggunakan kode tanpa kata sandi (dengan kredensial), pastikan Anda menggunakan impor dari azure.identity.aio.

  • Jika Anda melihat "Sesi klien tidak tertutup" dengan kode tanpa kata sandi (dengan kredensial), pastikan Anda menutup kredensial setelah selesai. Untuk informasi selengkapnya, lihat Kredensial asinkron.

  • Jika Anda melihat kesalahan otorisasi dengan recv.py saat mengakses penyimpanan, pastikan Anda mengikuti langkah-langkah dalam Membuat akun penyimpanan Azure dan kontainer blob dan menetapkan peran Kontributor Data Blob Penyimpanan ke perwakilan layanan.

  • Jika Anda menerima peristiwa dengan ID partisi yang berbeda, hasil ini diharapkan. Partisi adalah mekanisme organisasi data yang berkaitan dengan paralelisme hilir yang diperlukan dalam mengonsumsi aplikasi. Jumlah partisi dalam event hub secara langsung berkaitan dengan jumlah pembaca bersamaan yang diharapkan. Untuk informasi selengkapnya, lihat Pelajari selengkapnya tentang partisi.

Langkah berikutnya

Dalam mulai cepat ini, Anda telah mengirim dan menerima acara secara asinkron. Untuk mempelajari cara mengirim dan menerima acara secara sinkron, buka halaman Sync_samples GitHub.

Untuk semua sampel (sinkron dan asinkron) di GitHub, buka pustaka klien Azure Event Hubs untuk sampel Python.