Python kullanarak Event Hubs’daki olayları gönderme veya alma
Bu hızlı başlangıçta azure-eventhub Python paketini kullanarak olay hub'ına olay gönderme ve olay hub'ından olay alma adımları gösterilmektedir.
Önkoşullar
Azure Event Hubs'da yeniyseniz bu hızlı başlangıcı gerçekleştirmeden önce event hubs'a genel bakış konusuna bakın.
Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşullara ihtiyacınız vardır:
- Microsoft Azure aboneliği. Azure Event Hubs da dahil olmak üzere Azure hizmetlerini kullanmak için bir aboneliğe ihtiyacınız vardır. Mevcut bir Azure hesabınız yoksa ücretsiz deneme sürümüne kaydolun.
- Python 3.8 veya üzeri, pip yüklü ve güncelleştirilmiş.
- Visual Studio Code (önerilir) veya diğer tüm tümleşik geliştirme ortamları (IDE).
- Event Hubs ad alanı ve olay hub'ı oluşturun. İlk adım, Event Hubs ad alanı oluşturmak ve uygulamanızın olay hub'ı ile iletişim kurmak için ihtiyaç duyduğu yönetim kimlik bilgilerini almak için Azure portalını kullanmaktır. Ad alanı ve olay hub'ı oluşturmak için bu makaledeki yordamı izleyin.
Olayları göndermek için paketleri yükleme
Event Hubs için Python paketlerini yüklemek için, yolunda Python bulunan bir komut istemi açın. Dizini, örneklerinizi saklamak istediğiniz klasörle değiştirin.
pip install azure-eventhub
pip install azure-identity
pip install aiohttp
Azure'da uygulamanın kimliğini doğrulama
Bu hızlı başlangıçta Azure Event Hubs'a bağlanmanın iki yolu gösterilmektedir: parolasız ve bağlantı dizesi. İlk seçenek, Bir Event Hubs ad alanına bağlanmak için Microsoft Entra Id ve rol tabanlı erişim denetiminde (RBAC) güvenlik sorumlunuzu nasıl kullanacağınızı gösterir. Kodunuzda, yapılandırma dosyasında veya Azure Key Vault gibi güvenli bir depolama alanında sabit kodlanmış bağlantı dizesi olması konusunda endişelenmeniz gerekmez. İkinci seçenek, event hubs ad alanına bağlanmak için bir bağlantı dizesi nasıl kullanacağınızı gösterir. Azure'da yeniyseniz bağlantı dizesi seçeneğini daha kolay takip edebilirsiniz. Gerçek dünyadaki uygulamalarda ve üretim ortamlarında parolasız seçeneği kullanmanızı öneririz. Daha fazla bilgi için bkz . Kimlik doğrulaması ve yetkilendirme. Ayrıca, genel bakış sayfasında parolasız kimlik doğrulaması hakkında daha fazla bilgi edinebilirsiniz.
Microsoft Entra kullanıcınıza rol atama
Yerel olarak geliştirme yaparken, Azure Event Hubs'a bağlanan kullanıcı hesabının doğru izinlere sahip olduğundan emin olun. İleti gönderip almak için Azure Event Hubs Veri Sahibi rolüne ihtiyacınız vardır. Kendinize bu rolü atamak için Kullanıcı Erişimi Yöneticisi rolüne veya eylemi içeren Microsoft.Authorization/roleAssignments/write
başka bir role ihtiyacınız vardır. Azure portalı, Azure CLI veya Azure PowerShell'i kullanarak kullanıcıya Azure RBAC rolleri atayabilirsiniz. Kapsam genel bakış sayfasında rol atamaları için kullanılabilir kapsamlar hakkında daha fazla bilgi edinin.
Aşağıdaki örnekte rol, Azure Event Hubs kaynaklarına tam erişim sağlayan kullanıcı hesabınıza atanır Azure Event Hubs Data Owner
. Gerçek bir senaryoda, kullanıcılara yalnızca daha güvenli bir üretim ortamı için gereken minimum izinleri vermek için En Az Ayrıcalık İlkesi'ni izleyin.
Azure Event Hubs için Azure yerleşik rolleri
Azure Event Hubs için, Azure portalı ve Azure kaynak yönetimi API'sini kullanarak ad alanlarının ve tüm ilgili kaynakların yönetimi Azure RBAC modeli kullanılarak zaten korunur. Azure, Event Hubs ad alanına erişim yetkisi vermek için aşağıdaki Azure yerleşik rollerini sağlar:
- Azure Event Hubs Veri Sahibi: Event Hubs ad alanına ve varlıklarına (kuyruklar, konular, abonelikler ve filtreler) veri erişimini etkinleştirir
- Azure Event Hubs Veri Göndereni: Gönderene Event Hubs ad alanına ve varlıklarına erişim vermek için bu rolü kullanın.
- Azure Event Hubs Veri Alıcısı: Alıcıya Event Hubs ad alanına ve varlıklarına erişim vermek için bu rolü kullanın.
Özel bir rol oluşturmak istiyorsanız bkz . Event Hubs işlemleri için gereken haklar.
Önemli
Çoğu durumda rol atamasının Azure'a yayılması bir veya iki dakika sürer. Nadir durumlarda, sekiz dakikaya kadar sürebilir. Kodunuzu ilk kez çalıştırdığınızda kimlik doğrulama hataları alıyorsanız, birkaç dakika bekleyin ve yeniden deneyin.
Azure portalında ana arama çubuğunu veya sol gezintiyi kullanarak Event Hubs ad alanınızı bulun.
Genel bakış sayfasında, sol taraftaki menüden Erişim denetimi (IAM) öğesini seçin.
Erişim denetimi (IAM) sayfasında Rol atamaları sekmesini seçin.
Üst menüden + Ekle'yi seçin ve ardından açılan menüden Rol ataması ekle'yi seçin.
Sonuçları istenen role göre filtrelemek için arama kutusunu kullanın. Bu örnek için eşleşen sonucu arayın
Azure Event Hubs Data Owner
ve seçin. Ardından İleri'yi seçin.Erişim ata'nın altında Kullanıcı, grup veya hizmet sorumlusu'na tıklayın ve ardından + Üye seç'e tıklayın.
İletişim kutusunda Microsoft Entra kullanıcı adınızı (genellikle user@domain e-posta adresiniz) arayın ve iletişim kutusunun alt kısmındaki Seç'i seçin.
Son sayfaya gitmek için Gözden geçir + ata'yı seçin ve ardından işlemi tamamlamak için Gözden geçir + yeniden ata'yı seçin.
Olayları gönderme
Bu bölümde, daha önce oluşturduğunuz olay hub'ına olay göndermek için bir Python betiği oluşturun.
Visual Studio Code gibi sık kullandığınız Python düzenleyicisini açın.
send.py adlı bir betik oluşturun. Bu betik, daha önce oluşturduğunuz olay hub'ına bir dizi olay gönderir.
Aşağıdaki kodu send.py yapıştırın:
Kodda, aşağıdaki yer tutucuları değiştirmek için gerçek değerleri kullanın:
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
EVENT_HUB_NAME
import asyncio from azure.eventhub import EventData from azure.eventhub.aio import EventHubProducerClient from azure.identity.aio import DefaultAzureCredential EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def run(): # Create a producer client to send messages to the event hub. # Specify a credential that has correct role assigned to access # event hubs namespace and the event hub name. producer = EventHubProducerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, credential=credential, ) async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData("First event ")) event_data_batch.add(EventData("Second event")) event_data_batch.add(EventData("Third event")) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) # Close credential when no longer needed. await credential.close() asyncio.run(run())
Olayları alma
Bu hızlı başlangıçta denetim noktası deposu olarak Azure Blob depolama kullanılır. Denetim noktası deposu, denetim noktalarını (yani son okuma konumlarını) kalıcı hale getirmek için kullanılır.
Azure Blob Depolama denetim noktası deposu olarak kullanırken şu önerileri izleyin:
- Her tüketici grubu için ayrı bir kapsayıcı kullanın. Aynı depolama hesabını kullanabilirsiniz, ancak her grup için bir kapsayıcı kullanabilirsiniz.
- Kapsayıcıyı başka hiçbir şey için kullanmayın ve depolama hesabını başka hiçbir şey için kullanmayın.
- Depolama hesabı, dağıtılan uygulamanın bulunduğu bölgede olmalıdır. Uygulama şirket içindeyse, mümkün olan en yakın bölgeyi seçmeyi deneyin.
Azure portalının Depolama hesabı sayfasındaki Blob hizmeti bölümünde aşağıdaki ayarların devre dışı bırakıldığından emin olun.
- Hiyerarşik ad alanı
- Blob geçici silme
- Sürüm oluşturma
Azure depolama hesabı ve blob kapsayıcısı oluşturma
Aşağıdaki adımları uygulayarak içinde bir Azure depolama hesabı ve blob kapsayıcısı oluşturun:
- Azure Depolama hesabı oluşturma
- Blob kapsayıcısı oluşturma.
- Blob kapsayıcısının kimliğini doğrulama.
Daha sonra alma kodunda kullanmak üzere bağlantı dizesi ve kapsayıcı adını kaydettiğinizden emin olun.
Yerel olarak geliştirme yaparken blob verilerine erişen kullanıcı hesabının doğru izinlere sahip olduğundan emin olun. Blob verilerini okumak ve yazmak için Depolama Blob Verileri Katkıda Bulunanı gerekir. Kendinize bu rolü atamak için Kullanıcı Erişimi Yöneticisi rolüne veya Microsoft.Authorization/roleAssignments/write eylemini içeren başka bir role atanmalısınız. Azure portalı, Azure CLI veya Azure PowerShell'i kullanarak kullanıcıya Azure RBAC rolleri atayabilirsiniz. Rol atamaları için kullanılabilir kapsamlar hakkında daha fazla bilgiyi kapsam genel bakış sayfasından öğrenebilirsiniz.
Bu senaryoda, En Az Ayrıcalık İlkesi'ni izlemek için depolama hesabı kapsamındaki kullanıcı hesabınıza izinler atayacaksınız. Bu uygulama kullanıcılara yalnızca gereken minimum izinleri verir ve daha güvenli üretim ortamları oluşturur.
Aşağıdaki örnek, depolama hesabınızdaki blob verilerine hem okuma hem de yazma erişimi sağlayan Depolama Blob Verileri Katkıda Bulunanı rolünü kullanıcı hesabınıza atar.
Önemli
Çoğu durumda rol atamasının Azure'a yayılması bir veya iki dakika sürer, ancak nadir durumlarda sekiz dakikaya kadar sürebilir. Kodunuzu ilk kez çalıştırdığınızda kimlik doğrulama hataları alıyorsanız, birkaç dakika bekleyin ve yeniden deneyin.
Azure portalında ana arama çubuğunu veya sol gezintiyi kullanarak depolama hesabınızı bulun.
Depolama hesabına genel bakış sayfasında sol taraftaki menüden Erişim denetimi (IAM) öğesini seçin.
Erişim denetimi (IAM) sayfasında Rol atamaları sekmesini seçin.
Üst menüden + Ekle'yi seçin ve ardından açılan menüden Rol ataması ekle'yi seçin.
Sonuçları istenen role göre filtrelemek için arama kutusunu kullanın. Bu örnek için Depolama Blobu Veri Katkıda Bulunanı'nı arayın ve eşleşen sonucu seçin ve ardından İleri'yi seçin.
Erişim ata'nın altında Kullanıcı, grup veya hizmet sorumlusu'na tıklayın ve ardından + Üye seç'e tıklayın.
İletişim kutusunda Microsoft Entra kullanıcı adınızı (genellikle user@domain e-posta adresiniz) arayın ve iletişim kutusunun alt kısmındaki Seç'i seçin.
Son sayfaya gitmek için Gözden geçir + ata'yı seçin ve ardından işlemi tamamlamak için Gözden geçir + yeniden ata'yı seçin.
Olayları almak için paketleri yükleme
Alıcı taraf için bir veya daha fazla paket yüklemeniz gerekir. Bu hızlı başlangıçta, programın önceden okuduğu olayları okumaması için denetim noktalarını kalıcı hale getirmek için Azure Blob depolamayı kullanacaksınız. Blobda düzenli aralıklarla alınan iletilerde meta veri denetim noktaları gerçekleştirir. Bu yaklaşım, iletileri daha sonra kaldığınız yerden almaya devam etmenizi kolaylaştırır.
pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity
Olayları almak için Python betiği oluşturma
Bu bölümde, olay hub'ınızdan olayları almak için bir Python betiği oluşturacaksınız:
Visual Studio Code gibi sık kullandığınız Python düzenleyicisini açın.
recv.py adlı bir betik oluşturun.
Aşağıdaki kodu recv.py yapıştırın:
Kodda, aşağıdaki yer tutucuları değiştirmek için gerçek değerleri kullanın:
BLOB_STORAGE_ACCOUNT_URL
BLOB_CONTAINER_NAME
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
EVENT_HUB_NAME
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import ( BlobCheckpointStore, ) from azure.identity.aio import DefaultAzureCredential BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL" BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME" EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def on_event(partition_context, event): # Print the event data. print( 'Received the event: "{}" from the partition with ID: "{}"'.format( event.body_as_str(encoding="UTF-8"), partition_context.partition_id ) ) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event) async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore( blob_account_url=BLOB_STORAGE_ACCOUNT_URL, container_name=BLOB_CONTAINER_NAME, credential=credential, ) # Create a consumer client for the event hub. client = EventHubConsumerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, consumer_group="$Default", checkpoint_store=checkpoint_store, credential=credential, ) async with client: # Call the receive method. Read from the beginning of the partition # (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") # Close credential when no longer needed. await credential.close() if __name__ == "__main__": # Run the main method. asyncio.run(main())
Alıcı uygulamasını çalıştırma
Betiği çalıştırmak için, yolunda Python bulunan bir komut istemi açın ve şu komutu çalıştırın:
python recv.py
Gönderen uygulamasını çalıştırma
Betiği çalıştırmak için, yolunda Python bulunan bir komut istemi açın ve şu komutu çalıştırın:
python send.py
Alıcı penceresinde olay hub'ına gönderilen iletiler görüntülenmelidir.
Sorun giderme
Alıcı penceresinde olayları görmüyorsanız veya kod hata bildiriyorsa aşağıdaki sorun giderme ipuçlarını deneyin:
recy.py sonuçlarını görmüyorsanız send.py birkaç kez çalıştırın.
Parolasız kodu (kimlik bilgileriyle) kullanırken "coroutine" ile ilgili hatalar görürseniz, öğesinden içeri aktarmayı kullandığınızdan
azure.identity.aio
emin olun.Parolasız kodla (kimlik bilgileriyle) "Kapatılmamış istemci oturumu" görüyorsanız, işiniz bittiğinde kimlik bilgilerini kapattığınıza emin olun. Daha fazla bilgi için bkz . Zaman uyumsuz kimlik bilgileri.
Depolamaya erişirken recv.py yetkilendirme hataları görürseniz Azure depolama hesabı ve blob kapsayıcısı oluşturma makalesindeki adımları izlediğinizden ve Hizmet sorumlusuna Depolama Blob Verileri Katkıda Bulunanı rolünü atadığınızdan emin olun.
Farklı bölüm kimliklerine sahip olaylar alırsanız bu sonuç beklenir. Bölümler, tüketen uygulamalarda gerekli aşağı akış paralelliğiyle ilişkili bir veri düzenleme mekanizmasıdır. Bir olay hub'ındaki bölüm sayısı, sahip olmayı beklediğiniz eşzamanlı okuyucu sayısıyla doğrudan ilgilidir. Daha fazla bilgi için bkz . Bölümler hakkında daha fazla bilgi edinin.
Sonraki adımlar
Bu hızlı başlangıçta zaman uyumsuz olarak olaylar gönderip aldınız. Olayları zaman uyumlu olarak göndermeyi ve almayı öğrenmek için GitHub sync_samples sayfasına gidin.
GitHub'daki tüm örnekler (zaman uyumlu ve zaman uyumsuz) için Python örnekleri için Azure Event Hubs istemci kitaplığına gidin.