Szybki start: przechwytywanie danych usługi Event Hubs w usłudze Azure Storage i odczytywanie ich przy użyciu języka Python (azure-eventhub)

Centrum zdarzeń można skonfigurować tak, aby dane wysyłane do centrum zdarzeń zostały przechwycone na koncie usługi Azure Storage lub w usłudze Azure Data Lake Storage Gen 1 lub Gen 2. W tym artykule pokazano, jak napisać kod w języku Python w celu wysyłania zdarzeń do centrum zdarzeń i odczytywania przechwyconych danych z usługi Azure Blob Storage. Aby uzyskać więcej informacji na temat tej funkcji, zobacz Omówienie funkcji przechwytywania usługi Event Hubs.

W tym przewodniku Szybki start użyto zestawu SDK języka Python platformy Azure do zademonstrowania funkcji przechwytywania. Aplikacja sender.py wysyła symulowane dane telemetryczne środowiska do centrów zdarzeń w formacie JSON. Centrum zdarzeń jest skonfigurowane do używania funkcji przechwytywania do zapisywania tych danych w usłudze Blob Storage w partiach. Aplikacja capturereader.py odczytuje te obiekty blob i tworzy plik dołączania dla każdego urządzenia. Następnie aplikacja zapisuje dane w plikach CSV.

W ramach tego przewodnika Szybki start wykonasz następujące czynności:

  • Utwórz konto i kontener usługi Azure Blob Storage w witrynie Azure Portal.
  • Utwórz przestrzeń nazw usługi Event Hubs przy użyciu witryny Azure Portal.
  • Utwórz centrum zdarzeń z włączoną funkcją Przechwytywanie i połącz je z kontem magazynu.
  • Wysyłanie danych do centrum zdarzeń przy użyciu skryptu języka Python.
  • Odczytywanie i przetwarzanie plików z funkcji przechwytywania usługi Event Hubs przy użyciu innego skryptu języka Python.

Wymagania wstępne

Włączanie funkcji przechwytywania dla centrum zdarzeń

Włącz funkcję Przechwytywanie dla centrum zdarzeń. W tym celu postępuj zgodnie z instrukcjami w artykule Włączanie funkcji przechwytywania usługi Event Hubs przy użyciu witryny Azure Portal. Wybierz konto magazynu i kontener obiektów blob utworzony w poprzednim kroku. Wybierz pozycję Avro w polu Format serializacji zdarzeń wyjściowych.

Tworzenie skryptu języka Python w celu wysyłania zdarzeń do centrum zdarzeń

W tej sekcji utworzysz skrypt języka Python, który wysyła 200 zdarzeń (10 urządzeń * 20 zdarzeń) do centrum zdarzeń. Te zdarzenia to przykładowe odczyty środowiska wysyłane w formacie JSON.

  1. Otwórz ulubiony edytor języka Python, taki jak Visual Studio Code.

  2. Utwórz skrypt o nazwie sender.py.

  3. Wklej następujący kod do sender.py.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
        reading = {
                'id': dev, 
                'timestamp': str(datetime.datetime.utcnow()), 
                'uv': random.random(), 
                'temperature': random.randint(70, 100), 
                'humidity': random.randint(70, 100)
            }
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. Zastąp następujące wartości w skryptach:

    • Zastąp EVENT HUBS NAMESPACE CONNECTION STRING element parametry połączenia przestrzeni nazw usługi Event Hubs.
    • Zastąp EVENT HUB NAME ciąg nazwą centrum zdarzeń.
  5. Uruchom skrypt, aby wysyłać zdarzenia do centrum zdarzeń.

  6. W witrynie Azure Portal możesz sprawdzić, czy centrum zdarzeń odebrało komunikaty. Przejdź do widoku Komunikaty w sekcji Metryki . Odśwież stronę, aby zaktualizować wykres. Wyświetlenie komunikatów odebranych przez stronę może potrwać kilka sekund.

    Verify that the event hub received the messages

Tworzenie skryptu języka Python w celu odczytania plików przechwytywania

W tym przykładzie przechwycone dane są przechowywane w usłudze Azure Blob Storage. Skrypt w tej sekcji odczytuje przechwycone pliki danych z konta usługi Azure Storage i generuje pliki CSV w celu łatwego otwierania i wyświetlania. W bieżącym katalogu roboczym aplikacji zostanie wyświetlonych 10 plików. Te pliki zawierają odczyty środowiska dla 10 urządzeń.

  1. W edytorze języka Python utwórz skrypt o nazwie capturereader.py. Ten skrypt odczytuje przechwycone pliki i tworzy plik dla każdego urządzenia w celu zapisania danych tylko dla tego urządzenia.

  2. Wklej następujący kod do capturereader.py.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. Zastąp AZURE STORAGE CONNECTION STRING ciąg parametry połączenia dla konta usługi Azure Storage. Nazwa kontenera utworzonego w tym przewodniku Szybki start to przechwytywanie. Jeśli użyto innej nazwy kontenera, zastąp ciąg capture nazwą kontenera na koncie magazynu.

Uruchamianie skryptów

  1. Otwórz wiersz polecenia z językiem Python w swojej ścieżce, a następnie uruchom następujące polecenia, aby zainstalować pakiety wymagań wstępnych języka Python:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    
  2. Zmień katalog na katalog, w którym zapisano sender.py i capturereader.py, a następnie uruchom następujące polecenie:

    python sender.py
    

    To polecenie uruchamia nowy proces języka Python w celu uruchomienia nadawcy.

  3. Poczekaj kilka minut na uruchomienie przechwytywania, a następnie wprowadź następujące polecenie w oryginalnym oknie polecenia:

    python capturereader.py
    

    Ten procesor przechwytywania używa katalogu lokalnego do pobrania wszystkich obiektów blob z konta magazynu i kontenera. Przetwarza pliki, które nie są puste, i zapisuje wyniki jako pliki CSV w katalogu lokalnym.

Następne kroki

Zapoznaj się z przykładami języka Python w witrynie GitHub.