Not
Åtkomst till denna sida kräver auktorisation. Du kan prova att logga in eller byta katalog.
Åtkomst till denna sida kräver auktorisation. Du kan prova att byta katalog.
Exempel: Skriva till en strömmande tabell från flera Kafka-ämnen
I följande exempel skapas en strömmande tabell med namnet kafka_target och skriver till den strömmande tabellen från två Kafka-ämnen.
python
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Mer information om den read_kafka() tabellvärdesfunktion som används i SQL-frågorna finns i read_kafka i SQL-språkreferensen.
I Python kan du programmatiskt skapa flera flöden som riktar sig mot en enda tabell. I följande exempel visas det här mönstret för en lista över Kafka-ämnen.
Anmärkning
Det här mönstret har samma krav som att använda en for loop för att skapa tabeller. Du måste uttryckligen skicka ett Python-värde till funktionen som definierar flödet. Se Skapa tabeller i en for loop.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Exempel: Kör en engångsdatapåfyllning
Om du vill köra en fråga för att lägga till data i en befintlig strömmande tabell använder du append_flow.
När du har lagt till en uppsättning befintliga data har du flera alternativ:
- Om du vill att frågan ska lägga till ny data om den kommer till katalogen för återfyllnad, förblir frågan aktiv.
- Om du vill att detta ska vara en engångsefterfyllnad och aldrig köras igen tar du bort frågan när du har kört pipelinen en gång.
- Om du vill att frågan ska köras en gång och bara köras igen i de fall där data uppdateras fullständigt anger du parametern
oncetillTruei tilläggsflödet. I SQL använder duINSERT INTO ONCE.
I följande exempel körs en fråga för att lägga till historiska data i en strömmande tabell:
python
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
För ett mer detaljerat exempel, se Att fylla i historiska data med hjälp av pipelines.
Exempel: Använd tilläggsflödesbearbetning i stället för UNION
I stället för att använda en fråga med en UNION sats kan du använda tilläggsflödesfrågor för att kombinera flera källor och skriva till en enda strömmande tabell. Använda tilläggsflödesfrågor i stället för UNION tillåter att lägga till poster i en strömmande tabell från flera källor utan behov av att köra en fullständig uppdatering.
Följande Python-exempel innehåller en fråga som kombinerar flera datakällor med en UNION sats:
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
Följande exempel ersätter UNION frågan med tilläggsflödesfrågor:
python
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);