Skript toku dat (DFS)

PLATÍ PRO: Azure Data Factory Azure Synapse Analytics

Tip

Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.

Toky dat jsou k dispozici ve službě Azure Data Factory i v kanálech Azure Synapse. Tento článek se týká mapování toků dat. Pokud s transformacemi začínáte, přečtěte si úvodní článek Transformace dat pomocí mapování toku dat.

Skript toku dat (DFS) je podkladová metadata, podobně jako kódovací jazyk, který slouží ke spouštění transformací, které jsou součástí mapování toku dat. Každá transformace je reprezentována řadou vlastností, které poskytují potřebné informace ke správnému spuštění úlohy. Skript je viditelný a upravitelný z ADF kliknutím na tlačítko "skript" na horním pásu karet uživatelského rozhraní prohlížeče.

Script button

Například v transformaci zdroje říká službě, allowSchemaDrift: true, aby do toku dat zahrnula všechny sloupce ze zdrojové datové sady, i když nejsou zahrnuté do projekce schématu.

Případy použití

Systém souborů DFS je automaticky vytvořen uživatelským rozhraním. Kliknutím na tlačítko Skript můžete skript zobrazit a přizpůsobit. Můžete také vygenerovat skripty mimo uživatelské rozhraní ADF a pak je předat do rutiny PowerShellu. Při ladění složitých toků dat může být jednodušší zkontrolovat kód skriptu namísto skenování znázornění grafů uživatelského rozhraní vašich toků.

Tady je několik příkladů případů použití:

  • Programově vytváří mnoho toků dat, které jsou poměrně podobné, tj. "kolkování" toků dat.
  • Složité výrazy, které se obtížně spravují v uživatelském rozhraní nebo mají za následek problémy s ověřováním.
  • Ladění a lepší porozumění různým chybám vrácených během provádění

Když vytvoříte skript toku dat, který se použije s PowerShellem nebo rozhraním API, musíte formátovaný text sbalit do jednoho řádku. Tabulátory a nové znaky můžete udržovat jako řídicí znaky. Text se ale musí naformátovat tak, aby se vešl do vlastnosti JSON. V uživatelském rozhraní editoru skriptů v dolní části je tlačítko, které skript naformátuje jako jeden řádek.

Copy button

Přidání transformací

Přidání transformací vyžaduje tři základní kroky: přidání základních transformačních dat, přesměrování vstupního datového proudu a následné přesměrování výstupního datového proudu. To je vidět nejsnadněji v příkladu. Řekněme, že začneme jednoduchým zdrojem pro tok dat jímky, například takto:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Pokud se rozhodneme přidat transformaci odvození, nejprve musíme vytvořit základní transformační text, který má jednoduchý výraz pro přidání nového velkého sloupce s názvem upperCaseTitle:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

Pak vezmeme existující systém souborů DFS a přidáme transformaci:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

A teď přesměrujeme příchozí datový proud tak, source1že určíme, která transformace má přijít po nové transformaci (v tomto případě) a zkopírujeme název datového proudu do nové transformace:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Nakonec identifikujeme transformaci, kterou chceme po této nové transformaci přijít, a nahradíme její vstupní datový proud (v tomto případě sink1) názvem výstupního datového proudu naší nové transformace:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Základy systému souborů DFS

Systém SOUBORŮ DFS se skládá z řady propojených transformací, včetně zdrojů, jímek a různých dalších, které můžou přidávat nové sloupce, filtrovat data, spojovat data a mnoho dalšího. Skript obvykle začíná jedním nebo více zdroji, za kterými následuje mnoho transformací a končí jedním nebo více jímkami.

Všechny zdroje mají stejnou základní konstrukci:

source(
  source properties
) ~> source_name

Například jednoduchý zdroj se třemi sloupci (movieId, title, žánry) by byl:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

Všechny transformace kromě zdrojů mají stejnou základní konstrukci:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

Například jednoduchá transformace odvození, která přebírá sloupec (název) a přepíše ji velkými písmeny, by byla následující:

source1 derive(
  title = upper(title)
) ~> derive1

A jímka bez schématu by byla:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Fragmenty kódu skriptu

Fragmenty kódu skriptu jsou sdíletelný kód Tok dat Script, který můžete použít ke sdílení mezi toky dat. V následujícím videu se dozvíte, jak používat fragmenty kódu skriptu a jak pomocí Tok dat Script kopírovat a vkládat části skriptu za grafy toku dat:

Agregované souhrnné statistiky

Přidejte do toku dat agregační transformaci s názvem SummaryStats a vložte tento kód níže pro agregační funkci ve skriptu a nahraďte stávající souhrnné statistiky. Tím se poskytne obecný vzor pro souhrnné statistiky profilu dat.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

Pomocí následujícího vzorku můžete také spočítat počet jedinečných a počet jedinečných řádků v datech. Následující příklad lze vložit do toku dat s agregační transformací s názvem ValueDistAgg. V tomto příkladu se používá sloupec s názvem "title". Nezapomeňte nahradit "title" řetězcem ve vašich datech, který chcete použít k získání počtu hodnot.

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

Zahrnutí všech sloupců do agregace

