適用対象:
Databricks SQL
ストリーミングまたは増分データ処理を追加でサポートする Delta テーブルであるストリーミング テーブルを作成します。
ストリーミング テーブルは、Lakeflow Spark 宣言パイプラインと Databricks SQL と Unity カタログでのみサポートされます。 サポートされている Databricks Runtime コンピューティングでこのコマンドを実行すると、構文のみが解析されます。 SQL を使用した Lakeflow Spark 宣言パイプライン コードの開発に関するページを参照してください。
構文
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
COMMENT table_comment |
DEFAULT COLLATION UTF8_BINARY |
TBLPROPERTIES clause |
schedule |
WITH { ROW FILTER clause } } [...]
schedule
{ SCHEDULE [ REFRESH ] schedule_clause |
TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ]}
パラメーター
REFRESH
指定した場合、クエリで定義されているソースから利用できる最新のデータでテーブルが更新されます。 クエリが開始される前に到着した新しいデータのみが処理されます。 コマンドの実行中にソースに追加される新しいデータは次の更新まで無視されます。 CREATE OR REFRESH からの更新操作は完全に宣言型です。 更新コマンドで元のテーブル作成ステートメントのすべてのメタデータが指定されていない場合、指定されていないメタデータは削除されます。
存在しない場合
ストリーミング テーブルが存在しない場合は作成します。 この名前のテーブルが既に存在する場合、
CREATE STREAMING TABLEステートメントは無視されます。IF NOT EXISTSかOR REFRESHのいずれか 1 つだけを指定できます。-
作成されるテーブルの名前。 この名前には、テンポラル指定やオプション指定を含めないでください。 名前が修飾されていない場合、テーブルは現在のスキーマに作成されます。
テーブル仕様
この省略可能な句で、列、その型、プロパティ、説明、および列制約の一覧を定義します。
テーブル スキーマで列を定義しない場合、
AS queryを指定する必要があります。-
列に設定する一意の名前。
-
列のデータ型を指定します。
NOT NULL
指定した場合、列は
NULL値を受け取りません。コメント column_comment
列について説明する文字列リテラル。
-
重要
この機能はパブリック プレビュー段階にあります。
ストリーミング テーブル内の列に主キーまたは外部キー制約を追加します。 制約は、
hive_metastoreカタログ内のテーブルではサポートされていません。 -
列マスク関数を追加して、機密データを匿名化します。 その列の後続のすべてのクエリは、列の元の値の代わりに、その列に対してその関数を評価した結果を受け取ります。 これは、値を編集するかどうかを決定するために呼び出したユーザーの ID やグループ メンバーシップを関数で検査できる、きめ細かいアクセス制御に役立ちます。
CONSTRAINT expectation_name EXPECT (expectation_expr) [ 違反時 { FAIL UPDATE | ROWを削除 } ]
テーブルにデータ品質の期待値を追加します。 これらのデータ品質の期待値は、時間の経過と同時に追跡し、ストリーミング テーブルの イベント ログを介してアクセスできます。 テーブルの作成時とテーブル更新時の両方で、
FAIL UPDATE期待値により処理が失敗します。DROP ROW期待値が満たされない場合、行全体が削除されます。expectation_exprは、以下のものを除く、リテラル、テーブル内の列識別子、および決定論的な組み込みの SQL 関数または演算子で構成される場合があります。-
集計関数
- 分析ウィンドウ関数
- 順位付けウィンドウ関数
- テーブル値ジェネレーター関数
また
exprには、サブクエリを含めることはできません。-
集計関数
-
重要
この機能はパブリック プレビュー段階にあります。
情報主キーまたは情報外部キーの制約をストリーミング テーブルに追加します。 主な制約は、
hive_metastoreカタログ内のテーブルに対してはサポートされません。
-
-
テーブル条項
必要に応じて、パーティション分割、コメント、ユーザー定義プロパティ、新しいテーブルの更新スケジュールを指定します。 各サブ句は、1 回だけ指定できます。
-
テーブルをパーティション化するための、任意のテーブル列の一覧。
メモ
液体クラスタリングは、クラスタリング用の柔軟で最適化されたソリューションを提供します。 ストリーミング テーブルに
CLUSTER BYする代わりに、PARTITIONED BYを使用することを検討してください。 -
列のサブセットによってクラスター化する省略可能な句。
CLUSTER BY AUTOで自動液体クラスタリングを使用し、Databricks はクエリのパフォーマンスを最適化するためにクラスタリング キーをインテリジェントに選択します。 表に液体クラスタリングを使用するを参照してください。液体クラスタリングを
PARTITIONED BYと組み合わせることはできません。 コメント table_comment
テーブルを説明するための
STRINGリテラル。既定の照合順序UTF8_BINARY
適用対象:
Databricks SQL
Databricks Runtime 17.1 以降" とマークされているチェックストリーミング テーブルの既定の照合順序を強制的に
UTF8_BINARYします。 テーブルが作成されるスキーマにUTF8_BINARY以外の既定の照合順序がある場合、この句は必須です。 ストリーミング テーブルの既定の照合順序は、query内および列型の既定の照合順序として使用されます。-
必要に応じて、1 つ以上のユーザー定義プロパティを設定します。
この設定を使用して、このステートメントの実行に使用する Lakeflow Spark 宣言パイプライン ランタイム チャネルを指定します。
pipelines.channelプロパティの値を"PREVIEW"または"CURRENT"に設定します。 既定値は"CURRENT"です。 Lakeflow Spark 宣言パイプライン チャネルの詳細については、「 Lakeflow Spark 宣言パイプライン ランタイム チャネル」を参照してください。 schedule
スケジュールには、
SCHEDULEステートメントまたはTRIGGERステートメントのいずれかを指定できます。スケジュール [ REFRESH ] スケジュール文
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }定期的に発生する更新をスケジュールするには、
EVERY構文を使用します。EVERY構文が指定されている場合、ストリーミング テーブルまたは具体化されたビューは、指定された値 (HOUR、HOURS、DAY、DAYS、WEEK、WEEKSなど) に基づいて、指定した間隔で定期的に更新されます。 次の表に、numberに使用できる整数値を示します。時間単位 整数値 HOUR or HOURS1 <= H <= 72 DAY or DAYS1<= D<= 31 WEEK or WEEKS1 <= W <= 8 メモ
含まれる時間単位の単数形と複数形は、意味的に同等です。
CRON cron_string [ AT TIME ZONE timezone_id ]quartz cron 値を使用して更新をスケジュールする場合。 有効な time_zone_values が受け入れられます。
AT TIME ZONE LOCALはサポートされません。AT TIME ZONEが存在しない場合は、セッション タイム ゾーンが使用されます。AT TIME ZONEが存在せず、セッション タイム ゾーンも設定されていない場合は、エラーが発生します。SCHEDULEは意味的にSCHEDULE REFRESHと同等です。
スケジュールは
CREATEコマンドの一部として指定できます。 ALTER STREAMING TABLE を使用するか、CREATE OR REFRESHコマンドをSCHEDULE句と共に実行して、作成後にストリーミング テーブルのスケジュールを変更します。トリガーオン UPDATE [ 最大ですべてのtrigger_interval ]
重要
TRIGGER ON UPDATE機能はベータ版です。必要に応じて、アップストリーム データ ソースが更新されたときに更新するようにテーブルを設定します (最大 1 分ごとに)。
AT MOST EVERYの値を設定して、更新の間に少なくとも最小限の時間を必要とします。アップストリーム データ ソースは、外部またはマネージド Delta テーブル (具体化されたビューやストリーミング テーブルを含む) か、依存関係がサポートされているテーブルの種類に制限されているマネージド ビューである必要があります。
ファイル イベントを有効にすると、トリガーのパフォーマンスが向上し、トリガーの更新の制限の一部が増える可能性があります。
trigger_intervalは、少なくとも 1 分の INTERVAL ステートメントです。TRIGGER ON UPDATEには次の制限があります- TRIGGER ON UPDATEを使用する場合、ストリーミング テーブルあたり 10 個以下のアップストリーム データ ソース。
- TRIGGER ON UPDATEでは、最大 1,000 個のストリーミング テーブルまたは具体化されたビューを指定できます。
-
AT MOST EVERY句の既定値は 1 分で、1 分未満にすることはできません。
-
WITH ROW FILTER 句
行フィルター関数をテーブルに追加します。 そのテーブルからのそれ以降のすべてのクエリでは、関数がブール値 TRUE に評価される行のサブセットを受け取ります。 これは、特定の行をフィルター処理するかどうかを決定するために呼び出したユーザーの ID やグループ メンバーシップを関数で検査できる、きめ細かいアクセス制御に役立ちます。
AS クエリ
この句により、
queryからデータがテーブルに入力されます。 このクエリはストリーミング クエリにする必要があります。 そのためには増分的に処理するリレーションにSTREAMキーワードを追加します。queryとtable_specificationを一緒に指定するとき、table_specificationに指定されているテーブル スキーマに、queryから返される列をすべて含める必要があります。含まれていない場合、エラーが出ます。table_specificationで指定されているが、queryから返されない列はクエリ時にnull値を返します。
ストリーミング テーブルと他のテーブルの違い
ストリーミング テーブルはステートフル テーブルであり、増加するデータセットを処理するときに各行を 1 回だけ処理するように設計されています。 ほとんどのデータセットは時間が経過するにつれて増大し続けるため、ストリーミング テーブルは、大半のインジェスト ワークロードに適しています。 ストリーミング テーブルは、データの鮮度と待ち時間の短さが要求されるパイプラインに最適です。 また、非常に大規模な変換を行う用途にも適しています。これは、新しいデータが入ってくるのに応じて増分方式で結果を計算し続けて最新の状態に保つことができ、更新のたびにソース データ全体を再計算する必要がないためです。 ストリーミング テーブルは追加専用のデータ ソースを想定して設計されています。
ストリーミング テーブルは、REFRESH などの追加コマンドを受け取ります。このコマンドは、クエリで提供されるソースで利用できる最新のデータを処理します。 指定されたクエリに対する変更は、以前に処理されたデータではなく、REFRESH を呼び出すことによって新しいデータにのみ反映されます。 既存のデータにも変更を適用するには、REFRESH TABLE <table_name> FULL を実行するために FULL REFRESH を実行する必要があります。 完全更新では、最新の定義を使用して、ソースで使用可能なすべてのデータが再処理されます。 完全更新では既存のデータが切り捨てられるため、データの履歴全体を保持しないソースや、Kafka などの短い保持期間を持つソースに対して完全な更新を呼び出すことはできません。 ソースでデータが使用できなくなった場合、古いデータを回復できないことがあります。
行フィルターと列マスク
行フィルターを使用すると、テーブル スキャンで行がフェッチされるたびにフィルターとして適用される関数を指定できます。 これらのフィルターにより、後続のクエリでフィルター述語が true と評価される行のみが返されるようになります。
列マスクを使用すると、テーブル スキャンが行をフェッチするたびに列の値をマスクできます。 その列に関連するすべての今後のクエリは、列の元の値を置き換えて、列に対して関数を評価した結果を受け取ります。
行フィルターと列マスクの使用方法の詳細については、「 行フィルターと列マスク」を参照してください。
行フィルターと列マスクの管理
ストリーミング テーブルの行フィルターと列マスクは、CREATE OR REFRESH ステートメントを通じて追加、更新、または削除する必要があります。
動作
-
定義者として更新:
CREATE OR REFRESHまたはREFRESHステートメントがストリーミング テーブルを更新すると、行フィルター関数は定義者の権限 (テーブル所有者) で実行されます。 つまり、テーブルの更新では、ストリーミング テーブルを作成したユーザーのセキュリティ コンテキストが使用されます。 -
クエリ: ほとんどのフィルターは定義者の権限で実行されますが、ユーザー コンテキスト (
CURRENT_USERやIS_MEMBERなど) をチェックする関数は例外です。 これらの関数は呼び出し元として実行されます。 この方法では、現在のユーザーのコンテキストに基づいて、ユーザー固有のデータ セキュリティとアクセス制御が適用されます。
可観測性
DESCRIBE EXTENDED、INFORMATION_SCHEMA、またはカタログ エクスプローラーを使用して、特定のストリーミング テーブルに適用される既存の行フィルターと列マスクを調べます。 この機能により、ユーザーはストリーミング テーブルのデータ アクセスと保護対策を監査および確認できます。
制限事項
- テーブル所有者だけがストリーミング テーブルを更新して最新のデータを取得できます。
-
ALTER TABLEコマンドはストリーミング テーブルでは許可されません。 テーブルの定義とプロパティは、CREATE OR REFRESHまたは ALTER STREAMING TABLE ステートメントを使用して変更する必要があります。 -
INSERT INTOやMERGEなどの DML コマンドを利用してテーブル スキーマを導き出すことはできません。 - 次のコマンドは、ストリーミング テーブルではサポートされていません。
CREATE TABLE ... CLONE <streaming_table>COPY INTOANALYZE TABLERESTORETRUNCATEGENERATE MANIFEST[CREATE OR] REPLACE TABLE
- Delta Sharing はサポートされていません。
- テーブルの名前変更や所有者の変更はサポートされていません。
-
PRIMARY KEYやFOREIGN KEYなどのテーブル制約は、hive_metastoreカタログ内のストリーミング テーブルではサポートされていません。 - 生成された列、ID 列、既定の列はサポートされていません。
例
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
AS SELECT *
FROM STREAM source_stream_data;
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM STREAM sales;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')