Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Beispiel: Schreiben in eine Streamingtabelle aus mehreren Kafka-Topics
Im folgenden Beispiel wird eine Streamingtabelle mit dem Namen kafka_target erstellt, und es wird aus zwei Kafka-Topics in diese Streamingtabelle geschrieben.
Python
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Weitere Informationen zur read_kafka() in den SQL-Abfragen verwendeten Tabellenwertfunktion finden Sie unter read_kafka in der SQL-Sprachreferenz.
In Python können Sie programmgesteuert mehrere Flüsse erstellen, die auf eine einzelne Tabelle abzielen. Das folgende Beispiel zeigt dieses Muster für eine Liste der Kafka-Themen.
Hinweis
Dieses Muster hat die gleichen Anforderungen wie die Verwendung einer for Schleife zum Erstellen von Tabellen. Sie müssen einen Python-Wert explizit an die Funktion übergeben, die den Fluss definiert. Siehe Erstellen von Tabellen in einer for Schleife.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Beispiel: Ausführen eines einmaligen Datenrückfüllens
Wenn Sie eine Abfrage ausführen möchten, um Daten an eine vorhandene Streamingtabelle anzufügen, verwenden Sie append_flow.
Nach dem Anfügen einer Reihe vorhandener Daten haben Sie mehrere Optionen:
- Wenn die Abfrage neue Daten anhängen soll, falls diese im Backfill-Verzeichnis eingehen, lassen Sie die Abfrage unverändert.
- Wenn dies ein einmaliger Backfill sein soll und nie wieder ausgeführt werden soll, entfernen Sie die Abfrage, nachdem die Pipeline einmal ausgeführt wurde.
- Wenn Sie möchten, dass die Abfrage einmal ausgeführt wird und nur dann erneut ausgeführt wird, wenn die Daten vollständig aktualisiert werden, legen Sie den
once-Parameter aufTrueim Anfügefluss fest. Verwenden SieINSERT INTO ONCEin SQL .
In den folgenden Beispielen wird eine Abfrage ausgeführt, um verlaufsgeschichtliche Daten an eine Streamingtabelle anzufügen:
Python
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Ein ausführlicheres Beispiel finden Sie unter "Zurückfüllen von historischen Daten mit Pipelines".
Beispiel: Verwenden Sie die Anfügeflussverarbeitung anstelle von UNION
Anstatt eine Abfrage mit einer UNION Klausel zu verwenden, können Sie Anfügeflussabfragen verwenden, um mehrere Quellen zu kombinieren und in eine einzelne Streamingtabelle zu schreiben. Verwenden Sie Append-Flow-Abfragen anstelle von UNION, können Sie Daten aus mehreren Quellen an eine Streamingtabelle hinzufügen, ohne eine vollständige Aktualisierung durchzuführen.
Das folgende Python-Beispiel enthält eine Abfrage, die mehrere Datenquellen mit einer UNION Klausel kombiniert:
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
Die folgenden Beispiele ersetzen die UNION Abfrage durch Anfügeflussabfragen:
Python
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);