Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Příklad: Zápis do streamované tabulky z několika témat v Kafka
Následující příklad vytvoří tabulku streamování s názvem kafka_target a zapíše do této tabulky streamování ze dvou témat Kafka.
Python
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Další informace o read_kafka() tabulkové funkci použité v dotazech SQL najdete v read_kafka v referenční dokumentaci jazyka SQL.
V Pythonu můžete programově vytvořit více toků, které cílí na jednu tabulku. Následující příklad ukazuje tento vzor pro seznam témat Kafka.
Poznámka:
Tento model má stejné požadavky jako použití for smyčky k vytváření tabulek. Funkci definující tok musíte explicitně předat hodnotu typu Python. Viz Vytvoření tabulek ve smyčcefor.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Příklad: Spuštění jednorázového obnovení dat
Pokud chcete spustit dotaz pro připojení dat k existující streamované tabulce, použijte append_flow.
Po připojení sady existujících dat máte několik možností:
- Pokud chcete, aby dotaz připojil nová data, pokud dorazí do adresáře backfill, ponechte dotaz na místě.
- Pokud chcete, aby to bylo jednorázové obnovení a nikdy znovu nespustíte, odeberte dotaz po spuštění kanálu jednou.
- Pokud chcete, aby byl dotaz spuštěn jednou a znovu spuštěn jen v případech, kdy se data zcela obnovují, nastavte parametr
oncena tok přidáníTrue. V SQL použijteINSERT INTO ONCE.
Následující příklady spustí dotaz, který připojí historická data do streamované tabulky:
Python
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Podrobnější příklad najdete v tématu Doplnění historických dat pomocí pipeline.
Příklad: Použijte zpracování toku přidání místo UNION
Místo použití dotazu s UNION klauzulí můžete pomocí přidávacích dotazů toku kombinovat více zdrojů a zapisovat do jedné streamovací tabulky. Použití dotazů přidávacího toku místo UNION toho umožňuje připojit se k streamované tabulce z více zdrojů bez spuštění úplné aktualizace.
Následující příklad Pythonu obsahuje dotaz, který kombinuje více zdrojů dat s klauzulí UNION :
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
Následující příklady nahrazují UNION dotaz přidávacími dotazy toku:
Python
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);