適用於:
Databricks SQL
建立串流表,即支援串流或增量數據處理的 Delta 表格。
串流資料表僅支援 Lakeflow Spark 宣告式管線和具有 Unity 目錄的 Databricks SQL。 在受支援的 Databricks Runtime 計算上執行此命令只會剖析語法。 請參考 使用 SQL 開發 Lakeflow Spark 宣告式管線程式碼。
語法
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
COMMENT table_comment |
DEFAULT COLLATION UTF8_BINARY |
TBLPROPERTIES clause |
schedule |
WITH { ROW FILTER clause } } [...]
schedule
{ SCHEDULE [ REFRESH ] schedule_clause |
TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ]}
參數
REFRESH
若有指定,請用查詢中所定義來源的最新可用資料重新整理表格。 只會處理在查詢開始之前到達的新資料。 在命令執行期間,會忽略新增至來源的新數據,直到下一次重新整理為止。 CREATE OR REFRESH 的重新整理作業是完全宣告式的。 如果重新整理命令未指定原始數據表建立語句中的所有元數據,則會刪除未指定的元數據。
若不存在
若串流數據表不存在,則建立它。 如果這個名稱的數據表已經存在,則會忽略
CREATE STREAMING TABLE語句。您最多可以指定
IF NOT EXISTS或OR REFRESH中的一個。-
要建立之數據表的名稱。 名稱不得包含 時態規格或選項規格。 如果名稱未經完整限定,將在當前的模式中建立表。
表格規格
這個選擇性子句會定義欄位清單、其類型、屬性、描述和欄位條件約束。
如果您未在資料表架構中定義資料列,則必須指定
AS query。-
欄位的一個唯一名稱。
-
指定資料行 的資料類型。
非空
如果已指定,則欄不接受
NULL值。註解 column_comment
描述欄的字串常值。
-
重要
這項功能處於公開預覽狀態。
將主鍵或外鍵約束加入串流表中的欄位。
hive_metastore目錄中的數據表不支持條件約束。 -
新增數據行遮罩函式來匿名敏感數據。 該列的所有後續查詢都會接收將該函式應用於該列所得到的結果,取代該列的原始值。 這對於精細訪問控制用途很有用,其中函式可以檢查叫用使用者的身分識別或群組成員資格,以決定是否要編輯或刪除值。
CONSTRAINT expectation_name EXPECT (expectation_expr) [ON VIOLATION {FAIL UPDATE | DROP ROW}]
將資料品質期望加入表格。 這些數據質量預期可以隨著時間追蹤,並透過串流數據表 的事件記錄檔來存取。
FAIL UPDATE的預期行為會導致在建立或重新整理數據表時,處理過程失敗。 如果不符合預期,整個資料列會因這個DROP ROW預期而被移除。expectation_expr可能包含常值、數據表中的數據行標識符,以及決定性的內建 SQL 函式或運算符,但除外:expr也不得包含任何子查詢。-
重要
這項功能處於公開預覽狀態。
將資訊型主鍵或資訊型外鍵約束加入串流資料表。
hive_metastore目錄中的資料表不支援主鍵約束。
-
-
表格條款
選擇性地指定新數據表的數據分割、批注、使用者定義屬性,以及重新整理排程。 每個次子句只能指定一次。
-
用來對表格進行分區的可選欄位列表。
注意
Liquid clustering 提供彈性且優化的叢集解決方案。 請考慮使用
CLUSTER BY作為串流表替代PARTITIONED BY。 -
以欄位的子集進行叢集的選擇性子句。 搭配使用
CLUSTER BY AUTO自動液體叢集,Databricks 會智慧地選擇叢集索引鍵,以優化查詢效能。 請參閱 針對數據表使用液體叢集。液體聚類不能與
PARTITIONED BY結合使用。 註解 table_comment
描述資料表的
STRING字面值。預設排序規則 UTF8_BINARY
適用於:
Databricks SQL
Databricks Runtime 17.1 和更新版本強制串流表格的預設排序規則為
UTF8_BINARY。 如果建立表格的資料庫架構具有預設排序規則不同於UTF8_BINARY,則此條款是必需的。 串流表格的預定排序會在query中以及對資料行類型用作預設排序。-
選擇性地設定一個或多個使用者定義的屬性。
使用此設定來指定 Lakeflow Spark 宣告式管線的執行階段通道,用來執行此陳述式。 將
pipelines.channel屬性的值設定為"PREVIEW"或"CURRENT"。 預設值是"CURRENT"。 如需 Lakeflow Spark 宣告式管線通道的詳細資訊,請參閱 Lakeflow Spark 宣告式管線執行階段通道。 附表
排程可以是
SCHEDULE陳述式或TRIGGER陳述式。時間表 [ REFRESH ] 時間表條款
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }若要排定定期執行的更新,請使用
EVERY語法。 如果指定了EVERY語法格式,串流資料表或具象化檢視表將根據所提供的值在指定的間隔定期更新,例如HOUR、HOURS、DAY、DAYS、WEEK或WEEKS。 下表列出number接受的整數值。時間單位 整數值 HOUR or HOURS1 <= 高 <= 72 DAY or DAYS1 <= D <= 31 WEEK or WEEKS1 <= 寬 <= 8 注意
內含時間單位的單數和複數形式在語意上相等。
CRON cron_string [ AT TIME ZONE timezone_id ]使用 quartz cron 值來排程重新整理。 接受的 time_zone_values 必須有效。 不支援
AT TIME ZONE LOCAL。如果
AT TIME ZONE不存在,則會使用工作階段時區。 如果AT TIME ZONE不存在且未設定會話時區,則會擲回錯誤。SCHEDULE在語意上相當於SCHEDULE REFRESH。
排程可作為
CREATE命令的一部分提供。 使用 ALTER STREAMING TABLE 或搭配CREATE OR REFRESH子句執行SCHEDULE命令,以在建立之後改變串流數據表的排程。UPDATE 觸發 [ 最多每trigger_interval ]
重要
該
TRIGGER ON UPDATE功能處於 測試階段。選擇性地將表格設定為在更新上游資料來源時重新整理,最多每分鐘一次。 設定值 ,
AT MOST EVERY以要求重新整理之間至少有最短時間。上游資料來源必須是外部或受控 Delta 資料表 (包括具體化檢視或串流資料表),或相依性僅限於支援資料表類型的受控檢視。
啟用 檔案事件 可以讓觸發程式更有效率,並增加觸發程式更新的一些限制。
是
trigger_interval至少 1 分鐘的 INTERVAL 陳述式。TRIGGER ON UPDATE有以下限制- 使用 TRIGGER ON UPDATE時,每個串流資料表的上游資料來源不超過 10 個。
- 最多可以使用 TRIGGER ON UPDATE指定 1000 個串流資料表或具體化檢視。
- 子
AT MOST EVERY句預設為 1 分鐘,且不能小於 1 分鐘。
-
-
將數據列篩選函式加入至數據表。 該數據表中所有後續的查詢都會接收到函式評估為布林值 TRUE 的行子集。 這對於精細訪問控制用途很有用,其中函式可以檢查叫用使用者的身分識別或群組成員資格,以決定是否要篩選特定數據列。
AS 查詢
這個子句會使用 來自
query的數據填入數據表。 此查詢必須是串流查詢。 可以將STREAM關鍵字新增至您想要累加處理的任何關聯,以達成此目的。 當您一起指定query和table_specification時,table_specification中指定的數據表架構必須包含query傳回的所有數據行,否則您會收到錯誤。table_specification中指定的任何欄,如果在query中未被返回,則在查詢時會返回null值。
串流數據表與其他數據表之間的差異
串流數據表是具狀態的數據表,其設計目的是在您處理成長的數據集時,只處理每個數據列一次。 由於大部分數據集會隨著時間持續成長,串流數據表最適合大部分的擷取工作負載。 串流數據表最適合需要數據新鮮度和低延遲的管線。 串流數據表也可用於大規模轉換,因為結果可以在新數據送達時以累加方式計算,讓結果保持在最新狀態,而不需要使用每個更新完全重新計算所有源數據。 串流數據表是針對僅附加的數據源所設計。
串流數據表接受其他命令,例如 REFRESH,其會處理查詢中提供的來源中可用的最新數據。 提供的查詢變更只會藉由呼叫 REFRESH來反映新數據,而不是先前處理過的數據。 若也要對現有資料套用變更,需要執行 REFRESH TABLE <table_name> FULL 以執行 FULL REFRESH。 完整重新整理會根據最新的定義重新處理來源中所有可用的資料。 不建議對不保留整個資料歷程記錄或保留期間較短的來源執行完整重整,例如 Kafka,因為完整重整會截斷現有的資料。 如果資料來源中的資料不再可供使用,您可能無法復原舊資料。
行篩選和列遮罩
資料列篩選可讓您指定一個函式,該函式在資料表掃描取出資料列時作為篩選器應用。 這些篩選條件可確保後續查詢只會傳回篩選條件述詞評估為 true 的資料列。
每當數據表掃描擷取數據列時,數據行遮罩可讓您遮罩數據行的值。 涉及該數據行的所有未來查詢都會收到評估數據行函式的結果,並取代數據行的原始值。
如需如何使用數據列篩選和數據行遮罩的詳細資訊,請參閱 數據列篩選和數據行遮罩。
管理行篩選和列遮罩
串流數據表上的數據列篩選和數據行遮罩應該透過 CREATE OR REFRESH 語句新增、更新或卸除。
行為
-
重新整理為定義者:當
CREATE OR REFRESH或REFRESH語句重新整理串流數據表時,數據列篩選函式會以定義者的許可權執行(作為數據表擁有者)。 這表示資料表重新整理會使用建立串流資料表之使用者的安全性環境。 -
查詢:雖然大部分篩選都會以定義器的許可權執行,但檢查用戶內容 (例如
CURRENT_USER和IS_MEMBER) 的函式是例外狀況。 這些功能將以呼叫者的身份運行。 此方法會根據目前使用者的內容強制執行使用者特定的數據安全性和訪問控制。
可檢視性
使用 DESCRIBE EXTENDED、INFORMATION_SCHEMA或目錄總管來檢查套用至指定串流數據表的現有數據列篩選和數據行遮罩。 此功能可讓使用者稽核及檢閱串流數據表上的數據存取和保護措施。
限制
- 只有數據表擁有者可以重新整理串流數據表,以取得最新的數據。
- 串流數據表上不允許
ALTER TABLE命令。 數據表的定義和屬性應該透過CREATE OR REFRESH或 ALTER STREAMING TABLE 語句來改變。 - 不支援透過像
INSERT INTO和MERGE這樣的 DML 命令來修改資料表結構。 - 串流資料表不支援下列命令:
CREATE TABLE ... CLONE <streaming_table>COPY INTOANALYZE TABLERESTORETRUNCATEGENERATE MANIFEST[CREATE OR] REPLACE TABLE
- 不支援 Delta Sharing。
- 不支援重新命名資料表或變更擁有者。
- 型錄中的串流表格不支援
PRIMARY KEY和FOREIGN KEY等表格限制。 - 不支援產生的數據行、識別數據行和預設數據行。
範例
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
AS SELECT *
FROM STREAM source_stream_data;
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM STREAM sales;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')