Toto je obecný agregační vzor, který ukazuje, jak při vytváření agregací zachovat zbývající sloupce ve výstupních metadatech. V tomto případě použijeme first() funkci k výběru první hodnoty ve všech sloupcích, jejichž název není "film". Pokud to chcete použít, vytvořte agregační transformaci s názvem DistinctRows a vložte ji do skriptu nad existující agregační skript DistinctRows.

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

Vytvoření otisku prstu hash řádku

Tento kód ve skriptu toku dat použijte k vytvoření nového odvozeného sloupce, DWhash který vytvoří sha1 hodnotu hash tří sloupců.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

Pomocí následujícího skriptu můžete také vygenerovat hodnotu hash řádku pomocí všech sloupců, které jsou ve streamu, aniž byste museli každý sloupec pojmenovat:

derive(DWhash = sha1(columns())) ~> DWHash

ekvivalent String_agg

Tento kód bude fungovat jako funkce T-SQL string_agg() a bude agregovat řetězcové hodnoty do pole. Toto pole pak můžete přetypovat na řetězec, který se použije s cíli SQL.

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

Počet aktualizací, upsertů, vložení, odstranění

Při použití transformace alter row můžete chtít spočítat počet aktualizací, upsertů, vložení, odstranění výsledků ze zásad Alter Row. Po změně řádku přidejte agregační transformaci a vložte tento Tok dat Script do agregační definice těchto počtů.

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

Distinct row using all columns

Tento fragment kódu přidá do toku dat novou agregační transformaci, která vezme všechny příchozí sloupce, vygeneruje hodnotu hash, která se použije k seskupení, aby se eliminovaly duplicity, a pak jako výstup poskytne první výskyt každého duplikátu. Sloupce nemusíte explicitně pojmenovat, automaticky se vygenerují z příchozího datového proudu.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

Kontrola nul ve všech sloupcích

Jedná se o fragment kódu, který můžete vložit do toku dat a obecně zkontrolovat všechny sloupce hodnot NULL. Tato technika využívá posun schématu k procházení všech sloupců ve všech řádcích a použití podmíněného rozdělení k oddělení řádků s nulami od řádků bez NUL.

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

Posun schématu Automatické mapování pomocí výběru

Pokud potřebujete načíst existující schéma databáze z neznámé nebo dynamické sady příchozích sloupců, musíte v transformaci jímky namapovat sloupce na pravé straně. To je potřeba jenom v případě, že načítáte existující tabulku. Přidejte tento fragment kódu před jímku, abyste vytvořili možnost Vybrat, která automaticky mapuje sloupce. Mapování jímky nechte namapované automaticky.

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

Zachování datových typů sloupců

Přidejte tento skript do definice odvozeného sloupce pro ukládání názvů sloupců a datových typů z toku dat do trvalého úložiště pomocí jímky.

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

Vyplnit dolů

Tady je postup implementace běžného problému Vyplnit dolů s datovými sadami, pokud chcete nahradit hodnoty NULL hodnotou z předchozí hodnoty, která není null v sekvenci. Tato operace může mít negativní dopad na výkon, protože musíte vytvořit syntetické okno v celé sadě dat s fiktivní hodnotou kategorie. Kromě toho je nutné řadit podle hodnoty, aby se vytvořila správná posloupnost dat, aby se našla předchozí hodnota, která není null. Tento fragment kódu níže vytvoří syntetickou kategorii jako fiktivní a seřadí podle náhradního klíče. Náhradní klíč můžete odebrat a použít vlastní klíč řazení specifický pro data. Tento fragment kódu předpokládá, že jste už přidali zdrojovou transformaci s názvem source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

Klouzavý průměr

Klouzavý průměr je možné do toků dat implementovat velmi snadno pomocí transformace Windows. Tento příklad níže vytvoří 15denní klouzavý průměr cen akcií pro Microsoft.

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

Počet jedinečných hodnot všech sloupců

Tento skript můžete použít k identifikaci klíčových sloupců a zobrazení kardinality všech sloupců ve streamu pomocí jednoho fragmentu skriptu. Přidejte tento skript jako agregovanou transformaci toku dat a automaticky poskytne jedinečné počty všech sloupců.

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

Porovnání předchozích nebo dalších hodnot řádků

Tento ukázkový fragment kódu ukazuje, jak lze transformaci okna použít k porovnání hodnot sloupců z kontextu aktuálního řádku s hodnotami sloupců z řádků před a za aktuálním řádkem. V tomto příkladu se odvozený sloupec používá k vygenerování fiktivní hodnoty pro povolení oddílu okna v celé sadě dat. Transformace náhradního klíče slouží k přiřazení jedinečné hodnoty klíče pro každý řádek. Pokud tento vzor použijete u transformací dat, můžete náhradní klíč odebrat, pokud jste sloupec, podle kterého chcete data uspořádat, a pokud máte sloupce, podle kterých chcete data rozdělit, můžete odebrat odvozený sloupec.

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

Kolik sloupců jsou v mých datech?

size(array(columns()))

Prozkoumejte Tok dat tím, že začnete článkem s přehledem toků dat.