Partilhar via


Usar os Hubs de Eventos do Azure como uma fonte de dados de pipeline

Este artigo explica como processar mensagens dos Hubs de Eventos do Azure em um pipeline. Não é possível usar o conector Hubs de Eventos de Streaming Estruturado porque essa biblioteca não está disponível como parte do Databricks Runtime e o Lakeflow Spark Declarative Pipelines não permite que você use bibliotecas JVM de terceiros.

Como um pipeline pode se conectar aos Hubs de Eventos do Azure?

Os Azure Event Hubs fornecem um ponto de extremidade compatível com o Apache Kafka que pode usar com o conector Kafka de Streaming Estruturado, disponível no Databricks Runtime, para processar mensagens dos Azure Event Hubs. Para obter mais informações sobre Hubs de Eventos do Azure e compatibilidade com Apache Kafka, consulte Utilizar os Hubs de Eventos do Azure a partir de aplicações do Apache Kafka.

As etapas a seguir descrevem a conexão de um pipeline a uma instância existente de Event Hubs e o consumo de eventos a partir de um tópico. Para concluir essas etapas, você precisa dos seguintes valores de conexão de Hubs de Eventos:

  • O nome do namespace dos Hubs de Eventos.
  • O nome da instância do Hub de Eventos no namespace Hubs de Eventos.
  • Um nome de política de acesso compartilhado e uma chave de política para Hubs de Eventos. Por padrão, uma política de RootManageSharedAccessKey é criada para cada namespace de Hubs de Eventos. Esta política tem permissões manage, send e listen. Se o pipeline ler apenas de Hubs de Eventos, o Databricks recomenda a criação de uma nova política apenas com permissão de escuta.

Para obter mais informações sobre a cadeia de ligação do Event Hubs, consulte Obter uma cadeia de ligação do Event Hubs.

Observação

  • Os Hubs de Eventos do Azure fornecem opções OAuth 2.0 e assinatura de acesso compartilhado (SAS) para autorizar o acesso aos seus recursos seguros. Estas instruções usam autenticação baseada em SAS.
  • Se obtiver a cadeia de conexão dos hubs de eventos do portal do Azure, ela pode não conter o valor EntityPath. O valor EntityPath é necessário apenas ao usar o conector de Hubs de Eventos para Streaming Estruturado. Usar o Structured Streaming Kafka Connector requer fornecer apenas o nome do tópico.

Armazenar a chave de política em um segredo do Azure Databricks

Como a chave de política é uma informação confidencial, o Databricks recomenda não codificar o valor no código do pipeline. Em vez disso, use os segredos do Azure Databricks para armazenar e gerenciar o acesso à chave.

O exemplo a seguir usa a CLI do Databricks para criar um escopo secreto e armazenar a chave nesse escopo secreto. No seu código de pipeline, use a função dbutils.secrets.get() com o scope-name e o shared-policy-name para recuperar o valor da chave.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Para obter mais informações sobre o gerenciamento de segredos do Azure Databricks, consulte .

Criar um pipeline e adicionar o código necessário para processar eventos

O exemplo a seguir lê eventos de IoT de um tópico, mas você pode adaptar o exemplo para os requisitos do seu aplicativo. Como prática recomendada, o Databricks recomenda o uso das configurações de pipeline para configurar variáveis de aplicativo. Em seguida, o código do pipeline usa a função spark.conf.get() para recuperar valores.

from pyspark import pipelines as dp
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dp.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dp.expect("valid_topic", "topic IS NOT NULL")
@dp.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Criar a linha de produção

Crie um novo pipeline com um arquivo de origem Python e insira o código acima.

O código faz referência a parâmetros configurados. Utilize a seguinte configuração JSON, substituindo os valores de marcadores de posição por valores apropriados para o seu ambiente (consulte a lista após o JSON). Você pode definir os parâmetros usando a interface do usuário de configurações ou editando as configurações JSON diretamente. Para obter mais informações sobre como usar as definições de pipeline para parametrizar o seu pipeline, consulte Usar parâmetros com pipelines.

Esse arquivo de configurações também define o local de armazenamento para uma conta de armazenamento do Azure Data Lake Storage (ADLS). Como prática recomendada, esse pipeline não usa o caminho de armazenamento DBFS padrão, mas usa uma conta de armazenamento ADLS. Para obter mais informações sobre como configurar a autenticação para uma conta de armazenamento ADLS, consulte Acessar credenciais de acesso ao armazenamento de forma segura utilizando segredos em um pipeline.

{
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  }
}

Substitua os seguintes espaços reservados:

  • <container-name> com o nome de um contêiner de conta de armazenamento do Azure.
  • <storage-account-name> com o nome de uma conta de armazenamento ADLS.
  • <eh-namespace> com o nome do seu namespace de Event Hubs.
  • <eh-policy-name> com a chave de escopo secreta para a política dos Hubs de Eventos.
  • <eventhub> com o nome da sua instância de Hub de Eventos.
  • <secret-scope-name> com o nome do escopo secreto do Azure Databricks que contém a chave de política dos Hubs de Eventos.