Share via


Führen Sie Ihrem ersten strukturierten Streaming-Workload aus

Dieser Artikel enthält Codebeispiele und Erläuterungen grundlegender Konzepte, die zum Ausführen Ihrer ersten strukturierten Streaming-Abfragen auf Azure Databricks erforderlich sind. Sie können strukturiertes Streaming für nahezu Echtzeit- und inkrementelle Verarbeitungsworkloads verwenden.

Strukturiertes Streaming ist eine von mehreren Technologien, die Streamingtabellen in Delta Live-Tabellen nutzen. Databricks empfiehlt die Verwendung von Delta Live Tables für alle neuen ETL-, Aufnahme- und strukturierten Streaming-Workloads. Weitere Informationen finden Sie unter Was sind Delta Live-Tabellen?.

Hinweis

Während Delta Live Tables eine leicht geänderte Syntax zum Deklarieren von Streamingtabellen bereitstellt, gilt die allgemeine Syntax zum Konfigurieren von Streaminglesevorgängen und Transformationen für alle Streaming-Anwendungsfälle in Azure Databricks. Delta Live Tables vereinfacht auch das Streaming, indem Zustandsinformationen, Metadaten und zahlreiche Konfigurationen verwaltet werden.

Aus einem Datenstrom lesen

Sie können strukturiertes Streaming verwenden, um Daten aus unterstützten Datenquellen inkrementell aufzunehmen. Einige der am häufigsten verwendeten Datenquellen in Azure Databricks Structured Streaming-Workloads umfassen Folgendes:

  • Datendateien im Cloudobjektspeicher
  • Nachrichtenbusse und Warteschlangen
  • Deltasee

Databricks empfiehlt die Verwendung des Autoloaders für die Streamingerfassung aus dem Cloudobjektspeicher. Auto Loader unterstützt die meisten Dateiformate, die von strukturiertem Streaming unterstützt werden. Weitere Informationen finden Sie unter Automatisches Laden.

Jede Datenquelle bietet eine Reihe von Optionen, um anzugeben, wie Batches von Daten geladen werden. Während der Lesekonfiguration müssen die wichtigsten Optionen, die Sie möglicherweise festlegen, in die folgenden Kategorien unterteilt werden:

  • Optionen, welche die Datenquelle oder das Format angeben (z. B. Dateityp, Trennzeichen und Schema).
  • Optionen, die den Zugriff auf Quellsysteme konfigurieren (z. B. Porteinstellungen und Anmeldeinformationen).
  • Optionen, die angeben, wo in einem Datenstrom begonnen werden soll (z. B. Kafka-Offsets oder alle vorhandenen Dateien lesen).
  • Optionen, die steuern, wie viele Daten in jedem Batch verarbeitet werden (z. B. max. Offsets, Dateien oder Bytes pro Batch).

Verwenden des automatischen Ladens zum Lesen von Streamingdaten aus dem Objektspeicher

Im folgenden Beispiel wird das Laden von JSON-Daten mit Auto Loader veranschaulicht, das zum Kennzeichnen von Format und Optionen cloudFiles verwendet. Die schemaLocation-Option ermöglicht Schemarückschluss und -evolution. Fügen Sie den folgenden Code in eine Zelle des Databricks-Notebooks ein und führen Sie die Zelle aus, um einen Streaming-DataFrame mit dem Namen raw_df zu erstellen:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Wie bei anderen Lesevorgängen in Azure Databricks werden beim Konfigurieren von Streaminglesedaten keine Daten geladen. Sie müssen eine Aktion für die Daten auslösen, bevor der Datenstrom beginnt.

Hinweis

Das Aufrufen von display() für einen Streaming-DataFrame startet einen Streamingauftrag. Bei den meisten Anwendungsfällen für strukturiertes Streaming sollte die Aktion, die einen Datenstrom auslöst, die Daten in eine Senke schreiben. Weitere Informationen finden Sie unter Vorbereiten des strukturierten Streamingcodes für die Produktion.

Durchführen einer Streamingtransformation

Strukturiertes Streaming unterstützt die meisten Transformationen, die in Azure Databricks und Spark SQL verfügbar sind. Sie können sogar MLflow-Modelle als UDFs laden und Streamingvorhersagen als Transformation erstellen.

Im folgenden Codebeispiel wird eine einfache Transformation abgeschlossen, um die aufgenommenen JSON-Daten mit zusätzlichen Informationen mithilfe von Spark SQL-Funktionen zu erweitern:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

Das resultierende transformed_df enthält Abfrageanweisungen zum Laden und Transformieren jedes Datensatzes, sobald er in der Datenquelle eingeht.

Hinweis

Strukturiertes Streaming behandelt Datenquellen als ungebundene oder unendliche Datasets. Daher werden einige Transformationen in strukturierten Streaming-Workloads nicht unterstützt, da sie eine unendliche Anzahl von Elementen sortieren müssen.

Die meisten Aggregationen und viele Verknüpfungen erfordern die Verwaltung von Zustandsinformationen mit Wasserzeichen, Fenstern und Ausgabemodus. Siehe Anwenden von Wasserzeichen zum Steuern von Schwellenwerten für die Datenverarbeitung.

Schreiben in eine Datensenke

Eine Datensenke ist das Ziel eines Streaming-Schreibvorgangs. Zu den gängigen Senken, die in Azure Databricks-Streamingworkloads verwendet werden, gehören die folgenden:

  • Deltasee
  • Nachrichtenbusse und Warteschlangen
  • Schlüsselwertdatenbanken

