Teilen über


Inkrementelles Laden und Verarbeiten von Daten mit Lakeflow Spark Declarative Pipelines-Flüssen

Daten werden in Pipelines durch Abläufe verarbeitet. Jeder Fluss besteht aus einer Abfrage und in der Regel einem Ziel. Der Prozess verarbeitet die Abfrage entweder als Batch oder inkrementell als Datenstrom in das Ziel. Ein Flow befindet sich innerhalb einer Pipeline in den Lakeflow Spark Declarative Pipelines.

In der Regel werden Flüsse automatisch definiert, wenn Sie eine Abfrage in einer Pipeline erstellen, die ein Ziel aktualisiert, aber Sie können auch explizit zusätzliche Flüsse für eine komplexere Verarbeitung definieren, z. B. das Anfügen an ein einzelnes Ziel aus mehreren Quellen.

Aktualisierungen

Ein Fluss wird jedes Mal ausgeführt, wenn die definierende Pipeline aktualisiert wird. Der Fluss erstellt oder aktualisiert Tabellen mit den neuesten verfügbaren Daten. Abhängig vom Typ des Flusses und dem Status der Änderungen an den Daten kann die Aktualisierung eine inkrementelle Aktualisierung durchführen, die nur neue Datensätze verarbeitet oder eine vollständige Aktualisierung durchführt, die alle Datensätze aus der Datenquelle neu verarbeitet.

Erstellen eines Standardflusses

Wenn Sie eine Pipeline erstellen, definieren Sie in der Regel eine Tabelle oder eine Ansicht zusammen mit der Abfrage, die sie unterstützt. In dieser SQL-Abfrage erstellen Sie beispielsweise eine Streamingtabelle namens customers_silver, indem Sie aus der Tabelle customers_bronze lesen.

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Sie können auch die gleiche Streamingtabelle in Python erstellen. In Python verwenden Sie Pipelines, indem Sie eine Abfragefunktion erstellen, die einen Datenrahmen zurückgibt, mit Dekoratoren, um lakeflow Spark Declarative Pipelines-Funktionalität hinzuzufügen:

from pyspark import pipelines as dp

@dp.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

In diesem Beispiel haben Sie eine Streamingtabelle erstellt. Sie können auch materialisierte Ansichten mit ähnlicher Syntax sowohl in SQL als auch in Python erstellen. Weitere Informationen finden Sie unter Streamingtabellen und materialisierte Ansichten.

In diesem Beispiel wird ein Standardfluss zusammen mit der Streamingtabelle erstellt. Der Standardfluss für eine Streamingtabelle ist ein Anfügefluss , der neue Zeilen mit jedem Trigger hinzufüge. Dies ist die am häufigsten verwendete Methode für die Verwendung von Pipelines: Erstellen eines Flusses und des Ziels in einem einzigen Schritt. Sie können diese Methode verwenden, um Daten zu einspeisen oder zu transformieren.

Append-Flows unterstützen auch Datenverarbeitungsprozesse, die das Auslesen von Daten aus mehreren Streaming-Quellen erfordern, um ein einzelnes Ziel zu aktualisieren. Sie können z. B. Die Anfügeflussfunktion verwenden, wenn Sie über eine vorhandene Streamingtabelle und einen Fluss verfügen und eine neue Streamingquelle hinzufügen möchten, die in diese vorhandene Streamingtabelle schreibt.

Verwenden mehrerer Datenströme zum Schreiben in ein einziges Ziel

Im vorherigen Beispiel haben Sie einen Fluss und eine Streamingtabelle in einem einzigen Schritt erstellt. Sie können auch Flüsse für eine zuvor erstellte Tabelle erstellen. In diesem Beispiel können Sie sehen, wie Sie eine Tabelle und den damit verknüpften Fluss in separaten Schritten erstellen. Dieser Code hat identische Ergebnisse wie das Erstellen eines Standardflusses, einschließlich der Verwendung desselben Namens für die Streamingtabelle und den Fluss.

Python

from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

Das Erstellen eines Flusses unabhängig vom Ziel bedeutet, dass Sie auch mehrere Flüsse erstellen können, die Daten an dasselbe Ziel anfügen.

