分享方式:


CREATE STREAMING TABLE

適用於:核取記號為「是」 Databricks SQL

建立 串流數據表,這是支援串流或增量數據處理的 Delta 數據表。

串流表格只支援在 Delta Live Tables 和具有 Unity Catalog 的 Databricks SQL 上使用。 在受支援的 Databricks Runtime 計算上執行此命令只會剖析語法。 請參閱 使用 SQL 開發管線程式代碼。

語法

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

參數

  • REFRESH

    如果有指定,請用查詢中所定義來源的最新可用數據重新整理表格。 只會處理在查詢開始之前到達的新資料。 在命令執行期間,會忽略新增至來源的新數據,直到下一次重新整理為止。 CREATE OR REFRESH 的重新整理作業是完全宣告式的。 如果重新整理命令未指定原始數據表建立語句中的所有元數據,則會刪除未指定的元數據。

  • IF NOT EXISTS

    若串流數據表不存在,則建立它。 如果這個名稱的數據表已經存在,則會忽略 CREATE STREAMING TABLE 語句。

    您最多可以指定 IF NOT EXISTSOR REFRESH 中的一個。

  • table_name

    要建立之數據表的名稱。 名稱不得包含 時態規格或選項規格。 如果名稱未經完整限定,將在當前的模式中建立表。

  • table_specification

    這個選擇性子句會定義欄位清單、其類型、屬性、描述和欄位條件約束。

    如果您未在資料表架構中定義資料列,則必須指定 AS query

    • column_identifier

      欄的唯一名稱。

      • column_type

        指定資料行 的資料類型

      • NOT NULL

        如果已指定,則欄不接受 NULL 值。

      • COMMENT column_comment

        描述欄的字串常值。

      • column_constraint

        重要

        這項功能處於公開預覽狀態

        將主鍵或外鍵條件約束加入串流數據表中的數據行。 hive_metastore 目錄中的數據表不支持條件約束。

      • MASK 子句

        重要

        這項功能處於公開預覽狀態

        新增數據行遮罩函式來匿名敏感數據。 該數據欄的所有後續查詢都會接收到將該函式應用於數據欄後的結果,以取代數據欄的原始值。 這對於精細訪問控制用途很有用,其中函式可以檢查叫用使用者的身分識別或群組成員資格,以決定是否要編輯或刪除值。

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE |DROP ROW } ]

        將數據品質期望加入表格。 這些數據質量預期可以隨著時間追蹤,並透過串流數據表 事件記錄檔來存取。 FAIL UPDATE 預期會導致在建立數據表以及重新整理數據表時,處理失敗。 如果不符合預期,DROP ROW 預期會導致卸除整個資料列。

        expectation_expr 可能包含常值、數據表中的數據行標識符,以及決定性的內建 SQL 函式或運算符,但除外:

        expr 也不得包含任何子查詢

      • table_constraint

        重要

        這項功能處於公開預覽狀態

        將資訊型主鍵或資訊型外鍵約束加入串流資料表。 hive_metastore 目錄中的資料表不支援主鍵約束。

  • table_clauses

    選擇性地指定新數據表的數據分割、批注、使用者定義屬性,以及重新整理排程。 每個次子句只能指定一次。

    • PARTITIONED BY

      用來對表格進行分區的可選欄位列表。

    • COMMENT table_comment

      描述資料表的 STRING 字面值。

    • TBLPROPERTIES

      選擇性地設定一個或多個使用者定義的屬性。

      使用此設定來指定用來執行此語句的 Delta Live Tables 執行時間通道。 將 pipelines.channel 屬性的值設定為 "PREVIEW""CURRENT"。 預設值是 "CURRENT"。 如需 Delta Live Tables 頻道的詳細資訊,請參閱 Delta Live Tables 執行環境頻道

    • 時間表 [ REFRESH ] 時間表條款

    • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

      若要排程定期發生的重新整理,請使用 EVERY 語法。 如果指定了 EVERY 語法格式,串流資料表或具象化檢視表將根據所提供的值在指定的間隔定期更新,例如 HOURHOURSDAYDAYSWEEKWEEKS。 下表列出 number接受的整數值。

      Time unit 整數值
      HOUR or HOURS 1 <= H <= 72
      DAY or DAYS 1 <= D <= 31
      WEEK or WEEKS 1 <= W <= 8

      注意

      內含時間單位的單數和複數形式在語意上相等。

    • CRON cron_string [ AT TIME ZONE timezone_id ]

      使用 quartz cron 值來排程重新整理。 接受有效的 time_zone_values 。 不支援 AT TIME ZONE LOCAL

      如果 AT TIME ZONE 不存在,則會使用工作階段時區。 如果 AT TIME ZONE 不存在且未設定會話時區,則會擲回錯誤。 SCHEDULE 在語意上相當於 SCHEDULE REFRESH

    排程可作為 CREATE 命令的一部分提供。 使用 ALTER STREAMING TABLE 或搭配 CREATE OR REFRESH 子句執行 SCHEDULE 命令,以在建立之後改變串流數據表的排程。

  • 的ROW FILTER 子句

    重要

    這項功能處於公開預覽狀態

    將數據列篩選函式加入至數據表。 該數據表中所有後續的查詢都會接收到函式評估為布林值 TRUE 的行子集。 這對於精細訪問控制用途很有用,其中函式可以檢查叫用使用者的身分識別或群組成員資格,以決定是否要篩選特定數據列。

  • AS 查詢

    這個子句會使用 來自 query的數據填入數據表。 此查詢必須是串流查詢。 可以將 STREAM 關鍵字新增至您想要累加處理的任何關聯,以達成此目的。 當您一起指定 querytable_specification 時,table_specification 中指定的數據表架構必須包含 query傳回的所有數據行,否則您會收到錯誤。 table_specification 中指定的任何欄,如果在 query 中未被返回,則在查詢時會返回 null 值。

