Скрипт потока данных (DFS)

Область применения:Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

Потоки данных доступны в конвейерах как Фабрики данных Azure, так и Azure Synapse. Эта статья относится к потокам данных для сопоставления. Если вы не знакомы с преобразованиями, см. вводную статью Преобразование данных с помощью потока данных для сопоставления.

Скрипт потока данных (DFS) представляет собой базовые метаданные, аналогичные языку программирования. Он используется для выполнения преобразований, включенных в поток данных для сопоставления. Каждое преобразование представлено рядом свойств, которые предоставляют необходимые сведения для правильного выполнения задания. Чтобы увидеть скрипт в Фабрике данных Azure и получить к нему доступ для редактирования, нажмите кнопку "Скрипт" на верхней ленте пользовательского интерфейса браузера.

Script button

Например, allowSchemaDrift: true, в преобразовании источника указывает службе включать все столбцы из исходного набора данных в поток данных, даже если они не включены в проекцию схемы.

Случаи использования

DFS автоматически создается интерфейсом пользователя. Кнопка “Скрипт” позволяет просмотреть и настроить скрипт. Кроме того, можно создавать скрипты за пределами пользовательского интерфейса ADF, а затем передавать их в командлет PowerShell. При отладке сложных потоков данных может оказаться проще проверять код программной части скрипта, а не представление графа пользовательского интерфейса для потоков.

Ниже приведено несколько примеров использования.

  • Программное создание множества похожих потоков данных (“штамповка” потоков данных).
  • Сложные выражения, которыми трудно управлять в пользовательском интерфейсе или которые могут приводить к проблемам проверки.
  • Отладка и более эффективное понимание различных ошибок, возвращаемых во время выполнения.

При создании скрипта потока данных для использования с PowerShell или API необходимо свернуть форматированный текст в одну строку. Символы табуляции и новой строки можно сохранить в виде escape-символов. Но текст должен быть отформатирован в соответствии со свойством JSON. В нижней части пользовательского интерфейса редактора скриптов есть кнопка, которая будет форматировать скрипт как одну строку.

Copy button

Добавление преобразований

Чтобы добавить преобразования, нужно выполнить три основных шага: добавить основные данные преобразования, перенаправить входной поток и перенаправить выходной поток. Проще всего это увидеть на примере. Начнем с простого источника для потока данных приемника, как в следующем примере:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Если мы решим добавить производное преобразование, сначала необходимо создать основной текст преобразования с простым выражением, чтобы добавить новый столбец в верхнем регистре под названием upperCaseTitle:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

Затем мы берем существующий DFS и добавляем преобразование:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Теперь перенаправляем входящий поток, определив преобразование, за которым должно следовать новое преобразование (в данном случае source1), и скопировав имя потока в новое преобразование:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Наконец, мы определяем преобразование, которое должно следовать за новым преобразованием, и заменяем его входной поток (в данном случае sink1 ) именем выходного потока нашего нового преобразования:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Основы DFS

DFS состоит из ряда связанных преобразований, включая источники, приемники и различные другие элементы, которые могут добавлять новые столбцы, фильтровать и объединять данные и многое другое. Обычно скрипт начинается с одного или нескольких источников, за которым следует много преобразований, и заканчивается одним или несколькими приемниками.

Все источники имеют одинаковую базовую конструкцию:

source(
  source properties
) ~> source_name

Например, простой источник с тремя столбцами (movieId, title, genres) будет выглядеть следующим образом:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

Все преобразования, отличные от источников, имеют одну и ту же базовую конструкцию:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

Например, простое производное преобразование, которое принимает столбец (title) и перезаписывает его версией в верхнем регистре, будет выглядеть следующим образом:

source1 derive(
  title = upper(title)
) ~> derive1

А приемник без схемы будет выглядеть так:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Фрагменты скриптов

Фрагменты скриптов — это доступный для общего использования код скрипта потока данных, который можно предоставлять в потоках данных. В приведенном ниже видео рассказывается о том, как использовать фрагменты скриптов и как копировать и вставлять части скрипта, определяющие графы потока данных, с помощью скрипта потока данных.

Агрегатная сводная статистика

Добавьте преобразование “Статистическая обработка” в поток данных с именем SummaryStats, а затем вставьте в скрипт приведенный ниже код для агрегатной функции, заменив существующий код SummaryStats. Это обеспечит общий шаблон для сводной статистики профиля данных.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

Приведенный ниже пример также можно использовать для подсчета числа уникальных и различающихся строк в данных. Приведенный ниже пример можно вставить в поток данных с преобразованием “Статистическая обработка” с именем ValueDistAgg. В этом примере используется столбец с именем title. Обязательно замените title строковым столбцом в данных, которые вы хотите использовать для получения счетчиков значений.

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

Включение всех столбцов в статистическое выражение

Этот универсальный шаблон статистического выражения показывает, как сохранить оставшиеся столбцы в выходных метаданных при построении статистических выражений. В этом случае мы используем функцию first(), чтобы выбрать первое значение в каждом столбце с именем, отличным от movie. Чтобы использовать эту функцию, создайте преобразование “Статистическая обработка” с именем DistinctRows и вставьте его в скрипт поверх существующего сценария статистического выражения DistinctRows.

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

Создание отпечатка хэша строки

Используйте этот код в скрипте потока данных, чтобы создать новый производный столбец с именем DWhash, который создает хэш sha1 трех столбцов.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

Этот скрипт также позволяет создать хэш строки, используя все столбцы, имеющиеся в потоке, при этом не требуется присваивать имя каждому столбцу.

derive(DWhash = sha1(columns())) ~> DWHash

Эквивалент String_agg

Этот код будет действовать как функция string_agg() T-SQL и будет объединять строковые значения в массив. Затем этот массив можно привести к строке для использования с назначениями SQL.

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

Число обновлений, операций upsert, вставок, удалений

При использовании преобразования “Изменение строк” может потребоваться подсчитать количество обновлений, операций upsert, вставок и удалений, выполненных в результате применения политик изменения строк. Добавьте преобразование “Статистическая обработка” после преобразования “Изменение строк” и вставьте этот скрипт потока данных в определение статистического выражения для этих счетчиков.

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

Отличающаяся строка с учетом всех столбцов

Этот фрагмент кода добавит в поток данных новое преобразование "Статистическая обработка", которое будет принимать все входящие столбцы, формировать хэш, используемый для группировки, чтобы исключить дубликаты, а затем предоставлять первое вхождение каждого дубликата в качестве выходных данных. Столбцам не нужно явно присваивать имена, они будут автоматически создаваться на основе входящего потока данных.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

Проверка наличия значений NULL во всех столбцах

Этот фрагмент кода можно вставить в поток данных для общей проверки всех столбцов на наличие значений NULL. Этот метод использует смещение схемы, чтобы просмотреть все столбцы во всех строках и с помощью Условного разбиения отделить строки со значениями NULL от остальных строк.

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

Смещение схемы автоматического сопоставления с помощью инструкции select

Если необходимо загрузить существующую схему базы данных из неизвестного или динамического набора входящих столбцов, необходимо сопоставить крайние правые столбцы в преобразовании приемника. Это необходимо только при загрузке существующей таблицы. Добавьте этот фрагмент кода перед приемником, чтобы создать инструкцию Select, которая будет автоматически сопоставлять столбцы. Оставьте сопоставление приемника для автоматического сопоставления.

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

Сохранение типов данных столбцов

Добавьте этот скрипт в определение производного столбца, чтобы сохранить имена столбцов и типы данных из потока данных в постоянном хранилище с помощью приемника.

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

Заполнение по направлению вниз

Здесь показано, как реализовать распространенную задачу "заполнение вниз" с наборами данных, если требуется заменить значения NULL в последовательности предыдущим значением, отличным от NULL. Обратите внимание, что эта операция может отрицательно сказаться на производительности, поскольку необходимо создать искусственное окно во всем наборе данных с фиктивным значением категории. Кроме того, необходимо выполнить сортировку по значению, чтобы создать правильную последовательность данных для поиска предыдущего значения, отличного от NULL. Следующий фрагмент кода создает искусственную категорию как фиктивную и сортирует ее по суррогатному ключу. Вы можете удалить суррогатный ключ и использовать собственный ключ сортировки, относящийся к данным. В этом фрагменте кода предполагается, что вы уже добавили преобразование источника с именем source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

Скользящее среднее

Скользящее среднее можно легко реализовать в потоках данных с помощью преобразования окон. В следующем примере создается 15-дневное скользящее среднее для цен на акции для Майкрософт.

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

Число различных значений столбцов

Этот скрипт можно использовать для определения ключевых столбцов и просмотра количества элементов для всех столбцов в потоке с помощью одного фрагмента сценария. Добавьте этот скрипт в поток данных как преобразование статистической обработки, и он будет автоматически выдавать количество разных значений для всех столбцов.

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

Сравнение значений предыдущей или следующей строки

Этот фрагмент кода демонстрирует, как преобразование "Окно" можно использовать для сравнения значений столбцов из текущего контекста строки со значениями столбцов из строк, расположенных до и после текущей строки. В этом примере производный столбец используется для создания фиктивного значения, чтобы охватить секцию окна из всего набора данных. Преобразование "Суррогатный ключ" используется для назначения уникального значения ключа каждой строке. При применении этого шаблона к преобразованиям данных можно удалить суррогатный ключ, если нужно использовать упорядочение по имеющемуся столбцу, а также удалить производный столбец, если у вас есть столбцы, по которым можно выполнить секционирование данных.

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

Сколько столбцов в моих данных?

size(array(columns()))

Проанализируйте потоки данных. Для начала ознакомьтесь со статьей Общие сведения о потоках данных