Verwenden Sie den @dp.append_flow Dekorator in der Python-Schnittstelle oder die CREATE FLOW...INSERT INTO Klausel in der SQL-Schnittstelle, um einen neuen Datenfluss zu erstellen. Dies kann beispielsweise genutzt werden, um gezielt eine Streaming-Tabelle aus mehreren Streaming-Quellen anzusprechen. Verwenden Sie den Anfügefluss für Verarbeitungsaufgaben wie die folgenden:

  • Fügen Sie Streamingquellen hinzu, die Daten an eine vorhandene Streamingtabelle anfügen, ohne dass eine vollständige Aktualisierung erforderlich ist. Sie können beispielsweise eine Tabelle haben, die regionale Daten aus jeder Region kombiniert, in der Sie arbeiten. Wenn neue Regionen eingeführt werden, können Sie der Tabelle die neuen Regionsdaten hinzufügen, ohne eine vollständige Aktualisierung durchzuführen. Ein Beispiel zum Hinzufügen von Streamquellen zu einer bestehenden Streamingtabelle finden Sie unter Beispiel: Aus mehreren Kafka-Themen in eine Streamingtabelle schreiben.
  • Aktualisieren Sie eine Streamingtabelle, indem Sie fehlende historische Daten (Backfilling) anfügen. Sie können die INSERT INTO ONCE-Syntax verwenden, um ein historisches Backfill-Append zu erstellen, das einmalig ausgeführt wird. Sie haben beispielsweise eine vorhandene Streamingtabelle, in die ein Apache Kafka-Topic geschrieben wird. Sie haben auch historische Daten in einer Tabelle gespeichert, die Sie genau einmal in die Streamingtabelle eingefügt haben, und Sie können die Daten nicht streamen, da Ihre Verarbeitung eine komplexe Aggregation vor dem Einfügen der Daten umfasst. Ein Beispiel für ein Rückfüllen finden Sie unter "Backfilling historical data with pipelines".
  • Kombinieren Sie Daten aus mehreren Quellen, und schreiben Sie in eine einzelne Streamingtabelle, anstatt die UNION Klausel in einer Abfrage zu verwenden. Die Verwendung der Anfügeflussverarbeitung anstelle der UNION Möglichkeit, die Zieltabelle inkrementell zu aktualisieren, ohne ein vollständiges Aktualisierungsupdate auszuführen. Ein Beispiel für eine Vereinigung, die so durchgeführt wurde, finden Sie unter Beispiel: Verwenden der Anfügeflussverarbeitung anstelle von UNION.

Das Ziel für die Datensätze, die von der Anfügeflussverarbeitung ausgegeben werden, kann eine vorhandene Tabelle oder eine neue Tabelle sein. Verwenden Sie für Python-Abfragen die Funktion create_streaming_table() zum Erstellen einer Zieltabelle.

Im folgenden Beispiel werden zwei Flüsse für dasselbe Ziel hinzugefügt, wodurch eine Vereinigung der beiden Quelltabellen erstellt wird:

Python

from pyspark import pipelines as dp

# create a streaming table
dp.create_streaming_table("customers_us")

# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

Von Bedeutung

  • Wenn Sie Datenqualitätseinschränkungen mit Erwartungen definieren müssen, definieren Sie die Erwartungen an die Zieltabelle als Teil der create_streaming_table() Funktion oder einer vorhandenen Tabellendefinition. Sie können die Erwartungen in der @append_flow Definition nicht definieren.
  • Flüsse werden durch einen Flussnamen identifiziert, und dieser Name wird verwendet, um Streamingprüfpunkte zu identifizieren. Die Verwendung des Flussnamens zur Identifizierung des Prüfpunkts bedeutet Folgendes:
    • Wenn ein vorhandener Workflow in einer Pipeline umbenannt wird, wird der Kontrollpunkt nicht übernommen, und der umbenannte Workflow gilt als ein völlig neuer Prozess.
    • Sie können einen Flussnamen in einer Pipeline nicht wiederverwenden, da der vorhandene Prüfpunkt nicht mit der neuen Flussdefinition übereinstimmt.

Typen von Flüssen

Die Standardflüsse für Streamingtabellen und materialisierte Ansichten sind Anfügeflüsse. Sie können auch Flows erstellen, um aus Change Data Capture-Datenquellen zu lesen. In der folgenden Tabelle werden die verschiedenen Arten von Flüssen beschrieben.

Flusstyp Description
Append Append-Flüsse sind die häufigste Art von Datenfluss, bei denen neue Datensätze aus der Quelle bei jeder Aktualisierung an das Ziel geschrieben werden. Sie entsprechen dem Anfügemodus im strukturierten Streaming. Sie können das ONCE Flag hinzufügen, das eine Batchabfrage angibt, deren Daten nur einmal in das Ziel eingefügt werden sollen, es sei denn, das Ziel wird vollständig aktualisiert. Eine beliebige Anzahl von Append-Flows kann in ein bestimmtes Ziel schreiben.
Standardflüsse (erstellt mit der Zielstreamingtabelle oder der materialisierten Sicht) haben denselben Namen wie das Ziel. Andere Ziele weisen keine Standardflüsse auf.
Automatisches CDC (zuvor Änderungen übernehmen) Ein Auto-CDC-Fluss erfasst eine Abfrage, die Daten zur Änderungsdatenerfassung (Change Data Capture, CDC) enthält. Automatisierte CDC-Flows können nur Streamingtabellen als Ziel verwenden, und die Quelle muss eine Streamingquelle sein (auch bei ONCE Flüssen). Mehrere automatische CDC-Ströme können auf eine einzelne Streamingtabelle ausgerichtet werden. Eine Streamingtabelle, die als Ziel für einen automatischen CDC-Fluss fungiert, kann nur von anderen automatischen CDC-Flüssen bestimmt werden.
Weitere Informationen zu CDC-Daten finden Sie unter Auto CDC-APIs: Vereinfachen der Änderungsdatenerfassung mit Pipelines.

Weitere Informationen

Weitere Informationen zu Flüssen und deren Verwendung finden Sie in den folgenden Themen: