Sdílet prostřednictvím


Skript toku dat (DFS)

VZTAHUJE SE NA: Azure Data Factory Azure Synapse Analytics

Návod

Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje vše od přesunu dat až po datovou vědu, analýzy v reálném čase, podnikovou inteligenci a reportování. Přečtěte si, jak začít používat novou zkušební verzi zdarma.

Toky dat jsou k dispozici v kanálech Azure Data Factory i v kanálech Azure Synapse Analytics. Tento článek se týká mapování toků dat. Pokud s transformacemi začínáte, přečtěte si úvodní článek Transformace dat pomocí mapování toků dat.

Skript toku dat (DFS) je podkladová metadata, podobně jako kódovací jazyk, který slouží ke spouštění transformací, které jsou součástí mapování toku dat. Každá transformace je reprezentována řadou vlastností, které poskytují potřebné informace ke správnému spuštění úlohy. Skript je viditelný a upravitelný z ADF kliknutím na tlačítko "skript" na horním pásu karet uživatelského rozhraní prohlížeče.

Tlačítko Skript

Například v transformaci zdroje říká službě, allowSchemaDrift: true, aby do toku dat zahrnula všechny sloupce ze zdrojové datové sady, i když nejsou zahrnuté do projekce schématu.

Případy použití

Systém souborů DFS je automaticky vytvořen uživatelským rozhraním. Kliknutím na tlačítko Skript můžete skript zobrazit a přizpůsobit. Můžete také vygenerovat skripty mimo uživatelské rozhraní ADF a pak je předat do rutiny PowerShellu. Při ladění složitých toků dat může být jednodušší prohlédnout si kód skriptu místo prohlížení grafického znázornění toků v uživatelském rozhraní.

Tady je několik příkladů případů použití:

  • Programově generovat mnoho toků dat, které jsou poměrně podobné, tj. "vytváření" toků dat.
  • Složité výrazy, které se obtížně spravují v uživatelském rozhraní nebo mají za následek problémy s ověřováním.
  • Ladění a lepší porozumění různým chybám vrácených během provádění

Když vytvoříte skript toku dat, který se použije s PowerShellem nebo rozhraním API, musíte formátovaný text sbalit do jednoho řádku. Tabulátory a nové řádky můžete používat jako řídicí znaky. Text se ale musí naformátovat tak, aby se vešl do vlastnosti JSON. V uživatelském rozhraní editoru skriptů v dolní části je tlačítko, které skript naformátuje jako jeden řádek.

Tlačítko Kopírovat

Přidání transformací

Přidání transformací vyžaduje tři základní kroky: přidání základních transformačních dat, přesměrování vstupního datového proudu a následné přesměrování výstupního datového proudu. To je vidět nejsnadněji v příkladu. Řekněme, že začneme s jednoduchým tokem dat ze zdroje do úložiště, například takto:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Pokud se rozhodneme přidat transformaci odvození, nejprve musíme vytvořit základní transformační text, který má jednoduchý výraz pro přidání nového velkého sloupce s názvem upperCaseTitle:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

Pak vezmeme existující systém souborů DFS a přidáme transformaci:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

A teď přesměrujeme příchozí datový proud tak, source1že určíme, která transformace má přijít po nové transformaci (v tomto případě) a zkopírujeme název datového proudu do nové transformace:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Nakonec identifikujeme transformaci, kterou chceme provést po této nové transformaci, a nahradíme její vstupní datový proud (v tomto případě sink1) názvem výstupního datového proudu této nové transformace:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Základy systému souborů DFS

DFS se skládá z řady propojených transformací, včetně zdrojů, výstupů a různých dalších, které mohou přidávat nové sloupce, filtrovat data, spojovat data a mnoho dalšího. Skript obvykle začíná jedním nebo více zdroji, za kterými následuje mnoho transformací a končí jedním nebo více jímkami.

Všechny zdroje mají stejnou základní konstrukci:

source(
  source properties
) ~> source_name

Například jednoduchý zdroj se třemi sloupci (movieId, title, žánry) by byl:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

Všechny transformace kromě zdrojů mají stejnou základní konstrukci:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

Například jednoduchá odvozovací transformace, která vezme sloupec (název) a přepíše ho velkými písmeny, by vypadala následovně:

source1 derive(
  title = upper(title)
) ~> derive1

A jímka bez schématu by byla:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Fragmenty kódu skriptu

Fragmenty kódu skriptu jsou sdíletelný kód skriptu toku dat, který můžete použít ke sdílení mezi toky dat. V následujícím videu se dozvíte, jak používat fragmenty kódu skriptu a jak pomocí skriptu toku dat kopírovat a vkládat části skriptu za grafy toku dat:

Agregované souhrnné statistiky

Přidejte do toku dat agregační transformaci s názvem SummaryStats a vložte tento kód níže pro agregační funkci ve skriptu a nahraďte stávající souhrnné statistiky. Tím se poskytne obecný vzor pro souhrnné statistiky profilu dat.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

Pomocí následujícího vzorku můžete také spočítat počet unikátních a počet odlišných řádků ve vašich datech. Následující příklad lze vložit do toku dat s agregační transformací s názvem ValueDistAgg. V tomto příkladu se používá sloupec s názvem "title". Nezapomeňte nahradit "title" názvem sloupce ve vašich datech, který chcete použít k získání počtů hodnot.

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

