Share via


Azure Event Hubs を Delta Live Tables データ ソースとして使用する

この記事では、Delta Live Tables を使用してAzure Event Hubs からのメッセージを処理する方法について説明します。 このライブラリは Databricks Runtime の一部として使用できず、Delta Live Tables ではサードパーティの JVM ライブラリを使用できないため、構造化ストリーミング Event Hubs コネクタを使用することはできません。

Delta Live Tables を Azure Event Hubs に接続する方法

Azure Event Hubs は、Apache Kafka と互換性のあるエンドポイントを提供します。これを Databricks Runtime で使用できる構造化ストリーミング Kafka コネクタとともに使用して、Azure Event Hubs からのメッセージを処理することができます。 Azure Event Hubs と Apache Kafka の互換性の詳細については、「Apache Kafka アプリケーションから Azure Event Hubs を使用する」を参照してください。

次の手順では、Delta Live Tables パイプラインを既存の Event Hubs インスタンスに接続し、トピックからイベントを使用する方法について説明します。 これらの手順を完了するには、次の Event Hubs 接続の値が必要です。

  • Event Hubs 名前空間の名前。
  • Event Hubs 名前空間内のイベント ハブ インスタンスの名前。
  • Event Hubs の共有アクセス ポリシー名とポリシー キー。 既定では、Event Hubs 名前空間ごとに RootManageSharedAccessKey ポリシーが作成されます。 このポリシーには、managesend、および listen のアクセス許可があります。 パイプラインが Event Hubs からのみ読み取る場合、Databricks ではリッスン アクセス許可のみを使用して新しいポリシーを作成することをお勧めします。

Event Hubs 接続文字列の詳細については、「Event Hubs の接続文字列の取得」を参照してください。

注意

  • Azure Event Hubsには、セキュリティで保護されたリソースへのアクセスを承認するための OAuth 2.0 と Shared Access Signature (SAS) の両方のオプションが用意されています。 これらの手順では、SAS ベースの認証を使用します。
  • Azure portal から Event Hubs 接続文字列を取得した場合、EntityPath 値が含まれていない可能性があります。 EntityPath 値は、構造化ストリーミング イベント ハブ コネクタを使用する場合にのみ必要です。 構造化ストリーミング Kafka コネクタを使用するには、トピック名のみを指定する必要があります。

ポリシー キーを Azure Databricks シークレットに格納する

ポリシー キーは機密情報であるため、Databricks ではパイプライン コード内の値をハードコーディングしないことをお勧めします。 代わりに、Azure Databricks シークレットを使用して、キーへのアクセスを格納および管理します。

次の例では、Databricks CLI を使用してシークレット スコープを作成し、そのシークレット スコープにキーを格納します。 パイプライン コードで、dbutils.secrets.get() 関数を scope-name および shared-policy-name と共に使用して、キー値を取得します。

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Azure Databricks シークレットの詳細については、「シークレットの管理」を参照してください。

ノートブックを作成し、イベントを使用するためのパイプライン コードを追加する

次の例では、トピックから IoT イベントを読み取りますが、アプリケーションの要件に合わせてこの例を調整できます。 ベスト プラクティスとして、Databricks では、Delta Live Tables パイプライン設定を使用してアプリケーション変数を構成することをお勧めします。 その後、パイプライン コードは、spark.conf.get() 関数を使用して値を取得します。 パイプライン設定を使用してパイプラインをパラメーター化する方法の詳細については、「パイプラインをパラメーター化する」を参照してください。

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

パイプラインを作成する

次の設定を使用して新しいパイプラインを作成し、プレースホルダーの値を環境に適した値に置き換えます。

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

  • <container-name> を Azure ストレージ アカウント コンテナーの名前に置き換えます。
  • <storage-account-name> を ADLS Gen2 ストレージ アカウントの名前に置き換えます。
  • <eh-namespace> をご利用の Event Hubs 名前空間の名前に置換します。
  • <eh-policy-name> を Event Hubs ポリシー キーのシークレット スコープ キーに置き換えます。
  • <eventhub> を Event Hubs インスタンスの名前に置き換えます。
  • <secret-scope-name> を、Event Hubs ポリシー キーが格納された Azure Databricks シークレット スコープの名前に置き換えます。

ベスト プラクティスとして、このパイプラインでは既定の DBFS ストレージ パスを使用せず、代わりに Azure Data Lake Storage Gen2 (ADLS Gen2) ストレージ アカウントを使用します。 ADLS Gen2 ストレージ アカウントの認証の構成の詳細については、「パイプラインでシークレットを使用してストレージの資格情報に安全にアクセスする」を参照してください。