ストリーミング テーブルは、ストリーミングまたは増分データ処理をサポートするテーブルです。 ストリーミング テーブルはパイプラインによってサポートされます。 ストリーミング テーブルが更新されるたびに、ソース テーブルに追加されたデータがストリーミング テーブルに追加されます。 ストリーミング テーブルは、手動で、またはスケジュールに従って更新できます。
更新を実行またはスケジュールする方法の詳細については、「 パイプライン更新の実行」を参照してください。
構文
CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]
table_clauses
{ USING DELTA
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [ ... ]
パラメーター
REFRESH
指定されている場合、テーブルを作成するか、既存のテーブルとその内容を更新します。
プライベート
プライベート ストリーミング テーブルを作成します。
- これらはカタログに追加されず、定義したパイプライン内でのみアクセスできます。
- カタログ内の既存のオブジェクトと同じ名前を持つことができます。 パイプライン内で、プライベート ストリーミング テーブルとカタログ内のオブジェクトの名前が同じである場合、この名前への参照はプライベート ストリーミング テーブルに解決されます。
- プライベート ストリーミング テーブルは、1 回の更新中ではなく、パイプラインの有効期間全体でのみ保持されます。
プライベート ストリーミング テーブルは、以前は
TEMPORARYパラメーターを使用して作成されました。table_name
新しく作成されたテーブルの名前。 完全修飾のテーブル名は一意にする必要があります。
テーブル仕様
この省略可能な句で、列、その型、プロパティ、説明、および列制約の一覧を定義します。
-
列名は一意である必要があり、かつクエリの出力列にマップされている必要があります。
-
列のデータ型を指定します。 Azure Databricks でサポートされているすべてのデータ型が、ストリーミング テーブルでサポートされているわけではありません。
column_comment
列を記述する任意の
STRINGリテラル。 このオプションは、column_typeと共に指定する必要があります。 列の種類が指定されていない場合、列コメントはスキップされます。-
テーブルに流入するデータを検証する制約を追加します。 パイプラインの期待を使用してデータ品質を管理する方法については、を参照してください。
-
Important
この機能は パブリック プレビュー段階です。
列マスク関数を追加して、機密データを匿名化します。
「行フィルターと列マスク」を参照してください。
-
テーブル制約
Important
この機能は パブリック プレビュー段階です。
スキーマを指定するときに、主キーと外部キーを定義できます。 制約は情報提供のみを目的としており、強制されるものではありません。 SQL 言語リファレンスの CONSTRAINT 句 を参照してください。
注
テーブル制約を定義するには、パイプラインが Unity Catalog 対応のパイプラインである必要があります。
テーブル条項
必要に応じて、テーブルのパーティション分割、コメント、ユーザー定義プロパティを指定します。 各サブ句は、1 回だけ指定できます。
DELTA の使用
データ形式を指定します。 オプションは DELTA のみです。
この句は省略可能で、既定値は DELTA です。
でパーティション分割
テーブル内のパーティション分割に使用する 1 つ以上の列のリスト (省略可能)。
CLUSTER BYと相互に排他的です。液体クラスタリングは、クラスタリング用の柔軟で最適化されたソリューションを提供します。 パイプラインに
CLUSTER BYするのではなく、PARTITIONED BYを使用することを検討してください。CLUSTER BY
リキッド クラスタリングをテーブルに対して有効化し、クラスタリング キーとして使用する列を定義します。
CLUSTER BY AUTOで自動液体クラスタリングを使用し、Databricks はクエリのパフォーマンスを最適化するためにクラスタリング キーをインテリジェントに選択します。PARTITIONED BYと相互に排他的です。表に液体クラスタリングを使用するを参照してください。
場所
テーブル データの保存場所 (省略可能)。 設定されていない場合のシステムの既定値はパイプラインの保存場所となります。
コメント
テーブルについて説明するオプションの
STRINGリテラル。TBLPROPERTIES
テーブルのテーブル プロパティのリスト (省略可能)。
で ROW FILTER
Important
この機能は パブリック プレビュー段階です。
行フィルター関数をテーブルに追加します。 それ以降のそのテーブルに対するクエリでは、行のうち、この関数による評価の結果が TRUE であるものだけが返されます。 これは、細粒度のアクセス制御に役立ちます。呼び出し元ユーザーの ID とグループ メンバーシップをその関数で検査した結果として、特定の行をフィルター処理するかどうかを決定できるからです。
ROW FILTERの条項を参照してください。-
この句により、
queryからデータがテーブルに入力されます。 このクエリはストリーミング クエリにする必要があります。 STREAM キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。 変更コミットがあるデータを取り込むには、Python とSkipChangeCommitsオプションを使用してエラーを処理できます。queryとtable_specificationを一緒に指定するとき、table_specificationに指定されているテーブル スキーマに、queryから返される列をすべて含める必要があります。含まれていない場合、エラーが出ます。table_specificationで指定されているが、queryから返されない列はクエリ時にnull値を返します。ストリーミング データの詳細については、「パイプラインを使用してデータを変換する」を参照してください。
必要なアクセス許可
パイプラインの実行時のユーザーには、次のアクセス許可が必要です。
- ストリーミング テーブルによって参照されるベース テーブルに対する
SELECT権限。 - 親カタログに対する
USE CATALOG特権と、親スキーマに対するUSE SCHEMA特権。 - ストリーミング テーブルのスキーマに対する
CREATE MATERIALIZED VIEW権限。
ストリーミング テーブルが定義されているパイプラインをユーザーが更新できるようにするには、次が必要です。
- 親カタログに対する
USE CATALOG特権と、親スキーマに対するUSE SCHEMA特権。 - ストリーミング テーブルの所有権またはストリーミング テーブルに対する
REFRESH権限。 - ストリーミング テーブルの所有者は、ストリーミング テーブルによって参照されるベース テーブルに対する
SELECT権限を持っている必要があります。
ユーザーが結果のストリーミング テーブルに対してクエリを実行できるようにするには、次が必要です。
- 親カタログに対する
USE CATALOG特権と、親スキーマに対するUSE SCHEMA特権。 - ストリーミング テーブルに対する
SELECT権限。
制限事項
- テーブル所有者だけがストリーミング テーブルを更新して最新のデータを取得できます。
-
ALTER TABLEコマンドはストリーミング テーブルでは許可されません。 テーブルの定義とプロパティは、CREATE OR REFRESHまたは ALTER STREAMING TABLE ステートメントを使用して変更する必要があります。 -
INSERT INTOやMERGEなどの DML コマンドを利用してテーブル スキーマを導き出すことはできません。 - 次のコマンドは、ストリーミング テーブルではサポートされていません。
CREATE TABLE ... CLONE <streaming_table>COPY INTOANALYZE TABLERESTORETRUNCATEGENERATE MANIFEST[CREATE OR] REPLACE TABLE
- テーブルの名前変更や所有者の変更はサポートされていません。
- 生成された列、ID 列、既定の列はサポートされていません。
例示
-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")
-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)
-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;