Uso de Azure Event Hubs como origen de datos de Delta Live Tables
En este artículo se explica cómo usar Delta Live Tables para procesar mensajes de Azure Event Hubs. No puede usar el conector de Event Hubs de Structured Streaming porque esta biblioteca no está disponible como parte de Databricks Runtime, y Delta Live Tables no permite usar bibliotecas de JVM de terceros.
¿Cómo se puede conectar Delta Live Tables a Azure Event Hubs?
Azure Event Hubs proporciona un punto de conexión compatible con Apache Kafka que puede usar con el conector de Kafka de Structured Streaming, disponible en Databricks Runtime, para procesar mensajes de Azure Event Hubs. Para más información sobre la compatibilidad de Azure Event Hubs y Apache Kafka, consulte Usar Azure Event Hubs desde aplicaciones de Apache Kafka.
En los pasos siguientes se describe cómo conectar una canalización de Delta Live Tables a una instancia de Event Hubs existente y consumir eventos de un tema. Para completar estos pasos, necesita los siguientes valores de conexión de Event Hubs:
- El nombre del espacio de nombres de Event Hubs.
- El nombre de la instancia de Event Hubs en el espacio de nombres de Event Hubs.
- Un nombre de directiva de acceso compartido y la clave de directiva para Event Hubs. De forma predeterminada, se crea una
RootManageSharedAccessKey
directiva para cada espacio de nombres de Event Hubs. Esta directiva tiene permisosmanage
,send
ylisten
. Si la canalización solo lee de Event Hubs, Databricks recomienda crear una nueva directiva solo con permiso de escucha.
Para más información sobre la cadena de conexión de Event Hubs, consulte Obtener una cadena de conexión de Event Hubs.
Nota:
- Azure Event Hubs proporciona opciones de OAuth 2.0 y de firma de acceso compartido (SAS) para autorizar el acceso a los recursos seguros. Estas instrucciones usan una autenticación basada en SAS.
- Si obtiene la cadena de conexión de Event Hubs del Azure Portal, es posible que no contenga el valor
EntityPath
. El valorEntityPath
solo es necesario cuando se usa el conector de Event Hubs de Structured Streaming. El uso del conector de Kafka de Structured Streaming requiere proporcionar solo el nombre del tema.
Almacenar la clave de directiva en un secreto de Azure Databricks
Dado que la clave de directiva es información confidencial, Databricks recomienda no codificar de forma rígida el valor en el código de canalización. En su lugar, use secretos de Azure Databricks para almacenar y administrar el acceso a la clave.
En el ejemplo siguiente se usa la CLI de Databricks para crear un ámbito secreto y almacenar la clave en ese ámbito secreto. En el código de canalización, use la función dbutils.secrets.get()
con scope-name
y shared-policy-name
para recuperar el valor de clave.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Para más información sobre los secretos de Azure Databricks, consulte Administración de secretos.
Crear un cuaderno y añadir el código de canalización para consumir eventos
En el ejemplo siguiente se leen eventos de IoT de un tema, pero puede adaptar el ejemplo para los requisitos de la aplicación. Como procedimiento recomendado, Databricks recomienda usar las opciones de canalización de Delta Live Tables para configurar variables de aplicación. A continuación, el código de canalización usa la función spark.conf.get()
para recuperar valores. Para más información sobre el uso de la configuración de canalización para parametrizar la canalización, consulte Uso de parámetros con canalizaciones de Delta Live Tables.
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Creación de la canalización
Cree una nueva canalización con la siguiente configuración, reemplazando los valores de marcador de posición por los valores adecuados para su entorno.
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
Replace
<container-name>
con el nombre de un contenedor de cuenta de almacenamiento de Azure.<storage-account-name>
con el nombre de una cuenta de almacenamiento de ADLS Gen2.<eh-namespace>
con el nombre del espacio de nombres de Event Hubs.<eh-policy-name>
con la clave del ámbito secreto para la clave de directiva de Event Hubs.<eventhub>
con el nombre de la instancia de Event Hubs.<secret-scope-name>
con el nombre del ámbito secreto de Azure Databricks que contiene la clave de directiva de Event Hubs.
Como procedimiento recomendado, esta canalización no usa la ruta de acceso de almacenamiento de DBFS predeterminada, sino que usa una cuenta de almacenamiento de Azure Data Lake Storage Gen2 (ADLS Gen2). Para más información sobre cómo configurar la autenticación para una cuenta de almacenamiento de ADLS Gen2, consulte Acceso seguro a las credenciales de almacenamiento con secretos en una canalización.