Разностный формат в Фабрике данных Azure

ОБЛАСТЬ ПРИМЕНЕНИЯ:Фабрика данных Azure Azure Synapse Analytics

Совет

Опробуйте Фабрику данных в Microsoft Fabric, решение для аналитики "все в одном" для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных, аналитики в режиме реального времени, бизнес-аналитики и создания отчетов. Узнайте, как начать новую пробную версию бесплатно!

В этой статье объясняется, как копировать данные в озеро разностных данных и обратно при использовании в Azure Data Lake Store 2-го поколения или Хранилища BLOB-объектов Azure с помощью разностного формата. Этот соединитель доступен как встроенный набор данных в потоке сопоставления данных в качестве источника и приемника.

Свойства потока данных для сопоставления

Этот соединитель доступен как встроенный набор данных в потоке сопоставления данных в качестве источника и приемника.

Свойства источника

В таблице, приведенной ниже, указаны свойства, поддерживаемые источником разностных данных. Эти свойства можно изменить на вкладке Параметры источника.

Имя Описание Обязательно Допустимые значения Свойство сценария для потока данных
Формат Формат должен быть delta да delta format
Файловая система Контейнер/файловая система озера разностных данных да Строка fileSystem
Путь к папке Прямой путь к озеру разностных данных да Строка folderPath
Тип сжатия Тип сжатия таблицы разностных данных Нет bzip2
gzip
deflate
ZipDeflate
snappy
lz4
compressionType
Уровень сжатия Выберите приоритет: максимально быстрое сжатие или оптимальное сжатие. Обязателен, если указан ключ compressedType. Optimal или Fastest compressionLevel
Переход по времени Выберите, следует ли запрашивать старый моментальный снимок таблицы разностных данных Нет Запрос по метке времени: Timestamp
Запрос по версии: целое число
timestampAsOf
versionAsOf
Разрешить ненайденные файлы Если значение равно true, ошибка не возникает, если файлы не найдены Нет true или false ignoreNoFilesFound

Импорт схемы

Разностные данные доступны только в качестве встроенного набора данных и по умолчанию не имеют связанной схемы. Чтобы получить метаданные столбца, нажмите кнопку Импорт схемы на вкладке Проекция . Это позволяет ссылаться на имена столбцов и типы данных, указанные в корпусе . Чтобы импортировать схему, сеанс отладки потока данных должен быть активным, и у вас должен быть существующий файл определения сущности CDM, на который следует указывать.

Пример сценария источника разностных данных

source(output(movieId as integer,
            title as string,
            releaseDate as date,
            rated as boolean,
            screenedOn as timestamp,
            ticketPrice as decimal(10,2)
            ),
    store: 'local',
    format: 'delta',
    versionAsOf: 0,
    allowSchemaDrift: false,
    folderPath: $tempPath + '/delta'
  ) ~> movies

Свойства приемника

В таблице, приведенной ниже, указаны свойства, поддерживаемые приемником разностных данных. Эти свойства можно изменить на вкладке Параметры.

Имя Описание Обязательно Допустимые значения Свойство сценария для потока данных
Формат Формат должен быть delta да delta format
Файловая система Контейнер/файловая система озера разностных данных да Строка fileSystem
Путь к папке Прямой путь к озеру разностных данных да Строка folderPath
Тип сжатия Тип сжатия таблицы разностных данных Нет bzip2
gzip
deflate
ZipDeflate
snappy
lz4
compressionType
Уровень сжатия Выберите приоритет: максимально быстрое сжатие или оптимальное сжатие. Обязателен, если указан ключ compressedType. Optimal или Fastest compressionLevel
Очистка Удаляет файлы старше указанного периода, который больше не относится к текущей версии таблицы. Если указано значение 0 или меньше, операция очистки не выполняется. да Целочисленный тип vacuum
Действие таблицы Сообщает ADF, что делать с целевой таблицей Delta в приемнике. Вы можете оставить все как есть и добавить новые строки, перезаписать существующее определение таблицы и данные новыми метаданными и данными или сохранить существующую структуру таблицы, но сначала усечь все строки, а затем вставить новые строки. Нет Ничего, Усечь, Перезаписать deltaTruncate, overwrite
Update - метод При выборе параметра "Разрешить вставку" или записи в новую разностную таблицу целевой объект получает все входящие строки независимо от набора политик строк. Если данные содержат строки других политик строк, их необходимо исключить с помощью предыдущего преобразования фильтра.

Если выбраны все методы Update, выполняется слияние, где строки вставляются, удаляются, upserted/обновляются в соответствии с политиками строк, установленными с помощью предыдущего преобразования Alter Row.
да true или false insertable
deletable
upsertable
updateable
Оптимизированная запись Повышение пропускной способности для операции записи с помощью оптимизации внутреннего случайного перемешивания в исполнителях Spark. В результате разделов и файлов может быть меньше, но они могут иметь больший размер. Нет true или false optimizedWrite: true
Автоматическое сжатие После завершения операции записи Spark автоматически выполнит команду OPTIMIZE для реорганизации данных, в результате чего при необходимости появится больше разделов для повышения производительности чтения в будущем. Нет true или false autoCompact: true

Пример скрипта приемника разностных данных

Связанный сценарий потока данных:

moviesAltered sink(
          input(movieId as integer,
                title as string
            ),
           mapColumn(
                movieId,
                title
            ),
           insertable: true,
           updateable: true,
           deletable: true,
           upsertable: false,
           keys: ['movieId'],
            store: 'local',
           format: 'delta',
           vacuum: 180,
           folderPath: $tempPath + '/delta'
           ) ~> movieDB

Приемник изменений с удалением секций

С помощью этого варианта в пункте "Метод Update" выше (т. е. update/upsert/delete) можно ограничить количество проверяемых секций. Из целевого хранилища извлекается только секции, удовлетворяющие этому условию. Вы можете указать фиксированный набор значений, которые может принимать столбец секции.

Снимок экрана: параметры удаления секции для ограничения проверки.

Пример скрипта приемника изменений с удалением секций

Пример скрипта приведен ниже.

DerivedColumn1 sink( 
      input(movieId as integer,
            title as string
           ), 
      allowSchemaDrift: true,
      validateSchema: false,
      format: 'delta',
      container: 'deltaContainer',
      folderPath: 'deltaPath',
      mergeSchema: false,
      autoCompact: false,
      optimizedWrite: false,
      vacuum: 0,
      deletable:false,
      insertable:true,
      updateable:true,
      upsertable:false,
      keys:['movieId'],
      pruneCondition:['part_col' -> ([5, 8])],
      skipDuplicateMapInputs: true,
      skipDuplicateMapOutputs: true) ~> sink2
 

Delta будет считывать только 2 секции, где part_col == 5 и 8, из целевого разностного хранилища (вместо всех секций). part_col — это столбец, по которому секционированы целевые разностные данные. Ему не нужно присутствовать в исходных данных.

Варианты оптимизации приемника изменений

На вкладке Параметры вы найдете еще три параметра для оптимизации преобразования приемника изменений.

  • Если параметр схемы Merge включен, он разрешает эволюцию схемы, т. е. все столбцы, которые присутствуют в текущем входящем потоке, но не находятся в целевой таблице Delta, автоматически добавляются в ее схему. Этот параметр поддерживается всеми методами обновления.

  • Если автоматическое сжатие включено, после отдельной операции записи преобразование проверяет возможность дополнительного сжатия файлов и запускает задание быстрой оптимизации (с размером файлов 128 МБ вместо 1 ГБ), чтобы еще больше сжать файлы для секций с наибольшим числом файлов малого размера. Автоматическое сжатие помогает объединить большое количество файлов малого размера в несколько крупных файлов. Автоматическое сжатие выполняется только при наличии не менее 50 файлов. После выполнения операции сжатия она создает новую версию таблицы и записывает новый файл с данными нескольких предыдущих файлов в сжатом виде.

  • Если включена оптимизация записи, преобразование приемника динамически оптимизирует размеры секций с учетом фактических данных, пытаясь записывать файлы размером 128 МБ для каждой секции таблицы. Это приблизительный размер, который может различаться в зависимости от характеристик набора данных. Оптимизированные операции записи повышают общую эффективность операций записи и последующих операций чтения. Он упорядочивает секции таким образом, чтобы повысить производительность последующих операций чтения.

Совет

Процесс оптимизированной записи замедлит все задание извлечения, преобразования и загрузки, так как приемник выдаст команду Spark Delta Lake Optimize после обработки данных. Рекомендуется не злоупотреблять оптимизированной записью. Например, если у вас есть почасовой конвейер данных, выполняйте поток данных с оптимизированной записью ежедневно.

Известные ограничения

При записи в разностный приемник существует известное ограничение, при котором количество записанных строк не отображается в выходных данных мониторинга.

Дальнейшие действия