Condividi tramite


Script del flusso di dati (DFS)

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Provare Data Factory in Microsoft Fabric, una soluzione di analisi completa per le aziende. Microsoft Fabric copre tutti gli elementi, dallo spostamento dei dati all'analisi scientifica dei dati, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Scopri come avviare gratuitamente una nuova versione di valutazione .

I flussi di dati sono disponibili sia in Azure Data Factory che in Azure Synapse Pipelines. Questo articolo si applica ai flussi di dati di mapping. Se non si ha esperienza con le trasformazioni, vedere l'articolo introduttivo Trasformare i dati usando un flusso di dati di mapping.

Lo script del flusso di dati corrisponde ai metadati sottostanti, analogamente a un linguaggio di codifica, usati per eseguire le trasformazioni incluse in un flusso di dati per mapping. Ogni trasformazione è rappresentata da una serie di proprietà che forniscono le informazioni necessarie per eseguire correttamente il processo. Lo script è visibile e modificabile da Azure Data Factory facendo clic sul pulsante "Script" sulla barra multifunzione superiore dell'interfaccia utente del browser.

Script button

Ad esempio, allowSchemaDrift: true, in una trasformazione di origine indica al servizio di includere tutte le colonne del set di dati di origine nel flusso di dati anche se non sono incluse nella proiezione dello schema.

Utilizzare casi

Il file DFS viene generato automaticamente dall'interfaccia utente. È possibile fare clic sul pulsante Script per visualizzare e personalizzare lo script. È anche possibile generare script all'esterno dell'interfaccia utente di Azure Data Factory e quindi passarli al cmdlet di PowerShell. Quando si esegue il debug di flussi di dati complessi, è possibile che sia più semplice analizzare il code-behind dello script anziché analizzare la rappresentazione del grafico dell'interfaccia utente dei flussi.

Ecco alcuni casi d'uso di esempio:

  • Produrre a livello di codice molti flussi di dati abbastanza simili, ad esempio flussi di dati di tipo "stamping-out".
  • Espressioni complesse difficili da gestire nell'interfaccia utente o che causano problemi di convalida.
  • Debug e migliore comprensione dei vari errori restituiti durante l'esecuzione.

Quando si compila uno script del flusso di dati da usare con PowerShell o un'API, è necessario comprimere il testo formattato in una singola riga. È possibile mantenere tabulazioni e nuove righe come caratteri di escape. Ma il testo deve essere formattato per adattarsi all'interno di una proprietà JSON. C'è un pulsante nell'interfaccia utente dell'editor di script nella parte inferiore che formatta lo script come una singola riga.

Copy button

Come aggiungere trasformazioni

L'aggiunta di trasformazioni richiede tre passaggi di base: aggiungere i dati di trasformazione principali, reindirizzare il flusso di input e quindi reindirizzare il flusso di output. Questo può essere visto più semplice in un esempio. Si supponga di iniziare con un'origine semplice per il flusso di dati sink, come illustrato di seguito:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Se si decide di aggiungere una trasformazione deriva, è prima necessario creare il testo della trasformazione principale, che include un'espressione semplice per aggiungere una nuova colonna maiuscola denominata upperCaseTitle:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

Si prende quindi il file DFS esistente e si aggiunge la trasformazione:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Ora si reindirizza il flusso in ingresso identificando la trasformazione che si vuole che la nuova trasformazione venga eseguita dopo (in questo caso , source1) e copiando il nome del flusso nella nuova trasformazione:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Infine, si identifica la trasformazione che si vuole ottenere dopo questa nuova trasformazione e si sostituisce il flusso di input (in questo caso , sink1) con il nome del flusso di output della nuova trasformazione:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Nozioni fondamentali su DFS

DFS è costituito da una serie di trasformazioni connesse, tra cui origini, sink e varie altre che possono aggiungere nuove colonne, filtrare i dati, unire dati e molto altro ancora. In genere, lo script inizierà con una o più origini seguite da molte trasformazioni e terminando con uno o più sink.

Le origini hanno tutte la stessa costruzione di base:

source(
  source properties
) ~> source_name

Ad esempio, un'origine semplice con tre colonne (movieId, titolo, generi) sarà:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

Tutte le trasformazioni diverse dalle origini hanno la stessa costruzione di base:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

Ad esempio, una semplice trasformazione deriva che accetta una colonna (titolo) e la sovrascrive con una versione maiuscola sarà la seguente:

source1 derive(
  title = upper(title)
) ~> derive1

E un sink senza schema sarà:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Frammenti di script

I frammenti di script sono codice condivisibile di Flusso di dati Script che è possibile usare per condividere tra flussi di dati. Questo video seguente illustra come usare frammenti di script e usare Flusso di dati Script per copiare e incollare parti dello script dietro i grafici del flusso di dati:

Statistiche di riepilogo aggregate

Aggiungere una trasformazione Aggregate al flusso di dati denominato "SummaryStats" e quindi incollare questo codice seguente per la funzione di aggregazione nello script, sostituendo summaryStats esistente. Verrà fornito un modello generico per le statistiche di riepilogo del profilo dati.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

È anche possibile usare l'esempio seguente per contare il numero di righe univoce e il numero di righe distinte nei dati. L'esempio seguente può essere incollato in un flusso di dati con la trasformazione Aggregate denominata ValueDistAgg. In questo esempio viene utilizzata una colonna denominata "title". Assicurarsi di sostituire "title" con la colonna stringa nei dati da usare per ottenere i conteggi dei valori.

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

