Mulai cepat: Mengambil data Azure Event Hubs di Azure Storage dan membacanya dengan menggunakan Python (azure-eventhub)
Anda dapat mengonfigurasi pusat aktivitas agar data yang dikirim ke pusat aktivitas ditangkap di akun penyimpanan Azure atau Azure Data Lake Storage Gen 1 atau Gen 2. Artikel ini memperlihatkan kepada Anda cara menulis kode Python untuk mengirim peristiwa ke pusat aktivitas dan membaca data yang ditangkap dari penyimpanan Azure Blob. Untuk informasi selengkapnya tentang fitur ini, lihat ringkasan fitur Capture Azure Event Hubs.
Mulai cepat ini menggunakan Azure Python SDK untuk mendemonstrasikan fitur Capture. Aplikasi sender.py mengirimkan simulasi telemetri lingkungan ke pusat aktivitas dalam format JSON. Pusat aktivitas dikonfigurasi untuk menggunakan fitur Capture untuk menulis data ini ke penyimpanan Blob dalam batch. Aplikasi capturereader.py membaca blob ini dan membuat file lampiran untuk setiap perangkat. Aplikasi kemudian menulis data ke dalam file CSV.
Dalam mulai cepat ini, Anda akan:
- Membuat akun penyimpanan dan kontainer Azure Blob di portal Microsoft Azure.
- Membuat ruang nama Azure Event Hubs menggunakan portal Microsoft Azure.
- Memuat pusat aktivitas dengan fitur Capture diaktifkan dan terhubung ke akun penyimpanan Anda.
- Mengirim data ke pusat aktivitas Anda dengan menggunakan skrip Python.
- Membaca dan memproses file dari fitur Capture Azure Event Hubs dengan menggunakan skrip Python lain.
Prasyarat
Python 3.8 atau yang lebih baru, dengan pip diinstal dan diperbarui.
Langganan Azure. Jika Anda tidak memilikinya, buat akun gratis sebelum memulai.
Namespace layanan dan pusat aktivitas Azure Event Hubs. Membuat ruang nama Azure Event Hubs dan pusat aktivitas di namespace layanan. Catat nama ruang nama Azure Event Hubs, nama pusat aktivitas, dan kunci akses primer untuk namespace layanan. Untuk mendapatkan kunci akses, lihat Mendapatkan string koneksi Azure Event Hubs. Nama kunci default adalah: RootManageSharedAccessKey. Untuk mulai cepat ini, Anda hanya perlu kunci primer. Anda tidak perlu string koneksi.
Akun penyimpanan Azure, kontainer blob di akun penyimpanan, dan string koneksi ke akun penyimpanan. Jika Anda tidak memiliki item ini, lakukan langkah-langkah berikut:
- Membuat akun Azure Storage
- Membuat kontainer blob di akun penyimpanan
- Mendapatkan string koneksi ke akun penyimpanan
Pastikan untuk mencatat string koneksi dan nama kontainer untuk digunakan nanti dalam mulai cepat ini.
Mengaktifkan fitur Capture untuk pusat aktivitas
Aktifkan fitur Capture untuk pusat aktivitas. Untuk melakukannya, ikuti instruksi di Mengaktifkan Capture Azure Event Hubs menggunakan portal Microsoft Azure. Pilih akun penyimpanan dan kontainer blob yang Anda buat di langkah sebelumnya. Pilih Avro untuk format serialisasi peristiwa Output.
Membuat skrip Python untuk mengirim peristiwa ke pusat aktivitas Anda
Di bagian ini, Anda membuat skrip Python yang mengirim 200 peristiwa (10 perangkat * 20 peristiwa) ke pusat aktivitas. Peristiwa ini adalah contoh pembacaan lingkungan yang dikirim dalam format JSON.
Buka editor favorit Anda, seperti Visual Studio Code.
Buat skrip bernama send.py.
Tempelkan kode berikut ke send.py.
import time import os import uuid import datetime import random import json from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices = [] for x in range(0, 10): devices.append(str(uuid.uuid4())) # Create a producer client to produce and publish events to the event hub. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME") for y in range(0,20): # For each device, produce 20 events. event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. for dev in devices: # Create a dummy reading. reading = { 'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100) } s = json.dumps(reading) # Convert the reading into a JSON string. event_data_batch.add(EventData(s)) # Add event data to the batch. producer.send_batch(event_data_batch) # Send the batch of events to the event hub. # Close the producer. producer.close()
Ganti nilai berikut dalam skrip:
- Ganti
EVENT HUBS NAMESPACE CONNECTION STRING
dengan string koneksi ke ruang nama Azure Event Hubs. - Ganti
EVENT HUB NAME
dengan nama pusat aktivitas Anda.
- Ganti
Jalankan skrip untuk mengirim peristiwa ke pusat aktivitas.
Di portal Microsoft Azure, Anda dapat memverifikasi bahwa pusat aktivitas telah menerima pesan. Beralih ke tampilan Pesan di bagian Metrik. Refresh halaman untuk memperbarui bagan. Mungkin dibutuhkan waktu beberapa detik agar halaman menunjukkan bahwa pesan telah diterima.
Membuat skrip Python untuk membaca file Capture Anda
Dalam contoh ini, data yang ditangkap disimpan di penyimpanan Azure Blob. Skrip di bagian ini membaca file data yang ditangkap dari akun penyimpanan Azure Anda dan membuat file CSV agar Anda dapat dengan mudah membuka dan melihatnya. Anda melihat 10 file di direktori kerja aplikasi saat ini. File-file ini berisi pembacaan lingkungan untuk 10 perangkat.
Di editor Python Anda, buat skrip bernama capturereader.py. Skrip ini membaca file yang ditangkap dan membuat file untuk setiap perangkat untuk menulis data hanya untuk perangkat tersebut.
Tempelkan kode berikut ke capturereader.py.
import os import string import json import uuid import avro.schema from azure.storage.blob import ContainerClient, BlobClient from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter def processBlob2(filename): reader = DataFileReader(open(filename, 'rb'), DatumReader()) dict = {} for reading in reader: parsed_json = json.loads(reading["Body"]) if not 'id' in parsed_json: return if not parsed_json['id'] in dict: list = [] dict[parsed_json['id']] = list else: list = dict[parsed_json['id']] list.append(parsed_json) reader.close() for device in dict.keys(): filename = os.getcwd() + '\\' + str(device) + '.csv' deviceFile = open(filename, "a") for r in dict[device]: deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n') def startProcessing(): print('Processor started using path: ' + os.getcwd()) # Create a blob container client. container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME") blob_list = container.list_blobs() # List all the blobs in the container. for blob in blob_list: # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files). if blob.size > 508: print('Downloaded a non empty blob: ' + blob.name) # Create a blob client for the blob. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) # Construct a file name based on the blob name. cleanName = str.replace(blob.name, '/', '_') cleanName = os.getcwd() + '\\' + cleanName with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file. processBlob2(cleanName) # Convert the file into a CSV file. os.remove(cleanName) # Remove the original downloaded file. # Delete the blob from the container after it's read. container.delete_blob(blob.name) startProcessing()
Ganti
AZURE STORAGE CONNECTION STRING
dengan string koneksi untuk akun penyimpanan Azure Anda. Nama kontainer yang Anda buat dalam mulai cepat ini adalah capture. Jika Anda menggunakan nama yang berbeda untuk kontainer, ganti capture dengan nama kontainer di akun penyimpanan.
Menjalankan skrip
Buka prompt perintah yang berisi Python pada jalurnya, lalu jalankan perintah ini untuk menginstal paket prasyarat Python:
pip install azure-storage-blob pip install azure-eventhub pip install avro-python3
Ubah direktori Anda ke direktori tempat Anda menyimpan sender.py dan capturereader.py, dan jalankan perintah ini:
python sender.py
Perintah ini memulai proses Python baru untuk menjalankan pengirim.
Tunggu beberapa menit hingga penangkapan berjalan, lalu masukkan perintah berikut di jendela perintah asli Anda:
python capturereader.py
Prosesor penangkap ini menggunakan direktori lokal untuk mengunduh semua blob dari akun penyimpanan dan kontainer. Ini memproses file yang tidak kosong, dan menulis hasilnya sebagai file CSV ke direktori lokal.
Langkah berikutnya
Periksa sampel Python di GitHub.