Teilen über


Verwenden von Azure Event Hubs als Pipelinedatenquelle

In diesem Artikel wird erläutert, wie Nachrichten aus Azure Event Hubs in einer Pipeline verarbeitet werden. Sie können den Connector "Structured Streaming Event Hubs" nicht verwenden, da diese Bibliothek nicht als Teil der Databricks-Runtime verfügbar ist, und Lakeflow Spark Declarative Pipelines ermöglicht es Ihnen nicht, JVM-Bibliotheken von Drittanbietern zu verwenden.

Wie kann eine Pipeline eine Verbindung mit Azure Event Hubs herstellen?

Azure Event Hubs bietet einen Endpoint, der mit Apache Kafka kompatibel ist und den Sie mit dem Structured Streaming Kafka Connector verwenden können, der in Databricks Runtime verfügbar ist, um Nachrichten von Azure Event Hubs zu verarbeiten. Weitere Informationen zur Kompatibilität von Azure Event Hubs und Apache Kafka finden Sie unter Verwenden von Azure Event Hubs aus Apache Kafka-Anwendungen.

Die folgenden Schritte beschreiben das Verbinden einer Pipeline mit einer vorhandenen Event Hubs-Instanz und das Verwenden von Ereignissen aus einem Thema. Zum Ausführen dieser Schritte benötigen Sie die folgenden Event Hubs-Verbindungswerte:

  • Der Name des Event-Hubs-Namespaces.
  • Der Name der Event Hub-Instanz im Event Hubs-Namespace.
  • Ein freigegebener Zugriffsrichtlinienname und ein Richtlinienschlüssel für Event Hubs. Standardmäßig wird für jeden Event Hubs-Namespace eine RootManageSharedAccessKey Richtlinie erstellt. Diese Richtlinie hat manage, send und listen Berechtigungen. Wenn Ihre Pipeline nur aus Event Hubs liest, empfiehlt Databricks, eine neue Richtlinie mit ausschließlich Zuhörberechtigung zu erstellen.

Weitere Informationen zur Verbindungszeichenfolge für Event Hubs finden Sie unter Abrufen einer Event Hubs-Verbindungszeichenfolge.

Hinweis

  • Azure Event Hubs bietet sowohl OAuth 2.0- als auch SAS-Optionen (Shared Access Signature), um den Zugriff auf Ihre sicheren Ressourcen zu autorisieren. Diese Anweisungen verwenden sasbasierte Authentifizierung.
  • Wenn Sie die Event Hubs-Verbindungszeichenfolge aus dem Azure-Portal abrufen, enthält sie möglicherweise nicht den EntityPath Wert. Der EntityPath Wert ist nur erforderlich, wenn der Connector "Structured Streaming Event Hubs" verwendet wird. Die Verwendung des Structured Streaming Kafka Connector erfordert nur den Namen des Themas.

Speichern Sie den Richtlinienschlüssel in einem Azure Databricks-Geheimnis

Da der Richtlinienschlüssel vertrauliche Informationen ist, empfiehlt Databricks, den Wert in Ihrem Pipelinecode nicht hart zu codieren. Verwenden Sie stattdessen Azure Databricks-Geheimschlüssel, um den Zugriff auf den Schlüssel zu speichern und zu verwalten.

Im folgenden Beispiel wird die Databricks CLI verwendet, um einen geheimen Bereich zu erstellen und den Schlüssel in diesem geheimen Bereich zu speichern. Verwenden Sie in Ihrem Pipelinecode die dbutils.secrets.get() Funktion mit scope-name und shared-policy-name, um den Schlüsselwert abzurufen.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Weitere Informationen zu Azure Databricks-Geheimnissen finden Sie unter "Geheime Verwaltung".

Erstellen Sie eine Pipeline und fügen Sie Code zum Verarbeiten von Ereignissen hinzu

Im folgenden Beispiel werden IoT-Ereignisse aus einem Thema gelesen. Sie können das Beispiel jedoch an die Anforderungen Ihrer Anwendung anpassen. Als bewährte Methode empfiehlt Databricks die Verwendung der Pipelineeinstellungen zum Konfigurieren von Anwendungsvariablen. Der Pipelinecode verwendet dann die spark.conf.get() Funktion zum Abrufen von Werten.

from pyspark import pipelines as dp
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dp.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dp.expect("valid_topic", "topic IS NOT NULL")
@dp.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Erstellen der Pipeline

Erstellen Sie eine neue Pipeline mit einer Python-Quelldatei, und geben Sie den obigen Code ein.

Der Code verweist auf konfigurierte Parameter. Verwenden Sie die folgende JSON-Konfiguration, indem Sie die Platzhalterwerte durch geeignete Werte für Ihre Umgebung ersetzen (siehe Liste, die auf den JSON-Code folgt). Sie können die Parameter mithilfe der Einstellungs-UI oder durch direktes Bearbeiten der Einstellungs-JSON festlegen. Weitere Informationen zur Verwendung von Pipelineeinstellungen zum Parametrisieren der Pipeline finden Sie unter Verwenden von Parametern mit Pipelines.

Diese Einstellungsdatei definiert auch den Speicherort für ein Azure Data Lake Storage (ADLS)-Speicherkonto. Als bewährte Methode verwendet diese Pipeline nicht den standardmäßigen DBFS-Speicherpfad, sondern verwendet stattdessen ein ADLS-Speicherkonto. Weitere Informationen zum Konfigurieren der Authentifizierung für ein ADLS-Speicherkonto finden Sie unter Sicheres Zugreifen auf Speicheranmeldeinformationen mit geheimen Schlüsseln in einer Pipeline.

{
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  }
}

Ersetzen Sie die folgenden Platzhalter:

  • <container-name> mit dem Namen eines Azure-Speicherkonto-Containers.
  • <storage-account-name> mit dem Namen eines ADLS-Speicherkontos.
  • <eh-namespace> durch den Namen Ihres Event Hubs-Namespace.
  • <eh-policy-name> mit dem geheimen Bereichsschlüssel für den Event Hubs-Richtlinienschlüssel.
  • <eventhub> mit dem Namen Ihrer Event Hubs-Instanz.
  • <secret-scope-name> mit dem Namen des geheimen Azure Databricks-Bereichs, der den Event Hubs-Richtlinienschlüssel enthält.