次の方法で共有


Azure Event Hubs を Lakeflow 宣言パイプライン データ ソースとして使用する

この記事では、Lakeflow 宣言パイプラインを使用して Azure Event Hubs からのメッセージを処理する方法について説明します。 このライブラリは Databricks ランタイムの一部として使用できず、Lakeflow 宣言パイプラインではサードパーティの JVM ライブラリを使用できないため構造化ストリーミング Event Hubs コネクタを使用することはできません。

Lakeflow 宣言パイプラインを Azure Event Hubs に接続するにはどうすればよいですか?

Azure Event Hubs には Apache Kafka と互換性のあるエンドポイントが用意されています。このエンドポイントは、Databricks Runtime で使用できる Structured Streaming Kafka コネクタを使用して、Azure Event Hubs からのメッセージを処理できます。 Azure Event Hubs と Apache Kafka の互換性の詳細については、「 Apache Kafka アプリケーションからの Azure Event Hubs の使用」を参照してください。

次の手順では、Lakeflow 宣言パイプラインを既存の Event Hubs インスタンスに接続し、トピックからイベントを使用する方法について説明します。 これらの手順を完了するには、次の Event Hubs 接続値が必要です。

  • Event Hubs 名前空間の名前。
  • Event Hubs 名前空間内の Event Hub インスタンスの名前。
  • Event Hubs の共有アクセス ポリシー名とポリシー キー。 既定では、 RootManageSharedAccessKey ポリシーは Event Hubs 名前空間ごとに作成されます。 このポリシーには、 managesend 、および listen のアクセス許可があります。 パイプラインが Event Hubs からのみ読み取る場合、Databricks はリッスンアクセス許可のみを持つ新しいポリシーを作成することをお勧めします。

Event Hubs 接続文字列の詳細については、「 Event Hubs 接続文字列の取得」を参照してください

  • Azure Event Hubs には、セキュリティで保護されたリソースへのアクセスを承認するための OAuth 2.0 と Shared Access Signature (SAS) の両方のオプションが用意されています。 これらの手順では、SAS ベースの認証を使用します。
  • Azure portal から Event Hubs 接続文字列を取得した場合、 EntityPath 値が含まれていない可能性があります。 EntityPath値は、Structured Streaming Event Hubs コネクタを使用する場合にのみ必要です。 構造化ストリーミング Kafka コネクタを使用するには、トピック名のみを指定する必要があります。

Azure Databricks シークレットにポリシー キーを格納する

ポリシー キーは機密情報であるため、Databricks ではパイプライン コード内の値をハードコーディングしないことをお勧めします。 代わりに、Azure Databricks シークレットを使用してキーへのアクセスを格納および管理します。

次の例では、Databricks CLI を使用してシークレット スコープを作成し、そのシークレット スコープにキーを格納します。 パイプライン コードで、dbutils.secrets.get()scope-nameshared-policy-name関数を使用してキー値を取得します。

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Azure Databricks シークレットの詳細については、「 シークレット管理」を参照してください。

ノートブックを作成し、イベントを使用するパイプライン コードを追加する

次の例では、トピックから IoT イベントを読み取りますが、アプリケーションの要件に合わせて例を調整できます。 ベスト プラクティスとして、Databricks では、Lakeflow 宣言型パイプラインの設定を使用してアプリケーション変数を構成することをお勧めします。 パイプライン コードでは、 spark.conf.get() 関数を使用して値を取得します。 パイプライン設定を使用してパイプラインをパラメーター化する方法の詳細については、「 Lakeflow 宣言型パイプラインでパラメーターを使用する」を参照してください。

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

パイプラインを作成する

プレースホルダーの値を環境に適した値に置き換えて、次の設定で新しいパイプラインを作成します。

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

取り替える

  • <container-name> Azure ストレージ アカウント コンテナーの名前を指定します。
  • <storage-account-name> ADLS ストレージ アカウントの名前を指定します。
  • <eh-namespace> をご利用の Event Hubs 名前空間の名前に置換します。
  • <eh-policy-name> を Event Hubs ポリシー キーのシークレット スコープ キーと共に使用します。
  • <eventhub> を Event Hubs インスタンスの名前で指定します。
  • <secret-scope-name> Event Hubs ポリシー キーを含む Azure Databricks シークレット スコープの名前を指定します。

ベスト プラクティスとして、このパイプラインは既定の DBFS ストレージ パスを使用せず、代わりに Azure Data Lake Storage (ADLS) ストレージ アカウントを使用します。 ADLS ストレージ アカウントの認証の構成の詳細については、「 パイプライン内のシークレットを使用してストレージ資格情報に安全にアクセスする」を参照してください。