Bagikan melalui


CREATE STREAMING TABLE

Berlaku untuk:centang ditandai ya Databricks SQL

Membuat tabel streaming , tabel Delta dengan dukungan tambahan untuk streaming atau pemrosesan data tambahan.

Tabel streaming hanya didukung di Lakeflow Spark Declarative Pipelines dan di Databricks SQL dengan Unity Catalog. Menjalankan perintah ini pada komputasi Databricks Runtime yang didukung hanya mengurai sintaks. Lihat Mengembangkan kode Alur Deklaratif Lakeflow Spark dengan SQL.

Sintaks

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    COMMENT table_comment |
    DEFAULT COLLATION UTF8_BINARY |
    TBLPROPERTIES clause |
    schedule |
    WITH { ROW FILTER clause } } [...]

schedule
  { SCHEDULE [ REFRESH ] schedule_clause |
    TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ]}

Parameter

  • REFRESH

    Jika ditentukan, refresh tabel dengan data terbaru yang tersedia dari sumber yang ditentukan dalam kueri. Hanya data baru yang tiba sebelum kueri dimulai diproses. Data baru yang ditambahkan ke sumber selama eksekusi perintah diabaikan hingga refresh berikutnya. Operasi refresh dari CREATE OR REFRESH sepenuhnya deklaratif. Jika perintah refresh tidak menentukan semua metadata dari pernyataan pembuatan tabel asli, metadata yang tidak ditentukan akan dihapus.

  • JIKA TIDAK ADA

    Membuat tabel streaming jika tabel itu tidak ada. Jika tabel dengan nama ini sudah ada, pernyataan CREATE STREAMING TABLE diabaikan.

    Anda dapat menentukan paling banyak salah satu IF NOT EXISTS atau OR REFRESH.

  • table_name

    Nama tabel yang akan dibuat. Nama tidak boleh menyertakan spesifikasi temporal atau spesifikasi opsi. Jika nama tidak memenuhi syarat, tabel dibuat dalam skema saat ini.

  • spesifikasi_tabel

    Klausa opsional ini menentukan daftar kolom, jenis, properti, deskripsi, dan batasan kolomnya.

    Jika Anda tidak menentukan kolom dalam skema tabel, Anda harus menentukan AS query.

    • column_identifier

      Nama unik untuk kolom.

      • column_type

        Menentukan tipe data dari kolom.

      • BUKAN NULL

        Jika ditentukan, kolom tidak menerima nilai NULL.

      • KOMENTAR column_comment

        String harfiah untuk menjelaskan kolom.

      • column_constraint

        Penting

        Fitur ini ada di Pratinjau Publik.

        Menambahkan kunci primer atau batasan kunci asing ke kolom dalam tabel streaming. Batasan tidak didukung untuk tabel di katalog hive_metastore.

      • Klausa MASK

        Menambahkan fungsi masker kolom untuk menganonimkan data sensitif. Semua kueri berikutnya dari kolom tersebut menerima hasil evaluasi fungsi tersebut di atas kolom sebagai pengganti nilai asli kolom. Ini dapat berguna untuk tujuan kontrol akses yang lebih mendetail di mana fungsi dapat memeriksa identitas atau keanggotaan grup pengguna yang memanggil untuk memutuskan apakah akan menyunting nilai.

      • CONSTRAINT nama_ekspetasi EXPECT (ekspresi_ekspetasi) [ PADA PELANGGARAN { GAGAL UPDATE | HAPUS BARIS } ]

        Menambahkan ekspektasi kualitas data ke tabel. Harapan kualitas data ini dapat dilacak dari waktu ke waktu dan diakses melalui log peristiwa tabel streaming. Ekspektasi FAIL UPDATE menyebabkan pemrosesan gagal saat membuat tabel serta me-refresh tabel. DROP ROW Ekspektasi menyebabkan seluruh baris dihapus jika ekspektasi tidak terpenuhi.

        expectation_expr dapat terdiri dari literal, pengidentifikasi kolom dalam tabel, dan fungsi atau operator SQL bawaan deterministik kecuali:

        Juga expr tidak boleh berisi subkueri apa pun.

      • pembatasan_tabel

        Penting

        Fitur ini ada di Pratinjau Publik.

        Menambahkan kunci primer informasi atau batasan kunci asing informasional ke tabel streaming. Batasan kunci tidak didukung untuk tabel di katalog hive_metastore.

  • table_clauses

    Secara opsional tentukan partisi, komentar, properti yang ditentukan pengguna, dan jadwal refresh untuk tabel baru. Setiap sub klausul hanya dapat ditentukan satu kali.

    • DIPARTISI DENGAN

      Daftar kolom tabel opsional untuk mempartisi tabel.

      Catatan

      Pengklusteran cairan memberikan solusi yang fleksibel dan dioptimalkan untuk pengklusteran. Pertimbangkan untuk menggunakan CLUSTER BY alih-alih PARTITIONED BY untuk tabel streaming.

    • CLUSTER BY

      Klausa opsional untuk kluster menurut subset kolom. Gunakan pengklusteran cairan otomatis dengan CLUSTER BY AUTO, dan Databricks dengan cerdas memilih kunci pengklusteran untuk mengoptimalkan performa kueri. Lihat Menggunakan pengklusteran cair untuk tabel.

      Pengklusteran cairan tidak dapat dikombinasikan dengan PARTITIONED BY.

    • KOMENTAR table_comment

      STRING harfiah untuk menggambarkan tabel.

    • UTF8_BINARY KOLABASI DEFAULT

      Berlaku untuk:check ditandai ya pemeriksaan Databricks SQL ditandai ya Databricks Runtime 17.1 ke atas

      Memaksa pengurutan default tabel streaming menjadi UTF8_BINARY. Klausa ini wajib jika skema tempat tabel dibuat memiliki kolaborasi default selain UTF8_BINARY. Pengurutan default dari tabel streaming digunakan sebagai pengurutan default di query dan untuk jenis kolom.

    • TBLPROPERTIES

      Secara opsional, atur satu atau beberapa properti yang ditentukan pengguna.

      Gunakan pengaturan ini untuk menentukan saluran runtime Alur Deklaratif Lakeflow Spark yang digunakan untuk menjalankan pernyataan berikut. Atur nilai properti pipelines.channel ke "PREVIEW" atau "CURRENT". Nilai defaultnya adalah "CURRENT". Untuk informasi selengkapnya tentang saluran Alur Deklaratif Lakeflow Spark, lihat saluran waktu proses Alur Deklaratif Lakeflow Spark.

    • jadwal

      Jadwal dapat berupa SCHEDULE pernyataan atau TRIGGER pernyataan.

      • JADWAL [ REFRESH ] pasal_jadwal

        • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

          Untuk menjadwalkan refresh yang terjadi secara berkala, gunakan sintaks EVERY. Jika sintaks EVERY ditentukan, tabel streaming atau tampilan materialisasi di-refresh secara berkala pada interval yang ditentukan berdasarkan nilai yang disediakan, seperti HOUR, HOURS, DAY, DAYS, WEEK, atau WEEKS. Tabel berikut ini mencantumkan nilai bilangan bulat yang diterima untuk number.

          Unit waktu Nilai bilangan bulat
          HOUR or HOURS 1 <= H <= 72
          DAY or DAYS 1 <= D <= 31
          WEEK or WEEKS 1 <= W <= 8

          Catatan

          Bentuk tunggal dan jamak dari unit waktu yang disertakan setara secara semantik.

        • CRON cron_string [ AT TIME ZONE timezone_id ]

          Untuk menjadwalkan refresh menggunakan nilai quartz cron. time_zone_values yang valid diterima. AT TIME ZONE LOCAL tidak didukung.

          Jika AT TIME ZONE tidak ada, zona waktu sesi digunakan. Jika AT TIME ZONE tidak ada dan zona waktu sesi tidak diatur, kesalahan akan muncul. SCHEDULE secara semantik setara SCHEDULE REFRESHdengan .

        Jadwal dapat disediakan sebagai bagian CREATE dari perintah. Gunakan perintah ALTER STREAMING TABLE atau jalankan CREATE OR REFRESH dengan klausa SCHEDULE untuk mengubah jadwal tabel streaming setelah pembuatan.

      • PEMICU PADA UPDATE [ PALING BANYAK SETIAP trigger_interval ]

        Penting

        Fitur ini TRIGGER ON UPDATE ada di Beta.

        Secara opsional atur tabel untuk di-refresh saat sumber data upstream diperbarui, paling banyak sekali setiap menit. Tetapkan nilai untuk AT MOST EVERY memerlukan setidaknya waktu minimum antara refresh.

        Sumber data upstream harus berupa tabel Delta eksternal atau terkelola (termasuk tampilan materialisasi atau tabel streaming), atau tampilan terkelola yang dependensinya terbatas pada jenis tabel yang didukung.

        Mengaktifkan peristiwa file dapat membuat pemicu lebih berkinerja, dan meningkatkan beberapa batasan pada pembaruan pemicu.

        trigger_interval adalah pernyataan INTERVAL yang setidaknya 1 menit.

        TRIGGER ON UPDATE memiliki batasan berikut

        • Tidak lebih dari 10 sumber data upstream per tabel streaming saat menggunakan TRIGGER ON UPDATE.
        • Maksimum 1000 tabel streaming atau tampilan materialisasi dapat ditentukan dengan TRIGGER ON UPDATE.
        • Klausa AT MOST EVERY default ke 1 menit, dan tidak boleh kurang dari 1 menit.
  • DENGAN klausa

    Menambahkan fungsi filter baris ke tabel. Semua kueri berikutnya dari tabel tersebut menerima subset baris di mana fungsi dievaluasi sebagai boolean TRUE. Hal ini dapat berguna untuk tujuan kontrol akses menurun di mana fungsi dapat memeriksa identitas atau keanggotaan grup pengguna yang memanggil untuk memutuskan apakah akan memfilter baris tertentu.

  • AS kueri

    Klausa ini mengisi tabel menggunakan data dari query. Kueri ini harus berupa kueri streaming . Ini dapat dicapai dengan menambahkan STREAM kata kunci ke hubungan apa pun yang ingin Anda proses secara bertahap. Saat Anda menentukan query dan table_specification bersama-sama, skema tabel yang ditentukan dalam table_specification harus berisi semua kolom yang dikembalikan oleh query, jika tidak, Anda akan mendapatkan kesalahan. Kolom apa pun yang ditentukan dalam table_specification tetapi tidak dikembalikan oleh query mengembalikan nilai null saat dikueri.

