Rövid útmutató: Event Hubs-adatok rögzítése az Azure Storage-ban, és a Python (azure-eventhub) használatával történő olvasása

Az eseményközpontot úgy konfigurálhatja, hogy az eseményközpontba küldött adatok egy Azure Storage-fiókban vagy az 1. generációs Azure Data Lake Storage-ban vagy a Gen 2-ben lesznek rögzítve. Ez a cikk bemutatja, hogyan írhat Python-kódot események eseményközpontba való küldéséhez, és hogyan olvashatja be a rögzített adatokat az Azure Blob Storage-ból. A funkcióval kapcsolatos további információkért tekintse meg az Event Hubs Capture funkció áttekintését.

Ez a rövid útmutató az Azure Python SDK használatával mutatja be a Rögzítés funkciót. A sender.py alkalmazás szimulált környezeti telemetriát küld az eseményközpontoknak JSON formátumban. Az eseményközpont úgy van konfigurálva, hogy a Rögzítés funkcióval írja ezeket az adatokat a Blob Storage-ba kötegekben. A capturereader.py alkalmazás felolvassa ezeket a blobokat, és létrehoz egy hozzáfűző fájlt minden eszközhöz. Az alkalmazás ezután CSV-fájlokba írja az adatokat.

Ebben a rövid útmutatóban a következőket hajtja végre:

  • Hozzon létre egy Azure Blob Storage-fiókot és -tárolót az Azure Portalon.
  • Hozzon létre egy Event Hubs-névteret az Azure Portal használatával.
  • Hozzon létre egy eseményközpontot, amelyen engedélyezve van a Rögzítés funkció, és csatlakoztassa a tárfiókhoz.
  • Adatok küldése az eseményközpontba Egy Python-szkript használatával.
  • Fájlok olvasása és feldolgozása az Event Hubs Captureből egy másik Python-szkript használatával.

Előfeltételek

A Rögzítés funkció engedélyezése az eseményközponthoz

Engedélyezze a Rögzítés funkciót az eseményközponthoz. Ehhez kövesse az Event Hubs Capture engedélyezése az Azure Portal használatával című témakör utasításait. Válassza ki az előző lépésben létrehozott tárfiókot és blobtárolót. Válassza az Avro lehetőséget a kimeneti esemény szerializálási formátumához.

Python-szkript létrehozása események eseményközpontba való küldéséhez

Ebben a szakaszban létrehoz egy Python-szkriptet, amely 200 eseményt (10 eszközt * 20 eseményt) küld egy eseményközpontba. Ezek az események JSON formátumban küldött környezeti mintaolvasások.

  1. Nyissa meg a kedvenc Python-szerkesztőt, például a Visual Studio Code-ot.

  2. Hozzon létre egy sender.py nevű szkriptet.

  3. Illessze be a következő kódot a sender.py.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
        reading = {
                'id': dev, 
                'timestamp': str(datetime.datetime.utcnow()), 
                'uv': random.random(), 
                'temperature': random.randint(70, 100), 
                'humidity': random.randint(70, 100)
            }
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. Cserélje le a következő értékeket a szkriptekben:

    • Cserélje le EVENT HUBS NAMESPACE CONNECTION STRING az Event Hubs-névtér kapcsolati sztring.
    • Cserélje le EVENT HUB NAME az eseményközpont nevére.
  5. Futtassa a szkriptet, hogy eseményeket küldjön az eseményközpontba.

  6. Az Azure Portalon ellenőrizheti, hogy az eseményközpont megkapta-e az üzeneteket. Váltson Üzenetek nézetre a Metrikák szakaszban. Frissítse a lapot a diagram frissítéséhez. Eltarthat néhány másodpercig, hogy a lap megjelenítse az üzenetek fogadását.

    Verify that the event hub received the messages

Python-szkript létrehozása a Capture-fájlok olvasásához

Ebben a példában a rögzített adatokat az Azure Blob Storage tárolja. Az ebben a szakaszban található szkript beolvassa a rögzített adatfájlokat az Azure Storage-fiókból, és CSV-fájlokat hoz létre a könnyű megnyitáshoz és megtekintéshez. Az alkalmazás aktuális munkakönyvtárában 10 fájl látható. Ezek a fájlok tartalmazzák a 10 eszköz környezeti leolvasását.

  1. A Python-szerkesztőben hozzon létre egy capturereader.py nevű szkriptet. Ez a szkript beolvassa a rögzített fájlokat, és létrehoz egy fájlt minden eszközhöz, hogy csak az adott eszköz adatait írja.

  2. Illessze be a következő kódot capturereader.py.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. Cserélje le AZURE STORAGE CONNECTION STRING az Azure Storage-fiók kapcsolati sztring. A rövid útmutatóban létrehozott tároló neve a rögzítés. Ha más nevet használt a tárolóhoz, cserélje le a rögzítést a tároló nevére a tárfiókban.

A szkriptek futtatása

  1. Nyisson meg egy parancssort, amelyben a Python szerepel, majd futtassa ezeket a parancsokat a Python előfeltételcsomagjainak telepítéséhez:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    
  2. Módosítsa a könyvtárat arra a könyvtárra, ahová sender.py és capturereader.py mentette, és futtassa a következő parancsot:

    python sender.py
    

    Ez a parancs elindít egy új Python-folyamatot a feladó futtatásához.

  3. Várjon néhány percet a rögzítés futtatására, majd írja be a következő parancsot az eredeti parancsablakba:

    python capturereader.py
    

    Ez a rögzítési processzor a helyi könyvtár használatával tölti le az összes blobot a tárfiókból és a tárolóból. Feldolgozza a nem üres fájlokat, és CSV-fájlokként írja az eredményeket a helyi könyvtárba.

Következő lépések

Tekintse meg a Python-mintákat a GitHubon.