Bagikan melalui


Menggunakan Azure Event Hubs sebagai sumber data Tabel Langsung Delta

Artikel ini menjelaskan cara menggunakan Tabel Langsung Delta untuk memproses pesan dari Azure Event Hubs. Anda tidak dapat menggunakan konektor Hub Peristiwa Streaming Terstruktur karena pustaka ini tidak tersedia sebagai bagian dari Databricks Runtime, dan Tabel Langsung Delta tidak memungkinkan Anda menggunakan pustaka JVM pihak ketiga.

Bagaimana Cara Menyambungkan Tabel Langsung Delta ke Azure Event Hubs?

Azure Event Hubs menyediakan titik akhir yang kompatibel dengan Apache Kafka yang dapat Anda gunakan dengan konektor Kafka Streaming Terstruktur, tersedia di Databricks Runtime, untuk memproses pesan dari Azure Event Hubs. Untuk informasi selengkapnya tentang Azure Event Hubs dan kompatibilitas Apache Kafka, lihat Menggunakan Azure Event Hubs dari aplikasi Apache Kafka.

Langkah-langkah berikut menjelaskan menyambungkan alur Tabel Langsung Delta ke instans Azure Event Hubs yang ada dan menggunakan peristiwa dari topik. Untuk menyelesaikan langkah-langkah ini, Anda memerlukan nilai koneksi Azure Event Hubs berikut:

  • Nama namespace layanan Azure Event Hubs.
  • Nama instans Pusat Aktivitas di namespace Layanan Azure Event Hubs.
  • Nama kebijakan akses bersama dan kunci kebijakan untuk Azure Event Hubs. Secara default, RootManageSharedAccessKey Kebijakan dibuat untuk setiap namespace Azure Event Hubs. Kebijakan ini memiliki manageizin , send dan listen . Jika alur Anda hanya membaca dari Azure Event Hubs, Databricks merekomendasikan untuk membuat kebijakan baru hanya dengan izin mendengarkan.

Untuk informasi selengkapnya tentang string koneksi Azure Event Hubs, lihat Mendapatkan string koneksi Azure Event Hubs.

Catatan

  • Azure Event Hubs menyediakan opsi OAuth 2.0 dan tanda tangan akses bersama (SAS) untuk mengotorisasi akses ke sumber daya aman Anda. Instruksi ini menggunakan autentikasi berbasis SAS.
  • Jika Anda mendapatkan string koneksi Azure Event Hubs dari portal Azure, itu mungkin tidak berisi EntityPath nilai . Nilai EntityPath hanya diperlukan saat menggunakan konektor Structured Streaming Event Hubs. Menggunakan Konektor Kafka Streaming Terstruktur hanya memerlukan nama topik.

Menyimpan kunci kebijakan dalam rahasia Azure Databricks

Karena kunci kebijakan adalah informasi sensitif, Databricks merekomendasikan untuk tidak mengodekan nilai dalam kode alur Anda. Sebagai gantinya, gunakan rahasia Azure Databricks untuk menyimpan dan mengelola akses ke kunci.

Contoh berikut menggunakan Databricks CLI untuk membuat cakupan rahasia dan menyimpan kunci dalam cakupan rahasia tersebut. Dalam kode alur Anda, gunakan dbutils.secrets.get() fungsi dengan scope-name dan shared-policy-name untuk mengambil nilai kunci.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Untuk informasi selengkapnya tentang rahasia Azure Databricks, lihat Manajemen rahasia.

Membuat buku catatan dan menambahkan kode alur untuk mengonsumsi peristiwa

Contoh berikut membaca peristiwa IoT dari topik, tetapi Anda dapat menyesuaikan contoh untuk persyaratan aplikasi Anda. Sebagai praktik terbaik, Databricks merekomendasikan penggunaan pengaturan alur Delta Live Tables untuk mengonfigurasi variabel aplikasi. Kode alur Anda kemudian menggunakan spark.conf.get() fungsi untuk mengambil nilai. Untuk informasi selengkapnya tentang menggunakan pengaturan alur untuk membuat parameter alur Anda, lihat Menggunakan parameter dengan alur Tabel Langsung Delta.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Membuat alur

Buat alur baru dengan pengaturan berikut, ganti nilai tempat penampung dengan nilai yang sesuai untuk lingkungan Anda.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Menggantikan

  • <container-name> dengan nama kontainer akun penyimpanan Azure.
  • <storage-account-name> dengan nama akun penyimpanan ADLS Gen2.
  • <eh-namespace> dengan nama namespace layanan Pusat Aktivitas Anda.
  • <eh-policy-name> dengan kunci cakupan rahasia untuk kunci kebijakan Azure Event Hubs.
  • <eventhub> dengan nama instans Azure Event Hubs Anda.
  • <secret-scope-name> dengan nama cakupan rahasia Azure Databricks yang berisi kunci kebijakan Azure Event Hubs.

Sebagai praktik terbaik, alur ini tidak menggunakan jalur penyimpanan DBFS default tetapi menggunakan akun penyimpanan Azure Data Lake Storage Gen2 (ADLS Gen2). Untuk informasi selengkapnya tentang mengonfigurasi autentikasi untuk akun penyimpanan ADLS Gen2, lihat Mengakses kredensial penyimpanan dengan aman dengan rahasia dalam alur.