Perbedaan antara tabel streaming dan tabel lainnya

Tabel streaming adalah tabel stateful, yang dirancang untuk menangani setiap baris hanya sekali saat Anda memproses himpunan data yang berkembang. Karena kebanyakan himpunan data tumbuh terus menerus dari waktu ke waktu, tabel streaming cocok untuk sebagian besar beban kerja penyerapan. Tabel streaming optimal untuk alur yang memerlukan kesegaran data dan latensi rendah. Tabel streaming juga dapat berguna untuk transformasi skala besar-besaran, karena hasilnya dapat dihitung secara bertahap saat data baru tiba, menjaga hasil tetap terbaru tanpa perlu sepenuhnya mengolah ulang semua data sumber dengan setiap pembaruan. Tabel streaming dirancang untuk sumber data yang hanya bisa ditambahkan.

Tabel streaming menerima perintah tambahan seperti REFRESH, yang memproses data terbaru yang tersedia di sumber yang disediakan dalam kueri. Perubahan pada kueri yang disediakan hanya tercermin pada data baru dengan memanggil REFRESH, bukan data yang diproses sebelumnya. Untuk juga menerapkan perubahan pada data yang ada, Anda perlu menjalankan REFRESH TABLE <table_name> FULL untuk melakukan FULL REFRESH. Refresh penuh memproses ulang semua data yang tersedia di sumber dengan definisi terbaru. Tidak disarankan untuk memanggil refresh penuh pada sumber yang tidak menyimpan seluruh riwayat data atau memiliki periode retensi singkat, seperti Kafka, karena refresh penuh memotong data yang ada. Anda mungkin tidak dapat memulihkan data lama jika data tidak lagi tersedia di sumbernya.

