CREATE STREAMING TABLE

適用於:勾選「是」 Databricks SQL

建立串流表,即支援串流或增量數據處理的 Delta 表格。

串流資料表僅支援 Lakeflow Spark 宣告式管線和具有 Unity 目錄的 Databricks SQL。 在受支援的 Databricks Runtime 計算上執行此命令只會剖析語法。 請參考 使用 SQL 開發 Lakeflow Spark 宣告式管線程式碼

語法

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ {flow_clause | AS query} ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    COMMENT table_comment |
    DEFAULT COLLATION UTF8_BINARY |
    TBLPROPERTIES clause |
    schedule |
    WITH { ROW FILTER clause } } [...]

flow_clause
  FLOW { { INSERT BY NAME query } |
  { AUTO CDC auto_cdc_flow_spec } }

schedule
  { SCHEDULE [ REFRESH ] schedule_clause |
    TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ]}

參數

  • REFRESH

    若有指定,請用查詢中所定義來源的最新可用資料重新整理表格。 只會處理在查詢開始之前到達的新資料。 在命令執行期間,會忽略新增至來源的新數據,直到下一次重新整理為止。 CREATE OR REFRESH 的重新整理作業是完全宣告式的。 如果重新整理命令未指定原始數據表建立語句中的所有元數據,則會刪除未指定的元數據。

  • 若不存在

    若串流數據表不存在,則建立它。 如果這個名稱的數據表已經存在,則會忽略 CREATE STREAMING TABLE 語句。

    您最多可以指定 IF NOT EXISTSOR REFRESH 中的一個。

  • table_name

    要建立之數據表的名稱。 名稱不得包含 時態規格或選項規格。 如果名稱未經完整限定,將在當前的模式中建立表。

  • 表格規格

    這個選擇性子句會定義欄位清單、其類型、屬性、描述和欄位條件約束。

    如果您未在資料表架構中定義資料列,則必須指定 AS query

    • column_identifier

      欄位的一個唯一名稱。

      • 欄位類型

        指定資料行 的資料類型

      • 非空

        如果已指定,則欄不接受 NULL 值。

      • 註解 column_comment

        描述欄的字串常值。

      • column_constraint

        將主鍵或外鍵約束加入串流表中的欄位。 hive_metastore 目錄中的數據表不支持條件約束。

      • MASK 子句

        新增數據行遮罩函式來匿名敏感數據。 該列的所有後續查詢都會接收將該函式應用於該列所得到的結果,取代該列的原始值。 這對於精細訪問控制用途很有用,其中函式可以檢查叫用使用者的身分識別或群組成員資格,以決定是否要編輯或刪除值。

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ON VIOLATION {FAIL UPDATE | DROP ROW}]

        將資料品質期望加入表格。 這些數據質量預期可以隨著時間追蹤,並透過串流數據表 的事件記錄檔來存取。 FAIL UPDATE 的預期行為會導致在建立或重新整理數據表時,處理過程失敗。 如果不符合預期,整個資料列會因這個DROP ROW預期而被移除。

        expectation_expr 可能包含常值、數據表中的數據行標識符,以及決定性的內建 SQL 函式或運算符,但除外:

        expr 也不得包含任何子查詢

      • 資料表限制

        將資訊型主鍵或資訊型外鍵約束加入串流資料表。 hive_metastore 目錄中的資料表不支援主鍵約束。

  • 表格條款

    選擇性地指定新數據表的數據分割、批注、使用者定義屬性,以及重新整理排程。 每個次子句只能指定一次。

    • 由...分區

      用來對表格進行分區的可選欄位列表。

      注意

      Liquid clustering 提供彈性且優化的叢集解決方案。 請考慮使用 CLUSTER BY 作為串流表替代 PARTITIONED BY

    • CLUSTER BY

      以欄位的子集進行叢集的選擇性子句。 搭配使用 CLUSTER BY AUTO自動液體叢集,Databricks 會智慧地選擇叢集索引鍵,以優化查詢效能。 請參閱 針對數據表使用液體叢集

      液體聚類不能與 PARTITIONED BY結合使用。

    • 註解 table_comment

      描述資料表的 STRING 字面值。

    • 預設排序規則 UTF8_BINARY

      適用於:核取標示為是 Databricks SQL 核取標示為是 Databricks Runtime 17.1 和更新版本

      強制串流表格的預設排序規則為 UTF8_BINARY。 如果建立表格的資料庫架構具有預設排序規則不同於 UTF8_BINARY,則此條款是必需的。 串流表格的預定排序會在 query 中以及對資料行類型用作預設排序。

    • TBLPROPERTIES

      選擇性地設定一個或多個使用者定義的屬性。

      使用此設定來指定 Lakeflow Spark 宣告式管線的執行階段通道,用來執行此陳述式。 將 pipelines.channel 屬性的值設定為 "PREVIEW""CURRENT"。 預設值是 "CURRENT"。 如需 Lakeflow Spark 宣告式管線通道的詳細資訊,請參閱 Lakeflow Spark 宣告式管線執行階段通道

    • 附表

      排程可以是 SCHEDULE 陳述式或 TRIGGER 陳述式。

      • 時間表 [ REFRESH ] 時間表條款

        • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

          若要排定定期執行的更新,請使用 EVERY 語法。 如果指定了 EVERY 語法格式,串流資料表或具象化檢視表將根據所提供的值在指定的間隔定期更新,例如 HOURHOURSDAYDAYSWEEKWEEKS。 下表列出 number接受的整數值。

          時間單位 整數值
          HOUR or HOURS 1 <= 高 <= 72
          DAY or DAYS 1 <= D <= 31
          WEEK or WEEKS 1 <= 寬 <= 8

          注意

          內含時間單位的單數和複數形式在語意上相等。

        • CRON cron_string [ AT TIME ZONE timezone_id ]

          使用 quartz cron 值來排程重新整理。 接受的 time_zone_values 必須有效。 不支援 AT TIME ZONE LOCAL

          cron 表達式使用六個空間分離場,順序為: seconds minutes hours day-of-month month day-of-week。 使用?任意day-of-monthday-of-week一種,保持不具體說明。

          例如,每天 SCHEDULE CRON '0 0 0 * * ?' AT TIME ZONE 'UTC' 在 UTC 時間午夜刷新。

          如果 AT TIME ZONE 不存在,則會使用工作階段時區。 如果 AT TIME ZONE 不存在且未設定會話時區,則會擲回錯誤。 SCHEDULE 在語意上相當於 SCHEDULE REFRESH

        排程可作為 CREATE 命令的一部分提供。 使用 ALTER STREAMING TABLE 或搭配 CREATE OR REFRESH 子句執行 SCHEDULE 命令,以在建立之後改變串流數據表的排程。

      • UPDATE 觸發 [ 最多每trigger_interval ]

        選擇性地將表格設定為在更新上游資料來源時重新整理,最多每分鐘一次。 設定值 , AT MOST EVERY 以要求重新整理之間至少有最短時間。

        上游資料來源必須是外部或受控 Delta 資料表 (包括具體化檢視或串流資料表),或相依性僅限於支援資料表類型的受控檢視。

        啟用 檔案事件 可以讓觸發程式更有效率,並增加觸發程式更新的一些限制。

        trigger_interval 至少 1 分鐘的 INTERVAL 陳述式。

        TRIGGER ON UPDATE 有以下限制

        • 使用 TRIGGER ON UPDATE時,每個串流資料表的上游資料來源不超過 10 個。
        • 最多可以使用 TRIGGER ON UPDATE指定 1000 個串流資料表或具體化檢視。
        • AT MOST EVERY 句預設為 1 分鐘,且不能小於 1 分鐘。
  • 的ROW FILTER 子句

    將數據列篩選函式加入至數據表。 該數據表中所有後續的查詢都會接收到函式評估為布林值 TRUE 的行子集。 這對於精細訪問控制用途很有用,其中函式可以檢查叫用使用者的身分識別或群組成員資格,以決定是否要篩選特定數據列。

  • 流程

    這很重要

    這項功能位於 測試版 (Beta) 中。 需要 Databricks 執行環境 17.3 及以上版本。

    可選擇性地定義與資料表建立相關的 流程 。 流程是一種有狀態的查詢,用來刷新資料表的內容。 如果 FLOW 沒有特別說明,你可以用 AS query 替代。 另外一個 REFRESH STREAMING TABLE 語句讓你執行流程。 您可以指定以下其中一種流程類型:

    • INSERT 依名稱

      依欄位名稱將資料插入資料表。 查詢必須是串流查詢。 使用STREAM關鍵詞,來使用串流語意從來源讀取。 如果讀取遇到現有記錄的變更或刪除,則會拋出錯誤。 閱讀靜態或只能添加的來源是最安全的。

      注意

      FLOW INSERT BY NAME 等價於使用 AS query。 以下兩個陳述具有相同的行為:

      CREATE OR REFRESH STREAMING TABLE raw_data
      AS SELECT * FROM STREAM read_files('abfss://my_path');
      
      CREATE OR REFRESH STREAMING TABLE raw_data
      FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');
      
    • 自動疾病控制中心

      定義一個 AUTO CDC 流程,將變更資料擷取(CDC)記錄從來源處理到資料表。 當來源資料包含 CDC 語意時使用 AUTO CDC 。 看......CREATE STREAMING TABLE自動流控中心

  • AS 查詢

    這個子句會使用 來自 query的數據填入數據表。 此查詢必須是串流查詢。 可以將 STREAM 關鍵字新增至您想要累加處理的任何關聯,以達成此目的。 當您一起指定 querytable_specification 時,table_specification 中指定的數據表架構必須包含 query傳回的所有數據行,否則您會收到錯誤。 table_specification 中指定的任何欄,如果在 query 中未被返回,則在查詢時會返回 null 值。

串流數據表與其他數據表之間的差異

串流數據表是具狀態的數據表,其設計目的是在您處理成長的數據集時,只處理每個數據列一次。 由於大部分數據集會隨著時間持續成長,串流數據表最適合大部分的擷取工作負載。 串流數據表最適合需要數據新鮮度和低延遲的管線。 串流數據表也可用於大規模轉換,因為結果可以在新數據送達時以累加方式計算,讓結果保持在最新狀態,而不需要使用每個更新完全重新計算所有源數據。 串流數據表是針對僅附加的數據源所設計。

串流數據表接受其他命令,例如 REFRESH,其會處理查詢中提供的來源中可用的最新數據。 提供的查詢變更只會藉由呼叫 REFRESH來反映新數據,而不是先前處理過的數據。 若也要對現有資料套用變更,需要執行 REFRESH TABLE <table_name> FULL 以執行 FULL REFRESH。 完整重新整理會根據最新的定義重新處理來源中所有可用的資料。 不建議對不保留整個資料歷程記錄或保留期間較短的來源執行完整重整,例如 Kafka,因為完整重整會截斷現有的資料。 如果資料來源中的資料不再可供使用,您可能無法復原舊資料。

行篩選和列遮罩

資料列篩選可讓您指定一個函式,該函式在資料表掃描取出資料列時作為篩選器應用。 這些篩選條件可確保後續查詢只會傳回篩選條件述詞評估為 true 的資料列。

每當數據表掃描擷取數據列時,數據行遮罩可讓您遮罩數據行的值。 涉及該數據行的所有未來查詢都會收到評估數據行函式的結果,並取代數據行的原始值。

如需如何使用數據列篩選和數據行遮罩的詳細資訊,請參閱 數據列篩選和數據行遮罩

管理行篩選和列遮罩

串流數據表上的數據列篩選和數據行遮罩應該透過 CREATE OR REFRESH 語句新增、更新或卸除。

行為

  • 重新整理為定義者:當 CREATE OR REFRESHREFRESH 語句重新整理串流數據表時,數據列篩選函式會以定義者的許可權執行(作為數據表擁有者)。 這表示資料表重新整理會使用建立串流資料表之使用者的安全性環境。
  • 查詢:雖然大部分篩選都會以定義器的許可權執行,但檢查用戶內容 (例如 CURRENT_USERIS_MEMBER) 的函式是例外狀況。 這些功能將以呼叫者的身份運行。 此方法會根據目前使用者的內容強制執行使用者特定的數據安全性和訪問控制。

可檢視性

使用 DESCRIBE EXTENDEDINFORMATION_SCHEMA或目錄總管來檢查套用至指定串流數據表的現有數據列篩選和數據行遮罩。 此功能可讓使用者稽核及檢閱串流數據表上的數據存取和保護措施。

限制

  • 只有數據表擁有者可以重新整理串流數據表,以取得最新的數據。
  • 串流數據表上不允許 ALTER TABLE 命令。 數據表的定義和屬性應該透過 CREATE OR REFRESHALTER STREAMING TABLE 語句來改變。
  • 不支援透過像 INSERT INTOMERGE 這樣的 DML 命令來修改資料表結構。
  • 串流資料表不支援下列命令:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • 不支援 Delta Sharing。
  • 不支援重新命名資料表或變更擁有者。
  • 型錄中的串流表格不支援PRIMARY KEYFOREIGN KEY等表格限制。
  • 不支援產生的數據行、識別數據行和預設數據行。

範例

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
  TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
  AS SELECT *
  FROM STREAM source_stream_data;

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM STREAM sales;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')

-- Creates a streaming table using a FLOW to append data from files
> CREATE OR REFRESH STREAMING TABLE raw_data
  FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');

-- Creates a streaming table using an AUTO CDC flow to apply changes from a change feed
> CREATE OR REFRESH STREAMING TABLE target
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;