Поделиться через


API APPLY CHANGES: упрощение отслеживания изменений с помощью разностных динамических таблиц

Разностные динамические таблицы упрощают запись измененных данных (CDC) с APPLY CHANGES помощью API и APPLY CHANGES FROM SNAPSHOT API. Используемый интерфейс зависит от источника измененных данных:

  • Используется APPLY CHANGES для обработки изменений из веб-канала измененных данных (CDF).
  • Используйте APPLY CHANGES FROM SNAPSHOT (общедоступную предварительную версию) для обработки изменений моментальных снимков базы данных.

Ранее инструкция MERGE INTO часто использовалась для обработки записей CDC в Azure Databricks. MERGE INTO Однако может привести к неправильным результатам из-за неупорядоченных записей или требуется сложная логика для повторного упорядочивания записей.

APPLY CHANGES API поддерживается в интерфейсах SQL и Python Для разностных динамических таблиц. APPLY CHANGES FROM SNAPSHOT API поддерживается в интерфейсе Python Delta Live Tables.

APPLY CHANGES FROM SNAPSHOT Обе APPLY CHANGES таблицы поддерживают обновление таблиц с помощью SCD типа 1 и типа 2:

  • Используйте SCD типа 1 для обновления записей напрямую. Журнал не сохраняется для обновленных записей.
  • Используйте SCD типа 2 для хранения журнала записей во всех обновлениях или обновлениях указанного набора столбцов.

Сведения о синтаксисе и других ссылках см. в следующей статье:

Примечание.

В этой статье описано, как обновлять таблицы в конвейере Delta Live Tables на основе изменений в исходных данных. Дополнительные сведения о том, как записывать и запрашивать сведения об изменениях на уровне строк для таблиц Delta, см. в статье Использование веб-канала изменений данных Delta Lake в Azure Databricks.

Требования

Чтобы использовать API CDC, конвейер должен быть настроен для использования бессерверных конвейеров DLT или разностных динамических таблиц Pro или Advanced выпусков.

Как CDC реализуется с помощью APPLY CHANGES API?

Автоматически обрабатывая записи вне последовательности, APPLY CHANGES API в Delta Live Tables обеспечивает правильную обработку записей CDC и удаляет необходимость разработки сложной логики для обработки записей вне последовательности. Необходимо указать столбец в исходных данных, для которых выполняется последовательность записей, которые разностные динамические таблицы интерпретируются как монотонное увеличение правильного порядка исходных данных. Разностные динамические таблицы автоматически обрабатывают данные, поступающие из порядка. Для изменений типа SCD 2 разностные динамические таблицы распространяют соответствующие значения последовательности в целевые таблицы __START_AT и __END_AT столбцы. При каждом значении последовательности должно быть одно отдельное обновление, а значения последовательности NULL не поддерживаются.

Чтобы выполнить обработку APPLY CHANGESCDC, сначала создайте таблицу потоковой передачи, а затем используйте APPLY CHANGES INTO инструкцию в SQL или apply_changes() функции в Python, чтобы указать источник, ключи и последовательности для канала изменений. Чтобы создать целевую потоковую таблицу, используйте CREATE OR REFRESH STREAMING TABLE инструкцию в SQL или create_streaming_table() функции в Python. См. примеры обработки SCD типа 1 и типа 2.

Дополнительные сведения о синтаксисе см. в справочнике по SQL Delta Live Tables или в справочнике по Python.

Как CDC реализуется с помощью APPLY CHANGES FROM SNAPSHOT API?

Внимание

APPLY CHANGES FROM SNAPSHOT API находится в общедоступной предварительной версии.

APPLY CHANGES FROM SNAPSHOT — это декларативный API, который эффективно определяет изменения исходных данных путем сравнения ряда моментальных снимков в порядке, а затем выполняет обработку, необходимую для обработки записей CDC в моментальных снимках. APPLY CHANGES FROM SNAPSHOT поддерживается только интерфейсом Python Delta Live Tables.

