Rövid útmutató: Event Hubs-adatok rögzítése az Azure Storage-ban, és a Python (azure-eventhub) használatával történő olvasása
Az eseményközpontot úgy konfigurálhatja, hogy az eseményközpontba küldött adatok egy Azure Storage-fiókban vagy az 1. generációs Azure Data Lake Storage-ban vagy a Gen 2-ben lesznek rögzítve. Ez a cikk bemutatja, hogyan írhat Python-kódot események eseményközpontba való küldéséhez, és hogyan olvashatja be a rögzített adatokat az Azure Blob Storage-ból. A funkcióval kapcsolatos további információkért tekintse meg az Event Hubs Capture funkció áttekintését.
Ez a rövid útmutató az Azure Python SDK használatával mutatja be a Rögzítés funkciót. A sender.py alkalmazás szimulált környezeti telemetriát küld az eseményközpontoknak JSON formátumban. Az eseményközpont úgy van konfigurálva, hogy a Rögzítés funkcióval írja ezeket az adatokat a Blob Storage-ba kötegekben. A capturereader.py alkalmazás felolvassa ezeket a blobokat, és létrehoz egy hozzáfűző fájlt minden eszközhöz. Az alkalmazás ezután CSV-fájlokba írja az adatokat.
Ebben a rövid útmutatóban a következőket hajtja végre:
- Hozzon létre egy Azure Blob Storage-fiókot és -tárolót az Azure Portalon.
- Hozzon létre egy Event Hubs-névteret az Azure Portal használatával.
- Hozzon létre egy eseményközpontot, amelyen engedélyezve van a Rögzítés funkció, és csatlakoztassa a tárfiókhoz.
- Adatok küldése az eseményközpontba Egy Python-szkript használatával.
- Fájlok olvasása és feldolgozása az Event Hubs Captureből egy másik Python-szkript használatával.
Előfeltételek
Python 3.8 vagy újabb, pip telepítve és frissítve.
Azure-előfizetés. Ha még nincs előfizetése, hozzon létre egy ingyenes fiókot, mielőtt hozzákezd.
Aktív Event Hubs-névtér és eseményközpont. Hozzon létre egy Event Hubs-névteret és egy eseményközpontot a névtérben. Jegyezze fel az Event Hubs-névtér nevét, az eseményközpont nevét és a névtér elsődleges hozzáférési kulcsát. A hozzáférési kulcs lekéréséhez lásd: Event Hubs-kapcsolati sztring. Az alapértelmezett kulcsnév a RootManageSharedAccessKey. Ehhez a rövid útmutatóhoz csak az elsődleges kulcsra van szüksége. Nincs szüksége a kapcsolati sztring.
Egy Azure Storage-fiók, egy blobtároló a tárfiókban, és egy kapcsolati sztring a tárfiókhoz. Ha nem rendelkezik ezekkel az elemekkel, hajtsa végre a következő lépéseket:
- Azure Storage-fiók létrehozása
- Blobtároló létrehozása a tárfiókban
- A kapcsolati sztring lekérése a tárfiókhoz
Ebben a rövid útmutatóban mindenképpen rögzítse a kapcsolati sztring és a tároló nevét.
A Rögzítés funkció engedélyezése az eseményközponthoz
Engedélyezze a Rögzítés funkciót az eseményközponthoz. Ehhez kövesse az Event Hubs Capture engedélyezése az Azure Portal használatával című témakör utasításait. Válassza ki az előző lépésben létrehozott tárfiókot és blobtárolót. Válassza az Avro lehetőséget a kimeneti esemény szerializálási formátumához.
Python-szkript létrehozása események eseményközpontba való küldéséhez
Ebben a szakaszban létrehoz egy Python-szkriptet, amely 200 eseményt (10 eszközt * 20 eseményt) küld egy eseményközpontba. Ezek az események JSON formátumban küldött környezeti mintaolvasások.
Nyissa meg a kedvenc Python-szerkesztőt, például a Visual Studio Code-ot.
Hozzon létre egy sender.py nevű szkriptet.
Illessze be a következő kódot a sender.py.
import time import os import uuid import datetime import random import json from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices = [] for x in range(0, 10): devices.append(str(uuid.uuid4())) # Create a producer client to produce and publish events to the event hub. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME") for y in range(0,20): # For each device, produce 20 events. event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. for dev in devices: # Create a dummy reading. reading = { 'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100) } s = json.dumps(reading) # Convert the reading into a JSON string. event_data_batch.add(EventData(s)) # Add event data to the batch. producer.send_batch(event_data_batch) # Send the batch of events to the event hub. # Close the producer. producer.close()
Cserélje le a következő értékeket a szkriptekben:
- Cserélje le
EVENT HUBS NAMESPACE CONNECTION STRING
az Event Hubs-névtér kapcsolati sztring. - Cserélje le
EVENT HUB NAME
az eseményközpont nevére.
- Cserélje le
Futtassa a szkriptet, hogy eseményeket küldjön az eseményközpontba.
Az Azure Portalon ellenőrizheti, hogy az eseményközpont megkapta-e az üzeneteket. Váltson Üzenetek nézetre a Metrikák szakaszban. Frissítse a lapot a diagram frissítéséhez. Eltarthat néhány másodpercig, hogy a lap megjelenítse az üzenetek fogadását.
Python-szkript létrehozása a Capture-fájlok olvasásához
Ebben a példában a rögzített adatokat az Azure Blob Storage tárolja. Az ebben a szakaszban található szkript beolvassa a rögzített adatfájlokat az Azure Storage-fiókból, és CSV-fájlokat hoz létre a könnyű megnyitáshoz és megtekintéshez. Az alkalmazás aktuális munkakönyvtárában 10 fájl látható. Ezek a fájlok tartalmazzák a 10 eszköz környezeti leolvasását.
A Python-szerkesztőben hozzon létre egy capturereader.py nevű szkriptet. Ez a szkript beolvassa a rögzített fájlokat, és létrehoz egy fájlt minden eszközhöz, hogy csak az adott eszköz adatait írja.
Illessze be a következő kódot capturereader.py.
import os import string import json import uuid import avro.schema from azure.storage.blob import ContainerClient, BlobClient from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter def processBlob2(filename): reader = DataFileReader(open(filename, 'rb'), DatumReader()) dict = {} for reading in reader: parsed_json = json.loads(reading["Body"]) if not 'id' in parsed_json: return if not parsed_json['id'] in dict: list = [] dict[parsed_json['id']] = list else: list = dict[parsed_json['id']] list.append(parsed_json) reader.close() for device in dict.keys(): filename = os.getcwd() + '\\' + str(device) + '.csv' deviceFile = open(filename, "a") for r in dict[device]: deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n') def startProcessing(): print('Processor started using path: ' + os.getcwd()) # Create a blob container client. container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME") blob_list = container.list_blobs() # List all the blobs in the container. for blob in blob_list: # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files). if blob.size > 508: print('Downloaded a non empty blob: ' + blob.name) # Create a blob client for the blob. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) # Construct a file name based on the blob name. cleanName = str.replace(blob.name, '/', '_') cleanName = os.getcwd() + '\\' + cleanName with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file. processBlob2(cleanName) # Convert the file into a CSV file. os.remove(cleanName) # Remove the original downloaded file. # Delete the blob from the container after it's read. container.delete_blob(blob.name) startProcessing()
Cserélje le
AZURE STORAGE CONNECTION STRING
az Azure Storage-fiók kapcsolati sztring. A rövid útmutatóban létrehozott tároló neve a rögzítés. Ha más nevet használt a tárolóhoz, cserélje le a rögzítést a tároló nevére a tárfiókban.
A szkriptek futtatása
Nyisson meg egy parancssort, amelyben a Python szerepel, majd futtassa ezeket a parancsokat a Python előfeltételcsomagjainak telepítéséhez:
pip install azure-storage-blob pip install azure-eventhub pip install avro-python3
Módosítsa a könyvtárat arra a könyvtárra, ahová sender.py és capturereader.py mentette, és futtassa a következő parancsot:
python sender.py
Ez a parancs elindít egy új Python-folyamatot a feladó futtatásához.
Várjon néhány percet a rögzítés futtatására, majd írja be a következő parancsot az eredeti parancsablakba:
python capturereader.py
Ez a rögzítési processzor a helyi könyvtár használatával tölti le az összes blobot a tárfiókból és a tárolóból. Feldolgozza a nem üres fájlokat, és CSV-fájlokként írja az eredményeket a helyi könyvtárba.
Következő lépések
Tekintse meg a Python-mintákat a GitHubon.