Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Przykład: zapisywanie w tabeli przesyłania strumieniowego z wielu tematów platformy Kafka
Poniższe przykłady tworzą tabelę strumieniowego przesyłania o nazwie kafka_target i zapisują do niej dane z dwóch tematów na platformie Kafka.
Python
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Aby dowiedzieć się więcej na temat read_kafka() funkcji wartości tabeli używanej w zapytaniach SQL, zobacz read_kafka w dokumentacji języka SQL.
W języku Python można programowo utworzyć wiele przepływów przeznaczonych dla jednej tabeli. Poniższy przykład przedstawia ten wzorzec dla listy tematów platformy Kafka.
Uwaga / Notatka
Ten wzorzec ma takie same wymagania jak używanie for pętli do tworzenia tabel. Musisz jawnie przekazać wartość języka Python do funkcji definiującej przepływ. Zobacz Tworzenie tabel w for pętli.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Przykład: uruchamianie jednorazowego wypełniania danych
Jeśli chcesz uruchomić zapytanie w celu dołączenia danych do istniejącej tabeli przesyłania strumieniowego, użyj polecenia append_flow.
Po dołączeniu zestawu istniejących danych masz wiele opcji:
- Jeśli chcesz, aby zapytanie dołączało nowe dane, gdy pojawią się w katalogu backfill, pozostaw zapytanie bez zmian.
- Jeśli chcesz, aby było to jednorazowe wypełnianie i nigdy nie uruchamiać ponownie, usuń zapytanie po uruchomieniu potoku raz.
- Jeśli chcesz, aby zapytanie było uruchamiane raz i ponownie tylko w przypadkach, gdy dane są w pełni odświeżane, ustaw parametry
oncenaTruew przepływie dołączania. W programie SQL użyj poleceniaINSERT INTO ONCE.
W poniższych przykładach uruchomiono zapytanie, aby dołączyć dane historyczne do tabeli przesyłania strumieniowego:
Python
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Aby uzyskać bardziej szczegółowy przykład, zobacz Backfilling historical data with pipelines (Wypełnianie danych historycznych potokami).
Przykład: Użyj przetwarzania przepływu dodawania zamiast UNION
Zamiast używać zapytania z klauzulą UNION, możesz użyć zapytań łączących w przepływie danych, aby połączyć wiele źródeł i zapisać do jednej tabeli strumieniowej. Użytkowanie zapytań dodawania zamiast UNION umożliwia dołączanie danych do tabeli strumieniowej z wielu źródeł, bez potrzeby uruchamiania pełnego odświeżenia.
Poniższy przykład języka Python zawiera zapytanie, które łączy wiele źródeł danych z klauzulą UNION :
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
W poniższych przykładach zapytanie UNION jest zastępowane zapytaniami przepływu dołączania.
Python
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);