СОЗДАНИЕ ТАБЛИЦЫ ПОТОКОВОЙ ПЕРЕДАЧИ
Область применения: Databricks SQL
Создает таблицу потоковой передачи, таблицу Delta с дополнительной поддержкой потоковой или добавочной обработки данных.
Потоковая передача таблиц поддерживается только в разностных динамических таблицах и в Databricks SQL с каталогом Unity. При выполнении этой команды в поддерживаемой среде выполнения Databricks вычисляется только синтаксический анализ. См. статью "Реализация конвейера разностных динамических таблиц" с помощью SQL.
Синтаксис
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] |
WITH { ROW FILTER clause } } [...]
Параметры
REFRESH
При указании обновляет таблицу с последними данными, доступными из источников, определенных в запросе. Обрабатываются только новые данные, поступающие до запуска запроса. Новые данные, добавляемые в источники во время выполнения команды, игнорируются до следующего обновления. Операция обновления из CREATE OR REFRESH полностью декларативна. Если команда обновления не указывает все метаданные из исходной инструкции создания таблицы, удаляются неопределенные метаданные.
IF NOT EXISTS
Создает таблицу потоковой передачи, если она не существует. Если таблица по этому имени уже существует,
CREATE STREAMING TABLE
инструкция игнорируется.Можно указать не более одного предложения из числа
IF NOT EXISTS
иOR REFRESH
.-
Имя создаваемой таблицы. Имя не должно содержать временную спецификацию. Если имя не указано полностью, таблица создается в текущей схеме.
table_specification
Это необязательное предложение определяет список столбцов, их типы, свойства, описания и ограничения.
Если столбцы в схеме таблицы не определены, необходимо указать
AS query
.-
Уникальное имя столбца.
-
Указывает тип данных столбца.
NOT NULL
Если указанный столбец не принимает
NULL
значения.COMMENT column_comment
Строковый литерал для описания столбца.
-
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Добавляет ограничение первичного ключа или внешнего ключа в столбец в таблице потоковой передачи. Ограничения не поддерживаются для таблиц в каталоге
hive_metastore
. -
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Добавляет функцию маски столбца для анонимизации конфиденциальных данных. Все последующие запросы из этого столбца получают результат оценки этой функции по столбцу вместо исходного значения столбца. Это может быть полезно для точного контроля доступа, где функция может проверить удостоверение или членство в группах вызывающего пользователя, чтобы решить, следует ли изменить значение.
ОГРАНИЧЕНИЕ EXPECTATION_NAME ОЖИДАНИЕ (EXPECTATION_EXPR) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
Добавляет ожидания качества данных в таблицу. Эти ожидания качества данных можно отслеживать с течением времени и получать доступ через журнал событий потоковой таблицы. Ожидание
FAIL UPDATE
приводит к сбою обработки при создании таблицы, а также обновлении таблицы. ОжиданиеDROP ROW
приводит к тому, что вся строка будет удалена, если ожидание не выполнено.expectation_expr
может состоять из литералов, идентификаторов столбцов в таблице и детерминированных встроенных функций или операторов SQL, кроме:- Агрегатные функции
- Аналитические функции окон
- Ранжирование функций окна
- Функции генератора с табличным значением
Кроме того,
expr
не должен содержать какой-либо вложенный запрос.- Агрегатные функции
-
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Добавляет в таблицу потоковой передачи ограничения информационных первичных ключей или информационных внешних ключей. Ограничения ключей не поддерживаются для таблиц в каталоге
hive_metastore
.
-
-
table_clauses
При необходимости укажите секционирование, комментарии, пользовательские свойства и расписание обновления для новой таблицы. Каждое вложенное предложение может быть указано только один раз.
-
Необязательный список столбцов таблицы для секционирования таблицы по.
COMMENT table_comment
Литерал
STRING
для описания таблицы.-
При необходимости задает одно или несколько свойств, определяемых пользователем.
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]
Если это указано, планирует потоковую таблицу или материализованное представление, чтобы обновить свои данные с заданным расписанием крона . Принимаются только time_zone_values . Функция
AT TIME ZONE LOCAL
не поддерживается. ЕслиAT TIME ZONE
нет, используется часовой пояс сеанса. ЕслиAT TIME ZONE
отсутствует и часовой пояс сеанса не задан, возникает ошибка.SCHEDULE
семантически эквивалентенSCHEDULE REFRESH
.Расписание можно указать как часть
CREATE
команды. Используйте ALTER STREAMING TABLE или выполнитеCREATE OR REFRESH
команду сSCHEDULE
предложением, чтобы изменить расписание потоковой таблицы после создания.Предложение WITH ROW FILTER
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Добавляет функцию фильтра строк в таблицу. Все последующие запросы из этой таблицы получают подмножество строк, в которых функция оценивается как логическое значение TRUE. Это может быть полезно для точного контроля доступа, где функция может проверить удостоверение или членство в группах вызывающего пользователя, чтобы решить, следует ли фильтровать определенные строки.
-
AS query
Это предложение заполняет таблицу с помощью данных из
query
. Этот запрос должен быть потоковым запросом. Это можно сделать, добавив ключевоеSTREAM
слово в любое отношение, которое требуется обработать постепенно. При указанииquery
и вместе схема таблицы, указаннаяtable_specification
вtable_specification
ней, должна содержать все столбцы, возвращаемые параметромquery
, в противном случае возникает ошибка. Все столбцы, указанныеtable_specification
, но не возвращаются возвращаемымиnull
значениямиquery
при запросе.
Различия между таблицами потоковой передачи и другими таблицами
Потоковая передача таблиц — это таблицы с отслеживанием состояния, предназначенные для обработки каждой строки только один раз при обработке растущего набора данных. Так как большинство наборов данных постоянно растут с течением времени, потоковые таблицы хорошо подходит для большинства рабочих нагрузок приема. Таблицы потоковой передачи оптимально подходят для конвейеров, требующих свежести данных и низкой задержки. Потоковые таблицы также могут быть полезны для крупномасштабных преобразований, так как результаты могут быть добавочно вычисляются по мере поступления новых данных, сохраняя результаты до актуальности без необходимости полностью перекомпьютировать все исходные данные с каждым обновлением. Потоковые таблицы предназначены для источников данных, доступных только для добавления.
Потоковые таблицы принимают дополнительные команды, такие как REFRESH
, которые обрабатывают последние данные, доступные в источниках, предоставленных в запросе. Изменения предоставленного запроса отражаются только на новых данных путем вызова REFRESH
не обработанных ранее данных. Чтобы применить изменения к существующим данным, необходимо выполнить REFRESH TABLE <table_name> FULL
его FULL REFRESH
. Полные обновления повторно обрабатывают все данные, доступные в источнике с помощью последнего определения. Не рекомендуется вызывать полные обновления в источниках, которые не хранят всю историю данных или имеют короткие периоды хранения, например Kafka, так как полное обновление усечено существующих данных. Возможно, вы не сможете восстановить старые данные, если данные больше не доступны в источнике.
Фильтры строк и маски столбцов
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Фильтры строк позволяют указать функцию, которая применяется в качестве фильтра всякий раз, когда таблица сканирует строки. Эти фильтры гарантируют, что последующие запросы возвращают только строки, для которых предикат фильтра оценивается как true.
Маски столбцов позволяют маскировать значения столбца всякий раз, когда таблица сканирует строки. Все будущие запросы, связанные с этим столбцом, получат результат оценки функции по столбцу, заменив исходное значение столбца.
Дополнительные сведения о том, как использовать фильтры строк и маски столбцов, см. в разделе "Фильтрация конфиденциальных данных таблицы" с помощью фильтров строк и масок столбцов.
Управление фильтрами строк и масками столбцов
Фильтры строк и маски столбцов в таблицах потоковой передачи должны быть добавлены, обновлены или удалены с помощью инструкции CREATE OR REFRESH
.
Поведение
- Обновление в качестве определителя. При
CREATE OR REFRESH
обновлении таблицы потоковой передачи илиREFRESH
инструкции функции фильтрации строк выполняются с правами определителя (в качестве владельца таблицы). Это означает, что обновление таблицы использует контекст безопасности пользователя, создавшего потоковую таблицу. - Запрос. Хотя большинство фильтров выполняются с правами определителя, функции, которые проверяют контекст пользователя (например
CURRENT_USER
, иIS_MEMBER
) являются исключениями. Эти функции выполняются в качестве вызывающего средства. Этот подход применяет элементы управления безопасностью и доступом для определенных пользователей на основе контекста текущего пользователя.
Наблюдаемость
Используйте DESCRIBE EXTENDED
, INFORMATION_SCHEMA
или обозреватель каталога, чтобы проверить существующие фильтры строк и маски столбцов, которые применяются к заданной таблице потоковой передачи. Эта функция позволяет пользователям проверять и проверять меры доступа к данным и защиты в таблицах потоковой передачи.
Ограничения
Только владельцы таблиц могут обновлять потоковые таблицы, чтобы получить последние данные.
ALTER TABLE
команды запрещены в таблицах потоковой передачи. Определение и свойства таблицы должны быть изменены с помощьюCREATE OR REFRESH
инструкции ALTER STREAMING TABLE .Запросы на поездки по времени не поддерживаются.
Эволюционирование схемы таблицы с помощью таких команд DML, как
INSERT INTO
иMERGE
не поддерживается.Следующие команды не поддерживаются в таблицах потоковой передачи:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Разностный общий доступ не поддерживается.
Переименование таблицы или изменение владельца не поддерживается.
Ограничения таблиц, такие как
PRIMARY KEY
иFOREIGN KEY
не поддерживаются.Созданные столбцы, столбцы удостоверений и столбцы по умолчанию не поддерживаются.
Примеры
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')