Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Exemple : Écrire dans une table de diffusion en continu à partir de plusieurs rubriques Kafka
Les exemples suivants créent une table de diffusion en continu nommée kafka_target et écrit dans cette table de diffusion en continu à partir de deux rubriques Kafka :
Python
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Pour en savoir plus sur la read_kafka() fonction à valeur de table utilisée dans les requêtes SQL, consultez read_kafka dans la documentation SQL.
En Python, vous pouvez créer par programmation plusieurs flux qui ciblent une seule table. L’exemple suivant montre ce modèle pour une liste de rubriques Kafka.
Note
Ce modèle a les mêmes exigences que l’utilisation d’une for boucle pour créer des tables. Vous devez transmettre explicitement une valeur Python à la fonction définissant le flux. Consultez Créer des tables dans une for boucle.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Exemple : Exécuter un remplissage de données unique
Si vous souhaitez exécuter une requête pour ajouter des données à une table de diffusion en continu existante, utilisez append_flow.
Après avoir ajouté un ensemble de données existantes, vous avez plusieurs options :
- Si vous souhaitez que la requête ajoute de nouvelles données lorsqu'elles arrivent dans le répertoire de rétro-remplissage, laissez la requête telle quelle.
- Si vous souhaitez qu’il s’agit d’un remplissage unique et que vous ne réexécutez plus jamais, supprimez la requête après avoir exécuté le pipeline une seule fois.
- Si vous souhaitez que la requête s'exécute une seule fois, et ne la réexécuter que dans les cas où les données sont entièrement actualisées, paramétrez le paramètre
onceàTruedans le flux d'ajout. Dans SQL, utilisezINSERT INTO ONCE.
Les exemples suivants exécutent une requête pour ajouter des données historiques à une table de diffusion en continu :
Python
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Pour obtenir un exemple plus approfondi, consultez Le remplissage des données historiques avec des pipelines.
Exemple : Utiliser le traitement de flux d’ajout au lieu de UNION
Au lieu d’utiliser une requête avec une UNION clause, vous pouvez utiliser des requêtes de flux d’ajout pour combiner plusieurs sources et écrire dans une seule table de diffusion en continu. L’utilisation de requêtes de flux d’ajout au lieu de UNION vous permet d'ajouter à une table de diffusion en continu de plusieurs sources sans exécuter une actualisation complète.
L’exemple Python suivant inclut une requête qui combine plusieurs sources de données avec une UNION clause :
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
Les exemples suivants remplacent la UNION requête par des requêtes de flux d’ajout :
Python
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);