Szybki start: przechwytywanie danych usługi Event Hubs w usłudze Azure Storage i odczytywanie ich przy użyciu języka Python (azure-eventhub)
Centrum zdarzeń można skonfigurować tak, aby dane wysyłane do centrum zdarzeń zostały przechwycone na koncie usługi Azure Storage lub w usłudze Azure Data Lake Storage Gen 1 lub Gen 2. W tym artykule pokazano, jak napisać kod w języku Python w celu wysyłania zdarzeń do centrum zdarzeń i odczytywania przechwyconych danych z usługi Azure Blob Storage. Aby uzyskać więcej informacji na temat tej funkcji, zobacz Omówienie funkcji przechwytywania usługi Event Hubs.
W tym przewodniku Szybki start użyto zestawu SDK języka Python platformy Azure do zademonstrowania funkcji przechwytywania. Aplikacja sender.py wysyła symulowane dane telemetryczne środowiska do centrów zdarzeń w formacie JSON. Centrum zdarzeń jest skonfigurowane do używania funkcji przechwytywania do zapisywania tych danych w usłudze Blob Storage w partiach. Aplikacja capturereader.py odczytuje te obiekty blob i tworzy plik dołączania dla każdego urządzenia. Następnie aplikacja zapisuje dane w plikach CSV.
W ramach tego przewodnika Szybki start wykonasz następujące czynności:
- Utwórz konto i kontener usługi Azure Blob Storage w witrynie Azure Portal.
- Utwórz przestrzeń nazw usługi Event Hubs przy użyciu witryny Azure Portal.
- Utwórz centrum zdarzeń z włączoną funkcją Przechwytywanie i połącz je z kontem magazynu.
- Wysyłanie danych do centrum zdarzeń przy użyciu skryptu języka Python.
- Odczytywanie i przetwarzanie plików z funkcji przechwytywania usługi Event Hubs przy użyciu innego skryptu języka Python.
Wymagania wstępne
Środowisko Python w wersji 3.8 lub nowszej z zainstalowanym i zaktualizowanym programem pip.
Subskrypcja Azure. Jeśli nie masz subskrypcji, przed rozpoczęciem utwórz bezpłatne konto.
Aktywna przestrzeń nazw usługi Event Hubs i centrum zdarzeń. Utwórz przestrzeń nazw usługi Event Hubs i centrum zdarzeń w przestrzeni nazw. Zarejestruj nazwę przestrzeni nazw usługi Event Hubs, nazwę centrum zdarzeń i podstawowy klucz dostępu dla przestrzeni nazw. Aby uzyskać klucz dostępu, zobacz Pobieranie parametry połączenia usługi Event Hubs. Domyślna nazwa klucza to RootManageSharedAccessKey. W tym przewodniku Szybki start potrzebujesz tylko klucza podstawowego. Nie potrzebujesz parametry połączenia.
Konto usługi Azure Storage, kontener obiektów blob na koncie magazynu i parametry połączenia do konta magazynu. Jeśli nie masz tych elementów, wykonaj następujące czynności:
- Tworzenie konta usługi Azure Storage
- Tworzenie kontenera obiektów blob na koncie magazynu
- Pobieranie parametry połączenia do konta magazynu
Pamiętaj, aby zarejestrować parametry połączenia i nazwę kontenera do późniejszego użycia w tym przewodniku Szybki start.
Włączanie funkcji przechwytywania dla centrum zdarzeń
Włącz funkcję Przechwytywanie dla centrum zdarzeń. W tym celu postępuj zgodnie z instrukcjami w artykule Włączanie funkcji przechwytywania usługi Event Hubs przy użyciu witryny Azure Portal. Wybierz konto magazynu i kontener obiektów blob utworzony w poprzednim kroku. Wybierz pozycję Avro w polu Format serializacji zdarzeń wyjściowych.
Tworzenie skryptu języka Python w celu wysyłania zdarzeń do centrum zdarzeń
W tej sekcji utworzysz skrypt języka Python, który wysyła 200 zdarzeń (10 urządzeń * 20 zdarzeń) do centrum zdarzeń. Te zdarzenia to przykładowe odczyty środowiska wysyłane w formacie JSON.
Otwórz ulubiony edytor języka Python, taki jak Visual Studio Code.
Utwórz skrypt o nazwie sender.py.
Wklej następujący kod do sender.py.
import time import os import uuid import datetime import random import json from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices = [] for x in range(0, 10): devices.append(str(uuid.uuid4())) # Create a producer client to produce and publish events to the event hub. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME") for y in range(0,20): # For each device, produce 20 events. event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. for dev in devices: # Create a dummy reading. reading = { 'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100) } s = json.dumps(reading) # Convert the reading into a JSON string. event_data_batch.add(EventData(s)) # Add event data to the batch. producer.send_batch(event_data_batch) # Send the batch of events to the event hub. # Close the producer. producer.close()
Zastąp następujące wartości w skryptach:
- Zastąp
EVENT HUBS NAMESPACE CONNECTION STRING
element parametry połączenia przestrzeni nazw usługi Event Hubs. - Zastąp
EVENT HUB NAME
ciąg nazwą centrum zdarzeń.
- Zastąp
Uruchom skrypt, aby wysyłać zdarzenia do centrum zdarzeń.
W witrynie Azure Portal możesz sprawdzić, czy centrum zdarzeń odebrało komunikaty. Przejdź do widoku Komunikaty w sekcji Metryki . Odśwież stronę, aby zaktualizować wykres. Wyświetlenie komunikatów odebranych przez stronę może potrwać kilka sekund.
Tworzenie skryptu języka Python w celu odczytania plików przechwytywania
W tym przykładzie przechwycone dane są przechowywane w usłudze Azure Blob Storage. Skrypt w tej sekcji odczytuje przechwycone pliki danych z konta usługi Azure Storage i generuje pliki CSV w celu łatwego otwierania i wyświetlania. W bieżącym katalogu roboczym aplikacji zostanie wyświetlonych 10 plików. Te pliki zawierają odczyty środowiska dla 10 urządzeń.
W edytorze języka Python utwórz skrypt o nazwie capturereader.py. Ten skrypt odczytuje przechwycone pliki i tworzy plik dla każdego urządzenia w celu zapisania danych tylko dla tego urządzenia.
Wklej następujący kod do capturereader.py.
import os import string import json import uuid import avro.schema from azure.storage.blob import ContainerClient, BlobClient from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter def processBlob2(filename): reader = DataFileReader(open(filename, 'rb'), DatumReader()) dict = {} for reading in reader: parsed_json = json.loads(reading["Body"]) if not 'id' in parsed_json: return if not parsed_json['id'] in dict: list = [] dict[parsed_json['id']] = list else: list = dict[parsed_json['id']] list.append(parsed_json) reader.close() for device in dict.keys(): filename = os.getcwd() + '\\' + str(device) + '.csv' deviceFile = open(filename, "a") for r in dict[device]: deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n') def startProcessing(): print('Processor started using path: ' + os.getcwd()) # Create a blob container client. container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME") blob_list = container.list_blobs() # List all the blobs in the container. for blob in blob_list: # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files). if blob.size > 508: print('Downloaded a non empty blob: ' + blob.name) # Create a blob client for the blob. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) # Construct a file name based on the blob name. cleanName = str.replace(blob.name, '/', '_') cleanName = os.getcwd() + '\\' + cleanName with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file. processBlob2(cleanName) # Convert the file into a CSV file. os.remove(cleanName) # Remove the original downloaded file. # Delete the blob from the container after it's read. container.delete_blob(blob.name) startProcessing()
Zastąp
AZURE STORAGE CONNECTION STRING
ciąg parametry połączenia dla konta usługi Azure Storage. Nazwa kontenera utworzonego w tym przewodniku Szybki start to przechwytywanie. Jeśli użyto innej nazwy kontenera, zastąp ciąg capture nazwą kontenera na koncie magazynu.
Uruchamianie skryptów
Otwórz wiersz polecenia z językiem Python w swojej ścieżce, a następnie uruchom następujące polecenia, aby zainstalować pakiety wymagań wstępnych języka Python:
pip install azure-storage-blob pip install azure-eventhub pip install avro-python3
Zmień katalog na katalog, w którym zapisano sender.py i capturereader.py, a następnie uruchom następujące polecenie:
python sender.py
To polecenie uruchamia nowy proces języka Python w celu uruchomienia nadawcy.
Poczekaj kilka minut na uruchomienie przechwytywania, a następnie wprowadź następujące polecenie w oryginalnym oknie polecenia:
python capturereader.py
Ten procesor przechwytywania używa katalogu lokalnego do pobrania wszystkich obiektów blob z konta magazynu i kontenera. Przetwarza pliki, które nie są puste, i zapisuje wyniki jako pliki CSV w katalogu lokalnym.
Następne kroki
Zapoznaj się z przykładami języka Python w witrynie GitHub.