Usar os Hubs de Eventos do Azure como uma fonte de dados do Delta Live Tables
Este artigo explica como utilizar o Delta Live Tables para processar mensagens dos Hubs de Eventos do Azure. Não é possível usar o conector Hubs de Eventos de Streaming Estruturado porque essa biblioteca não está disponível como parte do Databricks Runtime e o Delta Live Tables não permite que você use bibliotecas JVM de terceiros.
Como o Delta Live Tables pode se conectar aos Hubs de Eventos do Azure?
Os Hubs de Eventos do Azure fornecem um ponto de extremidade compatível com o Apache Kafka que você pode usar com o conector Kafka de Streaming Estruturado, disponível no Databricks Runtime, para processar mensagens dos Hubs de Eventos do Azure. Para obter mais informações sobre Hubs de Eventos do Azure e compatibilidade com Apache Kafka, consulte Usar Hubs de Eventos do Azure de aplicativos Apache Kafka.
As etapas a seguir descrevem a conexão de um pipeline Delta Live Tables a uma instância existente de Hubs de Eventos e o consumo de eventos de um tópico. Para concluir essas etapas, você precisa dos seguintes valores de conexão de Hubs de Eventos:
- O nome do namespace Hubs de Eventos.
- O nome da instância do Hub de Eventos no namespace Hubs de Eventos.
- Um nome de política de acesso compartilhado e uma chave de política para Hubs de Eventos. Por padrão, uma
RootManageSharedAccessKey
política é criada para cada namespace de Hubs de Eventos. Esta política temmanage
esend
listen
permissões. Se o pipeline ler apenas de Hubs de Eventos, o Databricks recomenda a criação de uma nova política apenas com permissão de escuta.
Para obter mais informações sobre a cadeia de conexão de Hubs de Eventos, consulte Obter uma cadeia de conexão de Hubs de Eventos.
Nota
- Os Hubs de Eventos do Azure fornecem opções OAuth 2.0 e assinatura de acesso compartilhado (SAS) para autorizar o acesso aos seus recursos seguros. Estas instruções usam autenticação baseada em SAS.
- Se você obtiver a cadeia de conexão Hubs de Eventos do portal do Azure, ela pode não conter o
EntityPath
valor. OEntityPath
valor é necessário somente ao usar o conector Hubs de Eventos de Streaming Estruturado. Usar o Structured Streaming Kafka Connector requer fornecer apenas o nome do tópico.
Armazenar a chave de política em um segredo do Azure Databricks
Como a chave de política é uma informação confidencial, o Databricks recomenda não codificar o valor no código do pipeline. Em vez disso, use os segredos do Azure Databricks para armazenar e gerenciar o acesso à chave.
O exemplo a seguir usa a CLI do Databricks para criar um escopo secreto e armazenar a chave nesse escopo secreto. No código de pipeline, use a dbutils.secrets.get()
função com o scope-name
e shared-policy-name
para recuperar o valor da chave.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Para obter mais informações sobre segredos do Azure Databricks, consulte Gerenciamento de segredos.
Criar um bloco de anotações e adicionar o código de pipeline para consumir eventos
O exemplo a seguir lê eventos de IoT de um tópico, mas você pode adaptar o exemplo para os requisitos do seu aplicativo. Como prática recomendada, o Databricks recomenda o uso das configurações do pipeline Delta Live Tables para configurar variáveis de aplicativo. Em seguida, o código do pipeline usa a spark.conf.get()
função para recuperar valores. Para obter mais informações sobre como usar as configurações de pipeline para parametrizar seu pipeline, consulte Usar parâmetros com pipelines Delta Live Tables.
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Criar o pipeline
Crie um novo pipeline com as seguintes configurações, substituindo os valores de espaço reservado por valores apropriados para seu ambiente.
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
Replace
<container-name>
com o nome de um contêiner de conta de armazenamento do Azure.<storage-account-name>
com o nome de uma conta de armazenamento ADLS Gen2.<eh-namespace>
pelo nome do espaço de nomes dos Hubs de Eventos.<eh-policy-name>
com a chave de escopo secreta para a chave de política dos Hubs de Eventos.<eventhub>
com o nome da instância dos Hubs de Eventos.<secret-scope-name>
com o nome do escopo secreto do Azure Databricks que contém a chave de política dos Hubs de Eventos.
Como prática recomendada, esse pipeline não usa o caminho de armazenamento DBFS padrão, mas usa uma conta de armazenamento do Azure Data Lake Storage Gen2 (ADLS Gen2). Para obter mais informações sobre como configurar a autenticação para uma conta de armazenamento ADLS Gen2, consulte Acessar credenciais de armazenamento com segurança com segredos em um pipeline.