Memuat dan memproses data secara bertahap dengan alur Tabel Langsung Delta
Artikel ini menjelaskan alur apa dan bagaimana Anda dapat menggunakan alur di alur Tabel Langsung Delta untuk memproses data secara bertahap dari sumber ke tabel streaming target. Di Tabel Langsung Delta, alur didefinisikan dengan dua cara:
- Alur ditentukan secara otomatis saat Anda membuat kueri yang memperbarui tabel streaming.
- Tabel Langsung Delta juga menyediakan fungsionalitas untuk secara eksplisit menentukan alur untuk pemrosesan yang lebih kompleks seperti menambahkan ke tabel streaming dari beberapa sumber streaming.
Artikel ini membahas alur implisit yang dibuat saat Anda menentukan kueri untuk memperbarui tabel streaming, lalu menyediakan detail tentang sintaks untuk menentukan alur yang lebih kompleks.
Apa itu alur?
Di Tabel Langsung Delta, alur adalah kueri streaming yang memproses data sumber secara bertahap untuk memperbarui tabel streaming target. Sebagian besar himpunan data Tabel Langsung Delta yang Anda buat dalam alur menentukan alur sebagai bagian dari kueri dan tidak memerlukan pendefinisian alur secara eksplisit. Misalnya, Anda membuat tabel streaming di Tabel Langsung Delta dalam satu perintah DDL alih-alih menggunakan tabel terpisah dan pernyataan alur untuk membuat tabel streaming:
Catatan
Contoh ini CREATE FLOW
disediakan hanya untuk tujuan ilustrasi dan menyertakan kata kunci yang tidak valid sintaks Tabel Langsung Delta.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Selain alur default yang ditentukan oleh kueri, antarmuka Delta Live Tables Python dan SQL menyediakan fungsionalitas alur penambahan. Alur tambahan mendukung pemrosesan yang memerlukan pembacaan data dari beberapa sumber streaming untuk memperbarui satu tabel streaming. Misalnya, Anda dapat menggunakan fungsionalitas alur penambahan saat Anda memiliki tabel dan alur streaming yang ada dan ingin menambahkan sumber streaming baru yang menulis ke tabel streaming yang ada ini.
Menggunakan tambahkan alur untuk menulis ke tabel streaming dari beberapa aliran sumber
Catatan
Untuk menggunakan pemrosesan alur tambahan, alur Anda harus dikonfigurasi untuk menggunakan saluran pratinjau.
@append_flow
Gunakan dekorator di antarmuka Python atau CREATE FLOW
klausa di antarmuka SQL untuk menulis ke tabel streaming dari beberapa sumber streaming. Gunakan alur tambahan untuk memproses tugas seperti berikut ini:
- Tambahkan sumber streaming yang menambahkan data ke tabel streaming yang ada tanpa memerlukan refresh penuh. Misalnya, Anda mungkin memiliki tabel yang menggabungkan data regional dari setiap wilayah tempat Anda beroperasi. Saat wilayah baru diluncurkan, Anda dapat menambahkan data wilayah baru ke tabel tanpa melakukan refresh penuh. Lihat Contoh: Menulis ke tabel streaming dari beberapa topik Kafka.
- Perbarui tabel streaming dengan menambahkan data historis yang hilang (backfilling). Misalnya, Anda memiliki tabel streaming yang sudah ada yang ditulis oleh topik Apache Kafka. Anda juga memiliki data historis yang disimpan dalam tabel yang perlu Anda sisipkan tepat sekali ke dalam tabel streaming, dan Anda tidak dapat mengalirkan data karena pemrosesan Anda mencakup melakukan agregasi kompleks sebelum menyisipkan data. Lihat Contoh: Menjalankan isi ulang data satu kali.
- Gabungkan data dari beberapa sumber dan tulis ke satu tabel streaming alih-alih menggunakan
UNION
klausa dalam kueri. Menggunakan pemrosesan alur tambahan alih-alihUNION
memungkinkan Anda memperbarui tabel target secara bertahap tanpa menjalankan pembaruan refresh penuh. Lihat Contoh: Menggunakan pemrosesan alur tambahan alih-alih UNION.
Target untuk output rekaman oleh pemrosesan alur tambahan dapat berupa tabel yang sudah ada atau tabel baru. Untuk kueri Python, gunakan fungsi create_streaming_table() untuk membuat tabel target.
Penting
- Jika Anda perlu menentukan batasan kualitas data dengan harapan, tentukan harapan pada tabel target sebagai bagian
create_streaming_table()
dari fungsi atau pada definisi tabel yang ada. Anda tidak dapat menentukan ekspektasi dalam@append_flow
definisi. - Alur diidentifikasi dengan nama alur, dan nama ini digunakan untuk mengidentifikasi titik pemeriksaan streaming. Penggunaan nama alur untuk mengidentifikasi titik pemeriksaan berarti sebagai berikut:
- Jika alur yang ada dalam alur diganti namanya, titik pemeriksaan tidak dilakukan, dan alur yang diganti namanya secara efektif merupakan alur yang sama sekali baru.
- Anda tidak dapat menggunakan kembali nama alur dalam alur, karena titik pemeriksaan yang ada tidak akan cocok dengan definisi alur baru.
Berikut ini adalah sintaks untuk @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
Contoh: Menulis ke tabel streaming dari beberapa topik Kafka
Contoh berikut membuat tabel streaming bernama kafka_target
dan menulis ke tabel streaming tersebut dari dua topik Kafka:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Untuk mempelajari selengkapnya tentang fungsi bernilai tabel yang read_kafka()
digunakan dalam kueri SQL, lihat read_kafka dalam referensi bahasa SQL.
Contoh: Menjalankan isi ulang data satu kali
Contoh berikut menjalankan kueri untuk menambahkan data historis ke tabel streaming:
Catatan
Untuk memastikan isi ulang satu kali yang benar saat kueri isi ulang adalah bagian dari alur yang berjalan secara terjadwal atau terus menerus, hapus kueri setelah menjalankan alur sekali. Untuk menambahkan data baru jika tiba di direktori isi ulang, biarkan kueri di tempat.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
cloud_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
cloud_files(
"path/to/backfill/data/dir",
"csv"
);
Contoh: Gunakan pemrosesan alur tambahan alih-alih UNION
Alih-alih menggunakan kueri dengan UNION
klausul, Anda bisa menggunakan kueri alur tambahan untuk menggabungkan beberapa sumber dan menulis ke satu tabel streaming. Menggunakan kueri alur tambahan alih-alih UNION
memungkinkan Anda menambahkan ke tabel streaming dari beberapa sumber tanpa menjalankan refresh penuh.
Contoh Python berikut menyertakan kueri yang menggabungkan beberapa sumber data dengan UNION
klausa:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Contoh berikut mengganti UNION
kueri dengan kueri alur tambahan:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
cloud_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
cloud_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
cloud_files(
"/path/to/orders/apac",
"csv"
);
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk