CREATE STREAMING TABLE (saluran)

Tabel streaming adalah tabel dengan dukungan untuk pemrosesan data streaming atau inkremental. Tabel streaming didukung oleh pipa. Setiap kali tabel streaming di-refresh, data yang ditambahkan ke tabel sumber ditambahkan ke tabel streaming. Anda dapat me-refresh tabel streaming secara manual atau sesuai jadwal.

Untuk mempelajari selengkapnya tentang cara melakukan atau menjadwalkan refresh, lihat Menjalankan pembaruan alur.

Syntax

CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ {flow_clause | AS query} ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ column_constraint ] [, ...]
    [ , table_constraint ] [...] )

   column_properties
      { NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]

table_clauses
  { USING DELTA
    PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    LOCATION path |
    COMMENT view_comment |
    TBLPROPERTIES clause |
    WITH { ROW FILTER clause } } [ ... ]
   } [ ... ]

flow_clause
  FLOW { { INSERT [ONCE] BY NAME query } |
  { AUTO CDC auto_cdc_flow_spec } }

Parameter-parameternya

  • REFRESH

    Jika ditentukan, akan membuat tabel, atau memperbarui tabel yang sudah ada dan kontennya.

  • SWASTA

    Membuat tabel streaming privat.

    • Mereka tidak ditambahkan ke katalog dan hanya dapat diakses dalam alur yang menentukan
    • Mereka dapat memiliki nama yang sama dengan objek yang ada di katalog. Dalam alur, jika tabel streaming privat dan objek dalam katalog memiliki nama yang sama, referensi ke nama tersebut akan diarahkan ke tabel streaming privat.
    • Tabel streaming privat hanya dipertahankan selama masa operasional alur, tidak hanya satu pembaruan.

    Tabel streaming privat sebelumnya dibuat dengan TEMPORARY parameter .

  • table_name

    Nama tabel yang baru dibuat. Nama tabel yang sepenuhnya memenuhi syarat harus unik.

  • spesifikasi_tabel

    Klausa opsional ini menentukan daftar kolom, jenis, properti, deskripsi, dan batasan kolomnya.

  • pembatasan_tabel

    Saat menentukan skema, Anda dapat menentukan kunci primer dan asing. Batasan bersifat informasi dan tidak diberlakukan. Lihat klausa CONSTRAINT dalam referensi bahasa SQL.

    Nota

    Untuk menentukan batasan tabel, alur Anda harus berupa alur yang mendukung Katalog Unity.

  • table_clauses

    Tentukan pemartisian, komentar, dan properti yang ditentukan pengguna secara opsional untuk tabel. Setiap sub klausul hanya dapat ditentukan satu kali.

    • MENGGUNAKAN DELTA

      Menetapkan format data. Satu-satunya opsi adalah DELTA.

      Klausa ini bersifat opsional, dan secara default menjadi DELTA.

    • DISEGMENKAN OLEH

      Daftar opsional dari satu atau beberapa kolom yang akan digunakan untuk pemartisian dalam tabel. Bersifat saling eksklusif dengan CLUSTER BY.

      Pengklusteran cairan memberikan solusi yang fleksibel dan dioptimalkan untuk pengklusteran. Pertimbangkan untuk menggunakan CLUSTER BY alih-alih PARTITIONED BY untuk alur.

    • CLUSTER BY

      Aktifkan pengklusteran cair pada tabel dan tentukan kolom yang akan digunakan sebagai kunci pengklusteran. Gunakan pengklusteran cairan otomatis dengan CLUSTER BY AUTO, dan Databricks dengan cerdas memilih kunci pengklusteran untuk mengoptimalkan performa kueri. Bersifat saling eksklusif dengan PARTITIONED BY.

      Lihat Menggunakan pengklusteran cair untuk tabel.

    • LOKASI

      Lokasi penyimpanan opsional untuk data tabel. Jika tidak diatur, sistem akan menggunakan lokasi penyimpanan pipeline secara default.

    • KOMENTAR

      Literal STRING yang opsional untuk menjelaskan tabel.

    • TBLPROPERTIES

      Daftar properti tabel opsional untuk tabel.

    • DENGAN ROW FILTER

    Menambahkan fungsi filter baris ke tabel. Kueri di masa mendatang untuk tabel tersebut akan mendapatkan bagian dari baris-baris yang fungsinya bernilai BENAR. Ini berguna untuk kontrol akses terperintah, karena memungkinkan fungsi untuk memeriksa identitas dan keanggotaan grup pengguna yang memanggil untuk memutuskan apakah akan memfilter baris tertentu.

    Lihat klausa ROW FILTER.

    • ALIRAN

      Secara opsional menentukan alur sebaris dengan pembuatan tabel. Alur adalah kueri stateful yang me-refresh konten tabel. Jika FLOW tidak ditentukan, Anda dapat menggunakan AS query sebagai gantinya, atau menentukan alur secara terpisah dengan CREATE FLOW. Anda dapat menentukan salah satu jenis alur berikut:

      • INSERT MENURUT NAMA

        Menyisipkan data ke dalam tabel menurut nama kolom. ONCE Jika opsi tidak disediakan, kueri harus berupa kueri streaming. Gunakan kata kunci STREAM untuk memanfaatkan semantik streaming dalam membaca dari sumber. Jika pembacaan mengalami perubahan atau penghapusan pada rekaman yang ada, akan menghasilkan kesalahan. Paling aman untuk membaca dari sumber statis atau yang hanya bisa ditambahkan.

        Nota

        FLOW INSERT BY NAME setara dengan menggunakan AS query. Dua pernyataan berikut memiliki perilaku yang identik:

        CREATE OR REFRESH STREAMING TABLE raw_data
        AS SELECT * FROM STREAM read_files('abfss://my_path');
        
        CREATE OR REFRESH STREAMING TABLE raw_data
        FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');
        
      • SEKALI

        Secara opsional mendefinisikan alur sebagai alur satu kali, seperti isi ulang. Ketika ONCE disediakan, kueri bukan kueri streaming, dan alur berjalan satu kali secara default. Jika tabel di-refresh dengan refresh penuh, ONCE alur berjalan lagi untuk membuat ulang data. ONCE hanya berlaku untuk INSERT BY NAME alur.

      • AUTO CDC

        Penting

        Tersedia di Databricks Runtime 17.3 ke atas dan PREVIEW saluran Alur.

        AUTO CDC Menentukan alur yang memproses mengubah rekaman pengambilan data (CDC) dari sumber ke dalam tabel. Gunakan AUTO CDC saat data sumber menyertakan semantik CDC. Lihat API CDC Otomatis: Menyederhanakan penangkapan perubahan data menggunakan pipeline.

  • Kueri AS

    Klausa ini mengisi tabel menggunakan data dari query. Kueri ini harus berupa kueri streaming . Gunakan kata kunci STREAM untuk menggunakan semantik streaming untuk membaca dari sumbernya. Jika pembacaan mengalami perubahan atau penghapusan pada rekaman yang ada, akan menghasilkan kesalahan. Paling aman untuk membaca dari sumber statis atau yang hanya bisa ditambahkan. Untuk menyerap data yang memiliki penerapan perubahan, Anda dapat menambahkan SkipChangeCommits opsi baca untuk menangani kesalahan.

    Saat Anda menentukan query dan table_specification bersama-sama, skema tabel yang ditentukan dalam table_specification harus berisi semua kolom yang dikembalikan oleh query, jika tidak, Anda akan mendapatkan kesalahan. Kolom apa pun yang ditentukan dalam table_specification tetapi tidak dikembalikan oleh query mengembalikan nilai null saat dikueri.

    Untuk informasi selengkapnya tentang data streaming, lihat Mengubah data dengan alur.

    • Opsi Baca

      Anda dapat menentukan opsi baca dalam kueri untuk mengonfigurasi cara data dibaca dari sumber. Misalnya, Anda dapat menentukan skipChangeCommits untuk melewati penerapan perubahan apa pun dalam data sumber. Opsi baca ditentukan sebagai peta dalam WITH klausa kueri. Contohnya:

      SELECT * FROM STREAM source_table WITH (SKIPCHANGECOMMITS=TRUE, STARTINGVERSION=X)
      

      =TRUE bersifat opsional, sehingga Anda juga dapat menentukan opsi boolean seperti ini:

      SELECT * FROM STREAM source_table WITH (SKIPCHANGECOMMITS)
      

      Nota

      Opsi baca hanya didukung untuk Databricks Runtime 17.3 ke atas.

      Opsi baca di bawah ini didukung untuk Delta, untuk detail tentang setiap opsi, lihat Baca dan tulis streaming tabel Delta.

      • maxFilesPerTrigger
      • maxBytesPerTrigger
      • startingVersion
      • startingTimestamp
      • readChangeFeed
      • withEventTimeOrder
      • skipChangeCommits

