Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Lakeflow Spark Declarative Pipelines (SDP) memperkenalkan beberapa kata kunci dan fungsi SQL baru untuk menentukan tampilan materialisasi dan tabel streaming dalam alur. Dukungan SQL untuk mengembangkan alur dibangun berdasarkan dasar-dasar Spark SQL dan menambahkan dukungan untuk fungsionalitas Streaming Terstruktur.
Pengguna yang terbiasa dengan PySpark DataFrames mungkin lebih suka mengembangkan kode alur dengan Python. Python mendukung pengujian dan operasi yang lebih luas yang menantang untuk diterapkan dengan SQL, seperti operasi metaprogram. Lihat Mengembangkan kode alur dengan Python.
Untuk referensi lengkap sintaks alur SQL, lihat Referensi bahasa Alur SQL.
Dasar-dasar SQL untuk pengembangan alur
Kode SQL yang membuat himpunan data alur menggunakan CREATE OR REFRESH sintaks untuk menentukan tampilan materialisasi dan tabel streaming terhadap hasil kueri.
Kata kunci STREAM menunjukkan apakah sumber data yang dirujuk dalam klausa SELECT harus dibaca dengan semantik streaming.
Aksi membaca dan menulis secara default dilakukan pada katalog dan skema yang ditentukan selama konfigurasi alur. Lihat Atur katalog target dan skema.
Kode sumber alur sangat berbeda dari skrip SQL: SDP mengevaluasi semua definisi himpunan data di semua file kode sumber yang dikonfigurasi dalam alur dan membangun grafik aliran data sebelum kueri apa pun dijalankan. Urutan kueri yang muncul dalam file sumber menentukan urutan evaluasi kode, tetapi bukan urutan eksekusi kueri.
Membuat tampilan materialisasi dengan SQL
Contoh kode berikut menunjukkan sintaks dasar untuk membuat tampilan materialisasi dengan SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Membuat tabel streaming dengan SQL
Contoh kode berikut menunjukkan sintaks dasar untuk membuat tabel streaming dengan SQL. Saat membaca sumber untuk tabel streaming, STREAM kata kunci menunjukkan untuk menggunakan semantik streaming untuk sumbernya. Jangan gunakan STREAM kata kunci saat membuat tampilan materialisasi:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Nota
Gunakan kata kunci STREAM untuk menggunakan semantik streaming untuk membaca dari sumbernya. Jika pembacaan mengalami perubahan atau penghapusan pada rekaman yang ada, akan menghasilkan kesalahan. Paling aman untuk membaca dari sumber statis atau yang hanya bisa ditambahkan. Untuk memasukkan data yang memiliki komit perubahan, Anda dapat menggunakan Python dan opsi SkipChangeCommits untuk menangani kesalahan.
Memuat data dari penyimpanan objek
Alur mendukung pemuatan data dari semua format yang didukung oleh Azure Databricks. Lihat opsi format data .
Nota
Contoh-contoh ini menggunakan data yang tersedia di bawah /databricks-datasets yang secara otomatis dipasang ke ruang kerja Anda. Databricks merekomendasikan penggunaan jalur volume atau URI cloud untuk mereferensikan data yang disimpan dalam penyimpanan objek cloud. Lihat Apa itu Unity Catalog volumes?.
Databricks merekomendasikan penggunaan Auto Loader dan tabel streaming saat mengonfigurasi beban kerja penyerapan inkremental terhadap data yang disimpan di penyimpanan objek cloud. Lihat Apa itu Auto Loader?.
SQL menggunakan fungsi read_files untuk memanggil fungsionalitas Auto Loader. Anda juga harus menggunakan kata kunci STREAM untuk mengonfigurasi pembacaan streaming dengan read_files.
Berikut ini menguraikan sintaks untuk read_files dalam SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Opsi untuk Auto Loader merupakan pasangan kunci-nilai. Untuk detail tentang format dan opsi yang didukung, lihat opsi .
Contoh berikut membuat tabel streaming dari file JSON menggunakan Auto Loader:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Fungsi read_files juga mendukung semantik batch untuk membuat tampilan materialisasi. Contoh berikut menggunakan semantik batch untuk membaca direktori JSON dan membuat tampilan materialisasi:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Memvalidasi data sesuai dengan ekspetasi
Anda dapat menggunakan ekspektasi untuk mengatur dan menerapkan batasan kualitas data. Lihat Mengelola kualitas data dengan ekspektasi alur kerja.
Kode berikut mendefinisikan ekspektasi bernama valid_data yang menghilangkan rekaman yang null selama penyerapan data:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Kueri tampilan materialisasi dan tabel streaming yang ditentukan dalam alur Anda
Contoh berikut mendefinisikan empat himpunan data:
- Tabel streaming bernama
ordersyang memuat data JSON. - Tampilan materialisasi bernama
customersyang memuat data CSV. - Tampilan materialisasi bernama
customer_ordersyang menggabungkan rekaman dari himpunan dataordersdancustomers, mengonversi tanda waktu pesanan menjadi tanggal, dan memilih bidangcustomer_id,order_number,state, danorder_date. - Tampilan materialisasi bernama
daily_orders_by_stateyang menggabungkan jumlah pesanan harian untuk setiap negara bagian.
Nota
Saat mengkueri tampilan atau tabel di alur, Anda dapat menentukan katalog dan skema secara langsung, atau Anda bisa menggunakan default yang dikonfigurasi di alur Anda. Dalam contoh ini, tabel orders, customers, dan customer_orders ditulis dan dibaca dari katalog default dan skema yang dikonfigurasi untuk alur Anda.
Mode penerbitan lama menggunakan skema LIVE untuk melakukan kueri tampilan materialisasi lain dan tabel streaming yang ditentukan dalam alur Anda. Dalam pipeline baru, sintaks skema LIVE diabaikan tanpa disadari. Lihat skema LIVE (versi lama).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
Menentukan tabel privat
Anda dapat menggunakan PRIVATE klausa saat membuat tampilan materialisasi atau tabel streaming. Saat Anda membuat tabel privat, Anda membuat tabel, tetapi jangan membuat metadata untuk tabel. Klausa PRIVATE menginstruksikan SDP untuk membuat tabel yang tersedia untuk alur tetapi tidak boleh diakses di luar alur. Untuk mengurangi waktu pemrosesan, tabel privat berlaku selama masa pakai pipeline yang membuatnya, dan bukan hanya satu pembaruan.
Tabel privat dapat memiliki nama yang sama dengan tabel dalam katalog. Jika Anda menentukan nama yang tidak memenuhi syarat untuk tabel dalam alur, jika ada tabel privat dan tabel katalog dengan nama tersebut, tabel privat akan digunakan.
Tabel privat sebelumnya disebut sebagai tabel sementara.
Menghapus rekaman secara permanen dari tampilan terwujud atau tabel streaming
Untuk menghapus rekaman secara permanen dari tabel streaming dengan vektor penghapusan diaktifkan, seperti untuk kepatuhan GDPR, operasi tambahan harus dilakukan pada tabel Delta dasar dari objek. Untuk memastikan penghapusan rekaman dari tabel streaming, lihat Menghapus rekaman secara permanen dari tabel streaming.
Tampilan materialisasi selalu mencerminkan data dalam tabel yang mendasar ketika diperbarui. Untuk menghapus data dalam tampilan Materialisasi, Anda harus menghapus data dari sumber dan me-refresh tampilan materialisasi.
Membuat parameter nilai yang digunakan saat mendeklarasikan tabel atau tampilan dengan SQL
Gunakan SET untuk menentukan nilai konfigurasi dalam kueri yang mendeklarasikan tabel atau tampilan, termasuk konfigurasi Spark. Tabel atau tampilan apa pun yang Anda tentukan dalam file sumber setelah SET pernyataan memiliki akses ke nilai yang ditentukan. Konfigurasi Spark apa pun yang ditentukan menggunakan pernyataan SET digunakan saat menjalankan kueri Spark untuk tabel atau tampilan apa pun mengikuti pernyataan SET. Untuk membaca nilai konfigurasi dalam kueri, gunakan sintaks interpolasi string ${}. Contoh berikut menetapkan nilai konfigurasi Spark yang diberi nama startDate dan menggunakan nilai tersebut dalam kueri:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Untuk menentukan beberapa nilai konfigurasi, gunakan pernyataan terpisah SET untuk setiap nilai.
Keterbatasan
klausul PIVOT tidak didukung. Operasi pivot di Spark memerlukan pemrosesan awal data input untuk menghitung skema output. Kapabilitas ini tidak didukung dalam pipeline.
Nota
Sintaks CREATE OR REFRESH LIVE TABLE untuk membuat tampilan materialisasi tidak digunakan lagi. Sebagai gantinya, gunakan CREATE OR REFRESH MATERIALIZED VIEW.