Zahrnutí všech sloupců do agregace

Toto je obecný agregační vzor, který ukazuje, jak při vytváření agregací zachovat zbývající sloupce ve výstupních metadatech. V tomto případě použijeme first() funkci k výběru první hodnoty ve všech sloupcích, jejichž název není "film". Pokud to chcete použít, vytvořte agregační transformaci s názvem DistinctRows a vložte ji do skriptu nad existující agregační skript DistinctRows.

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

Vytvořit hašovací otisk řádku

Tento kód použijte ve skriptu toku dat k vytvoření nového odvozeného sloupce DWhash, který generuje sha1 hodnotu hash ze tří sloupců.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

Pomocí následujícího skriptu můžete také vygenerovat hodnotu hash řádku pomocí všech sloupců, které jsou ve streamu, aniž byste museli každý sloupec pojmenovat:

derive(DWhash = sha1(columns())) ~> DWHash

String_agg ekvivalent

Tento kód bude fungovat jako funkce T-SQL string_agg() a bude agregovat řetězcové hodnoty do pole. Pak můžete toto pole přetypovat na řetězec, který použijete pro SQL cíle.

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

Počítat počet aktualizací, upsertů, vložení, odstranění

Při použití transformace alter row můžete chtít spočítat počet aktualizací, upsertů, vložení, odstranění výsledků ze zásad Alter Row. Za řádek alter přidejte agregační transformaci a vložte tento skript toku dat do agregační definice těchto počtů.

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

Jedinečný řádek využívající všechny sloupce

Tento fragment kódu přidá do toku dat novou agregační transformaci, která vezme všechny příchozí sloupce, vygeneruje hodnotu hash, která se použije k seskupení, aby se eliminovaly duplicity, a pak jako výstup poskytne první výskyt každého duplikátu. Sloupce nemusíte explicitně pojmenovat, automaticky se vygenerují z příchozího datového proudu.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

Kontrola nul ve všech sloupcích

Toto je kódový fragment, který můžete vložit do toku dat, abyste obecně zkontrolovali všechny sloupce na hodnoty NULL. Tato technika využívá posun schématu k procházení všech sloupců ve všech řádcích a použití podmíněného rozdělení k oddělení řádků s nulami od řádků bez NUL.

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

Posun schématu Automatické mapování pomocí výběru

Pokud potřebujete načíst existující schéma databáze z neznámé nebo dynamické sady příchozích sloupců, musíte v transformaci jímky namapovat sloupce na pravé straně. To je potřeba jenom v případě, že načítáte existující tabulku. Přidejte tento fragment kódu před jímku, abyste vytvořili možnost Vybrat, která automaticky mapuje sloupce. Mapování jímky nechte namapované automaticky.

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

Zachování datových typů sloupců

Přidejte tento skript do definice odvozeného sloupce pro ukládání názvů sloupců a datových typů z toku dat do trvalého úložiště pomocí jímky.

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

Vyplnit dolů

Tady je postup, jak implementovat běžný problém „Fill Down“ s datovými sadami, když chcete nahradit hodnoty NULL hodnotou z předchozí nenulové hodnoty v sekvenci. Tato operace může mít negativní dopad na výkon, protože musíte vytvořit syntetické okno v celé sadě dat s fiktivní hodnotou kategorie. Kromě toho je nutné řadit podle hodnoty, aby se vytvořila správná posloupnost dat, aby se našla předchozí hodnota, která není null. Následující úryvek kódu vytvoří syntetickou kategorii 'dummy' a seřadí podle náhradního klíče. Náhradní klíč můžete odebrat a použít vlastní klíč řazení specifický pro data. Tento fragment kódu předpokládá, že jste už přidali zdrojovou transformaci s názvem source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

Klouzavý průměr

Klouzavý průměr je možné do toků dat implementovat velmi snadno pomocí transformace Windows. Tento příklad níže vytvoří 15denní klouzavý průměr cen akcií pro Microsoft.

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

Počet unikátních hodnot ve všech sloupcích

Tento skript můžete použít k identifikaci klíčových sloupců a zobrazení kardinality všech sloupců ve streamu pomocí jednoho fragmentu skriptu. Přidejte tento skript jako agregovanou transformaci toku dat a automaticky poskytne jedinečné počty všech sloupců.

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

Porovnejte hodnoty předchozích nebo následujících řádků

Tento ukázkový fragment kódu ukazuje, jak lze transformaci okna použít k porovnání hodnot sloupců z kontextu aktuálního řádku s hodnotami sloupců z řádků před a za aktuálním řádkem. V tomto příkladu se používá odvozený sloupec k vytvoření fiktivní hodnoty, která umožňuje rozdělení datového okna v celé sadě dat. Transformace náhradního klíče slouží k přiřazení jedinečné hodnoty klíče pro každý řádek. Pokud tento vzor použijete u transformací dat, můžete odebrat náhradní klíč, pokud máte sloupec, podle kterého chcete uspořádat, a pokud máte sloupce, kterými chcete data rozdělit, můžete odebrat odvozený sloupec.

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

Kolik sloupců jsou v mých datech?

size(array(columns()))

Prozkoumejte toky dat tím, že začnete s článkem o přehledu toků dat