استخدام Azure Event Hubs كمصدر بيانات Delta Live Tables

تشرح هذه المقالة كيفية استخدام Delta Live Tables لمعالجة الرسائل من Azure Event Hubs. لا يمكنك استخدام موصل Structured Streaming Event Hubs لأن هذه المكتبة غير متوفرة كجزء من Databricks Runtime، ولا تسمح لك Delta Live Tables باستخدام مكتبات JVM التابعة لجهة خارجية.

كيف يمكن ل Delta Live Tables الاتصال بمراكز أحداث Azure؟

توفر Azure Event Hubs نقطة نهاية متوافقة مع Apache Kafka التي يمكنك استخدامها مع موصل Structured Streaming Kafka، المتوفر في Databricks Runtime، لمعالجة الرسائل من Azure Event Hubs. لمزيد من المعلومات حول Azure Event Hubs وتوافق Apache Kafka، راجع استخدام مراكز أحداث Azure من تطبيقات Apache Kafka.

تصف الخطوات التالية توصيل مسار Delta Live Tables بمثيل Event Hubs موجود واستهلاك الأحداث من موضوع. لإكمال هذه الخطوات، تحتاج إلى قيم اتصال مراكز الأحداث التالية:

  • اسم مساحة اسم مراكز الأحداث.
  • اسم مثيل Event Hub في مساحة اسم مراكز الأحداث.
  • اسم نهج الوصول المشترك ومفتاح النهج لمراكز الأحداث. بشكل افتراضي، RootManageSharedAccessKey يتم إنشاء نهج لكل مساحة اسم لمراكز الأحداث. يحتوي هذا النهج على managesend أذونات وlisten. إذا كانت البنية الأساسية لبرنامج ربط العمليات التجارية الخاصة بك تقرأ فقط من مراكز الأحداث، فإن Databricks توصي بإنشاء نهج جديد بإذن الاستماع فقط.

لمزيد من المعلومات حول سلسلة الاتصال مراكز الأحداث، راجع الحصول على مراكز الأحداث سلسلة الاتصال.

إشعار

  • توفر Azure Event Hubs كلا من خيارات OAuth 2.0 وتوقيع الوصول المشترك (SAS) لتخويل الوصول إلى مواردك الآمنة. تستخدم هذه الإرشادات المصادقة المستندة إلى SAS.
  • إذا حصلت على "مراكز الأحداث" سلسلة الاتصال من مدخل Microsoft Azure، فقد لا تحتوي على EntityPath القيمة. EntityPath القيمة مطلوبة فقط عند استخدام موصل Structured Streaming Event Hubs. يتطلب استخدام موصل Structured Streaming Kafka توفير اسم الموضوع فقط.

تخزين مفتاح النهج في سر Azure Databricks

نظرا لأن مفتاح النهج هو معلومات حساسة، يوصي Databricks بعدم ترميز القيمة في التعليمات البرمجية للبنية الأساسية لبرنامج ربط العمليات التجارية. بدلا من ذلك، استخدم أسرار Azure Databricks لتخزين الوصول إلى المفتاح وإدارته.

يستخدم المثال التالي Databricks CLI لإنشاء نطاق سري وتخزين المفتاح في هذا النطاق السري. في التعليمات البرمجية للبنية الأساسية لبرنامج ربط العمليات التجارية الخاصة بك، استخدم الدالة dbutils.secrets.get() scope-name مع و shared-policy-name لاسترداد قيمة المفتاح.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

لمزيد من المعلومات حول أسرار Azure Databricks، راجع إدارة البيانات السرية.

إنشاء دفتر ملاحظات وإضافة التعليمات البرمجية للبنية الأساسية لبرنامج ربط العمليات التجارية لاستهلاك الأحداث

يقرأ المثال التالي أحداث IoT من موضوع، ولكن يمكنك تكييف المثال لمتطلبات التطبيق الخاص بك. كأفضل ممارسة، توصي Databricks باستخدام إعدادات البنية الأساسية لبرنامج ربط العمليات التجارية Delta Live Tables لتكوين متغيرات التطبيق. ثم تستخدم التعليمات البرمجية للبنية الأساسية لبرنامج ربط العمليات التجارية الدالة spark.conf.get() لاسترداد القيم. لمزيد من المعلومات حول استخدام إعدادات البنية الأساسية لبرنامج ربط العمليات التجارية لوضع معلمات للبنية الأساسية لبرنامج ربط العمليات التجارية الخاصة بك، راجع تحديد معلمات تعريفات مجموعة البيانات في Python أو SQL.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

إنشاء البنية الأساسية لبرنامج ربط العمليات التجارية

أنشئ مسارا جديدا بالإعدادات التالية، واستبدل قيم العنصر النائب بالقيم المناسبة للبيئة الخاصة بك.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

الاستبدال

  • <container-name> باسم حاوية حساب تخزين Azure.
  • <storage-account-name> باسم حساب تخزين ADLS Gen2.
  • <eh-namespace> باسم مساحة اسم Event Hub.
  • <eh-policy-name> مع مفتاح النطاق السري لمفتاح نهج مراكز الأحداث.
  • <eventhub> باسم مثيل مراكز الأحداث.
  • <secret-scope-name> باسم النطاق السري Azure Databricks الذي يحتوي على مفتاح نهج مراكز الأحداث.

كأفضل ممارسة، لا يستخدم هذا المسار مسار تخزين DBFS الافتراضي ولكنه يستخدم بدلا من ذلك حساب تخزين Azure Data Lake Storage Gen2 (ADLS Gen2). لمزيد من المعلومات حول تكوين المصادقة لحساب تخزين ADLS Gen2، راجع الوصول الآمن إلى بيانات اعتماد التخزين باستخدام البيانات السرية في البنية الأساسية لبرنامج ربط العمليات التجارية.