Hızlı Başlangıç: Azure Depolama Event Hubs verilerini yakalama ve Python (azure-eventhub) kullanarak okuma
Olay hub'ını yapılandırarak olay hub'ına gönderilen verilerin bir Azure depolama hesabında veya Azure Data Lake Depolama 1. Nesil veya 2. Nesil'de yakalanmasını sağlayabilirsiniz. Bu makalede, bir olay hub'ına olay göndermek ve Azure Blob depolamadan yakalanan verileri okumak için Python kodu yazma işlemi gösterilmektedir. Bu özellik hakkında daha fazla bilgi için bkz . Event Hubs Yakalama özelliğine genel bakış.
Bu hızlı başlangıçta Yakalama özelliğini göstermek için Azure Python SDK'sı kullanılmaktadır. sender.py uygulaması JSON biçiminde olay hub'larına sanal ortam telemetrisi gönderir. Olay hub'ı, bu verileri toplu olarak Blob depolamaya yazmak için Yakalama özelliğini kullanacak şekilde yapılandırılmıştır. capturereader.py uygulaması bu blobları okur ve her cihaz için bir ekleme dosyası oluşturur. Uygulama daha sonra verileri CSV dosyalarına yazar.
Bu hızlı başlangıçta:
- Azure portalında bir Azure Blob depolama hesabı ve kapsayıcısı oluşturun.
- Azure portalını kullanarak bir Event Hubs ad alanı oluşturun.
- Yakalama özelliğinin etkinleştirildiği bir olay hub'ı oluşturun ve depolama hesabınıza bağlayın.
- Python betiği kullanarak olay hub'ınıza veri gönderin.
- Başka bir Python betiği kullanarak Event Hubs Capture'tan dosyaları okuyun ve işleyin.
Önkoşullar
Python 3.8 veya üzeri, pip yüklü ve güncelleştirilmiş.
Azure aboneliği. Aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.
Etkin bir Event Hubs ad alanı ve olay hub'ı. Bir Event Hubs ad alanı ve ad alanında bir olay hub'ı oluşturun. Event Hubs ad alanının adını, olay hub'ının adını ve ad alanının birincil erişim anahtarını kaydedin. Erişim anahtarını almak için bkz. Event Hubs bağlantı dizesi alma. Varsayılan anahtar adı RootManageSharedAccessKey'dir. Bu hızlı başlangıç için yalnızca birincil anahtara ihtiyacınız vardır. bağlantı dizesi ihtiyacın yok.
Bir Azure depolama hesabı, depolama hesabında bir blob kapsayıcısı ve depolama hesabına bir bağlantı dizesi. Bu öğelere sahip değilseniz aşağıdaki adımları uygulayın:
- Azure depolama hesabı oluşturma
- Depolama hesabında blob kapsayıcısı oluşturma
- Depolama hesabına bağlantı dizesi alma
Bu hızlı başlangıçta daha sonra kullanmak üzere bağlantı dizesi ve kapsayıcı adını kaydettiğinizden emin olun.
Olay hub'ı için Yakalama özelliğini etkinleştirme
Olay hub'ı için Yakalama özelliğini etkinleştirin. Bunu yapmak için Azure portalını kullanarak Event Hubs Yakalamayı Etkinleştirme başlığındaki yönergeleri izleyin. Önceki adımda oluşturduğunuz depolama hesabını ve blob kapsayıcısını seçin. Çıkış olayı serileştirme biçimi için Avro'ya tıklayın.
Olay hub'ınıza olay göndermek için Python betiği oluşturma
Bu bölümde, bir olay hub'ına 200 olay (10 cihaz * 20 olay) gönderen bir Python betiği oluşturacaksınız. Bu olaylar, JSON biçiminde gönderilen örnek bir ortam okumasıdır.
Visual Studio Code gibi sık kullandığınız Python düzenleyicisini açın.
sender.py adlı bir betik oluşturun.
Aşağıdaki kodu sender.py yapıştırın.
import time import os import uuid import datetime import random import json from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices = [] for x in range(0, 10): devices.append(str(uuid.uuid4())) # Create a producer client to produce and publish events to the event hub. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME") for y in range(0,20): # For each device, produce 20 events. event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. for dev in devices: # Create a dummy reading. reading = { 'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100) } s = json.dumps(reading) # Convert the reading into a JSON string. event_data_batch.add(EventData(s)) # Add event data to the batch. producer.send_batch(event_data_batch) # Send the batch of events to the event hub. # Close the producer. producer.close()
Betiklerde aşağıdaki değerleri değiştirin:
- değerini Event Hubs ad alanınızın bağlantı dizesi ile değiştirin
EVENT HUBS NAMESPACE CONNECTION STRING
. - değerini olay hub'ınızın adıyla değiştirin
EVENT HUB NAME
.
- değerini Event Hubs ad alanınızın bağlantı dizesi ile değiştirin
Olay hub'ına olay göndermek için betiği çalıştırın.
Azure portalında, olay hub'ında iletilerin alındığını doğrulayabilirsiniz. Ölçümler bölümünde İletiler görünümüne geçin. Grafiği güncelleştirmek için sayfayı yenileyin. Sayfanın iletilerin alındığını görüntülemesi birkaç saniye sürebilir.
Capture dosyalarınızı okumak için Python betiği oluşturma
Bu örnekte, yakalanan veriler Azure Blob depolamada depolanır. Bu bölümdeki betik, Azure depolama hesabınızdan yakalanan veri dosyalarını okur ve kolayca açıp görüntüleyebilmek için CSV dosyaları oluşturur. Uygulamanın geçerli çalışma dizininde 10 dosya görürsünüz. Bu dosyalar 10 cihaz için ortam okumalarını içerir.
Python düzenleyicinizde capturereader.py adlı bir betik oluşturun. Bu betik, yakalanan dosyaları okur ve her cihaz için yalnızca bu cihaz için veri yazmak üzere bir dosya oluşturur.
Aşağıdaki kodu capturereader.py yapıştırın.
import os import string import json import uuid import avro.schema from azure.storage.blob import ContainerClient, BlobClient from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter def processBlob2(filename): reader = DataFileReader(open(filename, 'rb'), DatumReader()) dict = {} for reading in reader: parsed_json = json.loads(reading["Body"]) if not 'id' in parsed_json: return if not parsed_json['id'] in dict: list = [] dict[parsed_json['id']] = list else: list = dict[parsed_json['id']] list.append(parsed_json) reader.close() for device in dict.keys(): filename = os.getcwd() + '\\' + str(device) + '.csv' deviceFile = open(filename, "a") for r in dict[device]: deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n') def startProcessing(): print('Processor started using path: ' + os.getcwd()) # Create a blob container client. container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME") blob_list = container.list_blobs() # List all the blobs in the container. for blob in blob_list: # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files). if blob.size > 508: print('Downloaded a non empty blob: ' + blob.name) # Create a blob client for the blob. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) # Construct a file name based on the blob name. cleanName = str.replace(blob.name, '/', '_') cleanName = os.getcwd() + '\\' + cleanName with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file. processBlob2(cleanName) # Convert the file into a CSV file. os.remove(cleanName) # Remove the original downloaded file. # Delete the blob from the container after it's read. container.delete_blob(blob.name) startProcessing()
değerini Azure depolama hesabınızın bağlantı dizesi ile değiştirin
AZURE STORAGE CONNECTION STRING
. Bu hızlı başlangıçta oluşturduğunuz kapsayıcının adı yakalamadır. Kapsayıcı için farklı bir ad kullandıysanız capture değerini depolama hesabındaki kapsayıcının adıyla değiştirin.
Betikleri çalıştırma
Yolunda Python bulunan bir komut istemi açın ve python önkoşul paketlerini yüklemek için şu komutları çalıştırın:
pip install azure-storage-blob pip install azure-eventhub pip install avro-python3
Dizininizi, sender.py ve capturereader.py kaydettiğiniz dizinle değiştirin ve şu komutu çalıştırın:
python sender.py
Bu komut, göndereni çalıştırmak için yeni bir Python işlemi başlatır.
Yakalamanın çalışması için birkaç dakika bekleyin ve ardından özgün komut pencerenize aşağıdaki komutu girin:
python capturereader.py
Bu yakalama işlemcisi, depolama hesabından ve kapsayıcısından tüm blobları indirmek için yerel dizini kullanır. Boş olmayan dosyaları işler ve sonuçları yerel dizine CSV dosyaları olarak yazar.
Sonraki adımlar
GitHub'da Python örneklerine göz atın.