Teilen über


Beispiele für Workflows in Lakeflow Spark Declarative Pipelines

Beispiel: Schreiben in eine Streamingtabelle aus mehreren Kafka-Topics

Im folgenden Beispiel wird eine Streamingtabelle mit dem Namen kafka_target erstellt, und es wird aus zwei Kafka-Topics in diese Streamingtabelle geschrieben.

Python

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Weitere Informationen zur read_kafka() in den SQL-Abfragen verwendeten Tabellenwertfunktion finden Sie unter read_kafka in der SQL-Sprachreferenz.

In Python können Sie programmgesteuert mehrere Flüsse erstellen, die auf eine einzelne Tabelle abzielen. Das folgende Beispiel zeigt dieses Muster für eine Liste der Kafka-Themen.

Hinweis

Dieses Muster hat die gleichen Anforderungen wie die Verwendung einer for Schleife zum Erstellen von Tabellen. Sie müssen einen Python-Wert explizit an die Funktion übergeben, die den Fluss definiert. Siehe Erstellen von Tabellen in einer for Schleife.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Beispiel: Ausführen eines einmaligen Datenrückfüllens

Wenn Sie eine Abfrage ausführen möchten, um Daten an eine vorhandene Streamingtabelle anzufügen, verwenden Sie append_flow.

Nach dem Anfügen einer Reihe vorhandener Daten haben Sie mehrere Optionen:

  • Wenn die Abfrage neue Daten anhängen soll, falls diese im Backfill-Verzeichnis eingehen, lassen Sie die Abfrage unverändert.
  • Wenn dies ein einmaliger Backfill sein soll und nie wieder ausgeführt werden soll, entfernen Sie die Abfrage, nachdem die Pipeline einmal ausgeführt wurde.
  • Wenn Sie möchten, dass die Abfrage einmal ausgeführt wird und nur dann erneut ausgeführt wird, wenn die Daten vollständig aktualisiert werden, legen Sie den once-Parameter auf True im Anfügefluss fest. Verwenden Sie INSERT INTO ONCEin SQL .

In den folgenden Beispielen wird eine Abfrage ausgeführt, um verlaufsgeschichtliche Daten an eine Streamingtabelle anzufügen:

Python

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Ein ausführlicheres Beispiel finden Sie unter "Zurückfüllen von historischen Daten mit Pipelines".

Beispiel: Verwenden Sie die Anfügeflussverarbeitung anstelle von UNION

Anstatt eine Abfrage mit einer UNION Klausel zu verwenden, können Sie Anfügeflussabfragen verwenden, um mehrere Quellen zu kombinieren und in eine einzelne Streamingtabelle zu schreiben. Verwenden Sie Append-Flow-Abfragen anstelle von UNION, können Sie Daten aus mehreren Quellen an eine Streamingtabelle hinzufügen, ohne eine vollständige Aktualisierung durchzuführen.

Das folgende Python-Beispiel enthält eine Abfrage, die mehrere Datenquellen mit einer UNION Klausel kombiniert:

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

Die folgenden Beispiele ersetzen die UNION Abfrage durch Anfügeflussabfragen:

Python

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );