MEMBUAT TABEL STREAMING

Berlaku untuk:centang ditandai ya Databricks SQL centang ditandai ya Databricks Runtime 13.3 LTS ke atas

Penting

Fitur ini ada di Pratinjau Publik. Untuk mendaftar akses, isi formulir ini.

Membuat tabel streaming, tabel Delta dengan dukungan tambahan untuk pemrosesan data streaming atau inkremental.

Tabel streaming hanya didukung di Tabel Langsung Delta dan di Databricks SQL dengan Unity Catalog. Menjalankan perintah ini pada komputasi Databricks Runtime yang didukung hanya mengurai sintaks. Lihat Menerapkan alur Delta Live Tables dengan SQL.

Sintaks

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( [ column_identifier column_type [ NOT NULL ]
      [ COMMENT column_comment ] [ column_constraint ]
    ] [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]

Parameter

  • REFRESH

    Jika ditentukan, refresh tabel dengan data terbaru yang tersedia dari sumber yang ditentukan dalam kueri. Hanya data baru yang tiba sebelum kueri dimulai diproses. Data baru yang ditambahkan ke sumber selama eksekusi perintah diabaikan hingga refresh berikutnya.

  • IF NOT EXISTS

    Jika ditentukan dan tabel dengan nama yang sama sudah ada, pernyataan diabaikan.

    IF NOT EXISTS tidak dapat digunakan bersama dengan REFRESH, yang berarti CREATE OR REFRESH TABLE IF NOT EXISTS tidak diizinkan.

  • table_name

    Nama tabel yang akan dibuat. Nama tidak boleh menyertakan spesifikasi temporal. Jika nama tidak memenuhi syarat, tabel dibuat dalam skema saat ini.

  • table_specification

    Klausa opsional ini menentukan daftar kolom, jenis, properti, deskripsi, dan batasan kolom.

    Jika Anda tidak menentukan kolom dalam skema tabel, Anda harus menentukan AS query.

    • column_identifier

      Nama unik untuk kolom.

      • column_type

        Menentukan tipe data kolom.

      • BUKAN NULL

        Jika ditentukan, kolom tidak menerima NULL nilai.

      • KOMENTAR column_comment

        String literal untuk menggambarkan kolom.

      • column_constraint

        Penting

        Fitur ini ada di Pratinjau Publik.

        Menambahkan kunci primer atau batasan kunci asing ke kolom dalam tabel streaming. Batasan tidak didukung untuk tabel-tabel dalam katalog hive_metastore.

      • BATASAN expectation_name EXPECT (expectation_expr) [ PADA PELANGGARAN { FAIL UPDATE | JATUHKAN BARIS } ]

        Menambahkan ekspektasi kualitas data ke tabel. Harapan kualitas data ini dapat dilacak dari waktu ke waktu dan diakses melalui log peristiwa tabel streaming. FAIL UPDATE Harapan menyebabkan pemrosesan gagal saat membuat tabel serta me-refresh tabel. DROP ROW Harapan menyebabkan seluruh baris dihilangkan jika harapan tidak terpenuhi.

        expectation_expr dapat terdiri atas literal, pengidentifikasi kolom dalam tabel, dan fungsi atau operator SQL bawaan deterministik kecuali:

        Juga expr tidak boleh berisi subkueri apa pun.

      • table_constraint

        Penting

        Fitur ini ada di Pratinjau Publik.

        Menambahkan kunci primer informasi atau batasan kunci asing informasional ke tabel streaming. Batasan kunci tidak didukung untuk tabel dalam hive_metastore katalog.

  • table_clauses

    Secara opsional tentukan partisi, komentar, properti yang ditentukan pengguna, dan jadwal refresh untuk tabel baru. Setiap sub klausul hanya dapat ditentukan satu kali.

    • DIPARTISI OLEH

      Daftar kolom tabel opsional untuk mempartisi tabel.

    • KOMENTAR table_comment

      Harfiah STRING untuk menggambarkan tabel.

    • TBLPROPERTIES

      Secara opsional, atur satu atau beberapa properti yang ditentukan pengguna.

    • SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]

      Jika disediakan, menjadwalkan tabel streaming atau tampilan materialisasi untuk me-refresh datanya dengan jadwal cron kuarsa yang diberikan. Hanya time_zone_values yang diterima. AT TIME ZONE LOCAL tidak didukung. Jika AT TIME ZONE tidak ada, zona waktu sesi digunakan. Jika AT TIME ZONE tidak ada dan zona waktu sesi tidak diatur, kesalahan akan muncul. SCHEDULE secara semantik setara SCHEDULE REFRESHdengan .

      Anda tidak dapat menggunakan SCHEDULE sintaks dalam definisi alur Delta Live Tables.

      Klausa SCHEDULE tidak diperbolehkan dalam CREATE OR REFRESH perintah. Jadwal dapat disediakan sebagai bagian CREATE dari perintah. Gunakan ALTER STREAMING TABLE untuk mengubah jadwal tabel streaming setelah pembuatan.

  • AS kueri

    Klausa ini mengisi tabel menggunakan data dari query. Kueri ini harus berupa kueri streaming . Ini dapat dicapai dengan menambahkan STREAM kata kunci ke hubungan apa pun yang ingin Anda proses secara bertahap. Saat Anda menentukan query dan table_specification bersama-sama, skema tabel yang ditentukan di table_specification harus berisi semua kolom yang dikembalikan oleh query, jika tidak, Anda mendapatkan kesalahan. Kolom apa pun yang ditentukan dalam table_specification tetapi tidak dikembalikan oleh query nilai yang dikembalikan null saat dikueri.

    Klausa ini diperlukan untuk tabel streaming yang dibuat di Databricks SQL, tetapi tidak diperlukan dalam Tabel Langsung Delta. Jika klausa ini tidak disediakan dalam Tabel Langsung Delta, Anda harus mereferensikan tabel ini dalam APPLY CHANGES perintah di alur DLT Anda. Lihat Mengubah pengambilan data dengan SQL di Tabel Langsung Delta.

