Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Contoh: Menulis ke tabel streaming dari beberapa topik Kafka
Contoh berikut membuat tabel streaming bernama kafka_target dan menulis ke tabel streaming tersebut dari dua topik Kafka:
Phyton
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Untuk mempelajari selengkapnya tentang fungsi bernilai tabel yang read_kafka() digunakan dalam kueri SQL, lihat read_kafka dalam referensi bahasa SQL.
Di Python, Anda dapat secara terprogram membuat beberapa alur yang menargetkan satu tabel. Contoh berikut menunjukkan pola ini untuk daftar topik Kafka.
Nota
Pola ini memiliki persyaratan yang sama seperti menggunakan perulangan for untuk membuat tabel. Anda harus secara eksplisit meneruskan nilai Python ke fungsi yang menentukan alur. Lihat Buat tabel dalam loopfor.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Contoh: Menjalankan isi ulang data satu kali
Jika Anda ingin menjalankan kueri untuk menambahkan data ke tabel streaming yang sudah ada, gunakan append_flow.
Setelah menambahkan sekumpulan data yang sudah ada, Anda memiliki beberapa opsi:
- Jika Anda ingin kueri menambahkan data baru jika tiba di direktori isi ulang, biarkan kueri di tempatnya.
- Jika Anda ingin ini menjadi isi ulang satu kali, dan jangan pernah berjalan lagi, hapus kueri setelah menjalankan alur sekali.
- Jika Anda ingin kueri berjalan satu kali, dan hanya berjalan lagi dalam kasus di mana data mengalami penyegaran penuh, atur
onceparameter keTruepada alur penyambungan. Di SQL, gunakanINSERT INTO ONCE.
Contoh berikut menjalankan kueri untuk menambahkan data historis ke tabel streaming:
Phyton
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Untuk contoh yang lebih mendalam, lihat Mengisi ulang data historis dengan alur.
Contoh: Gunakan pemrosesan alur tambahan alih-alih UNION
Daripada menggunakan kueri dengan UNION klausul, Anda dapat menggunakan kueri alur penambahan untuk menggabungkan beberapa sumber dan memasukkannya ke dalam satu tabel streaming. Menggunakan kueri alur tambahan alih-alih UNION memungkinkan Anda menambahkan ke tabel streaming dari beberapa sumber tanpa menjalankan refresh penuh.
Contoh Python berikut menyertakan kueri yang menggabungkan beberapa sumber data dengan UNION klausa:
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
Contoh berikut mengganti kueri pencarian UNION dengan kueri pengaliran tambahan.
Phyton
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);