Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Voorbeeld: Schrijven naar een streamingtabel vanuit meerdere Kafka topics
De volgende voorbeelden maken een streamingtabel met de naam kafka_target en schrijven naar die streamingtabel vanaf twee Kafka-onderwerpen.
Python
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Zie read_kafka() in de sql-taalverwijzing voor meer informatie over de tabelwaardefunctie die wordt gebruikt in de SQL-query's.
In Python kunt u programmatisch meerdere stromen maken die gericht zijn op één tabel. In het volgende voorbeeld ziet u dit patroon voor een lijst met Kafka-onderwerpen.
Opmerking
Dit patroon heeft dezelfde vereisten als het gebruik van een for lus om tabellen te maken. U moet expliciet een Python-waarde doorgeven aan de functie die de stroom definieert. Zie Tabellen maken in een for lus.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Voorbeeld: Een eenmalige gegevensbackfill uitvoeren
Als u een query wilt uitvoeren om gegevens toe te voegen aan een bestaande streamingtabel, gebruikt u append_flow.
Nadat u een set bestaande gegevens hebt toegevoegd, hebt u meerdere opties:
- Als u wilt dat de query nieuwe gegevens toevoegt als deze binnenkomt in de backfillmap, laat de query staan.
- Als u wilt dat dit een eenmalige backfill is en nooit meer wordt uitgevoerd, verwijdert u de query nadat u de pijplijn eenmaal hebt uitgevoerd.
- Als u wilt dat de query eenmaal wordt uitgevoerd en alleen opnieuw wordt uitgevoerd in gevallen waarin de gegevens volledig worden vernieuwd, stelt u de
onceparameterTruein op de toevoegstroom. In SQL gebruikt uINSERT INTO ONCE.
In de volgende voorbeelden wordt een query uitgevoerd om historische gegevens toe te voegen aan een streamingtabel:
Python
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Zie Backfilling historische gegevens met pijplijnen voor een uitgebreider voorbeeld.
Voorbeeld: Gebruik append-stroomverwerking in plaats van UNION
In plaats van een query met een UNION component te gebruiken, kunt u toevoegstroomquery's gebruiken om meerdere bronnen te combineren en naar één streamingtabel te schrijven. Het gebruik van toevoegstroomquery's in plaats van UNION stelt u in staat om vanuit meerdere bronnen aan een streamingtabel toe te voegen zonder dat u een volledige vernieuwing hoeft uit te voeren.
Het volgende Python-voorbeeld bevat een query die meerdere gegevensbronnen combineert met een UNION component:
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
De volgende voorbeelden vervangen de UNION query door toevoegstroomqueries.
Python
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);