Использование Центры событий Azure в качестве источника данных Delta Live Tables

В этой статье объясняется, как использовать разностные динамические таблицы для обработки сообщений из Центры событий Azure. Соединитель Центров событий структурированной потоковой передачи нельзя использовать, так как эта библиотека недоступна в рамках среды выполнения Databricks, а разностные динамические таблицы не позволяют использовать сторонние библиотеки JVM.

Как разностные динамические таблицы могут подключаться к Центры событий Azure?

Центры событий Azure предоставляет конечную точку, совместимую с Apache Kafka, которую можно использовать с помощью Структурированный соединитель Stream Kafka, доступный в Databricks Runtime, для обработки сообщений из Центры событий Azure. Дополнительные сведения о совместимости Центры событий Azure и Apache Kafka см. в статье "Использование Центры событий Azure из приложений Apache Kafka".

Ниже описано, как подключить конвейер Delta Live Tables к существующему экземпляру Центров событий и использовать события из раздела. Чтобы выполнить следующие действия, вам потребуется следующее значение подключения Центров событий:

  • Имя пространства имен Центров событий.
  • Имя экземпляра Концентратора событий в пространстве имен Центров событий.
  • Имя политики общего доступа и ключ политики для Центров событий. По умолчанию RootManageSharedAccessKey политика создается для каждого пространства имен Центров событий. Эта политика имеет разрешения managesend и listen разрешения. Если конвейер считывается только из Центров событий, Databricks рекомендует создать новую политику только с разрешением прослушивания.

Дополнительные сведения о центрах событий строка подключения см. в статье "Получение центров событий" строка подключения.

Примечание.

  • Центры событий Azure предоставляет параметры подписи OAuth 2.0 и подписанного URL-адреса (SAS) для авторизации доступа к защищенным ресурсам. Эти инструкции используют проверку подлинности на основе SAS.
  • Если вы получите центры событий строка подключения из портал Azure, оно может не содержать EntityPath значения. Это EntityPath значение необходимо только при использовании соединителя концентратора событий структурированной потоковой передачи. Для использования структурированной потоковой передачи Kafka Подключение or требуется только имя раздела.

Хранение ключа политики в секрете Azure Databricks

Так как ключ политики является конфиденциальной информацией, Databricks рекомендует не жестко кодирования значения в коде конвейера. Вместо этого используйте секреты Azure Databricks для хранения и управления доступом к ключу.

В следующем примере интерфейс командной строки Databricks используется для создания секрета область и хранения ключа в этом секрете область. В коде конвейера используйте dbutils.secrets.get() функцию с scope-name значением ключа.shared-policy-name

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Дополнительные сведения о секретах Azure Databricks см. в разделе "Управление секретами".

Создание записной книжки и добавление кода конвейера для использования событий

В следующем примере считываются события Интернета вещей из раздела, но вы можете адаптировать пример для требований приложения. Рекомендуется использовать параметры конвейера Delta Live Tables для настройки переменных приложения в Databricks. Затем код конвейера использует функцию для получения значений spark.conf.get() . Дополнительные сведения об использовании параметров конвейера для параметризации конвейера см. в разделе Параметризация конвейеров.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Создание конвейера

Создайте конвейер со следующими параметрами, заменив значения заполнителей соответствующими значениями для вашей среды.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Replace

  • <container-name> с именем контейнера учетной записи хранения Azure.
  • <storage-account-name> с именем учетной записи хранения ADLS 2-го поколения.
  • <eh-namespace> замените именем пространства имен для Центров событий;
  • <eh-policy-name>с ключом секрета область для ключа политики Центров событий.
  • <eventhub> с именем экземпляра Центров событий.
  • <secret-scope-name>с именем секрета Azure Databricks область, содержащего ключ политики Центров событий.

В качестве рекомендации этот конвейер не использует путь к хранилищу DBFS по умолчанию, а вместо этого использует учетную запись хранения Azure Data Lake Storage 2-го поколения (ADLS 2-го поколения). Дополнительные сведения о настройке проверки подлинности для учетной записи хранения ADLS 2-го поколения см. в статье "Безопасный доступ к учетным данным хранилища" с секретами в конвейере.