Partager via


Exemples de flux dans les pipelines déclaratifs Lakeflow Spark

Exemple : Écrire dans une table de diffusion en continu à partir de plusieurs rubriques Kafka

Les exemples suivants créent une table de diffusion en continu nommée kafka_target et écrit dans cette table de diffusion en continu à partir de deux rubriques Kafka :

Python

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Pour en savoir plus sur la read_kafka() fonction à valeur de table utilisée dans les requêtes SQL, consultez read_kafka dans la documentation SQL.

En Python, vous pouvez créer par programmation plusieurs flux qui ciblent une seule table. L’exemple suivant montre ce modèle pour une liste de rubriques Kafka.

Note

Ce modèle a les mêmes exigences que l’utilisation d’une for boucle pour créer des tables. Vous devez transmettre explicitement une valeur Python à la fonction définissant le flux. Consultez Créer des tables dans une for boucle.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Exemple : Exécuter un remplissage de données unique

Si vous souhaitez exécuter une requête pour ajouter des données à une table de diffusion en continu existante, utilisez append_flow.

Après avoir ajouté un ensemble de données existantes, vous avez plusieurs options :

  • Si vous souhaitez que la requête ajoute de nouvelles données lorsqu'elles arrivent dans le répertoire de rétro-remplissage, laissez la requête telle quelle.
  • Si vous souhaitez qu’il s’agit d’un remplissage unique et que vous ne réexécutez plus jamais, supprimez la requête après avoir exécuté le pipeline une seule fois.
  • Si vous souhaitez que la requête s'exécute une seule fois, et ne la réexécuter que dans les cas où les données sont entièrement actualisées, paramétrez le paramètre once à True dans le flux d'ajout. Dans SQL, utilisez INSERT INTO ONCE.

Les exemples suivants exécutent une requête pour ajouter des données historiques à une table de diffusion en continu :

Python

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Pour obtenir un exemple plus approfondi, consultez Le remplissage des données historiques avec des pipelines.

Exemple : Utiliser le traitement de flux d’ajout au lieu de UNION

Au lieu d’utiliser une requête avec une UNION clause, vous pouvez utiliser des requêtes de flux d’ajout pour combiner plusieurs sources et écrire dans une seule table de diffusion en continu. L’utilisation de requêtes de flux d’ajout au lieu de UNION vous permet d'ajouter à une table de diffusion en continu de plusieurs sources sans exécuter une actualisation complète.

L’exemple Python suivant inclut une requête qui combine plusieurs sources de données avec une UNION clause :

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

Les exemples suivants remplacent la UNION requête par des requêtes de flux d’ajout :

Python

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );