Потоковая передача разностных таблиц для чтения и записи

Delta Lake надежно интегрирована со Структурированной потоковой передачей Spark с помощью readStream и writeStream. Delta Lake преодолевает многие ограничения, которые обычно связаны с системами потоковой передачи и файлами, включая:

  • Объединение небольших файлов, созданных при низкой задержке приема.
  • Обслуживание обработки "ровно один раз" с несколькими потоками (или параллельными пакетными заданиями).
  • Эффективное обнаружение новых файлов при использовании файлов в качестве источника для потока.

Примечание.

В этой статье описывается использование таблиц Delta Lake в качестве источников потоковой передачи и приемников. Сведения о загрузке данных с помощью потоковых таблиц в Databricks SQL см. в статье "Загрузка данных с помощью потоковых таблиц в Databricks SQL".

Таблица Delta в качестве источника

Структурированная потоковая передача постепенно считывает таблицы Delta. Хотя потоковый запрос активен для таблицы Delta, новые записи обрабатываются идемпотентно, так как новые версии таблиц фиксируются в исходной таблице.

В следующих примерах кода показано, как настроить потоковое чтение с помощью имени таблицы или пути к файлу.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Внимание

Если схема для таблицы Delta изменяется после начала потокового чтения таблицы, запрос завершается сбоем. Для большинства изменений схемы можно перезапустить поток, чтобы устранить несоответствие схемы и продолжить обработку.

В Databricks Runtime 12.2 LTS и более поздней версии нельзя передавать поток из таблицы Delta с включенным сопоставлением столбцов, которые подверглись эволюции недитивной схемы, например переименованию или удалению столбцов. Дополнительные сведения см. в разделе Потоковая передача с сопоставлением столбцов и изменениями схемы.

Ограничить скорость ввода

Для управления микропакетами доступны следующие параметры:

  • maxFilesPerTrigger: Сколько новых файлов будет учитываться в каждом микропакете. Значение по умолчанию — 1000.
  • maxBytesPerTrigger: Сколько данных обрабатывается в каждом микропакете. Этот параметр задает значение "мягкого максимума", то есть пакет обрабатывает приблизительно такой объем данных и может обработать больше, чтобы потоковый запрос продолжился в случаях, когда наименьший входной блок превышает это ограничение. Это не задано по умолчанию.

Если используется maxBytesPerTrigger в сочетании с maxFilesPerTrigger, микропакетная обработка охватывает данные до ограничения объема maxFilesPerTrigger или maxBytesPerTrigger.

Примечание.

В случаях, когда транзакции исходной таблицы очищаются из-за logRetentionDurationконфигурации и потокового запроса пытается обработать эти версии, по умолчанию запрос не может избежать потери данных. Вы можете задать параметр failOnDataLoss , чтобы false игнорировать потерянные данные и продолжать обработку.

Потоковая передача потока отслеживания измененных данных Delta Lake (CDC)

Delta Lake изменяет записи канала данных в таблицу Delta, включая обновления и удаления. При включении можно выполнять потоковую передачу из канала измененных данных и записывать логику для обработки вставок, обновлений и удаления в подчиненные таблицы. Хотя выходные данные канала изменений немного отличаются от описанной таблицы Delta, это обеспечивает решение для распространения добавочных изменений в подчиненных таблицах в архитектуре медальона.

Внимание

В Databricks Runtime 12.2 LTS и ниже вы не можете передавать поток из канала измененных данных для таблицы Delta с включенным сопоставлением столбцов, которые пережили эволюцию недитивной схемы, например переименование или удаление столбцов. См. раздел Потоковая передача с сопоставлением столбцов и изменениями схемы.

Игнорировать обновления и удаления

В структурированной потоковой передаче не обрабатываются входные данные, кроме дополнений, и создается исключение, если в таблице, используемой в качестве источника, происходят какие-либо изменения. Существуют две основные стратегии для работы с изменениями, которые не могут быть автоматически распространены по нисходящей:

  • Вы можете удалить выходные данные и контрольную точку и перезапустить поток с самого начала.
  • Можно задать один из следующих двух вариантов:
    • ignoreDeletes: игнорировать транзакции, удаляющие данные на границах секций.
    • skipChangeCommits: игнорировать транзакции, которые удаляют или изменяют существующие записи. skipChangeCommits включает ignoreDeletes.

Примечание.

В Databricks Runtime 12.2 LTS и более поздних skipChangeCommits версиях не рекомендуется использовать предыдущий параметр ignoreChanges. В Databricks Runtime 11.3 LTS и более низкий ignoreChanges вариант поддерживается.

Семантика параметраignoreChanges существенно отличается от семантики параметра skipChangeCommits. Если параметр ignoreChanges включен, перезаписанные файлы данных в исходной таблице будут созданы повторно после операции изменения данных, такой как UPDATE, MERGE INTO, DELETE (в секциях) или OVERWRITE. Неизменяемые строки часто создаются вместе с новыми строками, поэтому нижестоящие потребители должны иметь возможность обрабатывать дубликаты. Удаления не распространяются по нисходящей. ignoreChanges включает ignoreDeletes.

skipChangeCommits полностью игнорирует операции изменения файлов. Файлы данных, которые перезаписываются в исходной таблице из-за операций изменения данных, таких как UPDATE, MERGE INTO, DELETE и OVERWRITE, полностью игнорируются. Чтобы отразить изменения в вышестоящих исходных таблицах, необходимо реализовать отдельную логику для распространения этих изменений.

Рабочие нагрузки, настроенные с продолжением работы с ignoreChanges известной семантикой, но Databricks рекомендует использовать skipChangeCommits для всех новых рабочих нагрузок. Перенос рабочих нагрузок, использующихся ignoreChanges для skipChangeCommits выполнения рефакторинга, требует логики рефакторинга.

Пример

Например, предположим, что имеется таблица user_events со столбцами date, user_email и action, которая секционирована по date. Происходит потоковая передача из таблицы user_events и необходимо удалить из нее данные в соответствии с GDPR.

При удалении на границах секций (то есть WHERE в столбце секции) файлы уже разбиты по значению, поэтому удаление просто удаляет эти файлы из метаданных. При удалении всей секции данных можно использовать следующее:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/tmp/delta/user_events")

При удалении данных в нескольких разделах (в этом примере фильтрация по user_email) используйте следующий синтаксис:

spark.readStream.format("delta")
  .option("skipChangeCommits", "true")
  .load("/tmp/delta/user_events")

При обновлении user_email с помощью оператора UPDATE файл с указанным user_email перезаписывается. Используйте skipChangeCommits для пропуска измененных файлов данных.

Указать начальное расположение

Можно использовать следующие параметры, чтобы указать начальную точку источника потоковой передачи Delta Lake, не обрабатывая всю таблицу.

  • startingVersion: Версия Delta Lake, с которой начинается запуск. Databricks рекомендует не использовать этот параметр для большинства рабочих нагрузок. Если он не задан, поток начинается с последней доступной версии, включая полный моментальный снимок таблицы в данный момент.

    Если задано, поток считывает все изменения в таблицу Delta, начиная с указанной версии (включительно). Если указанная версия больше недоступна, поток не запускается. Можно получить версии фиксации из столбца version выходных данных команды DESCRIBE HISTORY.

    Чтобы вернуть только последние изменения, укажите latest.

  • startingTimestamp: Метка времени для запуска. Все изменения таблицы, зафиксированные в метке времени (включительно), считываются средством чтения потоковой передачи. Если указанная метка времени предшествует всем фиксациям таблицы, потоковая передача начинается с самой ранней доступной метки времени. Одно из двух значений:

    • Строка метки времени. Например, "2019-01-01T00:00:00.000Z".
    • Строка даты. Например, "2019-01-01".

Одновременно нельзя задать оба параметра. Они вступают в силу только при запуске нового потокового запроса. Если потоковый запрос запущен и в его контрольной точке записан ход выполнения, эти параметры игнорируются.

Внимание

Источник потоковой передачи можно запустить из указанной версии или метки времени, но схема источника потоковой передачи всегда является последней схемой таблицы Delta. Необходимо убедиться в отсутствии несовместимых изменений схемы в таблице Delta после указанной версии или метки времени. В противном случае источник потоковой передачи может выдать неверные результаты при чтении данных с неверной схемой.

Пример

Предположим, имеется таблица user_events. Если вы хотите считать изменения начиная с версии 5, используйте:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/tmp/delta/user_events")

Если вы хотите считать изменения начиная с 18 октября 2018 г., используйте:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/tmp/delta/user_events")

Обработка исходного моментального снимка без удаления данных

Примечание.

Эта функция доступна в Databricks Runtime 11.3 LTS и выше. Эта функция предоставляется в режиме общедоступной предварительной версии.

При использовании разностной таблицы в качестве источника потока запрос сначала обрабатывает все данные, имеющиеся в таблице. В этой версии разностная таблица называется начальным моментальным снимком. По умолчанию файлы данных разностной таблицы обрабатываются в зависимости от того, какой файл был изменен в последний раз. Однако время последнего изменения необязательно соответствует временной последовательности событий.

В запросе потоковой передачи с отслеживанием состояния с определенным водяным знаком обработка файлов по времени изменения может привести к обработке записей в неправильном порядке. Это может привести к удалению записей как более поздних событий на основе водяного знака.

Чтобы избежать проблемы с удалением данных, включите следующий параметр:

  • withEventTimeOrder: следует ли обрабатывать исходный моментальный снимок в порядке времени события.

Если включен временной порядок события, диапазон времени исходного моментального снимка делится на периоды времени. Каждый микропакет обрабатывает период путем фильтрации данных в пределах диапазона времени. Параметры конфигурации maxFilesPerTrigger и maxBytesPerTrigger по-прежнему можно использовать для управления размером микропакета, но только приблизительно, что связано с принципом обработки.

На рисунке ниже показан этот процесс:

Исходный моментальный снимок

Важная информация об этой функции:

  • Проблема с удалением данных возникает только в том случае, если исходный разностный моментальный снимок запроса потоковой передачи с отслеживанием состояния обрабатывается в порядке по умолчанию.
  • После запуска запроса потока во время обработки исходного моментального снимка изменить withEventTimeOrder невозможно. Чтобы запустить повторно с измененным withEventTimeOrder, необходимо удалить контрольную точку.
  • Если выполняется потоковый запрос с включенным параметром EventTimeOrder, вы не сможете перейти на более раннюю версию DBR, которая не поддерживает эту функцию до завершения обработки исходного моментального снимка. Если необходимо перейти на более раннюю версию, можно дождаться завершения обработки исходного моментального снимка или удалить контрольную точку и перезапустить запрос.
  • Эта функция не поддерживается в следующих редких сценариях:
    • Столбец времени события — это генерируемый столбец, и между разностным источником и водяным знаком выполняются незапланированные преобразования.
    • В запросе потока имеется водяной знак с несколькими разностными источниками.
  • Если включен временной порядок событий, производительность обработки исходных разностных моментальных снимков может быть ниже.
  • Каждый микропакет сканирует исходный моментальный снимок, чтобы отфильтровать данные в пределах соответствующего диапазона времени события. Чтобы ускорить фильтрацию, рекомендуется использовать столбец источника Delta в качестве времени события, чтобы можно было применить пропуск данных (проверка пропускать данные для Delta Lake, если это применимо). Кроме того, дополнительного ускорения обработки можно добиться с помощью разделения таблиц по столбцу времени события. С помощью пользовательского интерфейса Spark можно узнать, сколько сканируется разностных файлов для определенного микропакета.

Пример

Предположим, имеется таблица user_events со столбцом event_time. Запрос потоковой передачи — это агрегатный запрос. Если вы хотите убедиться, что данные не будут удаляться во время обработки исходных моментальных снимков, можно использовать следующее:

spark.readStream.format("delta")
  .option("withEventTimeOrder", "true")
  .load("/tmp/delta/user_events")
  .withWatermark("event_time", "10 seconds")

Примечание.

Вы также можете включить эту конфигурацию Spark в кластере, которая будет применяться ко всем потоковым запросам: spark.databricks.delta.withEventTimeOrder.enabled true

Таблица Delta в качестве приемника

Кроме того, запись данных в таблицу Delta можно выполнять с помощью структурированного потока. Журнал транзакций позволяет Delta Lake гарантировать только одну обработку, даже если в таблице имеются другие потоки или пакетные запросы, выполняющиеся параллельно.

Примечание.

Функция Delta Lake VACUUM удаляет все файлы, не управляемые Delta Lake, но пропускает все каталоги, имена которых начинаются с _. Вы можете безопасно хранить контрольные точки вместе с другими данными и метаданными для дельта-таблицы, используя такую структуру каталогов, как <table-name>/_checkpoints.

Метрики

Количество байтов и файлов, которым еще предстоит пройти обработку, можно узнать в процессе потоковой обработки, с помощью метрик numBytesOutstanding и numFilesOutstanding. К дополнительным метрикам относятся:

  • numNewListedFiles: количество файлов Delta Lake, перечисленных для вычисления невыполненной работы для этого пакета.
    • backlogEndOffset: версия таблицы, используемая для вычисления невыполненной работы.

Если вы выполняете поток в записной книжке, эти метрики можно просмотреть на вкладке "Необработанные данные " на панели мониторинга хода выполнения потокового запроса:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Режим добавления

По умолчанию потоки выполняются в режиме добавления, когда добавляются новые записи в таблицу.

Можно использовать метод пути:

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/_checkpoints/")
   .start("/delta/events")
)

Scala

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .start("/tmp/delta/events")

toTable или метод, как показано ниже.

Python

(events.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Полный режим

Структурированную потоковую передачу также можно использовать для замены всей таблицы на каждый пакет. Один из примеров использования — вычисление сводки с помощью статистической обработки:

Python

(spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")
)

Scala

spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")

В предыдущем примере непрерывно обновляется таблица, которая содержит совокупное количество событий по клиентам.

Для приложений с менее строгими дополнительными требованиями к задержке можно сэкономить вычислительные ресурсы с одноразовыми триггерами. Используйте их для обновления сводных статистических таблиц по заданному расписанию, обрабатывая только новые данные, поступившие с момента последнего обновления.

Выполнение потоковых статических объединений

Вы можете положиться на транзакционные гарантии и протокол управления версиями Delta Lake для выполнения потоковых статических объединений. Потоковое статическое объединение объединяет последнюю допустимую версию дельта-таблицы (статические данные) с потоком данных с помощью объединения без сохранения состояния.

Когда Azure Databricks обрабатывает микропакет данных в потоковом статическом объединении, последняя действительная версия данных из статической разностной таблицы объединяется с записями, представленными в текущем микропакете. Так как объединение не имеет состояния, вам не нужно настраивать водяные знаки, и вы можете обрабатывать результаты с малой задержкой. Данные в статической дельта-таблице, используемой в объединении, должны изменяться медленно.

streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")

query = (streamingDF
  .join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .table("orders_with_customer_info")
)

Upsert из потоковых запросов с помощью foreachBatch

Можно использовать сочетание merge и foreachBatch запись сложных upserts из потокового запроса в разностную таблицу. См. раздел Использование foreachBatch для записи в произвольные приемники данных.

Этот шаблон содержит множество приложений, включая следующее:

  • Запись агрегатов потоковой передачи в режиме обновления — это гораздо более эффективно, чем полный режим.
  • Запись потока изменений базы данных в таблицу Delta — запрос слияния для записи измененных данных можно использовать в foreachBatch для непрерывного применения потока изменений к таблице Delta.
  • Запись потока данных в разностную таблицу с дедупликацией: запрос слияния только для вставки для дедупликации можно использовать для foreachBatch непрерывной записи данных (с дубликатами) в таблицу Delta с автоматической дедупликацией.

Примечание.

  • Убедитесь, что инструкция merge в foreachBatch идемпотентная, так как перезагрузка потокового запроса может применить операцию к тому же пакету данных несколько раз.
  • Если merge используется в foreachBatch, скорость входных данных для запроса потоковой передачи (выдаваемых через StreamingQueryProgress и видимых в диаграмме скорости записной книжки) может быть кратна фактической скорости, с которой данные создаются в источнике. Это связано с тем, что merge считывает входные данные несколько раз, что приводит к умножению метрик входных данных. Если это узкое место, можно кэшировать пакетный DataFrame до merge и кэшировать его после merge.

В следующем примере показано, как можно использовать SQL для foreachBatch выполнения этой задачи:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Вы также можете использовать API Delta Lake для выполнения потоковых upserts, как показано в следующем примере:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Идемпотентная таблица записывает данные в foreachBatch

Примечание.

Databricks рекомендует настроить отдельную потоковую запись для каждого приемника, который вы хотите обновить. Использование foreachBatch для записи в несколько таблиц сериализует записи, что снижает параллелизайтон и увеличивает общую задержку.

Разностные таблицы поддерживают следующие DataFrameWriter параметры для записи в несколько таблиц в foreachBatch идемпотентном режиме:

  • txnAppId: уникальная строка, которую можно передать для каждой записи dataFrame. Например, идентификатор StreamingQuery можно использовать как txnAppId.
  • txnVersion: Монотонно возрастающее количество, действующее как версия транзакции.

В Delta Lake используется сочетание txnAppId и txnVersion для обнаружения повторяющихся записей и их пропуска.

Если пакетная запись прерывается сбоем, повторное выполнение пакета использует одно и то же приложение и идентификатор пакетной службы, чтобы помочь среде выполнения правильно определить повторяющиеся записи и игнорировать их. Идентификатор приложения (txnAppId) может быть любой уникальной строкой, созданной пользователем, и не обязательно должен быть связан с идентификатором потока. См. раздел Использование foreachBatch для записи в произвольные приемники данных.

Предупреждение

Если удалить точку потоковой передачи проверка и перезапустить запрос с новой проверка point, необходимо указать другойtxnAppId. Новые точки проверка начинаются с пакетного идентификатора0. Delta Lake использует идентификатор пакета и txnAppId в качестве уникального ключа и пропускает пакеты с уже видимыми значениями.

В следующем примере кода показан этот шаблон:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}