Hızlı Başlangıç: Python kullanarak olay hub'larına olay gönderme veya olay hub'larından olay alma

Bu Hızlı Başlangıçta azure-eventhub Python paketini kullanarak olay hub'ına olay göndermeyi ve olay hub'ından olay almayı öğreneceksiniz.

Önkoşullar

Azure Event Hubs'da yeniyseniz bu hızlı başlangıcı gerçekleştirmeden önce event hubs'a genel bakış konusuna bakın.

Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşullara sahip olduğunuzdan emin olun:

  • Microsoft Azure aboneliği: Aboneliğiniz yoksa ücretsiz deneme sürümüne kaydolun.
  • Python 3.8 veya üzeri: Pip'in yüklendiğinden ve güncelleştirildiğinden emin olun.
  • Visual Studio Code (önerilir):Veya istediğiniz başka bir IDE kullanın.
  • Event Hubs ad alanı ve olay hub'ı: Bunları Azure portalında oluşturmak için bu kılavuzu izleyin.

Olayları göndermek için paketleri yükleme

Event Hubs için Python paketlerini yüklemek için, yolunda Python bulunan bir komut istemi açın. Dizini, örneklerinizi saklamak istediğiniz klasörle değiştirin.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Azure'da uygulamanın kimliğini doğrulama

Bu hızlı başlangıçta Azure Event Hubs'a bağlanmanın iki yolu gösterilmektedir:

  • Parolasız. Bir Event Hubs ad alanına bağlanmak için Microsoft Entra Id ve rol tabanlı erişim denetiminde (RBAC) güvenlik sorumlunuzu kullanın. Kodunuzda, yapılandırma dosyasında veya Azure Key Vault gibi güvenli depolama alanında sabit kodlanmış bağlantı dizelerine sahip olma konusunda endişelenmeniz gerekmez.
  • Bağlantı dizesi. Event Hubs ad alanına bağlanmak için bağlantı dizesi kullanın. Azure'da yeniyseniz bağlantı dizesi seçeneğini daha kolay takip edebilirsiniz.

Gerçek dünyadaki uygulamalarda ve üretim ortamlarında parolasız seçeneği kullanmanızı öneririz. Daha fazla bilgi için bkz. Azure hizmetleri için Service Bus kimlik doğrulaması ve yetkilendirmesi ve Parolasız bağlantılar.

Microsoft Entra kullanıcınıza rol atama

Yerel olarak geliştirirken, Azure Event Hubs'a bağlanan kullanıcı hesabının doğru izinlere sahip olduğundan emin olun. İleti göndermek ve almak için Azure Event Hubs Veri Sahibi rolüne ihtiyacınız vardır. Kendinize bu rolü atamak için Kullanıcı Erişimi Yöneticisi rolüne veya eylemi içeren Microsoft.Authorization/roleAssignments/write başka bir role ihtiyacınız vardır. Azure portalı, Azure CLI veya Azure PowerShell'i kullanarak kullanıcıya Azure RBAC rolleri atayabilirsiniz. Daha fazla bilgi için bkz. Azure RBAC kapsamını anlama sayfası.

Aşağıdaki örnekte rol, Azure Event Hubs kaynaklarına tam erişim sağlayan kullanıcı hesabınıza atanır Azure Event Hubs Data Owner . Gerçek bir senaryoda, kullanıcılara yalnızca daha güvenli bir üretim ortamı için gereken minimum izinleri vermek için En Az Ayrıcalık İlkesi'ni izleyin.

Azure Event Hubs için Azure yerleşik rolleri

Azure Event Hubs için, Azure portalı ve Azure kaynak yönetimi API'sini kullanarak ad alanlarının ve tüm ilgili kaynakların yönetimi Azure RBAC modeli kullanılarak zaten korunur. Azure, Event Hubs ad alanına erişim yetkisi vermek için aşağıdaki yerleşik rolleri sağlar:

Özel bir rol oluşturmak istiyorsanız, Event Hubs işlemleri için gereken haklar bölümüne bakın.