APPLY CHANGES FROM SNAPSHOT поддерживает прием моментальных снимков из нескольких типов источников:

  • Используйте периодическое прием моментальных снимков для приема моментальных снимков из существующей таблицы или представления. APPLY CHANGES FROM SNAPSHOT имеет простой, упрощенный интерфейс для поддержки периодически приема моментальных снимков из существующего объекта базы данных. Новый моментальный снимок выполняется при каждом обновлении конвейера, а время приема используется в качестве версии моментального снимка. При выполнении конвейера в непрерывном режиме несколько моментальных снимков получаются при каждом обновлении конвейера за период, определенный параметром интервала триггера для потока, содержащего обработку APPLY CHANGES FROM SNAPSHOT.
  • Используйте прием исторических моментальных снимков для обработки файлов, содержащих моментальные снимки базы данных, например моментальные снимки, созданные из базы данных Oracle или MySQL или хранилища данных.

Чтобы выполнить обработку CDC из любого исходного типа APPLY CHANGES FROM SNAPSHOT, сначала создайте таблицу потоковой передачи, а затем используйте apply_changes_from_snapshot() функцию в Python, чтобы указать моментальный снимок, ключи и другие аргументы, необходимые для реализации обработки. Ознакомьтесь с примерами приема периодических моментальных снимков и историческими примерами приема моментальных снимков.

Моментальные снимки, передаваемые API, должны находиться в порядке возрастания по версии. Если разностные динамические таблицы обнаруживают моментальный снимок вне порядка, возникает ошибка.

Дополнительные сведения о синтаксисе см. в справочнике по Python для разностных динамических таблиц.

Ограничения

Столбец, используемый для последовательности, должен быть сортируемым типом данных.

Пример: обработка SCD типа 1 и SCD типа 2 с исходными данными CDF

В следующих разделах приведены примеры запросов SCD типа 1 и типа 2 delta Live Tables, которые обновляют целевые таблицы на основе исходных событий из канала измененных данных:

  1. Создает новые записи пользователей.
  2. Удаляет запись пользователя.
  3. Обновляет записи пользователей. В примере SCD типа 1 последние UPDATE операции приходят поздно и удаляются из целевой таблицы, демонстрируя обработку событий вне порядка.

В следующих примерах предполагается знакомство с настройкой и обновлением конвейеров Delta Live Tables. См . руководство. Запуск первого конвейера live tables Delta Live Tables.

Для выполнения этих примеров необходимо начать с создания примера набора данных. См. статью "Создание тестовых данных".

Ниже приведены входные записи для этих примеров.

userId name city Операция sequenceNum
124 Raul Оахака ВСТАВИТЬ 1
123 Isabel Monterrey ВСТАВИТЬ 1
125 Mercedes Тихуана ВСТАВИТЬ 2
126 Lily Cancun ВСТАВИТЬ 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Чиуауа UPDATE 5

Если вы раскомментируете окончательную строку в примерах данных, вставьте следующую запись, указывающую, где должны быть усечены записи:

userId name city Операция sequenceNum
null null null TRUNCATE 3

Примечание.

Все приведенные ниже примеры включают параметры для указания обоих DELETE и TRUNCATE операций, но каждый из них является необязательным.

Обработка обновлений SCD типа 1

В следующем примере демонстрируется обработка обновлений SCD типа 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

После выполнения примера с SCD типа 1 целевая таблица будет содержать следующие записи:

userId name city
124 Raul Оахака
125 Mercedes Guadalajara
126 Lily Cancun

После запуска примера SCD типа 1 с дополнительной записью TRUNCATE записи 124 и 126 усекаются из-за операции TRUNCATE в sequenceNum=3, а целевая таблица содержит следующую запись:

userId name city
125 Mercedes Guadalajara

Обработка обновлений SCD типа 2

В следующем примере демонстрируется обработка обновлений SCD типа 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

После выполнения примера с SCD типа 2 целевая таблица будет содержать следующие записи:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Чиуауа 5 6
124 Raul Оахака 1 null
125 Mercedes Тихуана 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancun 2 null

Запрос типа 2 SCD также может указать подмножество выходных столбцов для отслеживания журнала в целевой таблице. Изменения других столбцов обновляются вместо создания новых записей журнала. В следующем примере показано, как исключить столбец из отслеживания city :

В следующем примере показано использование журнала отслеживания с типом 2 SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

После выполнения этого примера без дополнительной TRUNCATE записи целевая таблица содержит следующие записи:

userId name city __START_AT __END_AT
123 Isabel Чиуауа 1 6
124 Raul Оахака 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancun 2 null

Создание тестовых данных

Приведенный ниже код содержится для создания примера набора данных для использования в примерах запросов, представленных в этом руководстве. Если у вас есть необходимые учетные данные для создания новой схемы и создания новой таблицы, эти инструкции можно выполнить с помощью записной книжки или Databricks SQL. Следующий код не предназначен для запуска в составе конвейера Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Пример. Периодическое обработка моментальных снимков

В следующем примере показана обработка SCD типа 2, которая обрабатывает моментальные снимки таблицы, хранимой в mycatalog.myschema.mytableней. Результаты обработки записываются в таблицу с именем target.

mycatalog.myschema.mytable записи в метке времени 2024-01-01 00:00:00

Ключ Значение
1 А1
2 А2

mycatalog.myschema.mytable записи в метке времени 2024-01-01 12:00:00

Ключ Значение
2 Б2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

После обработки моментальных снимков целевая таблица содержит следующие записи:

Ключ Значение __START_AT __END_AT
1 А1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 А2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 Б2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

Пример: обработка исторических моментальных снимков

В следующем примере показана обработка SCD типа 2, которая обновляет целевую таблицу на основе исходных событий из двух моментальных снимков, хранящихся в облачной системе хранения:

Моментальный снимок в timestamp, хранящийся в /<PATH>/filename1.csv

Ключ TrackingColumn NonTrackingColumn
1 А1 b1
2 А2 Б2
4 a4 b4

Моментальный снимок в timestamp + 5, хранящийся в /<PATH>/filename2.csv

Ключ TrackingColumn NonTrackingColumn
2 a2_new Б2
3 a3 b3
4 a4 b4_new

В следующем примере кода демонстрируется обработка обновлений SCD типа 2 с этими моментальными снимками:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

После обработки моментальных снимков целевая таблица содержит следующие записи:

Ключ TrackingColumn NonTrackingColumn __START_AT __END_AT
1 А1 b1 1 2
2 А2 Б2 1 2
2 a2_new Б2 2 null
3 a3 b3 2 null
4 a4 b4_new 1 null

Добавление, изменение или удаление данных в целевой потоковой таблице

Если конвейер публикует таблицы в каталоге Unity, можно использовать инструкции языка обработки данных (DML), включая инструкции insert, update, delete и merge, для изменения целевых таблиц потоковой передачи, созданных APPLY CHANGES INTO операторами.

Примечание.

  • Инструкции DML, изменяющие схему таблицы потоковой передачи, не поддерживаются. Убедитесь, что операторы DML не пытаются развивать схему таблицы.
  • Инструкции DML, обновляющие потоковую таблицу, могут выполняться только в общем кластере каталога Unity или хранилище SQL с помощью Databricks Runtime 13.3 LTS и более поздних версий.
  • Так как для потоковой передачи требуются источники данных только для добавления, если обработка требует потоковой передачи из исходной таблицы потоковой передачи с изменениями (например, операторами DML), задайте флаг skipChangeCommits при чтении исходной таблицы потоковой передачи. При skipChangeCommits установке транзакции, которые удаляют или изменяют записи в исходной таблице, игнорируются. Если для обработки не требуется потоковая таблица, можно использовать материализованное представление (которое не имеет ограничения только для добавления) в качестве целевой таблицы.

Так как разностные динамические таблицы используют указанный SEQUENCE BY столбец и распространяют соответствующие значения последовательности в __START_AT __END_AT целевые таблицы (для SCD типа 2), необходимо убедиться, что инструкции DML используют допустимые значения для этих столбцов для поддержания правильного порядка записей. Узнайте, как CDC реализован с помощью API APPLY CHANGES?.

Дополнительные сведения об использовании инструкций DML с таблицами потоковой передачи см. в разделе "Добавление, изменение или удаление данных" в таблице потоковой передачи.

В следующем примере вставляется активная запись с начальной последовательностью 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Чтение веб-канала изменений из целевой APPLY CHANGES таблицы