Filter baris dan masker kolom

Filter baris memungkinkan Anda menentukan fungsi yang diterapkan sebagai filter setiap kali pemindaian tabel mengambil baris. Filter ini memastikan bahwa kueri berikutnya hanya mengembalikan baris di mana hasil evaluasi predikat filternya adalah benar.

Masker kolom memungkinkan Anda menutupi nilai kolom setiap kali pemindaian tabel mengambil baris. Semua kueri di masa mendatang yang melibatkan kolom tersebut akan menerima hasil evaluasi fungsi di atas kolom, menggantikan nilai asli kolom.

Untuk informasi selengkapnya tentang cara menggunakan filter baris dan masker kolom, lihat Filter baris dan masker kolom.

Mengelola Filter Baris dan Masker Kolom

Filter baris dan masker kolom pada tabel streaming harus ditambahkan, diperbarui, atau dihilangkan melalui pernyataan CREATE OR REFRESH.

Perilaku

  • Refresh sebagai Pendefinisi: Ketika pernyataan CREATE OR REFRESH atau REFRESH menyegarkan tabel streaming, fungsi filter baris berjalan dengan hak pendefinisi (sebagai pemilik tabel). Ini berarti refresh tabel menggunakan konteks keamanan pengguna yang membuat tabel streaming.
  • Kueri: Meskipun sebagian besar filter berjalan dengan hak pendefinisi, fungsi yang memeriksa konteks pengguna (seperti CURRENT_USER dan IS_MEMBER) adalah pengecualian. Fungsi-fungsi ini dijalankan sebagai pemanggil. Pendekatan ini memberlakukan keamanan data dan kontrol akses khusus pengguna berdasarkan konteks pengguna saat ini.