Önemli

Çoğu durumda rol atamasının Azure'a yayılması bir veya iki dakika sürer. Nadir durumlarda, sekiz dakikaya kadar sürebilir. Kodunuzu ilk kez çalıştırdığınızda kimlik doğrulama hataları alıyorsanız, birkaç dakika bekleyin ve yeniden deneyin.

  1. Azure portalında ana arama çubuğunu veya sol gezintiyi kullanarak Event Hubs ad alanınızı bulun.

  2. Genel bakış sayfasında, sol taraftaki menüden Erişim denetimi (IAM) öğesini seçin.

  3. Erişim denetimi (IAM) sayfasında Rol atamaları sekmesini seçin.

  4. Üst menüden + Ekle'yi seçin. Ardından Rol ataması ekle'yi seçin.

    Rol atamayı gösteren ekran görüntüsü.

  5. Sonuçları istenen role göre filtrelemek için arama kutusunu kullanın. Bu örnek için eşleşen sonucu arayın Azure Event Hubs Data Owner ve seçin. Ardından İleri'yi seçin.

  6. Erişim ata'nın altında Kullanıcı, grup veya hizmet sorumlusu'na tıklayın. Ardından + Üye seç'i seçin.

  7. İletişim kutusunda Microsoft Entra kullanıcı adınızı (genellikle user@domain e-posta adresiniz) arayın. İletişim kutusunun alt kısmındaki Seç'i seçin.

  8. Son sayfaya gitmek için Gözden geçir ve ata'yı seçin. İşlemi tamamlamak için Gözden geçir ve yeniden ata komutunu seçin.

Olayları gönderme

Bu bölümde, daha önce oluşturduğunuz olay hub'ına olay göndermek için bir Python betiği oluşturun.

  1. Visual Studio Code gibi sık kullandığınız Python düzenleyicisini açın.

  2. send.py adlı bir betik oluşturun. Bu betik, daha önce oluşturduğunuz olay hub'ına bir dizi olay gönderir.

  3. Aşağıdaki kodu send.py yapıştırın:

    Kodda, aşağıdaki yer tutucuları değiştirmek için gerçek değerleri kullanın:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - Ad alanının Genel Bakış sayfasında tam adı görürsünüz. Şu biçimde olmalıdır: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Olay merkezinin adı.
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        print("Producer client created successfully.") 
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Uyarı

    Bağlantı dizesi kullanarak olay hub'ına zaman uyumsuz olarak olay göndermeye yönelik diğer seçeneklere ilişkin örnekler için GitHub send_async.py sayfasına bakın. Burada gösterilen desenler, olayları parolasız göndermek için de geçerlidir.

Olayları al

Bu hızlı başlangıçta denetim noktası deposu olarak Azure Blob depolama kullanılır. Denetim noktası deposu, denetim noktalarını (yani son okuma konumlarını) kalıcı hale getirmek için kullanılır.

Azure Blob Depolama'yı denetim noktası deposu olarak kullanırken şu önerileri izleyin:

  • Her tüketici grubu için ayrı bir kapsayıcı kullanın. Aynı depolama hesabını kullanabilirsiniz, ancak her grup için bir kapsayıcı kullanabilirsiniz.
  • Depolama hesabını başka bir şey için kullanmayın.
  • Kapsayıcıyı başka hiçbir şey için kullanmayın.
  • Depolama hesabını dağıtılan uygulamayla aynı bölgede oluşturun. Uygulama şirket içindeyse, mümkün olan en yakın bölgeyi seçmeyi deneyin.

Azure portalının Depolama hesabı sayfasındaki Blob hizmeti bölümünde aşağıdaki ayarların devre dışı bırakıldığından emin olun.

  • Hiyerarşik ad alanı
  • Blob geçici silme
  • Sürüm oluşturma

Azure depolama hesabı ve blob kapsayıcısı oluşturma

