Заметка
Доступ к этой странице требует авторизации. Вы можете попробовать войти в систему или изменить каталог.
Доступ к этой странице требует авторизации. Вы можете попробовать сменить директорию.
Декларативные конвейеры Spark Lakeflow (SDP) представляют несколько новых ключевых слов и функций SQL для определения материализованных представлений и таблиц потоковой передачи в конвейерах. Поддержка SQL для разработки конвейеров основана на основах Spark SQL и добавляет поддержку функций структурированной потоковой передачи.
Пользователи, знакомые с PySpark DataFrames, могут предпочесть разработку кода конвейера с помощью Python. Python поддерживает более обширное тестирование и операции, которые сложно реализовать с помощью SQL, такие как операции метапрограммирования. См. раздел Разработка кода конвейера с помощью Python.
Полный справочник по синтаксису конвейера SQL см. в справочнике по языку Конвейера SQL.
Основы SQL для разработки конвейеров
Код SQL, создающий наборы данных конвейера, использует CREATE OR REFRESH синтаксис для определения материализованных представлений и потоковых таблиц в результатах запроса.
Ключевое слово STREAM указывает, должен ли источник данных, на который ссылается предложение SELECT, считываться с семантикой потоковой передачи.
Считывает и записывает данные по умолчанию в каталог и схему, указанную во время настройки конвейера. См. Задайте целевой каталог и схему.
Исходный код конвейера критически отличается от скриптов SQL: SDP оценивает все определения набора данных во всех файлах исходного кода, настроенных в конвейере, и создает граф потока данных перед выполнением любых запросов. Порядок запросов, отображаемых в исходных файлах, определяет порядок оценки кода, но не порядок выполнения запроса.
Создание материализованного представления с помощью SQL
В следующем примере кода показан базовый синтаксис для создания материализованного представления с помощью SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Создание таблицы потоковой передачи с помощью SQL
В следующем примере кода показан базовый синтаксис для создания таблицы потоковой передачи с помощью SQL. При чтении источника для потоковой таблицы ключевое слово STREAM указывает на использование потоковой семантики для этого источника. Не используйте ключевое STREAM слово при создании материализованного представления:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Замечание
Используйте ключевое слово STREAM для использования семантики потоковой передачи для чтения из источника. Если чтение сталкивается с изменением или удалением существующей записи, возникает ошибка. Самое безопасное — читать из статических или источников только для добавления. Для приема данных с коммитами изменений можно использовать Python и SkipChangeCommits опцию для обработки ошибок.
Загрузка данных из хранилища объектов
Конвейеры поддерживают загрузку данных из всех форматов, поддерживаемых Azure Databricks. См. параметры формата данных .
Замечание
В этих примерах используются данные, доступные на /databricks-datasets, автоматически подключённые к вашей рабочей области. Databricks рекомендует использовать пути тома или облачные URI для ссылки на данные, хранящиеся в облачном хранилище объектов. См. статью Что такое тома каталога Unity?.
Databricks рекомендует использовать автозагрузчик и потоковые таблицы при настройке добавочных рабочих нагрузок приема данных, хранящихся в облачном хранилище объектов. См. раздел "Что такое автозагрузчик?".
SQL использует функцию read_files для вызова функций автозагрузчика. Необходимо также использовать ключевое слово STREAM для настройки потокового чтения с помощью read_files.
Ниже описан синтаксис для read_files в SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Параметры Автозагрузчика — это пары "ключ-значение". Дополнительные сведения о поддерживаемых форматах и параметрах см. в разделе Параметры.
В следующем примере создается потоковая таблица из JSON-файлов с помощью автозагрузчика:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Функция read_files также поддерживает пакетную семантику для создания материализованных представлений. В следующем примере используется пакетная семантика для чтения каталога JSON и создания материализованного представления:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Проверка данных с учетом ожиданий
Вы можете использовать ожидания для установки и применения ограничений качества данных. См. Управление качеством данных, используя ожидания конвейера.
Следующий код определяет ожидание с именем valid_data, которое удаляет записи, которые являются null во время приема данных:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Выполните запрос к материализованным представлениям и потоковым таблицам, которые определены в вашем конвейере
В следующем примере определяются четыре набора данных:
- Потоковая таблица с именем
orders, которая загружает данные JSON. - Материализованное представление с именем
customers, которое загружает данные CSV. - Материализованное представление с именем
customer_orders, которое объединяет записи из наборов данныхordersиcustomers, преобразует метку времени заказа в дату и выбирает поляcustomer_id,order_number,stateиorder_date. - Материализованное представление под именем
daily_orders_by_state, которое агрегирует ежедневное количество заказов для каждого штата.
Замечание
При запросе представлений или таблиц в конвейере можно указать каталог и схему напрямую или использовать значения по умолчанию, настроенные в конвейере. В этом примере таблицы orders, customersи customer_orders записываются и считываются из каталога по умолчанию и схемы, настроенной для конвейера.
Устаревший режим публикации использует схему LIVE для выполнения запросов к другим материализованным представлениям и потоковым таблицам, определенным в конвейере обработки данных. В новых конвейерах синтаксис схемы LIVE автоматически игнорируется. См. LIVE схему (устаревшую версию).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
Определение частной таблицы
Вы можете использовать PRIVATE при создании материализованного представления или потоковой таблицы. При создании частной таблицы вы создаете таблицу, но не создаете метаданные для таблицы. Предложение PRIVATE предписывает SDP создать таблицу, доступную конвейеру, но не должен быть доступ к ней за пределами конвейера. Чтобы сократить время обработки, частная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления.
Частные таблицы могут иметь то же имя, что и таблицы в каталоге. Если вы укажете неквалифицированное имя таблицы внутри конвейера, и если есть как частная таблица, так и таблица каталога с таким именем, будет использоваться частная таблица.
Частные таблицы ранее назывались временными таблицами.
Окончательное удаление записей из материализованного представления или потоковой таблицы
Чтобы окончательно удалить записи из таблицы потоковой передачи с включенными векторами удаления, например для соответствия GDPR, необходимо выполнить дополнительные операции в базовых таблицах Delta объекта. Чтобы гарантировать удаление записей из потоковой таблицы, см. постоянное удаление записей из потоковой таблицы.
Материализованные представления всегда отражают данные в базовых таблицах при их обновлении. Чтобы удалить данные в материализованном представлении, необходимо удалить данные из источника и обновить материализованное представление.
Параметризация значений, используемых при объявлении таблиц или представлений с помощью SQL
Используйте SET для указания значения конфигурации в запросе, объявляющего таблицу или представление, включая конфигурации Spark. Любая таблица или представление, которое вы определяете в исходном файле после инструкции SET, имеет доступ к определенному значению. Все конфигурации Spark, указанные с помощью инструкции SET, используются при выполнении запроса Spark для любой таблицы или представления после инструкции SET. Чтобы считывать значение конфигурации в запросе, используйте синтаксис интерполяции строк ${}. Следующий пример задает значение конфигурации Spark с именем startDate и использует это значение в запросе:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Чтобы указать несколько значений конфигурации, используйте отдельную инструкцию SET для каждого значения.
Ограничения
Условие PIVOT не поддерживается. Операция pivot в Spark требует активной загрузки входных данных для вычисления выходной схемы. Эта возможность не поддерживается в конвейерах.
Замечание
Синтаксис CREATE OR REFRESH LIVE TABLE для создания материализованного представления не рекомендуется. Вместо этого используйте CREATE OR REFRESH MATERIALIZED VIEW.