Charger et traiter des données de manière incrémentielle avec des flux Delta Live Tables
Cet article explique ce que sont les flux, et comment vous pouvez les utiliser dans des pipelines Delta Live Tables pour traiter de manière incrémentielle les données d’une table de streaming source vers une table de streaming cible. Dans Delta Live Tables, les flux sont définis de deux façons :
- Un flux est défini automatiquement quand vous créez une requête qui met à jour une table de streaming.
- Delta Live Tables fournit également des fonctionnalités permettant de définir explicitement des flux pour des traitements plus complexes, par exemple l’ajout à une table de streaming à partir de plusieurs sources de streaming.
Cet article décrit les flux implicites qui sont créés quand vous définissez une requête pour mettre à jour une table de streaming, puis fournit des détails sur la syntaxe permettant de définir des flux plus complexes.
Qu’est-ce qu’un flux ?
Dans Delta Live Tables, un flux est une requête de streaming qui traite les données sources de manière incrémentielle pour mettre à jour une table de streaming cible. La plupart des jeux de données Delta Live Tables que vous créez dans un pipeline définissent le flux dans le cadre de la requête, et ne nécessitent pas de définition explicite du flux. Par exemple, vous créez une table de streaming dans Delta Live Tables en une seule commande DDL au lieu d’utiliser des instructions de table et de flux distinctes pour créer la table de streaming :
Remarque
Cet exemple CREATE FLOW
est fourni uniquement à des fins d’illustration. Il comprend des mots clés qui ne correspondent pas à une syntaxe Delta Live Tables valide.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
En plus du flux par défaut défini par une requête, les interfaces Python et SQL de Delta Live Tables fournissent une fonctionnalité d’ajout de flux. L’ajout de flux prend en charge tout traitement qui nécessite la lecture de données provenant de plusieurs sources de streaming pour mettre à jour une seule table de streaming. Par exemple, vous pouvez utiliser la fonctionnalité d’ajout de flux quand vous disposez déjà d’une table et d’un flux de streaming, et que vous souhaitez ajouter une nouvelle source de streaming qui écrit dans cette table de streaming existante.
Utiliser l’ajout de flux pour écrire dans une table de streaming à partir de plusieurs flux sources
Remarque
Pour utiliser le traitement de flux d’ajout, votre pipeline doit être configuré pour utiliser le canal d’aperçu.
Utilisez l’élément décoratif @append_flow
dans l’interface Python, ou la clause CREATE FLOW
dans l’interface SQL pour écrire dans une table de streaming à partir de plusieurs sources de streaming. Utilisez l’ajout de flux pour le traitement des tâches telles que les suivantes :
- Ajoutez des sources de streaming qui ajoutent des données à une table de streaming existante sans nécessiter une actualisation complète. Par exemple, vous pouvez disposer d’une table combinant les données régionales de toutes les régions dans lesquelles vous travaillez. À mesure que de nouvelles régions sont déployées, vous pouvez ajouter les données des nouvelles régions au tableau sans effectuer une actualisation complète. Consultez Exemple : Écrire dans une table de streaming à partir de plusieurs rubriques Kafka.
- Mettez à jour une table de streaming en ajoutant les données historiques manquantes (remplissage). Par exemple, vous disposez d’une table de streaming existante dans laquelle est écrite une rubrique Apache Kafka. Vous disposez également de données historiques stockées dans une table que vous devez insérer une seule fois dans la table de streaming, et vous ne pouvez pas envoyer les données en streaming, car votre traitement comprend l’exécution d’une agrégation complexe avant l’insertion des données. Consultez Exemple : Exécuter un renvoi de données unique.
- Combinez des données provenant de plusieurs sources, et écrivez dans une seule table de streaming au lieu d’utiliser la clause
UNION
dans une requête. L’utilisation de l’ajout de flux à la place deUNION
vous permet de mettre à jour la table cible de manière incrémentielle sans exécuter de mise à jour avec actualisation complète. Consultez Exemple : Utiliser l’ajout de flux à la place de UNION.
La cible des enregistrements générés par l’ajout de flux peut être une table existante ou une nouvelle table. Pour les requêtes Python, utilisez la fonction create_streaming_table() afin de créer une table cible.
Important
- Si vous devez définir des contraintes de qualité des données avec des attentes, définissez les attentes sur la table cible dans le cadre de la fonction
create_streaming_table()
, ou sur une définition de table existante. Vous ne pouvez pas définir d’attentes dans la définition@append_flow
. - Les flux sont identifiés par un nom de flux, et ce nom permet d’identifier les points de contrôle de streaming. L’utilisation du nom de flux pour identifier le point de contrôle a les implications suivantes :
- Si un flux existant dans un pipeline est renommé, le point de contrôle n’est pas transféré. Le flux renommé est en fait un tout nouveau flux.
- Vous ne pouvez pas réutiliser un nom de flux dans un pipeline, car le point de contrôle existant ne correspond pas à la nouvelle définition de flux.
Voici la syntaxe de @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
Exemple : Écrire dans une table de streaming à partir de plusieurs rubriques Kafka
Les exemples suivants créent une table de streaming nommée kafka_target
, et écrivent dans cette table de streaming à partir de deux rubriques Kafka :
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Pour en savoir plus sur la fonction table read_kafka()
utilisée dans les requêtes SQL, consultez read_kafka dans la référence du langage SQL.
Exemple : Exécuter un renvoi de données unique
Les exemples suivants exécutent une requête pour ajouter des données historiques à une table de streaming :
Remarque
Pour garantir un véritable remplissage unique lorsque la requête de remplissage fait partie d'un pipeline qui s'exécute de manière planifiée ou continue, supprimez la requête après avoir exécuté le pipeline une fois. Pour ajouter de nouvelles données si elles arrivent dans le répertoire de remplissage, laissez la requête en place.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Exemple : Utiliser l’ajout de flux à la place de UNION
Au lieu d’utiliser une requête avec une clause UNION
, vous pouvez utiliser des requêtes d’ajout de flux pour combiner plusieurs sources, et écrire dans une seule table de streaming. L’utilisation de requêtes d’ajout de flux à la place de UNION
vous permet d’effectuer des ajouts à une table de streaming à partir de plusieurs sources sans exécuter une actualisation complète.
L’exemple Python suivant comprend une requête qui combine plusieurs sources de données avec une clause UNION
:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Les exemples suivants remplacent la requête UNION
par des requêtes d’ajout de flux :
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);