Aşağıdaki adımları uygulayarak içinde bir Azure depolama hesabı ve blob kapsayıcısı oluşturun:

  1. Azure Depolama hesabı oluşturma
  2. Bir blob kapsayıcısı oluşturun.
  3. Blob kapsayıcısına kimlik doğrula

Alma kodunda daha sonra kullanmak üzere bağlantı dizesini ve kapsayıcı adını kaydettiğinizden emin olun.

Yerel olarak geliştirirken blob verilerine erişen kullanıcı hesabının doğru izinlere sahip olduğundan emin olun. Blob verilerini okumak ve yazmak için Depolama Blob Verisi Katkıcısı rolüne ihtiyacınız var. Kendinize bu rolü atamak için Kullanıcı Erişimi Yöneticisi rolüne veya Microsoft.Authorization/roleAssignments/write eylemini içeren başka bir role atanmalısınız. Azure portalı, Azure CLI veya Azure PowerShell'i kullanarak kullanıcıya Azure RBAC rolleri atayabilirsiniz. Daha fazla bilgi için Azure RBAC kapsamını anlama bölümüne bakın.

Bu senaryoda, En Az Ayrıcalık İlkesi'ni izlemek için depolama hesabı kapsamındaki kullanıcı hesabınıza izinler atarsınız. Bu uygulama kullanıcılara yalnızca gereken minimum izinleri verir ve daha güvenli üretim ortamları oluşturur.

Aşağıdaki örnek, depolama hesabınızdaki blob verilerine hem okuma hem de yazma erişimi sağlayan Depolama Blob Verileri Katkıda Bulunanı rolünü kullanıcı hesabınıza atar.

Önemli

Çoğu durumda rol atamasının Azure'a yayılması bir veya iki dakika sürer. Nadir durumlarda, sekiz dakikaya kadar sürebilir. Kodunuzu ilk kez çalıştırdığınızda kimlik doğrulama hataları alıyorsanız, birkaç dakika bekleyin ve yeniden deneyin.

  1. Azure portalında ana arama çubuğunu veya sol gezintiyi kullanarak depolama hesabınızı bulun.

  2. Depolama hesabı sayfasında sol taraftaki menüden Erişim denetimi (IAM) öğesini seçin.

  3. Erişim denetimi (IAM) sayfasında Rol atamaları sekmesini seçin.

  4. Üst menüden + Ekle'yi seçin. Ardından Rol ataması ekle'yi seçin.

    Depolama hesabı rolü atamayı gösteren ekran görüntüsü.

  5. Sonuçları istenen role göre filtrelemek için arama kutusunu kullanın. Bu örnek için Depolama Blobu Veri Katkıda Bulunanını arayın. Eşleşen sonucu seçin ve ardından Sonrakiseçin.

  6. Erişim atama altında, Kullanıcı, grup veya hizmet sorumlusu seçeneğini seçin ve ardından + Üyeleri seç seçeneğine tıklayın.

  7. İletişim kutusunda Microsoft Entra kullanıcı adınızı (genellikle user@domain e-posta adresiniz) arayın ve iletişim kutusunun alt kısmındaki Seç'i seçin.

  8. Son sayfaya gitmek için Gözden geçir ve ata'yı seçin. İşlemi tamamlamak için Gözden geçir ve yeniden ata komutunu seçin.

Olayları almak için paketleri yükleyin

Alıcı taraf için bir veya daha fazla paket yüklemeniz gerekir. Bu hızlı başlangıçta, programın zaten okuduğu olayları okumaması için denetim noktalarını kalıcı hale getirmek için Azure Blob depolamayı kullanacaksınız. Blobda düzenli aralıklarla alınan iletilerde meta veri denetim noktaları gerçekleştirir. Bu yaklaşım, iletileri daha sonra kaldığınız yerden almaya devam etmenizi kolaylaştırır.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Olayları almak için Python betiği oluşturma

