MEMBUAT TABEL STREAMING
Berlaku untuk: Databricks SQL Databricks Runtime 13.3 LTS ke atas
Penting
Fitur ini ada di Pratinjau Publik. Untuk mendaftar akses, isi formulir ini.
Membuat tabel streaming, tabel Delta dengan dukungan tambahan untuk pemrosesan data streaming atau inkremental.
Tabel streaming hanya didukung di Tabel Langsung Delta dan di Databricks SQL dengan Unity Catalog. Menjalankan perintah ini pada komputasi Databricks Runtime yang didukung hanya mengurai sintaks. Lihat Menerapkan alur Delta Live Tables dengan SQL.
Sintaks
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( [ column_identifier column_type [ NOT NULL ]
[ COMMENT column_comment ] [ column_constraint ]
] [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]
Parameter
REFRESH
Jika ditentukan, refresh tabel dengan data terbaru yang tersedia dari sumber yang ditentukan dalam kueri. Hanya data baru yang tiba sebelum kueri dimulai diproses. Data baru yang ditambahkan ke sumber selama eksekusi perintah diabaikan hingga refresh berikutnya.
IF NOT EXISTS
Jika ditentukan dan tabel dengan nama yang sama sudah ada, pernyataan diabaikan.
IF NOT EXISTS
tidak dapat digunakan bersama denganREFRESH
, yang berartiCREATE OR REFRESH TABLE IF NOT EXISTS
tidak diizinkan.-
Nama tabel yang akan dibuat. Nama tidak boleh menyertakan spesifikasi temporal. Jika nama tidak memenuhi syarat, tabel dibuat dalam skema saat ini.
table_specification
Klausa opsional ini menentukan daftar kolom, jenis, properti, deskripsi, dan batasan kolom.
Jika Anda tidak menentukan kolom dalam skema tabel, Anda harus menentukan
AS query
.-
Nama unik untuk kolom.
-
Menentukan tipe data kolom.
BUKAN NULL
Jika ditentukan, kolom tidak menerima
NULL
nilai.KOMENTAR column_comment
String literal untuk menggambarkan kolom.
-
Penting
Fitur ini ada di Pratinjau Publik.
Menambahkan kunci primer atau batasan kunci asing ke kolom dalam tabel streaming. Batasan tidak didukung untuk tabel-tabel dalam katalog
hive_metastore
. BATASAN expectation_name EXPECT (expectation_expr) [ PADA PELANGGARAN { FAIL UPDATE | JATUHKAN BARIS } ]
Menambahkan ekspektasi kualitas data ke tabel. Harapan kualitas data ini dapat dilacak dari waktu ke waktu dan diakses melalui log peristiwa tabel streaming.
FAIL UPDATE
Harapan menyebabkan pemrosesan gagal saat membuat tabel serta me-refresh tabel.DROP ROW
Harapan menyebabkan seluruh baris dihilangkan jika harapan tidak terpenuhi.expectation_expr
dapat terdiri atas literal, pengidentifikasi kolom dalam tabel, dan fungsi atau operator SQL bawaan deterministik kecuali:- Fungsi agregat
- Fungsi jendela analitik
- Fungsi jendela peringkat
- Fungsi generator bernilai tabel
Juga
expr
tidak boleh berisi subkueri apa pun.- Fungsi agregat
-
Penting
Fitur ini ada di Pratinjau Publik.
Menambahkan kunci primer informasi atau batasan kunci asing informasional ke tabel streaming. Batasan kunci tidak didukung untuk tabel dalam
hive_metastore
katalog.
-
-
table_clauses
Secara opsional tentukan partisi, komentar, properti yang ditentukan pengguna, dan jadwal refresh untuk tabel baru. Setiap sub klausul hanya dapat ditentukan satu kali.
-
Daftar kolom tabel opsional untuk mempartisi tabel.
KOMENTAR table_comment
Harfiah
STRING
untuk menggambarkan tabel.-
Secara opsional, atur satu atau beberapa properti yang ditentukan pengguna.
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]
Jika disediakan, menjadwalkan tabel streaming atau tampilan materialisasi untuk me-refresh datanya dengan jadwal cron kuarsa yang diberikan. Hanya time_zone_values yang diterima.
AT TIME ZONE LOCAL
tidak didukung. JikaAT TIME ZONE
tidak ada, zona waktu sesi digunakan. JikaAT TIME ZONE
tidak ada dan zona waktu sesi tidak diatur, kesalahan akan muncul.SCHEDULE
secara semantik setaraSCHEDULE REFRESH
dengan .Anda tidak dapat menggunakan
SCHEDULE
sintaks dalam definisi alur Delta Live Tables.Klausa
SCHEDULE
tidak diperbolehkan dalamCREATE OR REFRESH
perintah. Jadwal dapat disediakan sebagai bagianCREATE
dari perintah. Gunakan ALTER STREAMING TABLE untuk mengubah jadwal tabel streaming setelah pembuatan.
-
AS kueri
Klausa ini mengisi tabel menggunakan data dari
query
. Kueri ini harus berupa kueri streaming . Ini dapat dicapai dengan menambahkanSTREAM
kata kunci ke hubungan apa pun yang ingin Anda proses secara bertahap. Saat Anda menentukanquery
dantable_specification
bersama-sama, skema tabel yang ditentukan ditable_specification
harus berisi semua kolom yang dikembalikan olehquery
, jika tidak, Anda mendapatkan kesalahan. Kolom apa pun yang ditentukan dalamtable_specification
tetapi tidak dikembalikan olehquery
nilai yang dikembalikannull
saat dikueri.Klausa ini diperlukan untuk tabel streaming yang dibuat di Databricks SQL, tetapi tidak diperlukan dalam Tabel Langsung Delta. Jika klausa ini tidak disediakan dalam Tabel Langsung Delta, Anda harus mereferensikan tabel ini dalam
APPLY CHANGES
perintah di alur DLT Anda. Lihat Mengubah pengambilan data dengan SQL di Tabel Langsung Delta.
Perbedaan antara tabel streaming dan tabel lainnya
Tabel streaming adalah tabel stateful, yang dirancang untuk menangani setiap baris hanya sekali saat Anda memproses himpunan data yang berkembang. Karena sebagian besar himpunan data tumbuh terus menerus dari waktu ke waktu, tabel streaming baik untuk sebagian besar beban kerja penyerapan. Tabel streaming optimal untuk alur yang memerlukan kesegaran data dan latensi rendah. Tabel streaming juga dapat berguna untuk transformasi skala besar-besaran, karena hasilnya dapat dihitung secara bertahap saat data baru tiba, menjaga hasil tetap terbaru tanpa perlu sepenuhnya mengolah ulang semua data sumber dengan setiap pembaruan. Tabel streaming dirancang untuk sumber data yang hanya ditambahkan.
Tabel streaming menerima perintah tambahan seperti REFRESH
, yang memproses data terbaru yang tersedia di sumber yang disediakan dalam kueri. Perubahan pada kueri yang disediakan hanya tercermin pada data baru dengan memanggil , bukan data yang REFRESH
diproses sebelumnya. Untuk menerapkan perubahan pada data yang ada juga, Anda perlu menjalankan REFRESH TABLE <table_name> FULL
untuk melakukan FULL REFRESH
. Refresh penuh memproses ulang semua data yang tersedia di sumber dengan definisi terbaru. Tidak disarankan untuk memanggil refresh penuh pada sumber yang tidak menyimpan seluruh riwayat data atau memiliki periode retensi singkat, seperti Kafka, karena refresh penuh memotong data yang ada. Anda mungkin tidak dapat memulihkan data lama jika data tidak lagi tersedia di sumbernya.
Batasan
Hanya pemilik tabel yang dapat menyegarkan tabel streaming untuk mendapatkan data terbaru.
ALTER TABLE
perintah tidak diizinkan pada tabel streaming. Definisi dan properti tabel harus diubah melaluiALTER STREAMING TABLE
pernyataan.Kueri perjalanan waktu tidak didukung.
Mengembangkan skema tabel melalui perintah DML seperti
INSERT INTO
, danMERGE
tidak didukung.Perintah berikut ini tidak didukung pada tabel streaming:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Berbagi Delta tidak didukung.
Mengganti nama tabel atau mengubah pemilik tidak didukung.
Batasan tabel seperti
PRIMARY KEY
danFOREIGN KEY
tidak didukung.Kolom yang dihasilkan, kolom identitas, dan kolom default tidak didukung.
Contoh
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');