分享方式:


使用 Azure 事件中樞 做為 Delta Live Tables 數據源

本文說明如何使用差異即時資料表處理來自 Azure 事件中樞的訊息。 您無法使用結構化串流事件中 樞連接器 ,因為此連結庫無法作為 Databricks Runtime 的一部分使用,而 Delta 實時數據表 不允許您使用第三方 JVM 連結庫

Delta Live Tables 如何連線到 Azure 事件中樞?

Azure 事件中樞提供與 Apache Kafka 相容的端點,您可搭配使用 Databricks Runtime 中提供的結構化串流 Kafka 連接器,以處理來自 Azure 事件中樞的訊息。 如需 Azure 事件中樞 和 Apache Kafka 相容性的詳細資訊,請參閱從 Apache Kafka 應用程式使用 Azure 事件中樞。

下列步驟說明將 Delta Live Tables 管線連線至現有的事件中樞實例,以及從主題取用事件。 若要完成這些步驟,您需要下列事件中樞連線值:

  • 事件中樞命名空間的名稱。
  • 事件中樞命名空間中事件中樞實例的名稱。
  • 事件中樞的共用存取原則名稱和原則密鑰。 根據預設,會為每個事件中樞命名空間建立原則 RootManageSharedAccessKey 。 此原則具有 managesendlisten 許可權。 如果您的管線只會從事件中樞讀取,Databricks 建議只建立具有接聽許可權的新原則。

如需事件中樞 連接字串 的詳細資訊,請參閱取得事件中樞 連接字串

注意

  • Azure 事件中樞 同時提供 OAuth 2.0 和共用存取簽章 (SAS) 選項,以授權存取您的安全資源。 這些指示會使用SAS型驗證。
  • 如果您從 Azure 入口網站 取得事件中樞 連接字串,它可能不會包含 EntityPath 值。 EntityPath只有在使用結構化串流事件中樞連接器時,才需要此值。 使用結構化串流 Kafka 連接器只需要提供主題名稱。

將原則金鑰儲存在 Azure Databricks 秘密中

因為原則索引鍵是敏感性資訊,因此 Databricks 不建議硬式編碼管線程序代碼中的值。 請改用 Azure Databricks 秘密來儲存和管理密鑰的存取權。

下列範例會使用 Databricks CLI 來建立秘密範圍,並將密鑰儲存在該秘密範圍中。 在您的管線程序代碼中 dbutils.secrets.get() ,使用函式搭配 scope-nameshared-policy-name 來擷取索引鍵值。

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

如需 Azure Databricks 秘密的詳細資訊,請參閱 秘密管理

建立筆記本並新增管線程序代碼以取用事件

下列範例會從主題讀取IoT事件,但您可以針對應用程式的需求調整範例。 最佳做法是 Databricks 建議使用 Delta Live Tables 管線設定來設定應用程式變數。 管線程式代碼接著會使用 函 spark.conf.get() 式來擷取值。 如需使用管線設定來參數化管線的詳細資訊,請參閱 搭配 Delta Live Tables 管線使用參數。

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

建立管線

使用下列設定建立新的管線,並將佔位元值取代為您環境的適當值。

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Replace

  • <container-name> 具有 Azure 記憶體帳戶容器的名稱。
  • <storage-account-name> 具有ADLS Gen2記憶體帳戶的名稱。
  • 以事件中樞命名空間名稱取代 <eh-namespace>
  • <eh-policy-name> 具有事件中樞原則密鑰的秘密範圍金鑰。
  • <eventhub> 具有事件中樞實例的名稱。
  • <secret-scope-name> 包含事件中樞原則密鑰的 Azure Databricks 秘密範圍名稱。

最佳做法是,此管線不會使用預設 DBFS 記憶體路徑,而是改用 Azure Data Lake Storage Gen2 (ADLS Gen2) 記憶體帳戶。 如需設定 ADLS Gen2 記憶體帳戶驗證的詳細資訊,請參閱 在管線中使用秘密安全地存取記憶體認證。