Bu bölümde, olay hub'ınızdan olayları almak için bir Python betiği oluşturacaksınız:

  1. Visual Studio Code gibi sık kullandığınız Python düzenleyicisini açın.

  2. recv.py adlı bir betik oluşturun.

  3. Aşağıdaki kodu recv.py yapıştırın:

    Kodda, aşağıdaki yer tutucuları değiştirmek için gerçek değerleri kullanın:

    • BLOB_STORAGE_ACCOUNT_URL - Bu değer şu biçimde olmalıdır: https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME - Azure depolama hesabındaki blob kapsayıcısının adı.
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - Ad alanının Genel Bakış sayfasında tam adı görürsünüz. Şu biçimde olmalıdır: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Olay merkezinin adı.
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Uyarı

    Bağlantı dizesi kullanarak bir olay hub'ından zaman uyumsuz olarak olay almaya yönelik diğer seçeneklere ilişkin örnekler için GitHub recv_with_checkpoint_store_async.py sayfasına bakın. Burada gösterilen desenler, parolasız olayları almak için de geçerlidir.

Alıcı uygulamasını çalıştırma

  1. Bir komut istemi başlatın.

  2. Aşağıdaki komutu çalıştırın ve Event Hubs ad alanındaki Azure Event Hubs Veri Sahibi rolüne ve Azure depolama hesabındaki Depolama Blob Verilerine Katkıda Bulunan rolüne eklenen hesabı kullanarak oturum açın.

    az login
    
  3. receive.py dosyasını içeren klasöre geçin ve aşağıdaki komutu çalıştırın:

    python recv.py
    

Gönderen uygulamasını çalıştırma

  1. Bir komut istemi başlatın.

  2. Aşağıdaki komutu çalıştırın ve Event Hubs ad alanındaki Azure Event Hubs Veri Sahibi rolüne ve Azure depolama hesabındaki Depolama Blob Verilerine Katkıda Bulunan rolüne eklenen hesabı kullanarak oturum açın.

    az login
    
  3. send.py içeren klasöre geçin ve şu komutu çalıştırın:

    python send.py
    

Alıcı penceresinde olay hub'ına gönderilen iletiler görüntülenmelidir.

Sorun giderme

Alıcı penceresinde olayları görmüyorsanız veya kod hata bildiriyorsa aşağıdaki sorun giderme ipuçlarını deneyin:

  • recy.py sonuçlarını görmüyorsanız send.py birkaç kez çalıştırın.

  • Parolasız kodu (kimlik bilgileriyle) kullanırken "coroutine" ile ilgili hatalar görürseniz, öğesinden içeri aktarmayı kullandığınızdan azure.identity.aioemin olun.

  • Parolasız oturum kodu ve kimlik bilgileriyle birlikte "Kapatılmamış istemci oturumu" görüyorsanız, işlem bittiğinde kimlik bilgisini kapattığınızdan emin olun. Daha fazla bilgi için bkz. Zaman uyumsuz kimlik bilgileri.

  • Depolamaya erişirken recv.py yetkilendirme hataları görürseniz Azure depolama hesabı ve blob kapsayıcısı oluşturma makalesindeki adımları izlediğinize ve Hizmet sorumlusuna Depolama Blob Verileri Katkıda Bulunanı rolünü atadığınızdan emin olun.

  • Farklı bölüm kimliklerine sahip olaylar alırsanız bu sonuç beklenir. Bölümler, uygulamaların tüketilmesinde gereken aşağı akış paralelliği ile ilgili bir veri düzenleme mekanizmasıdır. Bir olay hub'ındaki bölüm sayısı, sahip olmayı beklediğiniz eşzamanlı okuyucu sayısıyla doğrudan ilgilidir. Daha fazla bilgi için bkz. Bölümler hakkında daha fazla bilgi edinin.

Sonraki Adımlar

Bu hızlı başlangıçta olayları asenkron olarak gönderdiniz ve aldınız. Olayları zaman uyumlu olarak göndermeyi ve almayı öğrenmek için GitHub sync_samples sayfasına gidin.

Python örnekleri için Azure Event Hubs istemci kitaplığında daha fazla örnek ve gelişmiş senaryoyu keşfedin.