Справочник по языку SQL для разностных динамических таблиц
В этой статье содержатся сведения о интерфейсе программирования SQL Delta Live Tables.
- Дополнительные сведения об API Python см. в Справочнике по языку Python для разностных динамических таблиц.
- Дополнительные сведения о командах SQL см . в справочнике по языку SQL.
В запросах SQL можно использовать определяемые пользователем функции Python, но перед их вызовом в исходных файлах SQL необходимо определить эти определяемые пользователем функции Python. См . раздел "Определяемые пользователем скалярные функции " Python".
Ограничения
Предложение PIVOT
не поддерживается. Операция pivot
в Spark требует активной загрузки входных данных для вычисления схемы выходных данных. Эта возможность не поддерживается в разностных динамических таблицах.
Создание материализованного представления или потоковой таблицы Разностных динамических таблиц
При объявлении таблицы потоковой передачи или материализованного представления (также называемого материализованным LIVE TABLE
представлением) используется тот же базовый синтаксис SQL.
Таблицы потоковой передачи можно объявлять только с помощью запросов, которые считываются в источнике потоковой передачи. Databricks рекомендует использовать автозагрузчик для приема файлов из облачного хранилища объектов. См . синтаксис SQL автозагрузчика.
Необходимо включить STREAM()
функцию вокруг имени набора данных при указании других таблиц или представлений в конвейере в качестве источника потоковой передачи.
Ниже описан синтаксис для объявления материализованных представлений и потоковой передачи таблиц с помощью SQL:
CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
AS select_statement
Создание представления разностных динамических таблиц
Ниже описан синтаксис для объявления представлений с помощью SQL:
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
Синтаксис автоматического загрузчика SQL
Ниже описан синтаксис для работы с Автозагрузчиком в SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
С Автозагрузчиком можно использовать поддерживаемые параметры формата. Используя функцию map()
, вы можете передать методу cloud_files()
любое количество параметров. Параметры — это пары "ключ-значение", где ключи и значения являются строками. Дополнительные сведения о форматах и параметрах поддержки см. в параметрах формата файлов.
Пример. Определение таблиц
Набор данных можно создать путем считывания из внешнего источника данных или наборов данных, определенных в конвейере. Для чтения из внутреннего набора данных, добавьте ключевое слово LIVE
в начало имени набора данных: В следующем примере определяется два различных набора данных: таблица с именем taxi_raw
, которая принимает JSON-файл в качестве источника входных данных, и таблица с именем filtered_data
, принимающая таблицу taxi_raw
в качестве входных данных:
CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
Пример. Чтение из источника потоковой передачи
Чтобы считывать данные из источника потоковой передачи, например автозагрузчик или внутренний набор данных, определите таблицу STREAMING
:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
Дополнительные сведения о потоковой передаче данных см. в разделе "Преобразование данных с помощью разностных динамических таблиц".
Управление материализацией таблиц
Таблицы также обеспечивают дополнительный контроль над их материализацией:
- Укажите, как секционируются таблицы с помощью
PARTITIONED BY
. Секционирование можно использовать для ускорения запросов. - Свойства таблицы можно задать с помощью
TBLPROPERTIES
. См . свойства таблицы Delta Live Table. - Задайте место хранения с помощью параметра
LOCATION
. По умолчанию данные таблицы хранятся в расположении хранилища конвейера, еслиLOCATION
не задано. - Созданные столбцы можно использовать в определении схемы. См . пример. Указание столбцов схемы и секционирования.
Примечание.
Для таблиц меньше 1 ТБ в размере Databricks рекомендует разрешить delta Live Tables управлять данными организации данных. Если таблица не будет расти за пределами терабайта, обычно не следует указывать столбцы секций.
Пример. Указание столбцов схемы и секционирования
При выборе таблицы можно указать схему. В следующем примере указывается схема целевой таблицы, включая использование созданных столбцов Delta Lake и определение столбцов секционирования для таблицы:
CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
По умолчанию разностные динамические таблицы выводят схему из определения table
, если схема не указана.
Пример. Определение ограничений таблицы
Примечание.
Поддержка разностных динамических таблиц для ограничений таблиц доступна в общедоступной предварительной версии. Чтобы определить ограничения таблицы, конвейер должен быть конвейером с поддержкой каталога Unity и настроен для использования preview
канала.
При указании схемы можно определить первичные и внешние ключи. Ограничения являются информационными и не применяются. См. предложение CONSTRAINT в справочнике по языку SQL.
В следующем примере определяется таблица с ограничением первичного и внешнего ключа:
CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Параметризация значений, используемых при объявлении таблиц или представлений с помощью SQL
Используется SET
для указания значения конфигурации в запросе, объявляющего таблицу или представление, включая конфигурации Spark. Любая таблица или представление, определенные в записной книжке после того, как инструкция SET
получила доступ к определенному значению. Любые конфигурации Spark, заданные с помощью инструкции SET
, используются при выполнении запроса Spark для любой таблицы или представления после инструкции SET. Чтобы считать значение конфигурации в запросе, используйте синтаксис интерполяции строк ${}
. В следующем примере задается значение конфигурации Spark с именем startDate
, которое используется в запросе:
SET startDate='2020-01-01';
CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Чтобы указать несколько значений конфигурации, используйте отдельную инструкцию SET
для каждого значения.
Свойства SQL
CREATE TABLE или VIEW |
---|
TEMPORARY Создайте таблицу, но не публикуйте метаданные для таблицы. Предложение TEMPORARY указывает Delta Live Table создать таблицу, доступную конвейеру, но не должен быть доступ к ней за пределами конвейера. Чтобы сократить время обработки, временная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления. |
STREAMING Создание таблицы, считывающей входной набор данных в виде потока. Входной набор данных должен быть источником потоковых данных, например автозагрузчиком или таблицей STREAMING . |
PARTITIONED BY Необязательный список из одного или нескольких столбцов, используемых для секционирования таблицы. |
LOCATION Дополнительное место хранения данных таблицы. Если значение не задано, система будет по умолчанию использовать место хранения конвейера. |
COMMENT Необязательное описание таблицы. |
column_constraint Необязательный информационный первичный ключ или ограничение внешнего ключа для столбца. |
table_constraint Необязательный информационный первичный ключ или ограничение внешнего ключа в таблице. |
TBLPROPERTIES Необязательный список свойств таблицы для таблицы. |
select_statement Запрос разностной динамической таблицы, определяющий набор данных для таблицы. |
Предложение CONSTRAINT |
---|
EXPECT expectation_name Определение ограничения качества данных expectation_name . Если ограничение ON VIOLATION не определено, добавьте строки, которые нарушают ограничение, в целевой набор данных. |
ON VIOLATION Необязательное действие для неудачных строк: * FAIL UPDATE : Немедленно останавливает выполнение конвейера.* DROP ROW : Удаление записи и продолжение обработки. |
Изменение записи данных с помощью SQL в разностных динамических таблицах
Используйте инструкцию APPLY CHANGES INTO
для использования функций CDC Delta Live Table, как описано в следующем разделе:
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Вы определяете ограничения качества данных для целевого APPLY CHANGES
объекта, используя то же CONSTRAINT
предложение, что и запросы, отличныеAPPLY CHANGES
от запросов. См. статью Управление качеством данных с помощью Delta Live Tables.
Примечание.
Поведение по умолчанию для событий INSERT
и UPDATE
заключается в применении upsert к событиям CDC из источника — обновление всех записей в целевой таблице, которые совпадают с указанными ключами, или вставка новой записи, если совпадающая запись не существует в целевой таблице. Способ обработки событий DELETE
можно указать с помощью условия APPLY AS DELETE WHEN
.
Внимание
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы APPLY CHANGES
необходимо также включить столбцы __START_AT
и __END_AT
с таким же типом данных, как в поле sequence_by
.
См . РАЗДЕЛ API APPLY CHANGES: Упрощение записи измененных данных в разностных динамических таблицах.
Обратная связь
https://aka.ms/ContentUserFeedback.
Ожидается в ближайшее время: в течение 2024 года мы постепенно откажемся от GitHub Issues как механизма обратной связи для контента и заменим его новой системой обратной связи. Дополнительные сведения см. в разделеОтправить и просмотреть отзыв по