Wie bei Datenquellen bieten die meisten Datensenken eine Reihe von Optionen, um zu steuern, wie Daten in das Zielsystem geschrieben werden. Während der Writer-Konfiguration müssen die wichtigsten Optionen, die Sie möglicherweise festlegen, in die folgenden Kategorien unterteilt werden:

  • Ausgabemodus (standardmäßig anfügen).
  • Ein Prüfpunktspeicherort (erforderlich für jeden Writer).
  • Triggerintervalle - siehe Konfigurieren von Triggerintervallen für strukturiertes Streaming.
  • Optionen, welche die Datensenke oder das Format angeben (z. B. Dateityp, Trennzeichen und Schema).
  • Optionen, die den Zugriff auf Zielsysteme konfigurieren (z. B. Porteinstellungen und Anmeldeinformationen).

Durchführen eines inkrementellen Batchschreibvorgangs in Delta Lake

Im folgenden Beispiel wird mithilfe eines angegebenen Dateipfads und Prüfpunkts in Delta Lake geschrieben.

Wichtig

Stellen Sie immer sicher, dass Sie für jeden von Ihnen konfigurierten Streaming-Writer einen eindeutigen Prüfpunktspeicherort angeben. Der Prüfpunkt stellt die eindeutige Identität für Ihren Datenstrom bereit, wobei alle verarbeiteten Datensätze und Statusinformationen nachverfolgt werden, die Ihrer Streamingabfrage zugeordnet sind.

Die availableNow-Einstellung für den Trigger weist strukturiertes Streaming an, alle zuvor unverarbeiteten Datensätze aus dem Quelldatensatz zu verarbeiten und dann herunterzufahren, sodass Sie den folgenden Code sicher ausführen können, ohne sich Gedanken über das Verlassen eines Datenstroms machen zu müssen:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

In diesem Beispiel werden keine neuen Datensätze in unserer Datenquelle eintreffen, sodass die Ausführung dieses Codes keine neuen Datensätze erfasst.

Warnung

Die Ausführung von strukturiertem Streaming kann die automatische Beendigung vom Herunterfahren von Computeressourcen verhindern. Um unerwartete Kosten zu vermeiden, müssen Sie Streamingabfragen beenden.

Vorbereiten des strukturierten Streamingcodes für die Produktion

Databricks empfiehlt die Verwendung von Delta Live Tables für die meisten strukturierten Streaming-Workloads. Die folgenden Empfehlungen bieten einen Ausgangspunkt für die Vorbereitung von strukturierten Streaming-Workloads für die Produktion:

  • Entfernen Sie unnötigen Code aus Notebooks, der Ergebnisse zurückgeben würde, z. B. display und count.
  • Führen Sie keine strukturierten Streaming-Workloads auf interaktiven Clustern aus. Planen Sie Datenströme immer als Aufträge.
  • Damit Streamingaufträge automatisch wiederhergestellt werden können, konfigurieren Sie Aufträge mit unendlichen Wiederholungen.
  • Verwenden Sie keine automatische Skalierung für Workloads mit strukturiertem Streaming.

Weitere Empfehlungen finden Sie unter Überlegungen zur Produktion für strukturiertes Streaming.

Lesen von Daten aus Delta Lake, Transformation und Schreiben in Delta Lake

Delta Lake verfügt über umfangreiche Unterstützung für die Arbeit mit strukturiertem Streaming sowohl als Quelle als auch als Senke. Weitere Informationen finden Sie unter Delta-Tabelle: Streaming für Lese- und Schreibvorgänge.

Das folgende Beispiel zeigt eine Beispielsyntax, um alle neuen Datensätze aus einer Delta-Tabelle inkrementell zu laden, sie mit einer Momentaufnahme einer anderen Delta-Tabelle zu verbinden und in eine Delta-Tabelle zu schreiben:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Sie müssen über die erforderlichen Berechtigungen verfügen, um Quelltabellen zu lesen und in Zieltabellen und den angegebenen Prüfpunktspeicherort zu schreiben. Füllen Sie alle Parameter aus, die mit gewinkelten Klammern (<>) gekennzeichnet sind, indem Sie die relevanten Werte für Ihre Datenquellen und Senken verwenden.

Hinweis

Delta Live Tables bietet eine vollständig deklarative Syntax zum Erstellen von Delta Lake-Pipelines, und verwaltet Eigenschaften wie Trigger und Prüfpunkte automatisch. Weitere Informationen finden Sie unter Was sind Delta Live-Tabellen?.

Lesen von Daten aus Kafka, Transformieren und Schreiben in Kafka

Apache Kafka und andere Nachrichtenbusse bieten mit die niedrigste Latenz, die für große Datasets verfügbar ist. Sie können Azure Databricks verwenden, um Transformationen auf Daten anzuwenden, die aus Kafka aufgenommen wurden, und dann Daten zurück in Kafka schreiben.

Hinweis

Das Schreiben von Daten in den Cloudobjektspeicher erhöht zusätzlichen Latenzaufwand. Wenn Sie Daten aus einem Nachrichtenbus in Delta Lake speichern möchten, aber die niedrigste Latenz für Streaming-Workloads erfordern, empfiehlt Databricks, separate Streamingaufträge für das Aufnehmen von Daten in das Lakehouse zu konfigurieren und nahezu Echtzeittransformationen für Downstream-Nachrichtenbussenken anzuwenden.

Das folgende Codebeispiel veranschaulicht ein einfaches Muster zum Anreichern von Daten aus Kafka durch Verknüpfen mit Daten in einer Delta-Tabelle und anschließendes Zurückschreiben in Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Sie müssen über die erforderlichen Berechtigungen für den Zugriff auf Ihren Kafka-Dienst verfügen. Füllen Sie alle Parameter aus, die mit gewinkelten Klammern (<>) gekennzeichnet sind, indem Sie die relevanten Werte für Ihre Datenquellen und Senken verwenden. Siehe Stream-Verarbeitung mit Apache Kafka und Azure Databricks.