Script del flusso di dati (DFS)
SI APPLICA A: Azure Data Factory Azure Synapse Analytics
Suggerimento
Provare Data Factory in Microsoft Fabric, una soluzione di analisi completa per le aziende. Microsoft Fabric copre tutti gli elementi, dallo spostamento dei dati all'analisi scientifica dei dati, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Scopri come avviare gratuitamente una nuova versione di valutazione .
I flussi di dati sono disponibili sia in Azure Data Factory che in Azure Synapse Pipelines. Questo articolo si applica ai flussi di dati di mapping. Se non si ha esperienza con le trasformazioni, vedere l'articolo introduttivo Trasformare i dati usando un flusso di dati di mapping.
Lo script del flusso di dati corrisponde ai metadati sottostanti, analogamente a un linguaggio di codifica, usati per eseguire le trasformazioni incluse in un flusso di dati per mapping. Ogni trasformazione è rappresentata da una serie di proprietà che forniscono le informazioni necessarie per eseguire correttamente il processo. Lo script è visibile e modificabile da Azure Data Factory facendo clic sul pulsante "Script" sulla barra multifunzione superiore dell'interfaccia utente del browser.
Ad esempio, allowSchemaDrift: true,
in una trasformazione di origine indica al servizio di includere tutte le colonne del set di dati di origine nel flusso di dati anche se non sono incluse nella proiezione dello schema.
Utilizzare casi
Il file DFS viene generato automaticamente dall'interfaccia utente. È possibile fare clic sul pulsante Script per visualizzare e personalizzare lo script. È anche possibile generare script all'esterno dell'interfaccia utente di Azure Data Factory e quindi passarli al cmdlet di PowerShell. Quando si esegue il debug di flussi di dati complessi, è possibile che sia più semplice analizzare il code-behind dello script anziché analizzare la rappresentazione del grafico dell'interfaccia utente dei flussi.
Ecco alcuni casi d'uso di esempio:
- Produrre a livello di codice molti flussi di dati abbastanza simili, ad esempio flussi di dati di tipo "stamping-out".
- Espressioni complesse difficili da gestire nell'interfaccia utente o che causano problemi di convalida.
- Debug e migliore comprensione dei vari errori restituiti durante l'esecuzione.
Quando si compila uno script del flusso di dati da usare con PowerShell o un'API, è necessario comprimere il testo formattato in una singola riga. È possibile mantenere tabulazioni e nuove righe come caratteri di escape. Ma il testo deve essere formattato per adattarsi all'interno di una proprietà JSON. C'è un pulsante nell'interfaccia utente dell'editor di script nella parte inferiore che formatta lo script come una singola riga.
Come aggiungere trasformazioni
L'aggiunta di trasformazioni richiede tre passaggi di base: aggiungere i dati di trasformazione principali, reindirizzare il flusso di input e quindi reindirizzare il flusso di output. Questo può essere visto più semplice in un esempio. Si supponga di iniziare con un'origine semplice per il flusso di dati sink, come illustrato di seguito:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Se si decide di aggiungere una trasformazione deriva, è prima necessario creare il testo della trasformazione principale, che include un'espressione semplice per aggiungere una nuova colonna maiuscola denominata upperCaseTitle
:
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
Si prende quindi il file DFS esistente e si aggiunge la trasformazione:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Ora si reindirizza il flusso in ingresso identificando la trasformazione che si vuole che la nuova trasformazione venga eseguita dopo (in questo caso , source1
) e copiando il nome del flusso nella nuova trasformazione:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Infine, si identifica la trasformazione che si vuole ottenere dopo questa nuova trasformazione e si sostituisce il flusso di input (in questo caso , sink1
) con il nome del flusso di output della nuova trasformazione:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Nozioni fondamentali su DFS
DFS è costituito da una serie di trasformazioni connesse, tra cui origini, sink e varie altre che possono aggiungere nuove colonne, filtrare i dati, unire dati e molto altro ancora. In genere, lo script inizierà con una o più origini seguite da molte trasformazioni e terminando con uno o più sink.
Le origini hanno tutte la stessa costruzione di base:
source(
source properties
) ~> source_name
Ad esempio, un'origine semplice con tre colonne (movieId, titolo, generi) sarà:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
Tutte le trasformazioni diverse dalle origini hanno la stessa costruzione di base:
name_of_incoming_stream transformation_type(
properties
) ~> new_stream_name
Ad esempio, una semplice trasformazione deriva che accetta una colonna (titolo) e la sovrascrive con una versione maiuscola sarà la seguente:
source1 derive(
title = upper(title)
) ~> derive1
E un sink senza schema sarà:
derive1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Frammenti di script
I frammenti di script sono codice condivisibile di Flusso di dati Script che è possibile usare per condividere tra flussi di dati. Questo video seguente illustra come usare frammenti di script e usare Flusso di dati Script per copiare e incollare parti dello script dietro i grafici del flusso di dati:
Statistiche di riepilogo aggregate
Aggiungere una trasformazione Aggregate al flusso di dati denominato "SummaryStats" e quindi incollare questo codice seguente per la funzione di aggregazione nello script, sostituendo summaryStats esistente. Verrà fornito un modello generico per le statistiche di riepilogo del profilo dati.
aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats
È anche possibile usare l'esempio seguente per contare il numero di righe univoce e il numero di righe distinte nei dati. L'esempio seguente può essere incollato in un flusso di dati con la trasformazione Aggregate denominata ValueDistAgg. In questo esempio viene utilizzata una colonna denominata "title". Assicurarsi di sostituire "title" con la colonna stringa nei dati da usare per ottenere i conteggi dei valori.
aggregate(groupBy(title),
countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
numofdistinct = countDistinct(title)) ~> UniqDist
Includere tutte le colonne in un'aggregazione
Si tratta di un modello di aggregazione generico che illustra come mantenere le colonne rimanenti nei metadati di output durante la compilazione di aggregazioni. In questo caso, usiamo la first()
funzione per scegliere il primo valore in ogni colonna il cui nome non è "movie". A tale scopo, creare una trasformazione Aggregate denominata DistinctRows e incollarla nello script sopra lo script di aggregazione DistinctRows esistente.
aggregate(groupBy(movie),
each(match(name!='movie'), $$ = first($$))) ~> DistinctRows
Creare un'impronta digitale hash di riga
Usare questo codice nello script del flusso di dati per creare una nuova colonna derivata denominata DWhash
che produce un sha1
hash di tre colonne.
derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash
È anche possibile usare questo script seguente per generare un hash di riga usando tutte le colonne presenti nel flusso, senza dover assegnare un nome a ogni colonna:
derive(DWhash = sha1(columns())) ~> DWHash
String_agg equivalente
Questo codice funzionerà come la funzione T-SQL string_agg()
e aggrega i valori stringa in una matrice. È quindi possibile eseguire il cast di tale matrice in una stringa da usare con le destinazioni SQL.
source1 aggregate(groupBy(year),
string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg
Numero di aggiornamenti, upsert, inserimenti, eliminazioni
Quando si usa una trasformazione Alter Row, è possibile contare il numero di aggiornamenti, upsert, inserimenti ed eliminazioni risultanti dai criteri Alter Row. Aggiungere una trasformazione Aggregazione dopo la modifica della riga e incollare questo Flusso di dati Script nella definizione di aggregazione per tali conteggi.
aggregate(updates = countIf(isUpdate(), 1),
inserts = countIf(isInsert(), 1),
upserts = countIf(isUpsert(), 1),
deletes = countIf(isDelete(),1)) ~> RowCount
Riga distinta con tutte le colonne
Questo frammento di codice aggiungerà una nuova trasformazione Aggregazione al flusso di dati, che accetta tutte le colonne in ingresso, genera un hash usato per il raggruppamento per eliminare i duplicati, quindi fornirà la prima occorrenza di ogni duplicato come output. Non è necessario denominare in modo esplicito le colonne, che verranno generate automaticamente dal flusso di dati in ingresso.
aggregate(groupBy(mycols = sha2(256,columns())),
each(match(true()), $$ = first($$))) ~> DistinctRows
Verificare la presenza di valori NULL in tutte le colonne
Si tratta di un frammento di codice che è possibile incollare nel flusso di dati per controllare in modo generico tutte le colonne per i valori NULL. Questa tecnica sfrutta la deriva dello schema per esaminare tutte le colonne di tutte le righe e usa una suddivisione condizionale per separare le righe con valori NULL dalle righe senza valori NULL.
split(contains(array(toString(columns())),isNull(#item)),
disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)
Mappa automatica della deriva dello schema con una selezione
Quando è necessario caricare uno schema di database esistente da un set sconosciuto o dinamico di colonne in ingresso, è necessario eseguire il mapping delle colonne sul lato destro nella trasformazione Sink. Questa operazione è necessaria solo quando si carica una tabella esistente. Aggiungere questo frammento di codice prima del sink per creare un controllo Select che esegue il mapping automatico delle colonne. Lasciare il mapping del sink alla mappa automatica.
select(mapColumn(
each(match(true()))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> automap
Rendere persistenti i tipi di dati delle colonne
Aggiungere questo script all'interno di una definizione colonna derivata per archiviare i nomi di colonna e i tipi di dati dal flusso di dati a un archivio permanente usando un sink.
derive(each(match(type=='string'), $$ = 'string'),
each(match(type=='integer'), $$ = 'integer'),
each(match(type=='short'), $$ = 'short'),
each(match(type=='complex'), $$ = 'complex'),
each(match(type=='array'), $$ = 'array'),
each(match(type=='float'), $$ = 'float'),
each(match(type=='date'), $$ = 'date'),
each(match(type=='timestamp'), $$ = 'timestamp'),
each(match(type=='boolean'), $$ = 'boolean'),
each(match(type=='long'), $$ = 'long'),
each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1
Ricopia in basso
Ecco come implementare il problema comune "Fill Down" con i set di dati quando si desidera sostituire i valori NULL con il valore del valore precedente non NULL nella sequenza. Si noti che questa operazione può avere implicazioni negative sulle prestazioni perché è necessario creare una finestra sintetica nell'intero set di dati con un valore di categoria "fittizio". Inoltre, è necessario ordinare in base a un valore per creare la sequenza di dati appropriata per trovare il valore non NULL precedente. Questo frammento di codice seguente crea la categoria sintetica come "fittizia" e ordina in base a una chiave surrogata. È possibile rimuovere la chiave surrogata e usare la propria chiave di ordinamento specifica dei dati. Questo frammento di codice presuppone che sia già stata aggiunta una trasformazione Source denominata source1
source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
asc(sk, true),
Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1
Media mobile
La media mobile può essere implementata molto facilmente nei flussi di dati usando una trasformazione Di Windows. Questo esempio seguente crea una media mobile di 15 giorni dei prezzi azionari per Microsoft.
window(over(stocksymbol),
asc(Date, true),
startRowOffset: -7L,
endRowOffset: 7L,
FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1
Conteggio distinct di tutti i valori di colonna
È possibile usare questo script per identificare le colonne chiave e visualizzare la cardinalità di tutte le colonne del flusso con un singolo frammento di script. Aggiungere questo script come trasformazione di aggregazione al flusso di dati e fornirà automaticamente conteggi distinti di tutte le colonne.
aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern
Confrontare i valori di riga precedenti o successivi
Questo frammento di codice di esempio illustra come usare la trasformazione Finestra per confrontare i valori di colonna dal contesto di riga corrente con i valori di colonna delle righe prima e dopo la riga corrente. In questo esempio viene usata una colonna derivata per generare un valore fittizio per abilitare una partizione di finestra nell'intero set di dati. Una trasformazione Chiave surrogata viene usata per assegnare un valore di chiave univoco per ogni riga. Quando si applica questo modello alle trasformazioni dei dati, è possibile rimuovere la chiave surrogata se si è una colonna da ordinare ed è possibile rimuovere la colonna derivata se si dispone di colonne da usare per partizionare i dati.
source1 keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
asc(sk, true),
prevAndCurr = lag(title,1)+'-'+last(title),
nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag
Quante colonne sono presenti nei dati?
size(array(columns()))
Contenuto correlato
Esplorare le Flusso di dati a partire dall'articolo panoramica dei flussi di dati