В Databricks Runtime 15.2 и более поздних версиях можно считывать веб-канал данных изменений из таблицы потоковой передачи, которая является целью APPLY CHANGES или APPLY CHANGES FROM SNAPSHOT запросами таким же образом, как и чтение веб-канала изменений из других таблиц Delta. Для чтения веб-канала изменений из целевой таблицы потоковой передачи необходимо следующее:

  • Целевая таблица потоковой передачи должна быть опубликована в каталоге Unity. См. раздел "Использование каталога Unity" с конвейерами Delta Live Tables.
  • Чтобы прочитать веб-канал изменений из целевой потоковой таблицы, необходимо использовать Databricks Runtime 15.2 или более поздней версии. Чтобы прочитать веб-канал изменений в другом конвейере разностных динамических таблиц, конвейер должен быть настроен для использования Databricks Runtime 15.2 или более поздней версии.

Вы считываете веб-канал изменений из целевой таблицы потоковой передачи, созданной в конвейере Разностных динамических таблиц, так же, как чтение веб-канала изменений из других таблиц Delta. Дополнительные сведения об использовании функции веб-канала изменений Delta, включая примеры в Python и SQL, см. в статье Использование веб-канала изменений Delta Lake в Azure Databricks.

Примечание.

Запись канала изменений включает метаданные , определяющие тип события изменения. При обновлении записи в таблице метаданные связанных записей изменений обычно включают _change_type значения, заданные update_preimage и update_postimage события.

Однако значения отличаются, _change_type если обновления вносятся в целевую потоковую таблицу, включающую изменение значений первичного ключа. Если изменения включают обновления первичных ключей, _change_type поля метаданных задаются insert и delete события. Изменения первичных ключей могут возникать при внесении обновлений вручную в одно из ключевых полей с UPDATE инструкцией или MERGE для таблиц SCD типа 2, когда __start_at поле изменяется на более раннее начальное значение последовательности.

Запрос APPLY CHANGES определяет значения первичного ключа, которые отличаются для обработки SCD типа 1 и SCD типа 2:

  • Для обработки SCD типа 1 и интерфейса Python Delta Live Tables первичный ключ является значением keys параметра в apply_changes() функции. Для интерфейса SQL Delta Live Tables первичный ключ — это столбцы, определенные предложением KEYS в инструкции APPLY CHANGES INTO .
  • Для SCD типа 2 первичный ключ является keys параметром или KEYS предложением, а также возвращаемым значением операции coalesce(__START_AT, __END_AT) , где __START_AT и __END_AT являются соответствующими столбцами из целевой потоковой таблицы.

Получение данных о записях, обработанных запросом CDC Delta Live Tables

Примечание.

Следующие метрики фиксируются только APPLY CHANGES запросами, а не APPLY CHANGES FROM SNAPSHOT запросами.

Следующие метрики фиксируются запросами APPLY CHANGES :

  • num_upserted_rows: количество выходных строк, которые добавляются в набор данных во время обновления.
  • num_deleted_rows: количество существующих выходных строк, удаленных из набора данных во время обновления.

Метрика, выходные num_output_rows данные для потоков, отличных от CDC, не фиксируются для apply changes запросов.

Какие объекты данных используются для обработки CDC разностных динамических таблиц?

Примечание. Следующие структуры данных применяются только к APPLY CHANGES обработке, а не APPLY CHANGES FROM SNAPSHOT к обработке.

При объявлении целевой таблицы в хранилище метаданных Hive создаются две структуры данных:

  • Представление с именем, назначенным целевой таблице.
  • Внутренняя резервная таблица, используемая Delta Live Table для управления обработкой CDC. Эта таблица называется путем подготовки __apply_changes_storage_ к имени целевой таблицы.

Например, если объявить целевую таблицу с именем dlt_cdc_target, вы увидите представление с именем и таблицей с именем dlt_cdc_target __apply_changes_storage_dlt_cdc_target в хранилище метаданных. Создание представления позволяет разностным динамическим таблицам отфильтровать дополнительные сведения (например, на могилах и версиях), необходимые для обработки данных вне порядка. Чтобы просмотреть обработанные данные, выполните запрос к целевому представлению. Так как схема таблицы может измениться для поддержки __apply_changes_storage_ будущих функций или улучшений, не следует запрашивать таблицу для использования в рабочей среде. При добавлении данных вручную в таблицу предполагается, что записи будут поступать до других изменений, так как столбцы версии отсутствуют.

Если конвейер публикуется в каталоге Unity, внутренние резервные таблицы недоступны для пользователей.