Разностный формат в Фабрике данных Azure
ОБЛАСТЬ ПРИМЕНЕНИЯ:Фабрика данных Azure
Azure Synapse Analytics
Совет
Опробуйте Фабрику данных в Microsoft Fabric, решение для аналитики "все в одном" для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных, аналитики в режиме реального времени, бизнес-аналитики и создания отчетов. Узнайте, как начать новую пробную версию бесплатно!
В этой статье объясняется, как копировать данные в озеро разностных данных и обратно при использовании в Azure Data Lake Store 2-го поколения или Хранилища BLOB-объектов Azure с помощью разностного формата. Этот соединитель доступен как встроенный набор данных в потоке сопоставления данных в качестве источника и приемника.
Свойства потока данных для сопоставления
Этот соединитель доступен как встроенный набор данных в потоке сопоставления данных в качестве источника и приемника.
Свойства источника
В таблице, приведенной ниже, указаны свойства, поддерживаемые источником разностных данных. Эти свойства можно изменить на вкладке Параметры источника.
Имя | Описание | Обязательно | Допустимые значения | Свойство сценария для потока данных |
---|---|---|---|---|
Формат | Формат должен быть delta |
да | delta |
format |
Файловая система | Контейнер/файловая система озера разностных данных | да | Строка | fileSystem |
Путь к папке | Прямой путь к озеру разностных данных | да | Строка | folderPath |
Тип сжатия | Тип сжатия таблицы разностных данных | Нет | bzip2 gzip deflate ZipDeflate snappy lz4 |
compressionType |
Уровень сжатия | Выберите приоритет: максимально быстрое сжатие или оптимальное сжатие. | Обязателен, если указан ключ compressedType . |
Optimal или Fastest |
compressionLevel |
Переход по времени | Выберите, следует ли запрашивать старый моментальный снимок таблицы разностных данных | Нет | Запрос по метке времени: Timestamp Запрос по версии: целое число |
timestampAsOf versionAsOf |
Разрешить ненайденные файлы | Если значение равно true, ошибка не возникает, если файлы не найдены | Нет | true или false |
ignoreNoFilesFound |
Импорт схемы
Разностные данные доступны только в качестве встроенного набора данных и по умолчанию не имеют связанной схемы. Чтобы получить метаданные столбца, нажмите кнопку Импорт схемы на вкладке Проекция . Это позволяет ссылаться на имена столбцов и типы данных, указанные в корпусе . Чтобы импортировать схему, сеанс отладки потока данных должен быть активным, и у вас должен быть существующий файл определения сущности CDM, на который следует указывать.
Пример сценария источника разностных данных
source(output(movieId as integer,
title as string,
releaseDate as date,
rated as boolean,
screenedOn as timestamp,
ticketPrice as decimal(10,2)
),
store: 'local',
format: 'delta',
versionAsOf: 0,
allowSchemaDrift: false,
folderPath: $tempPath + '/delta'
) ~> movies
Свойства приемника
В таблице, приведенной ниже, указаны свойства, поддерживаемые приемником разностных данных. Эти свойства можно изменить на вкладке Параметры.
Имя | Описание | Обязательно | Допустимые значения | Свойство сценария для потока данных |
---|---|---|---|---|
Формат | Формат должен быть delta |
да | delta |
format |
Файловая система | Контейнер/файловая система озера разностных данных | да | Строка | fileSystem |
Путь к папке | Прямой путь к озеру разностных данных | да | Строка | folderPath |
Тип сжатия | Тип сжатия таблицы разностных данных | Нет | bzip2 gzip deflate ZipDeflate snappy lz4 |
compressionType |
Уровень сжатия | Выберите приоритет: максимально быстрое сжатие или оптимальное сжатие. | Обязателен, если указан ключ compressedType . |
Optimal или Fastest |
compressionLevel |
Очистка | Удаляет файлы старше указанного периода, который больше не относится к текущей версии таблицы. Если указано значение 0 или меньше, операция очистки не выполняется. | да | Целочисленный тип | vacuum |
Действие таблицы | Сообщает ADF, что делать с целевой таблицей Delta в приемнике. Вы можете оставить все как есть и добавить новые строки, перезаписать существующее определение таблицы и данные новыми метаданными и данными или сохранить существующую структуру таблицы, но сначала усечь все строки, а затем вставить новые строки. | Нет | Ничего, Усечь, Перезаписать | deltaTruncate, overwrite |
Update - метод | При выборе параметра "Разрешить вставку" или записи в новую разностную таблицу целевой объект получает все входящие строки независимо от набора политик строк. Если данные содержат строки других политик строк, их необходимо исключить с помощью предыдущего преобразования фильтра. Если выбраны все методы Update, выполняется слияние, где строки вставляются, удаляются, upserted/обновляются в соответствии с политиками строк, установленными с помощью предыдущего преобразования Alter Row. |
да | true или false |
insertable deletable upsertable updateable |
Оптимизированная запись | Повышение пропускной способности для операции записи с помощью оптимизации внутреннего случайного перемешивания в исполнителях Spark. В результате разделов и файлов может быть меньше, но они могут иметь больший размер. | Нет | true или false |
optimizedWrite: true |
Автоматическое сжатие | После завершения операции записи Spark автоматически выполнит команду OPTIMIZE для реорганизации данных, в результате чего при необходимости появится больше разделов для повышения производительности чтения в будущем. |
Нет | true или false |
autoCompact: true |
Пример скрипта приемника разностных данных
Связанный сценарий потока данных:
moviesAltered sink(
input(movieId as integer,
title as string
),
mapColumn(
movieId,
title
),
insertable: true,
updateable: true,
deletable: true,
upsertable: false,
keys: ['movieId'],
store: 'local',
format: 'delta',
vacuum: 180,
folderPath: $tempPath + '/delta'
) ~> movieDB
Приемник изменений с удалением секций
С помощью этого варианта в пункте "Метод Update" выше (т. е. update/upsert/delete) можно ограничить количество проверяемых секций. Из целевого хранилища извлекается только секции, удовлетворяющие этому условию. Вы можете указать фиксированный набор значений, которые может принимать столбец секции.
Пример скрипта приемника изменений с удалением секций
Пример скрипта приведен ниже.
DerivedColumn1 sink(
input(movieId as integer,
title as string
),
allowSchemaDrift: true,
validateSchema: false,
format: 'delta',
container: 'deltaContainer',
folderPath: 'deltaPath',
mergeSchema: false,
autoCompact: false,
optimizedWrite: false,
vacuum: 0,
deletable:false,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
pruneCondition:['part_col' -> ([5, 8])],
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> sink2
Delta будет считывать только 2 секции, где part_col == 5 и 8, из целевого разностного хранилища (вместо всех секций). part_col — это столбец, по которому секционированы целевые разностные данные. Ему не нужно присутствовать в исходных данных.
Варианты оптимизации приемника изменений
На вкладке Параметры вы найдете еще три параметра для оптимизации преобразования приемника изменений.
Если параметр схемы Merge включен, он разрешает эволюцию схемы, т. е. все столбцы, которые присутствуют в текущем входящем потоке, но не находятся в целевой таблице Delta, автоматически добавляются в ее схему. Этот параметр поддерживается всеми методами обновления.
Если автоматическое сжатие включено, после отдельной операции записи преобразование проверяет возможность дополнительного сжатия файлов и запускает задание быстрой оптимизации (с размером файлов 128 МБ вместо 1 ГБ), чтобы еще больше сжать файлы для секций с наибольшим числом файлов малого размера. Автоматическое сжатие помогает объединить большое количество файлов малого размера в несколько крупных файлов. Автоматическое сжатие выполняется только при наличии не менее 50 файлов. После выполнения операции сжатия она создает новую версию таблицы и записывает новый файл с данными нескольких предыдущих файлов в сжатом виде.
Если включена оптимизация записи, преобразование приемника динамически оптимизирует размеры секций с учетом фактических данных, пытаясь записывать файлы размером 128 МБ для каждой секции таблицы. Это приблизительный размер, который может различаться в зависимости от характеристик набора данных. Оптимизированные операции записи повышают общую эффективность операций записи и последующих операций чтения. Он упорядочивает секции таким образом, чтобы повысить производительность последующих операций чтения.
Совет
Процесс оптимизированной записи замедлит все задание извлечения, преобразования и загрузки, так как приемник выдаст команду Spark Delta Lake Optimize после обработки данных. Рекомендуется не злоупотреблять оптимизированной записью. Например, если у вас есть почасовой конвейер данных, выполняйте поток данных с оптимизированной записью ежедневно.
Известные ограничения
При записи в разностный приемник существует известное ограничение, при котором количество записанных строк не отображается в выходных данных мониторинга.
Дальнейшие действия
- Создайте преобразование источника в потоке данных для сопоставления.
- Создайте преобразование приемника в потоке данных для сопоставления.
- Создайте преобразование изменения строки, чтобы пометить строки для операций вставки, обновления, upsert или удаления.