Observabilitas

Gunakan DESCRIBE EXTENDED, INFORMATION_SCHEMA, atau Catalog Explorer untuk memeriksa filter baris dan masker kolom yang ada yang berlaku untuk tabel streaming tertentu. Fungsionalitas ini memungkinkan pengguna untuk mengaudit dan meninjau langkah-langkah akses dan perlindungan data pada tabel streaming.

Batasan

  • Hanya pemilik tabel yang dapat memperbarui tabel streaming untuk mendapatkan data terbaru.
  • ALTER TABLE perintah tidak diizinkan pada tabel streaming. Definisi dan properti tabel harus diubah melalui pernyataan CREATE OR REFRESH atau ALTER STREAMING TABLE.
  • Mengembangkan skema tabel melalui perintah DML seperti INSERT INTO, dan MERGE tidak didukung.
  • Perintah berikut ini tidak didukung pada tabel streaming:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Fitur Delta Sharing tidak didukung.
  • Mengganti nama tabel atau mengubah pemilik tidak didukung.
  • Batasan tabel seperti PRIMARY KEY dan FOREIGN KEY tidak didukung untuk tabel streaming dalam hive_metastore katalog.
  • Kolom yang dihasilkan, kolom identitas, dan kolom default tidak didukung.

Contoh

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
  TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
  AS SELECT *
  FROM STREAM source_stream_data;

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM STREAM sales;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')