Отслеживание конвейеров динамических таблиц Delta
В этой статье описывается, как использовать встроенные функции мониторинга и наблюдаемости для конвейеров Delta Live Tables, включая происхождение данных, журнал обновлений и отчеты о качестве данных.
Большинство данных мониторинга можно просматривать вручную с помощью пользовательского интерфейса сведений о конвейере. Некоторые задачи проще выполнить, запрашивая метаданные журнала событий. См. раздел " Что такое журнал событий Delta Live Tables?".
Какие сведения о конвейере доступны в пользовательском интерфейсе?
График конвейера отображается сразу после успешного запуска обновления конвейера. Стрелки представляют зависимости между наборами данных в конвейере. По умолчанию на странице сведений о конвейере отображается последнее обновление таблицы, но в раскрывающемся меню можно выбрать старые обновления.
Сведения, отображаемые, включают идентификатор конвейера, исходные библиотеки, затраты на вычисления, выпуск продукта и канал, настроенный для конвейера.
Чтобы просмотреть табличное представление наборов данных, щелкните вкладку "Список ". Представление списка позволяет просматривать все наборы данных в конвейере, представленные в виде строки в таблице, и полезно, если daG конвейера слишком велик, чтобы визуализироваться в представлении Графа . Вы можете управлять наборами данных, отображаемыми в таблице, с помощью нескольких фильтров, таких как имя набора данных, тип и состояние. Чтобы вернуться к визуализации DAG, щелкните Graph.
Пользователь Запуск от имени является владельцем конвейера, и обновления конвейера выполняются с разрешениями этого пользователя. Чтобы изменить пользователя run as
, щелкните Разрешения и измените владельца конвейера.
Как просмотреть сведения о наборе данных?
Щелкнув набор данных в графе конвейера или списке наборов данных, отображаются сведения о наборе данных. Сведения включают схему набора данных, метрики качества данных и ссылку на исходный код, определяющий набор данных.
Просмотр журнала обновлений
Чтобы просмотреть журнал и состояние обновлений конвейера, щелкните раскрывающееся меню журнала обновлений в верхней строке.
Чтобы просмотреть граф, сведения и события для обновления, выберите обновление в раскрывающемся меню. Чтобы вернуться к последнему обновлению, щелкните Показать последнее обновление.
Получение уведомлений о событиях конвейера
Чтобы получать уведомления в режиме реального времени о событиях конвейера, например об успешном завершении или сбое обновления конвейера, добавьте параметр Добавить уведомления по электронной почте для событий конвейера при создании или изменении конвейера.
Что такое журнал событий Delta Live Tables?
Журнал событий разностных динамических таблиц содержит все сведения, связанные с конвейером, включая журналы аудита, проверки качества данных, ход конвейера и происхождение данных. С помощью журнала событий можно отслеживать, анализировать и контролировать состояние конвейеров данных.
Записи журнала событий можно просматривать в пользовательском интерфейсе разностных динамических таблиц, API разностных динамических таблиц или путем прямого запроса к журналу событий. В этом разделе основное внимание уделяется запросу журнала событий напрямую.
Можно также определить пользовательские действия, выполняемые при регистрации событий, например отправку оповещений с перехватчиками событий.
Схема журнала событий
Схема журнала событий описана в таблице ниже. Некоторые из этих полей содержат данные JSON, требующие синтаксического анализа для выполнения некоторых запросов, таких как details
поле. Azure Databricks поддерживает :
оператор для анализа полей JSON. См . оператор : (знак двоеточия).
Поле | Description |
---|---|
id |
Уникальный идентификатор записи журнала событий. |
sequence |
Документ JSON, содержащий метаданные для обнаружения и упорядочения событий. |
origin |
Документ JSON, содержащий метаданные для источника события, например поставщика облачных служб, региона поставщика облачных служб, user_id или для отображения места создания конвейера либо WORKSPACE DBSQL .pipeline_type pipeline_id |
timestamp |
Время записи события. |
message |
Удобное для чтения сообщение, описывающее событие. |
level |
Тип события, например , INFO , WARN ERROR или METRICS . |
error |
В случае возникновения ошибки сведения, описывающие ошибку. |
details |
Документ JSON, содержащий структурированные сведения о событии. Это основное поле, используемое для анализа событий. |
event_type |
Тип события. |
maturity_level |
Стабильность схемы событий. Возможны следующие значения: - STABLE : схема стабильна и не изменится.- NULL : схема стабильна и не изменится. Значение может быть NULL , если запись была создана до maturity_level добавления поля (выпуск 2022.37).- EVOLVING : схема не стабильна и может измениться.- DEPRECATED : схема устарела, и среда выполнения Delta Live Tables может перестать создавать это событие в любое время. |
Запрос журнала событий
Расположение журнала событий и интерфейса для запроса журнала событий зависит от того, настроен ли конвейер на использование хранилища метаданных Hive или каталога Unity.
Хранилище метаданных Hive
Если конвейер публикует таблицы в хранилище метаданных Hive, журнал событий хранится в /system/events
расположении storage
. Например, если параметр конвейера storage
настроен как /Users/username/data
, то журнал событий хранится в /Users/username/data/system/events
пути в DBFS.
Если параметр storage
не настроен, расположением журнала событий по умолчанию является /pipelines/<pipeline-id>/system/events
в DBFS. Например, если идентификатор конвейера — 91de5e48-35ed-11ec-8d3d-0242ac130003
, то место хранения — /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events
.
Можно создать представление для упрощения запросов к журналу событий. В следующем примере создается временное представление event_log_raw
. Это представление используется в примерах запросов журнала событий, включенных в эту статью:
CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;
Замените <event-log-path>
расположением журнала событий.
Каждый экземпляр выполнения конвейера называется обновлением. Часто требуется извлечь сведения для последнего обновления. Выполните следующий запрос, чтобы найти идентификатор последнего обновления и сохранить его во временном latest_update_id
представлении. Это представление используется в примерах запросов журнала событий, включенных в эту статью:
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
Журнал событий можно запросить в записной книжке Azure Databricks или редакторе SQL. Используйте записную книжку или редактор SQL для выполнения примеров запросов журнала событий.
Каталог Unity
Если конвейер публикует таблицы в каталоге Unity, необходимо использовать event_log
табличную функцию (TVF), чтобы получить журнал событий для конвейера. Журнал событий для конвейера извлекается путем передачи идентификатора конвейера или имени таблицы в TVF. Например, чтобы получить записи журнала событий для конвейера с идентификатором 04c78631-3dd7-4856-b2a6-7d84e9b2638b
:
SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")
Чтобы получить записи журнала событий для конвейера, который создал или владеет таблицей my_catalog.my_schema.table1
:
SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))
Чтобы вызвать TVF, необходимо использовать общий кластер или хранилище SQL. Например, можно использовать записную книжку, подключенную к общему кластеру, или использовать редактор SQL, подключенный к хранилищу SQL.
Чтобы упростить запросы событий для конвейера, владелец конвейера может создать представление по event_log
TVF. В следующем примере создается представление журнала событий для конвейера. Это представление используется в примерах запросов журнала событий, включенных в эту статью.
Примечание.
event_log
TVF может вызываться только владельцем конвейера, а представление, созданное через event_log
TVF, может запрашиваться только владельцем конвейера. Представление не может быть предоставлено другим пользователям.
CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");
Замените <pipeline-ID>
уникальным идентификатором конвейера Delta Live Tables. Идентификатор можно найти на панели сведений о конвейере в пользовательском интерфейсе разностных динамических таблиц.
Каждый экземпляр выполнения конвейера называется обновлением. Часто требуется извлечь сведения для последнего обновления. Выполните следующий запрос, чтобы найти идентификатор последнего обновления и сохранить его во временном latest_update_id
представлении. Это представление используется в примерах запросов журнала событий, включенных в эту статью:
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
Запрос сведений о происхождении данных из журнала событий
События, содержащие сведения о происхождении данных, имеют тип события flow_definition
. Объект details:flow_definition
содержит output_dataset
и input_datasets
определяет каждую связь в графе.
Для извлечения входных и выходных наборов данных можно использовать следующий запрос, чтобы просмотреть сведения о происхождении:
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
output_dataset |
input_datasets |
---|---|
customers |
null |
sales_orders_raw |
null |
sales_orders_cleaned |
["customers", "sales_orders_raw"] |
sales_order_in_la |
["sales_orders_cleaned"] |
Запрос качества данных из журнала событий
Если вы определяете ожидания для наборов данных в конвейере, метрики качества данных хранятся в объекте details:flow_progress.data_quality.expectations
. События, содержащие сведения о качестве данных, имеют тип события flow_progress
. В следующем примере запрашиваются метрики качества данных для последнего обновления конвейера:
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
dataset |
expectation |
passing_records |
failing_records |
---|---|---|---|
sales_orders_cleaned |
valid_order_number |
4083 | 0 |
Мониторинг невыполненной работы данных путем запроса к журналу событий
Delta Live Tables отслеживает количество данных, присутствующих в невыполненной работы в объекте details:flow_progress.metrics.backlog_bytes
. События, содержащие метрики невыполненной работы, имеют тип flow_progress
события. В следующем примере запрашиваются метрики невыполненной работы для последнего обновления конвейера:
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
Примечание.
Метрики невыполненной работы могут быть недоступны в зависимости от типа источника данных конвейера и версии Databricks Runtime.
Мониторинг событий расширенного автомасштабирования из журнала событий
Журнал событий фиксирует изменения размера кластера при включении расширенного автомасштабирования в конвейерах. События, содержащие сведения о расширенном автомасштабировании, имеют тип события autoscale
. Сведения об изменении размера кластера хранятся в объекте details:autoscale
. В следующем примере приводятся запросы на изменение размера кластера расширенного автомасштабирования для последнего обновления конвейера:
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Мониторинг использования вычислительных ресурсов
cluster_resources
события предоставляют метрики по количеству слотов задач в кластере, количеству используемых слотов задач и количеству задач, ожидающих планирования.
Если включен расширенный автомасштабирование, cluster_resources
события также содержат метрики для алгоритма автомасштабирования, включая latest_requested_num_executors
и optimal_num_executors
. События также показывают состояние алгоритма в виде различных состояний, таких как CLUSTER_AT_DESIRED_SIZE
, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
и BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
.
Эти сведения можно просмотреть в сочетании с событиями автомасштабирования, чтобы обеспечить общую картину расширенного автомасштабирования.
В следующем примере выполняется запрос журнала размера очереди задач для последнего обновления конвейера:
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
В следующем примере выполняется запрос журнала использования для последнего обновления конвейера:
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
В следующем примере выполняется запрос журнала счетчиков исполнителя, сопровождаемых метриками, доступными только для конвейеров расширенного автомасштабирования, включая количество исполнителей, запрашиваемых алгоритмом в последнем запросе, оптимальное количество исполнителей, рекомендуемых алгоритмом на основе последних метрик, и состояния алгоритма автомасштабирования:
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
Аудит конвейеров динамических таблиц Delta
Вы можете использовать записи журнала событий разностных динамических таблиц и другие журналы аудита Azure Databricks, чтобы получить полное представление о том, как данные обновляются в разностных динамических таблицах.
Разностные динамические таблицы используют учетные данные владельца конвейера для выполнения обновлений. Вы можете изменить используемые учетные данные, обновив владельца конвейера. Разностные динамические таблицы записывают пользователя, выполняющего действия в конвейере, включая создание конвейера, изменение конфигурации и активацию обновлений.
Сведения о событиях аудита каталога Unity см. в справочнике по событиям аудита каталога Unity.
Запрос действий пользователя в журнале событий
Можно использовать журнал событий для аудита событий, например действий пользователя. События, содержащие сведения о действиях пользователя, имеют тип события user_action
.
Сведения о действии хранятся в объекте user_action
в поле details
. Используйте следующий запрос, чтобы создать журнал аудита событий пользователя. Сведения о создании представления, используемого event_log_raw
в этом запросе, см. в разделе "Запрос журнала событий".
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
---|---|---|
2021-05-20T19:36:03.517+0000 | START |
user@company.com |
2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
2021-05-27T00:35:51.971+0000 | START |
user@company.com |
Сведения о среде выполнения
Вы можете просмотреть сведения о среде выполнения для обновления конвейера, например версию Databricks Runtime для обновления:
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
---|
11,0 |