Поделиться через


ПРИМЕНЕНИЕ API ИЗМЕНЕНИЙ: упрощение отслеживания изменений в разностных динамических таблицах

Разностные динамические таблицы упрощают запись измененных данных (CDC) с APPLY CHANGES помощью API. Ранее инструкция MERGE INTO часто использовалась для обработки записей CDC в Azure Databricks. MERGE INTO Однако может привести к неправильным результатам из-за неупорядоченных записей или требовать сложную логику для повторного порядка записей.

Автоматически обрабатывая записи вне последовательности, APPLY CHANGES API в Delta Live Tables обеспечивает правильную обработку записей CDC и удаляет необходимость разработки сложной логики для обработки записей вне последовательности.

API APPLY CHANGES поддерживается в интерфейсах SQL и Python Delta Live Tables, включая поддержку обновления таблиц с scD типа 1 и типа 2:

  • Используйте SCD типа 1 для обновления записей напрямую. Для обновляемых записей журнал не сохраняется.
  • Используйте SCD типа 2 для хранения журнала записей во всех обновлениях или обновлениях указанного набора столбцов.

Сведения о синтаксисе и других ссылках см. в следующей статье:

Примечание.

В этой статье описано, как обновлять таблицы в конвейере Delta Live Tables на основе изменений в исходных данных. Дополнительные сведения о том, как записывать и запрашивать сведения об изменениях на уровне строк для таблиц Delta, см. в статье Использование веб-канала изменений данных Delta Lake в Azure Databricks.

Как CDC реализуется с помощью разностных динамических таблиц?

Необходимо указать столбец в исходных данных, для которых выполняется последовательность записей, которые разностные динамические таблицы интерпретируются как монотонное увеличение правильного порядка исходных данных. Разностные динамические таблицы автоматически обрабатывают данные, поступающие из порядка. Для изменений типа SCD 2 разностные динамические таблицы распространяют соответствующие значения последовательности в __START_AT__END_AT столбцы целевой таблицы. При каждом значении последовательности должно быть одно отдельное обновление, а значения последовательности NULL не поддерживаются.

Чтобы выполнить обработку CDC с помощью Delta Live Table, сначала создайте потоковую таблицу, а затем используйте APPLY CHANGES INTO инструкцию, чтобы указать источник, ключи и последовательности для канала изменений. Чтобы создать целевую потоковую таблицу, используйте CREATE OR REFRESH STREAMING TABLE инструкцию в SQL или create_streaming_table() функции в Python. Чтобы создать инструкцию, определяющую обработку CDC, используйте APPLY CHANGES инструкцию в SQL или apply_changes() функции в Python. Сведения о синтаксисе см. в разделе "Изменение записи данных с помощью SQL в разностных динамических таблицах " или "Изменение данных" с помощью Python в разностных динамических таблицах.

Какие объекты данных используются для обработки CDC разностных динамических таблиц?

При объявлении целевой таблицы в хранилище метаданных Hive создаются две структуры данных:

  • Представление с именем, назначенным целевой таблице.
  • Внутренняя резервная таблица, используемая Delta Live Table для управления обработкой CDC. Эта таблица называется путем подготовки __apply_changes_storage_ к имени целевой таблицы.

Например, если объявить целевую таблицу с именем dlt_cdc_target, вы увидите представление с именем и таблицей с именем dlt_cdc_target__apply_changes_storage_dlt_cdc_target в хранилище метаданных. Создание представления позволяет разностным динамическим таблицам отфильтровать дополнительные сведения (например, на могилах и версиях), необходимые для обработки данных вне порядка. Чтобы просмотреть обработанные данные, выполните запрос к целевому представлению. Так как схема таблицы может измениться для поддержки __apply_changes_storage_ будущих функций или улучшений, не следует запрашивать таблицу для использования в рабочей среде. При добавлении данных вручную в таблицу предполагается, что записи будут поступать до других изменений, так как столбцы версии отсутствуют.

Если конвейер публикуется в каталоге Unity, внутренние резервные таблицы недоступны для пользователей.

Получение данных о записях, обработанных запросом CDC Delta Live Tables

Следующие метрики фиксируются запросами apply changes :

  • num_upserted_rows: количество выходных строк, которые добавляются в набор данных во время обновления.
  • num_deleted_rows: количество существующих выходных строк, удаленных из набора данных во время обновления.

num_output_rows Метрика, которая выводится для потоков, отличных от CDC, не фиксируется для apply changes запросов.

Ограничения

Целевой APPLY CHANGES INTO объект запроса или apply_changes функции нельзя использовать в качестве источника для потоковой таблицы. Таблица, которая считывается из целевого APPLY CHANGES INTO объекта запроса или apply_changes функции, должна быть материализованным представлением.

SCD типа 1 и 2 в Azure Databricks

В следующих разделах приведены примеры, демонстрирующие scD типа 1 и типа 2 запросов Delta Live Tables, которые обновляют целевые таблицы на основе исходных событий, которые:

  1. создают новые записи пользователей;
  2. удаляют запись пользователя;
  3. обновляют записи пользователя. В примере SCD типа 1 последние UPDATE операции приходят поздно и удаляются из целевой таблицы, демонстрируя обработку событий вне порядка.

В следующих примерах предполагается знакомство с настройкой и обновлением конвейеров Delta Live Tables. См . руководство. Запуск первого конвейера live tables Delta Live Tables.

Для выполнения этих примеров необходимо начать с создания примера набора данных. См. статью "Создание тестовых данных".

Ниже приведены входные записи для этих примеров.

userId name city Операция sequenceNum
124 Raul Оахака ВСТАВИТЬ 1
123 Isabel Monterrey ВСТАВИТЬ 1
125 Mercedes Тихуана ВСТАВИТЬ 2
126 Lily Cancun ВСТАВИТЬ 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Чихуахуа UPDATE 5

Если вы раскомментируете окончательную строку в примерах данных, вставьте следующую запись, указывающую, где должны быть усечены записи:

userId name city Операция sequenceNum
null null null TRUNCATE 3

Примечание.

Все приведенные ниже примеры включают параметры для указания обоих DELETE операций и TRUNCATE операций, но все из них являются необязательными.

Обработка обновлений SCD типа 1

В следующем примере кода демонстрируется обработка обновлений SCD типа 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

После выполнения примера с SCD типа 1 целевая таблица будет содержать следующие записи:

userId name city
124 Raul Оахака
125 Mercedes Guadalajara
126 Lily Cancun

После запуска примера SCD типа 1 с дополнительной записью TRUNCATE записи 124 и 126 усекаются из-за операции TRUNCATE в sequenceNum=3, а целевая таблица содержит следующую запись:

userId name city
125 Mercedes Guadalajara

Обработка обновлений SCD типа 2

В следующем примере кода демонстрируется обработка обновлений SCD типа 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

После выполнения примера с SCD типа 2 целевая таблица будет содержать следующие записи:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Чихуахуа 5 6
124 Raul Оахака 1 null
125 Mercedes Тихуана 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancun 2 null

Запрос типа 2 SCD также может указать подмножество выходных столбцов для отслеживания журнала в целевой таблице. Изменения других столбцов обновляются вместо создания новых записей журнала. В следующем примере показано, как исключить столбец из отслеживания city :

В следующем примере показано использование журнала отслеживания с типом 2 SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

После выполнения этого примера без дополнительной TRUNCATE записи целевая таблица содержит следующие записи:

userId name city __START_AT __END_AT
123 Isabel Чихуахуа 1 6
124 Raul Оахака 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancun 2 null

Создание тестовых данных

Приведенный ниже код содержится для создания примера набора данных для использования в примерах запросов, представленных в этом руководстве. Если у вас есть необходимые учетные данные для создания новой схемы и создания новой таблицы, вы можете выполнить эти инструкции с помощью записной книжки или Databricks SQL. Следующий код не предназначен для запуска в составе конвейера Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Добавление, изменение или удаление данных в целевой потоковой таблице

Если конвейер публикует таблицы в каталоге Unity, можно использовать инструкции языка обработки данных (DML), включая инструкции insert, update, delete и merge, для изменения целевых таблиц потоковой передачи, созданных APPLY CHANGES INTO операторами.

Примечание.

  • Инструкции DML, изменяющие схему таблицы потоковой передачи, не поддерживаются. Убедитесь, что операторы DML не пытаются развивать схему таблицы.
  • Инструкции DML, обновляющие потоковую таблицу, могут выполняться только в общем кластере каталога Unity или хранилище SQL с помощью Databricks Runtime 13.3 LTS и более поздних версий.
  • Так как для потоковой передачи требуются источники данных только для добавления, если обработка требует потоковой передачи из исходной таблицы потоковой передачи с изменениями (например, операторами DML), задайте флаг skipChangeCommits при чтении исходной таблицы потоковой передачи. При skipChangeCommits установке транзакции, которые удаляют или изменяют записи в исходной таблице, игнорируются. Если для обработки не требуется потоковая таблица, можно использовать материализованное представление (которое не имеет ограничения только для добавления) в качестве целевой таблицы.

Так как разностные динамические таблицы используют указанный SEQUENCE BY столбец и распространяют соответствующие значения последовательности в __START_AT__END_AT целевые таблицы (для SCD типа 2), необходимо убедиться, что инструкции DML используют допустимые значения для этих столбцов для поддержания правильного порядка записей. Узнайте, как CDC реализуется с помощью Delta Live Tables?.

Дополнительные сведения об использовании инструкций DML с таблицами потоковой передачи см. в разделе "Добавление, изменение или удаление данных" в таблице потоковой передачи.

В следующем примере вставляется активная запись с начальной последовательностью 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);