Mulai cepat: Mengambil data Azure Event Hubs di Azure Storage dan membacanya dengan menggunakan Python (azure-eventhub)

Anda dapat mengonfigurasi pusat aktivitas agar data yang dikirim ke pusat aktivitas ditangkap di akun penyimpanan Azure atau Azure Data Lake Storage Gen 1 atau Gen 2. Artikel ini memperlihatkan kepada Anda cara menulis kode Python untuk mengirim peristiwa ke pusat aktivitas dan membaca data yang ditangkap dari penyimpanan Azure Blob. Untuk informasi selengkapnya tentang fitur ini, lihat ringkasan fitur Capture Azure Event Hubs.

Mulai cepat ini menggunakan Azure Python SDK untuk mendemonstrasikan fitur Capture. Aplikasi sender.py mengirimkan simulasi telemetri lingkungan ke pusat aktivitas dalam format JSON. Pusat aktivitas dikonfigurasi untuk menggunakan fitur Capture untuk menulis data ini ke penyimpanan Blob dalam batch. Aplikasi capturereader.py membaca blob ini dan membuat file lampiran untuk setiap perangkat. Aplikasi kemudian menulis data ke dalam file CSV.

Dalam mulai cepat ini, Anda akan:

  • Membuat akun penyimpanan dan kontainer Azure Blob di portal Microsoft Azure.
  • Membuat ruang nama Azure Event Hubs menggunakan portal Microsoft Azure.
  • Memuat pusat aktivitas dengan fitur Capture diaktifkan dan terhubung ke akun penyimpanan Anda.
  • Mengirim data ke pusat aktivitas Anda dengan menggunakan skrip Python.
  • Membaca dan memproses file dari fitur Capture Azure Event Hubs dengan menggunakan skrip Python lain.

Prasyarat

Mengaktifkan fitur Capture untuk pusat aktivitas

Aktifkan fitur Capture untuk pusat aktivitas. Untuk melakukannya, ikuti instruksi di Mengaktifkan Capture Azure Event Hubs menggunakan portal Microsoft Azure. Pilih akun penyimpanan dan kontainer blob yang Anda buat di langkah sebelumnya. Pilih Avro untuk format serialisasi peristiwa Output.

Membuat skrip Python untuk mengirim peristiwa ke pusat aktivitas Anda

Di bagian ini, Anda membuat skrip Python yang mengirim 200 peristiwa (10 perangkat * 20 peristiwa) ke pusat aktivitas. Peristiwa ini adalah contoh pembacaan lingkungan yang dikirim dalam format JSON.

  1. Buka editor favorit Anda, seperti Visual Studio Code.

  2. Buat skrip bernama send.py.

  3. Tempelkan kode berikut ke send.py.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
        reading = {
                'id': dev, 
                'timestamp': str(datetime.datetime.utcnow()), 
                'uv': random.random(), 
                'temperature': random.randint(70, 100), 
                'humidity': random.randint(70, 100)
            }
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. Ganti nilai berikut dalam skrip:

    • Ganti EVENT HUBS NAMESPACE CONNECTION STRING dengan string koneksi ke ruang nama Azure Event Hubs.
    • Ganti EVENT HUB NAME dengan nama pusat aktivitas Anda.
  5. Jalankan skrip untuk mengirim peristiwa ke pusat aktivitas.

  6. Di portal Microsoft Azure, Anda dapat memverifikasi bahwa pusat aktivitas telah menerima pesan. Beralih ke tampilan Pesan di bagian Metrik. Refresh halaman untuk memperbarui bagan. Mungkin dibutuhkan waktu beberapa detik agar halaman menunjukkan bahwa pesan telah diterima.

    Verify that the event hub received the messages

Membuat skrip Python untuk membaca file Capture Anda

Dalam contoh ini, data yang ditangkap disimpan di penyimpanan Azure Blob. Skrip di bagian ini membaca file data yang ditangkap dari akun penyimpanan Azure Anda dan membuat file CSV agar Anda dapat dengan mudah membuka dan melihatnya. Anda melihat 10 file di direktori kerja aplikasi saat ini. File-file ini berisi pembacaan lingkungan untuk 10 perangkat.

  1. Di editor Python Anda, buat skrip bernama capturereader.py. Skrip ini membaca file yang ditangkap dan membuat file untuk setiap perangkat untuk menulis data hanya untuk perangkat tersebut.

  2. Tempelkan kode berikut ke capturereader.py.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. Ganti AZURE STORAGE CONNECTION STRING dengan string koneksi untuk akun penyimpanan Azure Anda. Nama kontainer yang Anda buat dalam mulai cepat ini adalah capture. Jika Anda menggunakan nama yang berbeda untuk kontainer, ganti capture dengan nama kontainer di akun penyimpanan.

Menjalankan skrip

  1. Buka prompt perintah yang berisi Python pada jalurnya, lalu jalankan perintah ini untuk menginstal paket prasyarat Python:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    
  2. Ubah direktori Anda ke direktori tempat Anda menyimpan sender.py dan capturereader.py, dan jalankan perintah ini:

    python sender.py
    

    Perintah ini memulai proses Python baru untuk menjalankan pengirim.

  3. Tunggu beberapa menit hingga penangkapan berjalan, lalu masukkan perintah berikut di jendela perintah asli Anda:

    python capturereader.py
    

    Prosesor penangkap ini menggunakan direktori lokal untuk mengunduh semua blob dari akun penyimpanan dan kontainer. Ini memproses file yang tidak kosong, dan menulis hasilnya sebagai file CSV ke direktori lokal.

Langkah berikutnya

Periksa sampel Python di GitHub.