Udostępnij przez


Przykłady przepływów w deklaratywnych potokach Lakeflow Spark

Przykład: zapisywanie w tabeli przesyłania strumieniowego z wielu tematów platformy Kafka

Poniższe przykłady tworzą tabelę strumieniowego przesyłania o nazwie kafka_target i zapisują do niej dane z dwóch tematów na platformie Kafka.

Python

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Aby dowiedzieć się więcej na temat read_kafka() funkcji wartości tabeli używanej w zapytaniach SQL, zobacz read_kafka w dokumentacji języka SQL.

W języku Python można programowo utworzyć wiele przepływów przeznaczonych dla jednej tabeli. Poniższy przykład przedstawia ten wzorzec dla listy tematów platformy Kafka.

Uwaga / Notatka

Ten wzorzec ma takie same wymagania jak używanie for pętli do tworzenia tabel. Musisz jawnie przekazać wartość języka Python do funkcji definiującej przepływ. Zobacz Tworzenie tabel w for pętli.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Przykład: uruchamianie jednorazowego wypełniania danych

Jeśli chcesz uruchomić zapytanie w celu dołączenia danych do istniejącej tabeli przesyłania strumieniowego, użyj polecenia append_flow.

Po dołączeniu zestawu istniejących danych masz wiele opcji:

  • Jeśli chcesz, aby zapytanie dołączało nowe dane, gdy pojawią się w katalogu backfill, pozostaw zapytanie bez zmian.
  • Jeśli chcesz, aby było to jednorazowe wypełnianie i nigdy nie uruchamiać ponownie, usuń zapytanie po uruchomieniu potoku raz.
  • Jeśli chcesz, aby zapytanie było uruchamiane raz i ponownie tylko w przypadkach, gdy dane są w pełni odświeżane, ustaw parametry once na True w przepływie dołączania. W programie SQL użyj polecenia INSERT INTO ONCE.

W poniższych przykładach uruchomiono zapytanie, aby dołączyć dane historyczne do tabeli przesyłania strumieniowego:

Python

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Aby uzyskać bardziej szczegółowy przykład, zobacz Backfilling historical data with pipelines (Wypełnianie danych historycznych potokami).

Przykład: Użyj przetwarzania przepływu dodawania zamiast UNION

Zamiast używać zapytania z klauzulą UNION, możesz użyć zapytań łączących w przepływie danych, aby połączyć wiele źródeł i zapisać do jednej tabeli strumieniowej. Użytkowanie zapytań dodawania zamiast UNION umożliwia dołączanie danych do tabeli strumieniowej z wielu źródeł, bez potrzeby uruchamiania pełnego odświeżenia.

Poniższy przykład języka Python zawiera zapytanie, które łączy wiele źródeł danych z klauzulą UNION :

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

W poniższych przykładach zapytanie UNION jest zastępowane zapytaniami przepływu dołączania.

Python

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );