Поделиться через


Справочник по языку SQL для разностных динамических таблиц

В этой статье содержатся сведения о интерфейсе программирования SQL Delta Live Tables.

В запросах SQL можно использовать определяемые пользователем функции Python, но перед их вызовом в исходных файлах SQL необходимо определить эти определяемые пользователем функции Python. См . раздел "Определяемые пользователем скалярные функции " Python".

Ограничения

Предложение PIVOT не поддерживается. Операция pivot в Spark требует активной загрузки входных данных для вычисления схемы выходных данных. Эта возможность не поддерживается в разностных динамических таблицах.

Создание материализованного представления или потоковой таблицы Разностных динамических таблиц

При объявлении таблицы потоковой передачи или материализованного представления (также называемого материализованным LIVE TABLEпредставлением) используется тот же базовый синтаксис SQL.

Таблицы потоковой передачи можно объявлять только с помощью запросов, которые считываются в источнике потоковой передачи. Databricks рекомендует использовать автозагрузчик для приема файлов из облачного хранилища объектов. См . синтаксис SQL автозагрузчика.

Необходимо включить STREAM() функцию вокруг имени набора данных при указании других таблиц или представлений в конвейере в качестве источника потоковой передачи.

Ниже описан синтаксис для объявления материализованных представлений и потоковой передачи таблиц с помощью SQL:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Создание представления разностных динамических таблиц

Ниже описан синтаксис для объявления представлений с помощью SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Синтаксис автоматического загрузчика SQL

Ниже описан синтаксис для работы с Автозагрузчиком в SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

С Автозагрузчиком можно использовать поддерживаемые параметры формата. Используя функцию map(), вы можете передать методу cloud_files() любое количество параметров. Параметры — это пары "ключ-значение", где ключи и значения являются строками. Дополнительные сведения о форматах и параметрах поддержки см. в параметрах формата файлов.

Пример. Определение таблиц

Набор данных можно создать путем считывания из внешнего источника данных или наборов данных, определенных в конвейере. Для чтения из внутреннего набора данных, добавьте ключевое слово LIVE в начало имени набора данных: В следующем примере определяется два различных набора данных: таблица с именем taxi_raw, которая принимает JSON-файл в качестве источника входных данных, и таблица с именем filtered_data, принимающая таблицу taxi_raw в качестве входных данных:

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Пример. Чтение из источника потоковой передачи

Чтобы считывать данные из источника потоковой передачи, например автозагрузчик или внутренний набор данных, определите таблицу STREAMING :

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Дополнительные сведения о потоковой передаче данных см. в разделе "Преобразование данных с помощью разностных динамических таблиц".

Управление материализацией таблиц

Таблицы также обеспечивают дополнительный контроль над их материализацией:

  • Укажите, как секционируются таблицы с помощью PARTITIONED BY. Секционирование можно использовать для ускорения запросов.
  • Свойства таблицы можно задать с помощью TBLPROPERTIES. См . свойства таблицы Delta Live Table.
  • Задайте место хранения с помощью параметра LOCATION. По умолчанию данные таблицы хранятся в расположении хранилища конвейера, если LOCATION не задано.
  • Созданные столбцы можно использовать в определении схемы. См . пример. Указание столбцов схемы и секционирования.

Примечание.

Для таблиц меньше 1 ТБ в размере Databricks рекомендует разрешить delta Live Tables управлять данными организации данных. Если таблица не будет расти за пределами терабайта, обычно не следует указывать столбцы секций.

Пример. Указание столбцов схемы и секционирования

При выборе таблицы можно указать схему. В следующем примере указывается схема целевой таблицы, включая использование созданных столбцов Delta Lake и определение столбцов секционирования для таблицы:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

По умолчанию разностные динамические таблицы выводят схему из определения table, если схема не указана.

Пример. Определение ограничений таблицы

Примечание.

Поддержка разностных динамических таблиц для ограничений таблиц доступна в общедоступной предварительной версии. Чтобы определить ограничения таблицы, конвейер должен быть конвейером с поддержкой каталога Unity и настроен для использования preview канала.

При указании схемы можно определить первичные и внешние ключи. Ограничения являются информационными и не применяются. См. предложение CONSTRAINT в справочнике по языку SQL.

В следующем примере определяется таблица с ограничением первичного и внешнего ключа:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Параметризация значений, используемых при объявлении таблиц или представлений с помощью SQL

Используется SET для указания значения конфигурации в запросе, объявляющего таблицу или представление, включая конфигурации Spark. Любая таблица или представление, определенные в записной книжке после того, как инструкция SET получила доступ к определенному значению. Любые конфигурации Spark, заданные с помощью инструкции SET, используются при выполнении запроса Spark для любой таблицы или представления после инструкции SET. Чтобы считать значение конфигурации в запросе, используйте синтаксис интерполяции строк ${}. В следующем примере задается значение конфигурации Spark с именем startDate, которое используется в запросе:

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Чтобы указать несколько значений конфигурации, используйте отдельную инструкцию SET для каждого значения.

Свойства SQL

CREATE TABLE или VIEW
TEMPORARY

Создайте таблицу, но не публикуйте метаданные для таблицы. Предложение TEMPORARY указывает Delta Live Table создать таблицу, доступную конвейеру, но не должен быть доступ к ней за пределами конвейера. Чтобы сократить время обработки, временная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления.
STREAMING

Создание таблицы, считывающей входной набор данных в виде потока. Входной набор данных должен быть источником потоковых данных, например автозагрузчиком или таблицей STREAMING .
PARTITIONED BY

Необязательный список из одного или нескольких столбцов, используемых для секционирования таблицы.
LOCATION

Дополнительное место хранения данных таблицы. Если значение не задано, система будет по умолчанию использовать место хранения конвейера.
COMMENT

Необязательное описание таблицы.
column_constraint

Необязательный информационный первичный ключ или ограничение внешнего ключа для столбца.
table_constraint

Необязательный информационный первичный ключ или ограничение внешнего ключа в таблице.
TBLPROPERTIES

Необязательный список свойств таблицы для таблицы.
select_statement

Запрос разностной динамической таблицы, определяющий набор данных для таблицы.
Предложение CONSTRAINT
EXPECT expectation_name

Определение ограничения качества данных expectation_name. Если ограничение ON VIOLATION не определено, добавьте строки, которые нарушают ограничение, в целевой набор данных.
ON VIOLATION

Необязательное действие для неудачных строк:

* FAIL UPDATE: Немедленно останавливает выполнение конвейера.
* DROP ROW: Удаление записи и продолжение обработки.

Изменение записи данных с помощью SQL в разностных динамических таблицах

Используйте инструкцию APPLY CHANGES INTO для использования функций CDC Delta Live Table, как описано в следующем разделе:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Вы определяете ограничения качества данных для целевого APPLY CHANGES объекта, используя то же CONSTRAINT предложение, что и запросы, отличныеAPPLY CHANGES от запросов. См. статью Управление качеством данных с помощью Delta Live Tables.

Примечание.

Поведение по умолчанию для событий INSERT и UPDATE заключается в применении upsert к событиям CDC из источника — обновление всех записей в целевой таблице, которые совпадают с указанными ключами, или вставка новой записи, если совпадающая запись не существует в целевой таблице. Способ обработки событий DELETE можно указать с помощью условия APPLY AS DELETE WHEN.

Внимание

Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы APPLY CHANGES необходимо также включить столбцы __START_AT и __END_AT с таким же типом данных, как в поле sequence_by.

См . РАЗДЕЛ API APPLY CHANGES: Упрощение записи измененных данных в разностных динамических таблицах.

Предложения
KEYS

Столбец или сочетание столбцов, которые однозначно идентифицируют запись в исходных данных. Используется для определения того, какие события CDC применяются к конкретным записям в целевой таблице.

Это предложение обязательно.
IGNORE NULL UPDATES

Разрешает прием обновлений с подмножеством целевых столбцов. Если событие CDC совпадает с существующей записью и используется IGNORE NULL UPDATES, столбцы с null сохранят существующие значения в целевом объекте. Это также относится к вложенным столбцам со значением null.

Предложение не является обязательным.

По умолчанию существующие столбцы перезаписываются значениями null.
APPLY AS DELETE WHEN

Указывает, в каких случаях событие CDC необходимо обрабатывать как DELETE, а не как upsert. Для обработки неупорядоченных данных удаленная запись временно сохраняется в виде отметки полного удаления в базовой разностной таблице, а в хранилище метаданных создается представление, которое отфильтровывает такие отметки. Интервал хранения можно настроить с помощью свойства таблицы
pipelines.cdc.tombstoneGCThresholdInSeconds.

Предложение не является обязательным.
APPLY AS TRUNCATE WHEN

Указывает, в каких случаях событие CDC необходимо обрабатывать как полную таблицу TRUNCATE. Так как это предложение активирует полное усечение целевой таблицы, его следует использовать только для конкретных вариантов использования, требующих этой функции.

Предложение APPLY AS TRUNCATE WHEN поддерживается только для SCD типа 1. SCD типа 2 не поддерживает усечение.

Предложение не является обязательным.
SEQUENCE BY

Имя столбца, указывающего логический порядок событий CDC в исходных данных. Разностные динамические таблицы используют эту последовательность для обработки событий изменения, которые поступают неупорядоченно.

Это предложение обязательно.
COLUMNS

Указывает подмножество столбцов для включения в целевую таблицу. Вы можете сделать одно из двух:

* Указать полный список включаемых столбцов: COLUMNS (userId, name, city).
* Указать список исключаемых столбцов: COLUMNS * EXCEPT (operation, sequenceNum).

Предложение не является обязательным.

По умолчанию включаются все столбцы в целевой таблице, если не указано предложение COLUMNS.
STORED AS

Определяет, следует ли хранить записи в виде SCD типа 1 или SCD типа 2.

Предложение не является обязательным.

Значение по умолчанию — SCD типа 1.
TRACK HISTORY ON

Задает подмножество выходных столбцов для создания записей журнала при наличии изменений в указанных столбцах. Вы можете сделать одно из двух:

* Укажите полный список столбцов для отслеживания: COLUMNS (userId, name, city)
* Укажите список столбцов, которые следует исключить из отслеживания: COLUMNS * EXCEPT (operation, sequenceNum)

Предложение не является обязательным. По умолчанию используется журнал отслеживания для всех выходных столбцов при наличии изменений, эквивалентных TRACK HISTORY ON *.