Partager via


append_flow

Le décorateur @dp.append_flow crée des flux d'ajout ou des backfills pour vos tables de pipeline. La fonction doit retourner un DataFrame de streaming Apache Spark. Consultez Charger et traiter des données de manière incrémentielle avec les flux de pipelines déclaratifs Spark Lakeflow.

Les flux d’ajout peuvent cibler des tables ou récepteurs de streaming.

Syntaxe

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Paramètres

Paramètre Type Descriptif
function function Obligatoire. Fonction qui retourne un DataFrame de streaming Apache Spark à partir d’une requête définie par l’utilisateur.
target str Obligatoire. Nom de la table ou du récepteur qui est la cible du flux d’ajout.
name str Nom du flux. S’il n’est pas fourni, la valeur par défaut est le nom de la fonction.
once bool Si vous le souhaitez, définissez le flux en tant que flux à usage unique, tel qu’un remblai. L'utilisation de once=True change le flux de deux manières :
  • La valeur de retour. streaming-query. doit être un DataFrame de traitement par lots dans ce cas, plutôt qu'un DataFrame de diffusion en continu.
  • Le flux est exécuté une fois par défaut. Si le pipeline est mis à jour avec une actualisation complète, le ONCE flux s’exécute à nouveau pour recréer les données.
comment str Description du flux.
spark_conf dict Liste des configurations Spark pour l’exécution de cette requête

Examples

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))