Perbedaan antara tabel streaming dan tabel lainnya

Tabel streaming adalah tabel stateful, yang dirancang untuk menangani setiap baris hanya sekali saat Anda memproses himpunan data yang berkembang. Karena sebagian besar himpunan data tumbuh terus menerus dari waktu ke waktu, tabel streaming baik untuk sebagian besar beban kerja penyerapan. Tabel streaming optimal untuk alur yang memerlukan kesegaran data dan latensi rendah. Tabel streaming juga dapat berguna untuk transformasi skala besar-besaran, karena hasilnya dapat dihitung secara bertahap saat data baru tiba, menjaga hasil tetap terbaru tanpa perlu sepenuhnya mengolah ulang semua data sumber dengan setiap pembaruan. Tabel streaming dirancang untuk sumber data yang hanya ditambahkan.

Tabel streaming menerima perintah tambahan seperti REFRESH, yang memproses data terbaru yang tersedia di sumber yang disediakan dalam kueri. Perubahan pada kueri yang disediakan hanya tercermin pada data baru dengan memanggil , bukan data yang REFRESHdiproses sebelumnya. Untuk menerapkan perubahan pada data yang ada juga, Anda perlu menjalankan REFRESH TABLE <table_name> FULL untuk melakukan FULL REFRESH. Refresh penuh memproses ulang semua data yang tersedia di sumber dengan definisi terbaru. Tidak disarankan untuk memanggil refresh penuh pada sumber yang tidak menyimpan seluruh riwayat data atau memiliki periode retensi singkat, seperti Kafka, karena refresh penuh memotong data yang ada. Anda mungkin tidak dapat memulihkan data lama jika data tidak lagi tersedia di sumbernya.

Batasan

  • Hanya pemilik tabel yang dapat menyegarkan tabel streaming untuk mendapatkan data terbaru.

  • ALTER TABLE perintah tidak diizinkan pada tabel streaming. Definisi dan properti tabel harus diubah melalui ALTER STREAMING TABLE pernyataan.

  • Kueri perjalanan waktu tidak didukung.

  • Mengembangkan skema tabel melalui perintah DML seperti INSERT INTO, dan MERGE tidak didukung.

  • Perintah berikut ini tidak didukung pada tabel streaming:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Berbagi Delta tidak didukung.

  • Mengganti nama tabel atau mengubah pemilik tidak didukung.

  • Batasan tabel seperti PRIMARY KEY dan FOREIGN KEY tidak didukung.

  • Kolom yang dihasilkan, kolom identitas, dan kolom default tidak didukung.

Contoh

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');