Condividi tramite


Usare Azure Event Hubs come origine dati per le pipeline dichiarative di Lakeflow

Questo articolo illustra come usare le pipeline dichiarative di Lakeflow per elaborare i messaggi da Hub eventi di Azure. Non è possibile usare il connettore Structured Streaming Event Hubs perché questa libreria non è disponibile come parte di Databricks Runtime e Le pipeline dichiarative di Lakeflow non consentono di usare librerie JVM di terze parti.

In che modo le pipeline dichiarative di Lakeflow possono connettersi a Hub eventi di Azure?

Hub eventi di Azure offre un endpoint compatibile con Apache Kafka che è possibile usare con il connettore Kafka structured streaming , disponibile in Databricks Runtime, per elaborare i messaggi da Hub eventi di Azure. Per altre informazioni sulla compatibilità di Hub eventi di Azure e Apache Kafka, vedere Usare Hub eventi di Azure dalle applicazioni Apache Kafka.

I passaggi seguenti descrivono la connessione delle Lakeflow Declarative Pipelines a un'istanza esistente di Event Hubs e l'utilizzo di eventi da un topic. Per completare questi passaggi, sono necessari i valori di connessione di Hub eventi seguenti:

  • Nome dello spazio dei nomi di Event Hubs.
  • Nome dell'istanza di Event Hub nello spazio dei nomi di Event Hubs.
  • Nome e chiave dei criteri di accesso condiviso per Event Hubs. Per impostazione predefinita, viene creato un criterio di RootManageSharedAccessKey per ogni spazio dei nomi di Event Hub. Questa politica dispone di autorizzazioni manage, send e listen. Se la pipeline legge solo da Event Hubs, Databricks consiglia di creare un nuovo criterio con la sola autorizzazione all'ascolto.

Per ulteriori informazioni sulla stringa di connessione degli Hub Eventi, vedi Consulta una stringa di connessione degli Hub Eventi.

Nota

  • Hub eventi di Azure offre opzioni di firma di accesso condiviso (SAS) e OAuth 2.0 per autorizzare l'accesso alle risorse protette. Queste istruzioni usano l'autenticazione basata su SAS.
  • Se si ottiene la stringa di connessione di Hub eventi dal portale di Azure, potrebbe non contenere il valore EntityPath. Il valore EntityPath è obbligatorio solo quando si usa il connettore di Event Hubs di streaming strutturato. L'uso del connettore Kafka Structured Streaming richiede solo il nome dell'argomento.

Archiviare la chiave dei criteri in un archivio segreto di Azure Databricks

Poiché la chiave della politica è un'informazione sensibile, Databricks consiglia di non hardcodificare il valore nel codice della pipeline. Usare invece i segreti di Azure Databricks per archiviare e gestire l'accesso alla chiave.

Nell'esempio seguente viene usata l'interfaccia della riga di comando di Databricks per creare un ambito segreto e archiviare la chiave nell'ambito del segreto. Nel codice della pipeline usare la funzione dbutils.secrets.get() con il scope-name e shared-policy-name per recuperare il valore della chiave.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Per altre informazioni sui segreti di Azure Databricks, vedere Secret management.

Crea un notebook e aggiungi il codice della pipeline per utilizzare gli eventi

L'esempio seguente legge gli eventi IoT da un argomento, ma è possibile adattare l'esempio per i requisiti dell'applicazione. Come procedura consigliata, Databricks consiglia di usare le impostazioni delle pipeline dichiarative di Lakeflow per configurare le variabili dell'applicazione. Il codice della pipeline usa quindi la funzione spark.conf.get() per recuperare i valori. Per altre informazioni sull'uso delle impostazioni della pipeline per parametrizzare la pipeline, vedere Usare i parametri con le pipeline dichiarative di Lakeflow.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Creare la pipeline

Creare una nuova pipeline con le impostazioni seguenti, sostituendo i valori segnaposto con i valori appropriati per l'ambiente in uso.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Sostituire

  • <container-name> come nome di un contenitore dell'account di archiviazione di Azure.
  • <storage-account-name> con il nome di un account di archiviazione ADLS.
  • <eh-namespace> con il nome dello spazio dei nomi Event Hubs.
  • <eh-policy-name> con la chiave dell'ambito segreto per la chiave della politica di Hub eventi.
  • <eventhub> con il nome dell'istanza di Event Hubs.
  • <secret-scope-name> con il nome dell'ambito segreto di Azure Databricks che contiene la chiave della policy di Event Hubs.

Come procedura consigliata, questa pipeline non usa il percorso di archiviazione DBFS predefinito, ma usa invece un account di archiviazione di Azure Data Lake Storage (ADLS). Per ulteriori informazioni sulla configurazione dell'autenticazione per un account di archiviazione ADLS, consultare Accedere in modo sicuro alle credenziali di archiviazione utilizzando segreti in una pipeline.