串流數據表與其他數據表之間的差異

串流數據表是具狀態的數據表,其設計目的是在您處理成長的數據集時,只處理每個數據列一次。 由於大部分數據集會隨著時間持續成長,串流數據表最適合大部分的擷取工作負載。 串流數據表最適合需要數據新鮮度和低延遲的管線。 串流數據表也可用於大規模轉換,因為結果可以在新數據送達時以累加方式計算,讓結果保持在最新狀態,而不需要使用每個更新完全重新計算所有源數據。 串流數據表是針對僅附加的數據源所設計。

串流數據表接受其他命令,例如 REFRESH,其會處理查詢中提供的來源中可用的最新數據。 提供的查詢變更只會藉由呼叫 REFRESH來反映新數據,而不是先前處理過的數據。 若也要對現有資料套用變更,需要執行 REFRESH TABLE <table_name> FULL 以執行 FULL REFRESH。 完全重新整理會以最新的定義重新處理來源中所有可用的資料。 不建議對那些不保留完整數據歷史或保留時間較短的來源(例如 Kafka)進行完整重新整理,因為完整重新整理會截斷現有的數據。 如果資料來源中的資料不再可供使用,您可能無法復原舊資料。

行篩選和列遮罩

重要

這項功能處於公開預覽狀態

資料列篩選可讓您指定一個函式,該函式在資料表掃描取出資料列時作為篩選器應用。 這些篩選條件可確保後續查詢只會傳回篩選條件述詞評估為 true 的資料列。

每當數據表掃描擷取數據列時,數據行遮罩可讓您遮罩數據行的值。 涉及該數據行的所有未來查詢都會收到評估數據行函式的結果,並取代數據行的原始值。

如需如何使用數據列篩選和數據行遮罩的詳細資訊,請參閱 使用數據列篩選和數據行遮罩篩選敏感數據。

管理行篩選和列遮罩

串流數據表上的數據列篩選和數據行遮罩應該透過 CREATE OR REFRESH 語句新增、更新或卸除。

行為

  • Refresh as Definer:當 CREATE OR REFRESHREFRESH 語句重新整理串流數據表時,數據列篩選函式會以定義者的許可權執行(以數據表擁有者身分)。 這表示資料表重新整理會使用建立串流資料表之使用者的安全性環境。
  • 查詢:雖然大部分篩選都會以定義者的權限執行,但檢查使用者內容的函式 (例如 CURRENT_USERIS_MEMBER) 是例外狀況。 會以叫用者的身分執行這些函式。 此方法會根據目前使用者的內容強制執行使用者特定的資料安全性和存取控制。

可檢視性

使用 DESCRIBE EXTENDEDINFORMATION_SCHEMA或目錄總管來檢查套用至指定串流數據表的現有數據列篩選和數據行遮罩。 此功能可讓使用者稽核及檢閱串流數據表上的數據存取和保護措施。

限制

  • 只有數據表擁有者可以重新整理串流數據表,以取得最新的數據。
  • 串流數據表上不允許 ALTER TABLE 命令。 數據表的定義和屬性應該透過 CREATE OR REFRESHALTER STREAMING TABLE 語句來改變。
  • 不支援透過像 INSERT INTOMERGE 這樣的 DML 命令來修改資料表結構。
  • 串流資料表不支援下列命令:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • 不支援 Delta Sharing。
  • 不支援重新命名資料表或變更擁有者。
  • 不支持數據表條件約束,例如 PRIMARY KEYFOREIGN KEY
  • 不支援產生的數據行、識別數據行和預設數據行。

範例

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')