クイックスタート: Event Hubs データを Azure Storage にキャプチャし、Python を使用してそれを読み取る (azure-eventhub)
イベント ハブに送信されたデータが Azure ストレージ アカウントあるいは Azure Data Lake Storage Gen 1 または Gen 2 にキャプチャされるようにイベント ハブを構成できます。 この記事では、イベントをイベント ハブに送信し、キャプチャされたデータを Azure Blob Storage から読み取る Python コードを記述する方法を示します。 この機能の詳細については、Event Hubs Capture 機能の概要に関するページを参照してください。
このクイックスタートでは、Azure Python SDK を使用して、Capture の機能を試してみます。 sender.py アプリは、シミュレートされた環境のテレメトリを JSON 形式でイベント ハブに送信します。 イベント ハブは、Capture 機能を使用して、このデータを数回に分けて Blob Storage に書き込むように構成されています。 capturereader.py アプリは、こうした BLOB を読み取り、デバイスごとに追加ファイルを作成します。 その後、アプリはデータを CSV ファイルに書き込みます。
このクイック スタートでは次の作業を行います。
- Azure portal で Azure Blob Storage アカウントとコンテナーを作成します。
- Azure portal を使用して Event Hubs 名前空間を作成します。
- Capture 機能が有効になっているイベント ハブを作成し、それをストレージ アカウントに接続します。
- Python スクリプトを使用して、利用するイベント ハブへデータを送信します。
- 別の Python スクリプトを使用して、Event Hubs Capture からファイルを読み取り、処理します。
前提条件
Python 3.8 以降 (PIP がインストールおよび更新されている)。
Azure サブスクリプション。 お持ちでない場合は、開始する前に無料アカウントを作成してください。
アクティブな Event Hubs 名前空間とイベント ハブ。 Event Hubs 名前空間を作成し、その名前空間内にイベント ハブを作成します。 Event Hubs 名前空間の名前、イベント ハブの名前、および名前空間のプライマリ アクセス キーを記録しておいてください。 アクセス キーの取得については、「Event Hubs の接続文字列の取得」を参照してください。 既定のキー名は RootManageSharedAccessKey です。 このクイックスタートでは、プライマリ キーのみ必要となります。 接続文字列は必要ありません。
Azure Storage アカウント、ストレージ アカウントの BLOB コンテナー、ストレージ アカウントへの接続文字列。 これらの項目がない場合は、次の手順を実行します。
接続文字列とコンテナー名を記録しておいてください。後でこのクイックスタートの中で使用します。
イベント ハブの Capture 機能を有効化する
イベント ハブの Capture 機能の有効化。 Azure portal を使用して Event Hubs Capture を有効にする方法に関するページの手順に従ってください。 前の手順で作成したストレージ アカウントと BLOB コンテナーを選択します。 [出力イベントのシリアル化形式] として [Avro] を選択します。
イベントをイベント ハブに送信する Python スクリプトを作成する
このセクションでは、イベント ハブに 200 のイベント (10 デバイス * 20 イベント) を送信する Python スクリプトを作成します。 これらのイベントは、JSON 形式で送信されたサンプルの環境測定値です。
Visual Studio Codeなど、お使いの Python エディターを開きます。
sender.pyという名前のスクリプトを作成します。
次のコードを "sender.py" に貼り付けます。
import time import os import uuid import datetime import random import json from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices = [] for x in range(0, 10): devices.append(str(uuid.uuid4())) # Create a producer client to produce and publish events to the event hub. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME") for y in range(0,20): # For each device, produce 20 events. event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. for dev in devices: # Create a dummy reading. reading = { 'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100) } s = json.dumps(reading) # Convert the reading into a JSON string. event_data_batch.add(EventData(s)) # Add event data to the batch. producer.send_batch(event_data_batch) # Send the batch of events to the event hub. # Close the producer. producer.close()
スクリプト内の次の値を置き換えます。
EVENT HUBS NAMESPACE CONNECTION STRING
を Event Hubs 名前空間への接続文字列に置き換えます。EVENT HUB NAME
をイベント ハブの名前に置き換えます。
イベントをイベント ハブに送信するスクリプトを実行します。
Azure portal で、イベント ハブがメッセージを受信したことを確認できます。 [メトリック] セクションで [メッセージ] ビューに切り替えます。 ページを最新の情報に更新して、グラフを更新します。 メッセージが受信されたとページに表示されるまでに数秒かかることがあります。
Capture ファイルを読み取る Python スクリプトを作成する
この例では、キャプチャされたデータは Azure Blob Storage に格納されます。 このセクションのスクリプトは、Azure Storage アカウントからキャプチャ データ ファイルを読み取り、ユーザーが簡単に開いて確認できる CSV ファイルを生成します。 アプリケーションの現在の作業ディレクトリ内に 10 個のファイルが表示されます。 これらのファイルには、10 個のデバイスの環境測定値が含まれています。
Python エディターで、capturereader.py という名前のスクリプトを作成します。 このスクリプトはキャプチャされたファイルを読み取り、それぞれのデバイスにのみデータを書き込むようにデバイスごとのファイルを作成します。
次のコードを "capturereader.py" に貼り付けます。
import os import string import json import uuid import avro.schema from azure.storage.blob import ContainerClient, BlobClient from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter def processBlob2(filename): reader = DataFileReader(open(filename, 'rb'), DatumReader()) dict = {} for reading in reader: parsed_json = json.loads(reading["Body"]) if not 'id' in parsed_json: return if not parsed_json['id'] in dict: list = [] dict[parsed_json['id']] = list else: list = dict[parsed_json['id']] list.append(parsed_json) reader.close() for device in dict.keys(): filename = os.getcwd() + '\\' + str(device) + '.csv' deviceFile = open(filename, "a") for r in dict[device]: deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n') def startProcessing(): print('Processor started using path: ' + os.getcwd()) # Create a blob container client. container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME") blob_list = container.list_blobs() # List all the blobs in the container. for blob in blob_list: # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files). if blob.size > 508: print('Downloaded a non empty blob: ' + blob.name) # Create a blob client for the blob. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) # Construct a file name based on the blob name. cleanName = str.replace(blob.name, '/', '_') cleanName = os.getcwd() + '\\' + cleanName with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file. processBlob2(cleanName) # Convert the file into a CSV file. os.remove(cleanName) # Remove the original downloaded file. # Delete the blob from the container after it's read. container.delete_blob(blob.name) startProcessing()
AZURE STORAGE CONNECTION STRING
を Azure Storage アカウントへの接続文字列に置き換えます。 このクイックスタートで作成したコンテナーの名前は capture です。 コンテナーに別の名前を使用した場合は、capture をストレージ アカウント内のコンテナーの名前に置き換えます。
スクリプトの実行
Python をパス設定した状態でコマンド プロンプトを開き、これらのコマンドを実行してPython の前提条件となるパッケージをインストールします:
pip install azure-storage-blob pip install azure-eventhub pip install avro-python3
sender.py と capturereader.py の保存先ディレクトリに移動して、次のコマンドを実行します。
python sender.py
このコマンドにより、新しい Python プロセスが起動して sender を実行します。
capture が実行されるまで数分待って、元のコマンド ウィンドウに、次のコマンドを入力します。
python capturereader.py
このキャプチャ プロセッサでは、ローカル ディレクトリが使用され、ストレージ アカウントとコンテナーからすべての BLOB がダウンロードされます。 空ではないファイルがすべて処理され、結果が CSV ファイルとしてローカル ディレクトリに書き込まれます。
次のステップ
GitHub の Python サンプルをご覧ください。