Dela via


Använda Azure Event Hubs som en datakälla för deklarativa pipelines i Lakeflow

Den här artikeln förklarar hur du använder Lakeflow Deklarativa Pipelines för att bearbeta meddelanden från Azure Event Hubs. Du kan inte använda Structured Streaming Event Hubs-kopplingen eftersom det här biblioteket inte är tillgängligt som en del av Databricks Runtime, och Lakeflow Deklarativa Pipelines tillåter inte användning av JVM-bibliotek från tredje part.

Hur kan Lakeflow Declarative Pipelines ansluta till Azure Event Hubs?

Azure Event Hubs tillhandahåller en slutpunkt som är kompatibel med Apache Kafka som du kan använda med Structured Streaming Kafka Connector, tillgänglig i Databricks Runtime, för att bearbeta meddelanden från Azure Event Hubs. Mer information om Azure Event Hubs och Apache Kafka-kompatibilitet finns i Använda Azure Event Hubs från Apache Kafka-program.

Följande steg beskriver hur du ansluter deklarativa pipelines för Lakeflow till en befintlig Event Hubs-instans och konsumerar händelser från en topic. För att slutföra de här stegen behöver du följande event hubs-anslutningsvärden:

  • Namnet på Event Hubs-namnområdet.
  • Namnet på Event Hub-instansen i Event Hubs-namnområdet.
  • Ett namn på en princip för delad åtkomst och en principnyckel för Event Hubs. Som standard skapas en RootManageSharedAccessKey princip för varje Event Hubs-namnområde. Den här principen har behörigheterna manage, send och listen. Om din pipeline bara läser från Event Hubs rekommenderar Databricks att du skapar en ny princip med endast lyssningsbehörighet.

Mer information om anslutningssträngen för Event Hubs finns i Hämta en anslutningssträng för Event Hubs.

Notera

  • Azure Event Hubs innehåller alternativ för både OAuth 2.0 och signatur för delad åtkomst (SAS) för att auktorisera åtkomst till dina säkra resurser. Dessa instruktioner använder SAS-baserad autentisering.
  • Om du får anslutningssträngen Event Hubs från Azure-portalen kanske den inte innehåller värdet EntityPath. Värdet EntityPath krävs endast när du använder Structured Streaming Event Hubs-anslutningen. Om du använder kafka-anslutningsappen för strukturerad direktuppspelning måste du endast ange ämnesnamnet.

Lagra principnyckeln i en Azure Databricks-hemlighet

Eftersom principnyckeln är känslig information rekommenderar Databricks att du inte hårdkodar värdet i pipelinekoden. Använd i stället Azure Databricks-hemligheter för att lagra och hantera åtkomst till nyckeln.

I följande exempel används Databricks CLI för att skapa ett hemligt omfång och lagra nyckeln i det hemliga omfånget. I pipelinekoden använder du funktionen dbutils.secrets.get() med scope-name och shared-policy-name för att hämta nyckelvärdet.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Mer information om Azure Databricks-hemligheter finns i Hemlighetshantering.

Skapa en notebook och lägg till pipelinekoden för att konsumera händelser

I följande exempel läss IoT-händelser från ett ämne, men du kan anpassa exemplet efter kraven för ditt program. Som en bästa praxis rekommenderar Databricks att du använder inställningarna för Lakeflow Declarative Pipelines för att konfigurera programvariabler. Sedan använder pipelinekoden funktionen spark.conf.get() för att hämta värden. Mer information om hur du använder pipelineinställningar för att parametrisera din pipeline finns i använda parametrar med Lakeflow deklarativa pipelines.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Skapa pipelinen

Skapa en ny pipeline med följande inställningar och ersätt platshållarvärdena med lämpliga värden för din miljö.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Ersätta

  • <container-name> med namnet på en Azure Storage-kontocontainer.
  • <storage-account-name> med namnet på ett ADLS-lagringskonto.
  • <eh-namespace> med namnet på ditt Event Hubs-namnområde.
  • <eh-policy-name> med den hemliga åtkomstnyckeln för policy för Event Hubs.
  • <eventhub> med namnet på din Event Hubs-instans.
  • <secret-scope-name> med namnet på Azure Databricks-hemlighetsomfånget som innehåller principnyckeln för Event Hubs.

Som bästa praxis använder den här pipelinen inte standardsökvägen för DBFS-lagring, utan använder i stället ett Azure Data Lake Storage-lagringskonto (ADLS). Mer information om hur du konfigurerar autentisering för ett ADLS-lagringskonto finns i Säker åtkomst till autentiseringsuppgifter för lagring med hemligheter i en pipeline.