Includere tutte le colonne in un'aggregazione

Si tratta di un modello di aggregazione generico che illustra come mantenere le colonne rimanenti nei metadati di output durante la compilazione di aggregazioni. In questo caso, usiamo la first() funzione per scegliere il primo valore in ogni colonna il cui nome non è "movie". A tale scopo, creare una trasformazione Aggregate denominata DistinctRows e incollarla nello script sopra lo script di aggregazione DistinctRows esistente.

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

Creare un'impronta digitale hash di riga

Usare questo codice nello script del flusso di dati per creare una nuova colonna derivata denominata DWhash che produce un sha1 hash di tre colonne.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

È anche possibile usare questo script seguente per generare un hash di riga usando tutte le colonne presenti nel flusso, senza dover assegnare un nome a ogni colonna:

derive(DWhash = sha1(columns())) ~> DWHash

String_agg equivalente

Questo codice funzionerà come la funzione T-SQL string_agg() e aggrega i valori stringa in una matrice. È quindi possibile eseguire il cast di tale matrice in una stringa da usare con le destinazioni SQL.

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

Numero di aggiornamenti, upsert, inserimenti, eliminazioni

Quando si usa una trasformazione Alter Row, è possibile contare il numero di aggiornamenti, upsert, inserimenti ed eliminazioni risultanti dai criteri Alter Row. Aggiungere una trasformazione Aggregazione dopo la modifica della riga e incollare questo Flusso di dati Script nella definizione di aggregazione per tali conteggi.

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

Riga distinta con tutte le colonne

Questo frammento di codice aggiungerà una nuova trasformazione Aggregazione al flusso di dati, che accetta tutte le colonne in ingresso, genera un hash usato per il raggruppamento per eliminare i duplicati, quindi fornirà la prima occorrenza di ogni duplicato come output. Non è necessario denominare in modo esplicito le colonne, che verranno generate automaticamente dal flusso di dati in ingresso.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

Verificare la presenza di valori NULL in tutte le colonne

Si tratta di un frammento di codice che è possibile incollare nel flusso di dati per controllare in modo generico tutte le colonne per i valori NULL. Questa tecnica sfrutta la deriva dello schema per esaminare tutte le colonne di tutte le righe e usa una suddivisione condizionale per separare le righe con valori NULL dalle righe senza valori NULL.

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

Mappa automatica della deriva dello schema con una selezione

Quando è necessario caricare uno schema di database esistente da un set sconosciuto o dinamico di colonne in ingresso, è necessario eseguire il mapping delle colonne sul lato destro nella trasformazione Sink. Questa operazione è necessaria solo quando si carica una tabella esistente. Aggiungere questo frammento di codice prima del sink per creare un controllo Select che esegue il mapping automatico delle colonne. Lasciare il mapping del sink alla mappa automatica.

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

Rendere persistenti i tipi di dati delle colonne

Aggiungere questo script all'interno di una definizione colonna derivata per archiviare i nomi di colonna e i tipi di dati dal flusso di dati a un archivio permanente usando un sink.

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

Ricopia in basso

Ecco come implementare il problema comune "Fill Down" con i set di dati quando si desidera sostituire i valori NULL con il valore del valore precedente non NULL nella sequenza. Si noti che questa operazione può avere implicazioni negative sulle prestazioni perché è necessario creare una finestra sintetica nell'intero set di dati con un valore di categoria "fittizio". Inoltre, è necessario ordinare in base a un valore per creare la sequenza di dati appropriata per trovare il valore non NULL precedente. Questo frammento di codice seguente crea la categoria sintetica come "fittizia" e ordina in base a una chiave surrogata. È possibile rimuovere la chiave surrogata e usare la propria chiave di ordinamento specifica dei dati. Questo frammento di codice presuppone che sia già stata aggiunta una trasformazione Source denominata source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

Media mobile

La media mobile può essere implementata molto facilmente nei flussi di dati usando una trasformazione Di Windows. Questo esempio seguente crea una media mobile di 15 giorni dei prezzi azionari per Microsoft.

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

Conteggio distinct di tutti i valori di colonna

È possibile usare questo script per identificare le colonne chiave e visualizzare la cardinalità di tutte le colonne del flusso con un singolo frammento di script. Aggiungere questo script come trasformazione di aggregazione al flusso di dati e fornirà automaticamente conteggi distinti di tutte le colonne.

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

Confrontare i valori di riga precedenti o successivi

Questo frammento di codice di esempio illustra come usare la trasformazione Finestra per confrontare i valori di colonna dal contesto di riga corrente con i valori di colonna delle righe prima e dopo la riga corrente. In questo esempio viene usata una colonna derivata per generare un valore fittizio per abilitare una partizione di finestra nell'intero set di dati. Una trasformazione Chiave surrogata viene usata per assegnare un valore di chiave univoco per ogni riga. Quando si applica questo modello alle trasformazioni dei dati, è possibile rimuovere la chiave surrogata se si è una colonna da ordinare ed è possibile rimuovere la colonna derivata se si dispone di colonne da usare per partizionare i dati.

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

Quante colonne sono presenti nei dati?

size(array(columns()))

Esplorare le Flusso di dati a partire dall'articolo panoramica dei flussi di dati