Menggunakan Azure Event Hubs sebagai sumber data alur

Artikel ini menjelaskan cara memproses pesan dari Azure Event Hubs dalam pipeline. Anda tidak dapat menggunakan konektor Structured Streaming Event Hubs karena pustaka ini tidak tersedia sebagai bagian dari Databricks Runtime, dan Lakeflow Spark Declarative Pipelines tidak memungkinkan Anda menggunakan pustaka JVM pihak ketiga.

Bagaimana alur dapat tersambung ke Azure Event Hubs?

Azure Event Hubs menyediakan titik akhir yang kompatibel dengan Apache Kafka yang dapat Anda gunakan dengan konektor Structured Streaming Kafka, tersedia di Databricks Runtime, untuk memproses pesan dari Azure Event Hubs. Untuk informasi selengkapnya tentang Azure Event Hubs dan kompatibilitas Apache Kafka, lihat Menggunakan Azure Event Hubs dari aplikasi Apache Kafka.

Langkah-langkah berikut menjelaskan menyambungkan pipeline ke instans Azure Event Hubs yang ada dan menggunakan event dari topik. Untuk menyelesaikan langkah-langkah ini, Anda memerlukan nilai koneksi Azure Event Hubs berikut:

  • Nama namespace layanan Azure Event Hubs.
  • Nama instans Event Hub di namespace Event Hubs.
  • Nama kebijakan akses bersama dan kunci kebijakan untuk Event Hubs. Secara default, kebijakan RootManageSharedAccessKey dibuat untuk setiap namespace layanan Azure Event Hubs. Kebijakan ini memiliki izin manage, send, dan listen. Jika alur Anda hanya membaca dari Azure Event Hubs, Databricks merekomendasikan untuk membuat kebijakan baru hanya dengan izin mendengarkan.

Untuk informasi selengkapnya tentang string koneksi Azure Event Hubs, lihat Mendapatkan string koneksi Azure Event Hubs.

Nota

  • Azure Event Hubs menyediakan opsi OAuth 2.0 dan tanda tangan akses bersama (SAS) untuk mengotorisasi akses ke sumber daya aman Anda. Instruksi ini menggunakan autentikasi berbasis SAS.
  • Jika Anda mendapatkan string koneksi Azure Event Hubs dari portal Microsoft Azure, string tersebut mungkin tidak berisi nilai EntityPath. Nilai EntityPath hanya diperlukan saat menggunakan konektor Structured Streaming Event Hubs. Menggunakan Konektor Kafka Streaming Terstruktur hanya memerlukan nama topik.

Menyimpan kunci kebijakan dalam rahasia Azure Databricks

Karena kunci kebijakan adalah informasi sensitif, Databricks merekomendasikan untuk tidak mengodekan nilai dalam kode alur Anda. Sebagai gantinya, gunakan rahasia Azure Databricks untuk menyimpan dan mengelola akses ke kunci.

Contoh berikut menggunakan Databricks CLI untuk membuat cakupan rahasia dan menyimpan kunci dalam cakupan rahasia tersebut. Dalam kode alur Anda, gunakan fungsi dbutils.secrets.get() dengan scope-name dan shared-policy-name untuk mengambil nilai kunci.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Untuk informasi selengkapnya tentang rahasia Azure Databricks, lihat Manajemen rahasia.

Membuat alur dan menambahkan kode untuk mengonsumsi peristiwa

Contoh ini membaca peristiwa IoT dari topik, namun Anda bisa menyesuaikan contoh ini untuk persyaratan aplikasi Anda. Sebagai praktik terbaik, Databricks merekomendasikan penggunaan pengaturan alur untuk mengonfigurasi variabel aplikasi. Kode alur Anda kemudian menggunakan fungsi spark.conf.get() untuk mengambil nilai.

from pyspark import pipelines as dp
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dp.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dp.expect("valid_topic", "topic IS NOT NULL")
@dp.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Membuat alur kerja

Buat alur baru dengan file sumber Python, dan masukkan kode di atas.

Kode mereferensikan parameter yang dikonfigurasi. Gunakan konfigurasi JSON berikut, mengganti nilai penampung dengan nilai yang sesuai untuk lingkungan Anda (lihat daftar yang mengikuti JSON). Anda dapat mengatur parameter dengan menggunakan antarmuka pengguna pengaturan, atau dengan mengedit pengaturan JSON secara langsung. Untuk informasi selengkapnya tentang menggunakan pengaturan alur untuk membuat parameter alur Anda, lihat Menggunakan parameter dengan alur.

File pengaturan ini juga menentukan lokasi penyimpanan untuk akun penyimpanan Azure Data Lake Storage (ADLS). Sebagai praktik terbaik, alur ini tidak menggunakan jalur penyimpanan DBFS default tetapi menggunakan akun penyimpanan ADLS. Untuk informasi selengkapnya tentang mengonfigurasi autentikasi untuk akun penyimpanan ADLS, lihat Mengakses kredensial penyimpanan dengan aman dengan rahasia dalam alur.

{
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  }
}

Ganti tempat penampung berikut:

  • <container-name> sebagai nama dari kontainer pada akun penyimpanan Azure.
  • <storage-account-name> dengan nama akun penyimpanan ADLS.
  • <eh-namespace> dengan nama namespace Event Hubs Anda.
  • <eh-policy-name> dengan kunci lingkup rahasia untuk kunci kebijakan Event Hubs.
  • <eventhub> dengan nama instans Event Hubs Anda.
  • <secret-scope-name> dengan nama cakupan rahasia Azure Databricks yang berisi kunci kebijakan Event Hubs.