Dela via


Snabbstart: Samla in Event Hubs-data i Azure Storage och läs dem med hjälp av Python (azure-eventhub)

Du kan konfigurera en händelsehubb så att data som skickas till en händelsehubb samlas in i ett Azure Storage-konto eller Azure Data Lake Storage Gen 1 eller Gen 2. Den här artikeln visar hur du skriver Python-kod för att skicka händelser till en händelsehubb och läsa insamlade data från Azure Blob Storage. Mer information om den här funktionen finns i Översikt över event hubs capture-funktionen.

Den här snabbstarten använder Azure Python SDK för att demonstrera avbildningsfunktionen. Appen sender.py skickar simulerad miljötelemetri till händelsehubbar i JSON-format. Händelsehubben är konfigurerad för att använda avbildningsfunktionen för att skriva dessa data till Blob Storage i batchar. Den capturereader.py appen läser dessa blobar och skapar en tilläggsfil för varje enhet. Appen skriver sedan data till CSV-filer.

I den här snabbstarten kommer du att göra följande:

  • Skapa ett Azure Blob Storage-konto och en container i Azure-portalen.
  • Skapa ett Event Hubs-namnområde med hjälp av Azure-portalen.
  • Skapa en händelsehubb med avbildningsfunktionen aktiverad och anslut den till ditt lagringskonto.
  • Skicka data till din händelsehubb med hjälp av ett Python-skript.
  • Läsa och bearbeta filer från Event Hubs Capture med hjälp av ett annat Python-skript.

Förutsättningar

Aktivera avbildningsfunktionen för händelsehubben

Aktivera avbildningsfunktionen för händelsehubben. Det gör du genom att följa anvisningarna i Aktivera Event Hubs Capture med hjälp av Azure-portalen. Välj lagringskontot och blobcontainern som du skapade i föregående steg. Välj Avro som format för serialisering av utdatahändelser.

Skapa ett Python-skript för att skicka händelser till din händelsehubb

I det här avsnittet skapar du ett Python-skript som skickar 200 händelser (10 enheter * 20 händelser) till en händelsehubb. Dessa händelser är ett exempel på miljöavläsning som skickas i JSON-format.

  1. Öppna din favoritredigerare för Python, till exempel Visual Studio Code.

  2. Skapa ett skript med namnet sender.py.

  3. Klistra in följande kod i sender.py.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
        reading = {
                'id': dev, 
                'timestamp': str(datetime.datetime.utcnow()), 
                'uv': random.random(), 
                'temperature': random.randint(70, 100), 
                'humidity': random.randint(70, 100)
            }
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. Ersätt följande värden i skripten:

    • Ersätt EVENT HUBS NAMESPACE CONNECTION STRING med anslutningssträng för Event Hubs-namnområdet.
    • Ersätt EVENT HUB NAME med namnet på din händelsehubb.
  5. Kör skriptet för att skicka händelser till händelsehubben.

  6. I Azure-portalen kan du kontrollera att händelsehubben har tagit emot meddelandena. Växla till vyn Meddelanden i avsnittet Mått . Uppdatera sidan för att uppdatera diagrammet. Det kan ta några sekunder för sidan att visa att meddelandena har tagits emot.

    Verify that the event hub received the messages

Skapa ett Python-skript för att läsa avbildningsfiler

I det här exemplet lagras insamlade data i Azure Blob Storage. Skriptet i det här avsnittet läser de insamlade datafilerna från ditt Azure Storage-konto och genererar CSV-filer som du enkelt kan öppna och visa. Du ser 10 filer i programmets aktuella arbetskatalog. Dessa filer innehåller miljöavläsningarna för de 10 enheterna.

  1. Skapa ett skript med namnet capturereader.py i Python-redigeraren. Det här skriptet läser de insamlade filerna och skapar en fil för varje enhet för att endast skriva data för den enheten.

  2. Klistra in följande kod i capturereader.py.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. Ersätt AZURE STORAGE CONNECTION STRING med anslutningssträng för ditt Azure Storage-konto. Namnet på containern som du skapade i den här snabbstarten är capture. Om du använde ett annat namn för containern ersätter du avbildningen med namnet på containern i lagringskontot.

Kör skripten

  1. Öppna en kommandotolk som har Python i sin sökväg och kör sedan dessa kommandon för att installera Python-nödvändiga paket:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    
  2. Ändra katalogen till katalogen där du sparade sender.py och capturereader.py och kör det här kommandot:

    python sender.py
    

    Det här kommandot startar en ny Python-process för att köra avsändaren.

  3. Vänta några minuter tills avbildningen körs och ange sedan följande kommando i det ursprungliga kommandofönstret:

    python capturereader.py
    

    Den här insamlingsprocessorn använder den lokala katalogen för att ladda ned alla blobar från lagringskontot och containern. Den bearbetar filer som inte är tomma och skriver resultatet som CSV-filer till den lokala katalogen.

Nästa steg

Kolla in Python-exempel på GitHub.