Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Le décorateur @dp.append_flow crée des flux d'ajout ou des backfills pour vos tables de pipeline. La fonction doit retourner un DataFrame de streaming Apache Spark. Consultez Charger et traiter des données de manière incrémentielle avec les flux de pipelines déclaratifs Spark Lakeflow.
Les flux d’ajout peuvent cibler des tables ou récepteurs de streaming.
Syntaxe
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <bool>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #
Paramètres
| Paramètre | Type | Descriptif |
|---|---|---|
| function | function |
Obligatoire. Fonction qui retourne un DataFrame de streaming Apache Spark à partir d’une requête définie par l’utilisateur. |
target |
str |
Obligatoire. Nom de la table ou du récepteur qui est la cible du flux d’ajout. |
name |
str |
Nom du flux. S’il n’est pas fourni, la valeur par défaut est le nom de la fonction. |
once |
bool |
Si vous le souhaitez, définissez le flux en tant que flux à usage unique, tel qu’un remblai. L'utilisation de once=True change le flux de deux manières :
|
comment |
str |
Description du flux. |
spark_conf |
dict |
Liste des configurations Spark pour l’exécution de cette requête |
Examples
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))