Разностный формат в Фабрике данных Azure
ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics
Совет
Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !
В этой статье объясняется, как копировать данные в озеро разностных данных и обратно при использовании в Azure Data Lake Store 2-го поколения или Хранилища BLOB-объектов Azure с помощью разностного формата. Этот соединитель доступен как встроенный набор данных в потоке сопоставления данных в качестве источника и приемника.
Свойства потока данных для сопоставления
Этот соединитель доступен как встроенный набор данных в потоке сопоставления данных в качестве источника и приемника.
Свойства источника
В таблице, приведенной ниже, указаны свойства, поддерживаемые источником разностных данных. Изменить эти свойства можно на вкладке Source options (Параметры источника).
Имя | Описание | Обязательное поле | Допустимые значения | Свойство скрипта для потока данных |
---|---|---|---|---|
Формат | Формат должен быть delta |
yes | delta |
format |
Файловая система | Контейнер/файловая система озера разностных данных | yes | Строка | fileSystem |
Folder path | Каталог разностного озера | yes | Строка | folderPath |
Тип сжатия | Тип сжатия таблицы разностных данных | no | bzip2 gzip deflate ZipDeflate snappy lz4 |
compressionType |
Compression level | Выберите приоритет: максимально быстрое сжатие или оптимальное сжатие. | Обязателен, если указан ключ compressedType . |
Optimal или Fastest |
compressionLevel |
Переход по времени | Выберите, следует ли запрашивать старый моментальный снимок таблицы разностных данных | no | Запрос по метке времени: метка времени Запрос по версии: целое число |
метка времениAsOf versionAsOf |
Allow no files found (Разрешить ненайденные файлы) | Если значение true, ошибка не возникает, если файлы не найдены | no | true или false |
ignoreNoFilesFound |
Импорт схемы
Разностные данные доступны только в качестве встроенного набора данных и по умолчанию не имеют связанной схемы. Чтобы получить метаданные столбца, нажмите кнопку "Импорт схемы" на вкладке "Проекция". Это позволяет ссылаться на имена столбцов и типы данных, указанные в корпусе. Чтобы импортировать схему, сеанс отладки потока данных должен быть активным, и для указания необходимо указать существующий файл определения сущности CDM.
Пример сценария источника разностных данных
source(output(movieId as integer,
title as string,
releaseDate as date,
rated as boolean,
screenedOn as timestamp,
ticketPrice as decimal(10,2)
),
store: 'local',
format: 'delta',
versionAsOf: 0,
allowSchemaDrift: false,
folderPath: $tempPath + '/delta'
) ~> movies
Свойства приемника
В таблице, приведенной ниже, указаны свойства, поддерживаемые приемником разностных данных. Эти свойства можно изменить на вкладке Параметры.
Имя | Описание | Обязательное поле | Допустимые значения | Свойство скрипта для потока данных |
---|---|---|---|---|
Формат | Формат должен быть delta |
yes | delta |
format |
Файловая система | Контейнер/файловая система озера разностных данных | yes | Строка | fileSystem |
Folder path | Каталог разностного озера | yes | Строка | folderPath |
Тип сжатия | Тип сжатия таблицы разностных данных | no | bzip2 gzip deflate ZipDeflate snappy lz4 TarGZip tar |
compressionType |
Compression level | Выберите приоритет: максимально быстрое сжатие или оптимальное сжатие. | Обязателен, если указан ключ compressedType . |
Optimal или Fastest |
compressionLevel |
Vacuum | Удаляет файлы старше указанной длительности, которая больше не относится к текущей версии таблицы. Если указано значение 0 или меньше, операция вакуума не выполняется. | yes | Целое | vacuum |
Действие таблицы | Сообщает ADF, что делать с целевой таблицей Delta в приемнике. Вы можете оставить все как есть и добавить новые строки, перезаписать существующее определение таблицы и данные новыми метаданными и данными или сохранить существующую структуру таблицы, но сначала усечь все строки, а затем вставить новые строки. | no | Ничего, Усечь, Перезаписать | deltaTruncate, перезапись |
Метод обновления | При нажатии кнопки "Разрешить вставку" отдельно или при записи в новую разностную таблицу целевой объект получает все входящие строки независимо от набора политик строк. Если данные содержат строки других политик строк, их необходимо исключить с помощью предыдущего преобразования фильтра. При выборе всех методов обновления выполняется слияние, где строки вставляются, удаляются или обновляются в соответствии с набором политик строк, используя предыдущее преобразование Alter Row. |
yes | true или false |
Вставляемый deletable upsertable updateable |
Оптимизированная запись | Повышение пропускной способности для операции записи с помощью оптимизации внутреннего случайного перемешивания в исполнителях Spark. В результате разделов и файлов может быть меньше, но они могут иметь больший размер. | no | true или false |
optimizedWrite: true |
Автоматическое сжатие | После завершения операции записи Spark автоматически выполнит команду OPTIMIZE для реорганизации данных, в результате чего при необходимости появится больше разделов для повышения производительности чтения в будущем. |
no | true или false |
autoCompact: true |
Пример скрипта приемника разностных данных
Связанный скрипта потока данных:
moviesAltered sink(
input(movieId as integer,
title as string
),
mapColumn(
movieId,
title
),
insertable: true,
updateable: true,
deletable: true,
upsertable: false,
keys: ['movieId'],
store: 'local',
format: 'delta',
vacuum: 180,
folderPath: $tempPath + '/delta'
) ~> movieDB
Приемник изменений с удалением секций
С помощью этого варианта в пункте "Метод Update" выше (т. е. update/upsert/delete) можно ограничить количество проверяемых секций. Из целевого хранилища извлекается только секции, удовлетворяющие этому условию. Вы можете указать фиксированный набор значений, которые может принимать столбец секции.
Пример скрипта приемника изменений с удалением секций
Пример скрипта приведен ниже.
DerivedColumn1 sink(
input(movieId as integer,
title as string
),
allowSchemaDrift: true,
validateSchema: false,
format: 'delta',
container: 'deltaContainer',
folderPath: 'deltaPath',
mergeSchema: false,
autoCompact: false,
optimizedWrite: false,
vacuum: 0,
deletable:false,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
pruneCondition:['part_col' -> ([5, 8])],
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> sink2
Delta будет считывать только 2 секции, где part_col == 5 и 8, из целевого разностного хранилища (вместо всех секций). part_col — это столбец, по которому секционированы целевые разностные данные. Ему не нужно присутствовать в исходных данных.
Варианты оптимизации приемника изменений
На вкладке "Параметры" вы найдете три дополнительных параметра для оптимизации преобразования разностного приемника.
Если включен параметр схемы слияния, он разрешает эволюцию схемы , т. е. любые столбцы, входящие в текущий поток, но не в целевой таблице Delta, автоматически добавляются в ее схему. Этот параметр поддерживается во всех методах обновления.
Если автоматическое сжатие включено, после отдельной операции записи преобразование проверяет возможность дополнительного сжатия файлов и запускает задание быстрой оптимизации (с размером файлов 128 МБ вместо 1 ГБ), чтобы еще больше сжать файлы для секций с наибольшим числом файлов малого размера. Автоматическое сжатие помогает объединить большое количество файлов малого размера в несколько крупных файлов. Автоматическое сжатие выполняется только при наличии не менее 50 файлов. После выполнения операции сжатия она создает новую версию таблицы и записывает новый файл с данными нескольких предыдущих файлов в сжатом виде.
Если включена оптимизация записи, преобразование приемника динамически оптимизирует размеры секций с учетом фактических данных, пытаясь записывать файлы размером 128 МБ для каждой секции таблицы. Это приблизительный размер, который может различаться в зависимости от характеристик набора данных. Оптимизированные операции записи повышают общую эффективность операций записи и последующих операций чтения. Он упорядочивает секции таким образом, чтобы повысить производительность последующих операций чтения.
Совет
Процесс оптимизированной записи замедлит все задание извлечения, преобразования и загрузки, так как приемник выдаст команду Spark Delta Lake Optimize после обработки данных. Рекомендуется не злоупотреблять оптимизированной записью. Например, если у вас есть почасовой конвейер данных, выполняйте поток данных с оптимизированной записью ежедневно.
Известные ограничения
При записи в приемник разностных данных существует известное ограничение, в котором количество строк, записанных, не будет отображаться в выходных данных мониторинга.
Связанный контент
- Создайте преобразование источника в потоке данных для сопоставления.
- Создайте преобразование приемника в потоке данных для сопоставления.
- Создайте преобразование изменения строки, чтобы пометить строки для операций вставки, обновления, upsert или удаления.