Instruire
Modul
Implement streaming architecture patterns with Delta Live Tables - Training
Learn about structured streaming with Delta Live Tables
Acest browser nu mai este acceptat.
Faceți upgrade la Microsoft Edge pentru a profita de cele mai noi funcții, actualizări de securitate și asistență tehnică.
Applies to: Databricks SQL
Creates a streaming table, a Delta table with extra support for streaming or incremental data processing.
Streaming tables are only supported in DLT and on Databricks SQL with Unity Catalog. Running this command on supported Databricks Runtime compute only parses the syntax. See Develop pipeline code with SQL.
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
REFRESH
If specified, refreshes the table with the latest data available from the sources defined in the query. Only new data that arrives before the query starts is processed. New data that gets added to the sources during the execution of the command is ignored until the next refresh. The refresh operation from CREATE OR REFRESH is fully declarative. If a refresh command does not specify all metadata from the original table creation statement, the unspecified metadata is deleted.
IF NOT EXISTS
Creates the streaming table if it does not exist. If a table by this name already exists, the CREATE STREAMING TABLE
statement is ignored.
You may specify at most one of IF NOT EXISTS
or OR REFRESH
.
The name of the table to be created. The name must not include a temporal specification or options specification. If the name is not qualified the table is created in the current schema.
table_specification
This optional clause defines the list of columns, their types, properties, descriptions, and column constraints.
If you do not define columns in the table schema you must specify AS query
.
A unique name for the column.
Specifies the data type of the column.
NOT NULL
If specified the column does not accept NULL
values.
COMMENT column_comment
A string literal to describe the column.
Important
This feature is in Public Preview.
Adds a primary key or foreign key constraint to the column in a streaming table.
Constraints are not supported for tables in the hive_metastore
catalog.
Important
This feature is in Public Preview.
Adds a column mask function to anonymize sensitive data. All subsequent queries from that column receive the result of evaluating that function over the column in place of the column’s original value. This can be useful for fine-grained access control purposes where the function can inspect the identity or group memberships of the invoking user to decide whether to redact the value.
CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
Adds data quality expectations to the table. These data quality expectations can be tracked over time and accessed through the streaming table‘s event log. A FAIL UPDATE
expectation causes the processing to fail when both creating the table as well as refreshing the table. A DROP ROW
expectation causes the entire row to be dropped if the expectation is not met.
expectation_expr
may be composed of literals, column identifiers within the table, and deterministic, built-in SQL functions or operators except:
Also expr
must not contain any subquery.
Important
This feature is in Public Preview.
Adds an informational primary key or informational foreign key constraints to a streaming table.
Key constraints are not supported for tables in the hive_metastore
catalog.
table_clauses
Optionally specify partitioning, comments, user defined properties, and a refresh schedule for the new table. Each sub clause may only be specified once.
An optional list of columns of the table to partition the table by.
Notă
Liquid clustering provides a flexible, optimized solution for clustering. Consider using CLUSTER BY
instead of PARTITIONED BY
for streaming tables.
An optional clause to cluster by a subset of columns. For more information about liquid clustering, see Use liquid clustering for Delta tables.
Delta Lake liquid clustering cannot be combined with PARTITIONED BY
.
COMMENT table_comment
A STRING
literal to describe the table.
Optionally sets one or more user defined properties.
Use this setting to specify the DLT runtime channel used to run this statement. Set the value of the pipelines.channel
property to "PREVIEW"
or "CURRENT"
. The default value is "CURRENT"
. For more information about DLT channels, see DLT runtime channels.
SCHEDULE [ REFRESH ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
To schedule a refresh that occurs periodically, use EVERY
syntax. If EVERY
syntax is specified, the streaming table or materialized view is refreshed periodically at the specified interval based on the provided value, such as HOUR
, HOURS
, DAY
, DAYS
, WEEK
, or WEEKS
.
The following table lists accepted integer values for number
.
Time unit | Integer value |
---|---|
HOUR or HOURS |
1 <= H <= 72 |
DAY or DAYS |
1 <= D <= 31 |
WEEK or WEEKS |
1 <= W <= 8 |
Notă
The singular and plural forms of the included time unit are semantically equivalent.
CRON cron_string [ AT TIME ZONE timezone_id ]
To schedule a refresh using a quartz cron value. Valid time_zone_values are accepted. AT TIME ZONE LOCAL
is not supported.
If AT TIME ZONE
is absent, the session time zone is used.
If AT TIME ZONE
is absent and the session time zone is not set, an error is thrown.
SCHEDULE
is semantically equivalent to SCHEDULE REFRESH
.
The schedule can be provided as part of the CREATE
command.
Use ALTER STREAMING TABLE or run CREATE OR REFRESH
command with SCHEDULE
clause to alter the schedule of a streaming table after creation.
WITH ROW FILTER clause
Important
This feature is in Public Preview.
Adds a row filter function to the table. All subsequent queries from that table receive a subset of the rows where the function evaluates to boolean TRUE. This can be useful for fine-grained access control purposes where the function can inspect the identity or group memberships of the invoking user to decide whether to filter certain rows.
AS query
This clause populates the table using the data from query
. This query must be a streaming query. This can be achieved by adding the STREAM
keyword to any relation you want to process incrementally.
When you specify a query
and a table_specification
together, the table schema specified in table_specification
must contain all the columns returned by the query
, otherwise you get an error. Any columns specified in table_specification
but not returned by query
return null
values when queried.
Streaming tables are stateful tables, designed to handle each row only once as you process a growing dataset. Because most datasets grow continuously over time, streaming tables are good for most ingestion workloads. Streaming tables are optimal for pipelines that require data freshness and low latency. Streaming tables can also be useful for massive scale transformations, as results can be incrementally calculated as new data arrives, keeping results up to date without needing to fully recompute all source data with each update. Streaming tables are designed for data sources that are append-only.
Streaming tables accept additional commands such as REFRESH
, which processes the latest data available in the sources provided in the query. Changes to the provided query only get reflected on new data by calling a REFRESH
, not previously processed data. To apply the changes on existing data as well, you need to execute REFRESH TABLE <table_name> FULL
to perform a FULL REFRESH
. Full refreshes re-process all data available in the source with the latest definition. It is not recommended to call full refreshes on sources that don’t keep the entire history of the data or have short retention periods, such as Kafka, as the full refresh truncates the existing data. You may not be able to recover old data if the data is no longer available in the source.
Important
This feature is in Public Preview.
Row filters let you specify a function that applies as a filter whenever a table scan fetches rows. These filters ensure that subsequent queries only return rows for which the filter predicate evaluates to true.
Column masks let you mask a column’s values whenever a table scan fetches rows. All future queries involving that column will receive the result of evaluating the function over the column, replacing the column’s original value.
For more information on how to use row filters and column masks, see Filter sensitive table data using row filters and column masks.
Row filters and column masks on streaming tables should be added, updated, or dropped through the CREATE OR REFRESH
statement.
CREATE OR REFRESH
or REFRESH
statements refresh a streaming table, row filter functions run with the definer’s rights (as the table owner). This means the table refresh uses the security context of the user who created the streaming table.CURRENT_USER
and IS_MEMBER
) are exceptions. These functions run as the invoker. This approach enforces user-specific data security and access controls based on the current user’s context.Use DESCRIBE EXTENDED
, INFORMATION_SCHEMA
, or the Catalog Explorer to examine the existing row filters and column masks that apply to a given streaming table. This functionality allows users to audit and review data access and protection measures on streaming tables.
ALTER TABLE
commands are disallowed on streaming tables. The definition and properties of the table should be altered through the CREATE OR REFRESH
or ALTER STREAMING TABLE statement.INSERT INTO
, and MERGE
is not supported.CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
PRIMARY KEY
and FOREIGN KEY
are not supported.-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')
Instruire
Modul
Implement streaming architecture patterns with Delta Live Tables - Training
Learn about structured streaming with Delta Live Tables
Documentație
DLT SQL language reference - Azure Databricks
Learn how to use the SQL language features of Azure Databricks DLT
CREATE MATERIALIZED VIEW - Azure Databricks - Databricks SQL
Learn how to use the CREATE MATERIALIZED VIEW syntax of the SQL language in Databricks SQL and Databricks Runtime.
DLT Python language reference - Azure Databricks
This article details the DLT Python programming interface.