Sdílet prostřednictvím


Příklady toků v deklarativních datových kanálech Lakeflow Spark

Příklad: Zápis do streamované tabulky z několika témat v Kafka

Následující příklad vytvoří tabulku streamování s názvem kafka_target a zapíše do této tabulky streamování ze dvou témat Kafka.

Python

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Další informace o read_kafka() tabulkové funkci použité v dotazech SQL najdete v read_kafka v referenční dokumentaci jazyka SQL.

V Pythonu můžete programově vytvořit více toků, které cílí na jednu tabulku. Následující příklad ukazuje tento vzor pro seznam témat Kafka.

Poznámka:

Tento model má stejné požadavky jako použití for smyčky k vytváření tabulek. Funkci definující tok musíte explicitně předat hodnotu typu Python. Viz Vytvoření tabulek ve smyčcefor.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Příklad: Spuštění jednorázového obnovení dat

Pokud chcete spustit dotaz pro připojení dat k existující streamované tabulce, použijte append_flow.

Po připojení sady existujících dat máte několik možností:

  • Pokud chcete, aby dotaz připojil nová data, pokud dorazí do adresáře backfill, ponechte dotaz na místě.
  • Pokud chcete, aby to bylo jednorázové obnovení a nikdy znovu nespustíte, odeberte dotaz po spuštění kanálu jednou.
  • Pokud chcete, aby byl dotaz spuštěn jednou a znovu spuštěn jen v případech, kdy se data zcela obnovují, nastavte parametr once na tok přidání True. V SQL použijte INSERT INTO ONCE.

Následující příklady spustí dotaz, který připojí historická data do streamované tabulky:

Python

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Podrobnější příklad najdete v tématu Doplnění historických dat pomocí pipeline.

Příklad: Použijte zpracování toku přidání místo UNION

Místo použití dotazu s UNION klauzulí můžete pomocí přidávacích dotazů toku kombinovat více zdrojů a zapisovat do jedné streamovací tabulky. Použití dotazů přidávacího toku místo UNION toho umožňuje připojit se k streamované tabulce z více zdrojů bez spuštění úplné aktualizace.

Následující příklad Pythonu obsahuje dotaz, který kombinuje více zdrojů dat s klauzulí UNION :

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

Následující příklady nahrazují UNION dotaz přidávacími dotazy toku:

Python

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );