Script de fluxo de dados (DFS)

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Os fluxos de dados estão disponíveis no Azure Data Factory e no Azure Synapse Pipelines. Este artigo aplica-se ao mapeamento de fluxos de dados. Se você é novo em transformações, consulte o artigo introdutório Transformar dados usando um fluxo de dados de mapeamento.

Script de fluxo de dados (DFS) são os metadados subjacentes, semelhantes a uma linguagem de codificação, que são usados para executar as transformações incluídas em um fluxo de dados de mapeamento. Cada transformação é representada por uma série de propriedades que fornecem as informações necessárias para executar o trabalho corretamente. O script é visível e editável a partir do ADF clicando no botão "script" na faixa de opções superior da interface do usuário do navegador.

Script button

Por exemplo, em uma transformação de origem diz ao serviço para incluir todas as colunas do conjunto de dados de origem no fluxo de dados, allowSchemaDrift: true, mesmo que elas não estejam incluídas na projeção de esquema.

Casos de utilização

O DFS é produzido automaticamente pela interface do utilizador. Você pode clicar no botão Script para exibir e personalizar o script. Você também pode gerar scripts fora da interface do usuário do ADF e, em seguida, passá-los para o cmdlet do PowerShell. Ao depurar fluxos de dados complexos, você pode achar mais fácil verificar o code-behind do script em vez de verificar a representação gráfica da interface do usuário de seus fluxos.

Aqui estão alguns exemplos de casos de uso:

  • Produzindo programaticamente muitos fluxos de dados que são bastante semelhantes, ou seja, fluxos de dados "stamping-out".
  • Expressões complexas que são difíceis de gerenciar na interface do usuário ou estão resultando em problemas de validação.
  • Depuração e melhor compreensão de vários erros retornados durante a execução.

Ao criar um script de fluxo de dados para usar com o PowerShell ou uma API, você deve recolher o texto formatado em uma única linha. Você pode manter guias e novas linhas como caracteres de escape. Mas o texto deve ser formatado para caber dentro de uma propriedade JSON. Há um botão na interface do usuário do editor de scripts na parte inferior que formatará o script como uma única linha para você.

Copy button

Como adicionar transformações

Adicionar transformações requer três etapas básicas: adicionar os dados de transformação principais, redirecionar o fluxo de entrada e, em seguida, redirecionar o fluxo de saída. Isso pode ser visto mais facilmente em um exemplo. Digamos que começamos com uma fonte simples para coletar o fluxo de dados como o seguinte:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Se decidirmos adicionar uma transformação derivada, primeiro precisamos criar o texto da transformação principal, que tem uma expressão simples para adicionar uma nova coluna maiúscula chamada upperCaseTitle:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

Em seguida, pegamos o DFS existente e adicionamos a transformação:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

E agora redirecionamos o fluxo de entrada identificando qual transformação queremos que a nova transformação venha depois (neste caso, source1) e copiando o nome do fluxo para a nova transformação:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Finalmente, identificamos a transformação que queremos que venha após essa nova transformação e substituímos seu fluxo de entrada (neste caso, sink1) pelo nome do fluxo de saída de nossa nova transformação:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Fundamentos do DFS

O DFS é composto por uma série de transformações conectadas, incluindo fontes, coletores e vários outros que podem adicionar novas colunas, filtrar dados, unir dados e muito mais. Normalmente, o script começará com uma ou mais fontes, seguido por muitas transformações e terminará com um ou mais coletores.

Todas as fontes têm a mesma construção básica:

source(
  source properties
) ~> source_name

Por exemplo, uma fonte simples com três colunas (movieId, título, gêneros) seria:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

Todas as transformações, exceto fontes, têm a mesma construção básica:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

Por exemplo, uma transformação derivada simples que pega uma coluna (título) e a substitui por uma versão maiúscula seria a seguinte:

source1 derive(
  title = upper(title)
) ~> derive1

E uma pia sem esquema seria:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Trechos de script

Trechos de script são códigos compartilháveis do Script de Fluxo de Dados que você pode usar para compartilhar entre fluxos de dados. Este vídeo abaixo fala sobre como usar trechos de script e utilizar o Data Flow Script para copiar e colar partes do script por trás de seus gráficos de fluxo de dados:

Estatísticas resumidas agregadas

Adicione uma transformação Aggregate ao seu fluxo de dados chamada "SummaryStats" e cole neste código abaixo para a função de agregação em seu script, substituindo SummaryStats existente. Isso fornecerá um padrão genérico para estatísticas de resumo de perfil de dados.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

Você também pode usar o exemplo abaixo para contar o número de linhas exclusivas e distintas em seus dados. O exemplo abaixo pode ser colado em um fluxo de dados com transformação agregada chamada ValueDistAgg. Este exemplo usa uma coluna chamada "title". Certifique-se de substituir "title" pela coluna string em seus dados que você deseja usar para obter contagens de valor.

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

Incluir todas as colunas em uma agregação

Este é um padrão de agregação genérico que demonstra como você pode manter as colunas restantes em seus metadados de saída quando estiver criando agregações. Neste caso, usamos a first() função para escolher o primeiro valor em cada coluna cujo nome não é "filme". Para usar isso, crie uma transformação Aggregate chamada DistinctRows e cole-a em seu script sobre o script de agregação DistinctRows existente.

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

Criar uma impressão digital do hash de linha

Utilize este código no script do fluxo de dados para criar uma nova coluna derivada chamada DWhash que produz um hash sha1 de três colunas.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

Também pode utilizar o script abaixo para gerar um hash de linha com todas as colunas que estão presentes no fluxo, sem necessidade de dar um nome a cada coluna:

derive(DWhash = sha1(columns())) ~> DWHash

String_agg equivalente

Esse código agirá como a função T-SQL string_agg() e agregará valores de cadeia de caracteres em uma matriz. Em seguida, você pode converter essa matriz em uma cadeia de caracteres para usar com destinos SQL.

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

Contar o número de atualizações, upserts, inserções, exclusões

Ao usar uma transformação Alter Row, convém contar o número de atualizações, upserts, inserções, exclusões resultantes de suas políticas Alter Row. Adicione uma transformação Agregada após a linha de alteração e cole este Script de Fluxo de Dados na definição de agregação para essas contagens.

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

Linha distinta usando todas as colunas

Esse trecho adicionará uma nova transformação Agregada ao seu fluxo de dados, que levará todas as colunas de entrada, gerará um hash que é usado para agrupamento para eliminar duplicatas e, em seguida, fornecerá a primeira ocorrência de cada duplicata como saída. Você não precisa nomear explicitamente as colunas, elas serão geradas automaticamente a partir do seu fluxo de dados de entrada.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

Verifique se há NULLs em todas as colunas

Este é um trecho que você pode colar em seu fluxo de dados para verificar genericamente todas as suas colunas para valores NULL. Essa técnica aproveita o desvio de esquema para examinar todas as colunas em todas as linhas e usa uma Divisão Condicional para separar as linhas com NULLs das linhas sem NULLs.

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

Desvio do esquema do AutoMap com uma seleção

Quando precisar carregar um esquema de banco de dados existente a partir de um conjunto desconhecido ou dinâmico de colunas de entrada, você deverá mapear as colunas do lado direito na transformação Coletor. Isso só é necessário quando você está carregando uma tabela existente. Adicione este trecho antes do coletor para criar um Selecione que mapeie automaticamente suas colunas. Deixe o mapeamento do coletor para o mapeamento automático.

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

Persistir tipos de dados de coluna

Adicione esse script dentro de uma definição de Coluna Derivada para armazenar os nomes de coluna e os tipos de dados do seu fluxo de dados para um repositório persistente usando um coletor.

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

Preencher

Veja como implementar o problema comum "Fill Down" com conjuntos de dados quando você deseja substituir valores NULL pelo valor do valor não NULL anterior na sequência. Observe que essa operação pode ter implicações negativas de desempenho porque você deve criar uma janela sintética em todo o conjunto de dados com um valor de categoria "fictício". Além disso, você deve classificar por um valor para criar a sequência de dados adequada para localizar o valor não NULL anterior. Este trecho abaixo cria a categoria sintética como "manequim" e classifica por uma chave substituta. Você pode remover a chave substituta e usar sua própria chave de classificação específica de dados. Este trecho de código pressupõe que você já adicionou uma transformação de origem chamada source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

Média móvel

A média móvel pode ser implementada muito facilmente em fluxos de dados usando uma transformação do Windows. Este exemplo abaixo cria uma média móvel de 15 dias dos preços das ações da Microsoft.

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

Contagem distinta de todos os valores de coluna

Você pode usar esse script para identificar colunas-chave e exibir a cardinalidade de todas as colunas em seu fluxo com um único trecho de script. Adicione esse script como uma transformação agregada ao seu fluxo de dados e ele fornecerá automaticamente contagens distintas de todas as colunas.

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

Comparar valores de linha anteriores ou seguintes

Este trecho de exemplo demonstra como a transformação Window pode ser usada para comparar valores de coluna do contexto de linha atual com valores de coluna de linhas antes e depois da linha atual. Neste exemplo, uma Coluna Derivada é usada para gerar um valor fictício para habilitar uma partição de janela em todo o conjunto de dados. Uma transformação de chave substituta é usada para atribuir um valor de chave exclusivo para cada linha. Ao aplicar esse padrão às transformações de dados, você pode remover a chave substituta se for uma coluna pela qual deseja ordenar e pode remover a coluna derivada se tiver colunas para usar para particionar seus dados.

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

Quantas colunas existem nos meus dados?

size(array(columns()))

Explore os fluxos de dados começando com o artigo de visão geral dos fluxos de dados