Memerlukan izin

Pengguna 'run-as' untuk pipeline harus memiliki izin berikut:

  • SELECT hak akses atas tabel dasar yang direferensikan oleh tabel streaming.
  • USE CATALOG hak istimewa pada katalog induk dan hak istimewa USE SCHEMA pada skema induk.
  • CREATE MATERIALIZED VIEW hak akses pada skema untuk tabel streaming.

Agar pengguna dapat memperbarui alur tempat tabel streaming ditentukan, mereka memerlukan:

  • USE CATALOG hak istimewa pada katalog induk dan hak istimewa USE SCHEMA pada skema induk.
  • Kepemilikan atau hak istimewa pada tabel streaming REFRESH.
  • Pemilik tabel streaming harus memiliki SELECT hak istimewa atas tabel dasar yang dirujuk oleh tabel streaming.

Agar pengguna dapat mengkueri tabel streaming yang dihasilkan, mereka memerlukan:

  • USE CATALOG hak istimewa pada katalog induk dan hak istimewa USE SCHEMA pada skema induk.
  • SELECT hak akses istimewa atas tabel streaming.

Keterbatasan

  • Hanya pemilik tabel yang dapat memperbarui tabel streaming untuk mendapatkan data terbaru.
  • ALTER TABLE perintah tidak diperbolehkan di tabel streaming. Definisi dan properti tabel harus diubah melalui pernyataan CREATE OR REFRESH atau ALTER STREAMING TABLE.
  • Mengembangkan skema tabel melalui perintah DML seperti INSERT INTO, dan MERGE tidak didukung.
  • Perintah berikut ini tidak didukung pada tabel streaming:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Mengganti nama tabel atau mengubah pemilik tidak didukung.
  • Kolom yang dihasilkan, kolom identitas, dan kolom default tidak didukung.

Examples

-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")

-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;

-- Define a streaming table with an inline append flow:
CREATE OR REFRESH STREAMING TABLE raw_data
FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');

-- Define a streaming table with an inline AUTO CDC flow:
CREATE OR REFRESH STREAMING TABLE target
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;