Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
VZTAHUJE SE NA:
Azure Data Factory
Azure Synapse Analytics
Návod
Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje vše od přesunu dat až po datovou vědu, analýzy v reálném čase, podnikovou inteligenci a reportování. Přečtěte si, jak začít používat novou zkušební verzi zdarma.
Toky dat jsou k dispozici v kanálech Azure Data Factory i v kanálech Azure Synapse Analytics. Tento článek se týká mapování toků dat. Pokud s transformacemi začínáte, přečtěte si úvodní článek Transformace dat pomocí mapování toků dat.
Skript toku dat (DFS) je podkladová metadata, podobně jako kódovací jazyk, který slouží ke spouštění transformací, které jsou součástí mapování toku dat. Každá transformace je reprezentována řadou vlastností, které poskytují potřebné informace ke správnému spuštění úlohy. Skript je viditelný a upravitelný z ADF kliknutím na tlačítko "skript" na horním pásu karet uživatelského rozhraní prohlížeče.
Například v transformaci zdroje říká službě, allowSchemaDrift: true, aby do toku dat zahrnula všechny sloupce ze zdrojové datové sady, i když nejsou zahrnuté do projekce schématu.
Případy použití
Systém souborů DFS je automaticky vytvořen uživatelským rozhraním. Kliknutím na tlačítko Skript můžete skript zobrazit a přizpůsobit. Můžete také vygenerovat skripty mimo uživatelské rozhraní ADF a pak je předat do rutiny PowerShellu. Při ladění složitých toků dat může být jednodušší prohlédnout si kód skriptu místo prohlížení grafického znázornění toků v uživatelském rozhraní.
Tady je několik příkladů případů použití:
- Programově generovat mnoho toků dat, které jsou poměrně podobné, tj. "vytváření" toků dat.
- Složité výrazy, které se obtížně spravují v uživatelském rozhraní nebo mají za následek problémy s ověřováním.
- Ladění a lepší porozumění různým chybám vrácených během provádění
Když vytvoříte skript toku dat, který se použije s PowerShellem nebo rozhraním API, musíte formátovaný text sbalit do jednoho řádku. Tabulátory a nové řádky můžete používat jako řídicí znaky. Text se ale musí naformátovat tak, aby se vešl do vlastnosti JSON. V uživatelském rozhraní editoru skriptů v dolní části je tlačítko, které skript naformátuje jako jeden řádek.
Přidání transformací
Přidání transformací vyžaduje tři základní kroky: přidání základních transformačních dat, přesměrování vstupního datového proudu a následné přesměrování výstupního datového proudu. To je vidět nejsnadněji v příkladu. Řekněme, že začneme s jednoduchým tokem dat ze zdroje do úložiště, například takto:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Pokud se rozhodneme přidat transformaci odvození, nejprve musíme vytvořit základní transformační text, který má jednoduchý výraz pro přidání nového velkého sloupce s názvem upperCaseTitle:
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
Pak vezmeme existující systém souborů DFS a přidáme transformaci:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
A teď přesměrujeme příchozí datový proud tak, source1že určíme, která transformace má přijít po nové transformaci (v tomto případě) a zkopírujeme název datového proudu do nové transformace:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Nakonec identifikujeme transformaci, kterou chceme provést po této nové transformaci, a nahradíme její vstupní datový proud (v tomto případě sink1) názvem výstupního datového proudu této nové transformace:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Základy systému souborů DFS
DFS se skládá z řady propojených transformací, včetně zdrojů, výstupů a různých dalších, které mohou přidávat nové sloupce, filtrovat data, spojovat data a mnoho dalšího. Skript obvykle začíná jedním nebo více zdroji, za kterými následuje mnoho transformací a končí jedním nebo více jímkami.
Všechny zdroje mají stejnou základní konstrukci:
source(
source properties
) ~> source_name
Například jednoduchý zdroj se třemi sloupci (movieId, title, žánry) by byl:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
Všechny transformace kromě zdrojů mají stejnou základní konstrukci:
name_of_incoming_stream transformation_type(
properties
) ~> new_stream_name
Například jednoduchá odvozovací transformace, která vezme sloupec (název) a přepíše ho velkými písmeny, by vypadala následovně:
source1 derive(
title = upper(title)
) ~> derive1
A jímka bez schématu by byla:
derive1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Fragmenty kódu skriptu
Fragmenty kódu skriptu jsou sdíletelný kód skriptu toku dat, který můžete použít ke sdílení mezi toky dat. V následujícím videu se dozvíte, jak používat fragmenty kódu skriptu a jak pomocí skriptu toku dat kopírovat a vkládat části skriptu za grafy toku dat:
Agregované souhrnné statistiky
Přidejte do toku dat agregační transformaci s názvem SummaryStats a vložte tento kód níže pro agregační funkci ve skriptu a nahraďte stávající souhrnné statistiky. Tím se poskytne obecný vzor pro souhrnné statistiky profilu dat.
aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats
Pomocí následujícího vzorku můžete také spočítat počet unikátních a počet odlišných řádků ve vašich datech. Následující příklad lze vložit do toku dat s agregační transformací s názvem ValueDistAgg. V tomto příkladu se používá sloupec s názvem "title". Nezapomeňte nahradit "title" názvem sloupce ve vašich datech, který chcete použít k získání počtů hodnot.
aggregate(groupBy(title),
countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
numofdistinct = countDistinct(title)) ~> UniqDist
Zahrnutí všech sloupců do agregace
Toto je obecný agregační vzor, který ukazuje, jak při vytváření agregací zachovat zbývající sloupce ve výstupních metadatech. V tomto případě použijeme first() funkci k výběru první hodnoty ve všech sloupcích, jejichž název není "film". Pokud to chcete použít, vytvořte agregační transformaci s názvem DistinctRows a vložte ji do skriptu nad existující agregační skript DistinctRows.
aggregate(groupBy(movie),
each(match(name!='movie'), $$ = first($$))) ~> DistinctRows
Vytvořit hašovací otisk řádku
Tento kód použijte ve skriptu toku dat k vytvoření nového odvozeného sloupce DWhash, který generuje sha1 hodnotu hash ze tří sloupců.
derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash
Pomocí následujícího skriptu můžete také vygenerovat hodnotu hash řádku pomocí všech sloupců, které jsou ve streamu, aniž byste museli každý sloupec pojmenovat:
derive(DWhash = sha1(columns())) ~> DWHash
String_agg ekvivalent
Tento kód bude fungovat jako funkce T-SQL string_agg() a bude agregovat řetězcové hodnoty do pole. Pak můžete toto pole přetypovat na řetězec, který použijete pro SQL cíle.
source1 aggregate(groupBy(year),
string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg
Počítat počet aktualizací, upsertů, vložení, odstranění
Při použití transformace alter row můžete chtít spočítat počet aktualizací, upsertů, vložení, odstranění výsledků ze zásad Alter Row. Za řádek alter přidejte agregační transformaci a vložte tento skript toku dat do agregační definice těchto počtů.
aggregate(updates = countIf(isUpdate(), 1),
inserts = countIf(isInsert(), 1),
upserts = countIf(isUpsert(), 1),
deletes = countIf(isDelete(),1)) ~> RowCount
Jedinečný řádek využívající všechny sloupce
Tento fragment kódu přidá do toku dat novou agregační transformaci, která vezme všechny příchozí sloupce, vygeneruje hodnotu hash, která se použije k seskupení, aby se eliminovaly duplicity, a pak jako výstup poskytne první výskyt každého duplikátu. Sloupce nemusíte explicitně pojmenovat, automaticky se vygenerují z příchozího datového proudu.
aggregate(groupBy(mycols = sha2(256,columns())),
each(match(true()), $$ = first($$))) ~> DistinctRows
Kontrola nul ve všech sloupcích
Toto je kódový fragment, který můžete vložit do toku dat, abyste obecně zkontrolovali všechny sloupce na hodnoty NULL. Tato technika využívá posun schématu k procházení všech sloupců ve všech řádcích a použití podmíněného rozdělení k oddělení řádků s nulami od řádků bez NUL.
split(contains(array(toString(columns())),isNull(#item)),
disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)
Posun schématu Automatické mapování pomocí výběru
Pokud potřebujete načíst existující schéma databáze z neznámé nebo dynamické sady příchozích sloupců, musíte v transformaci jímky namapovat sloupce na pravé straně. To je potřeba jenom v případě, že načítáte existující tabulku. Přidejte tento fragment kódu před jímku, abyste vytvořili možnost Vybrat, která automaticky mapuje sloupce. Mapování jímky nechte namapované automaticky.
select(mapColumn(
each(match(true()))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> automap
Zachování datových typů sloupců
Přidejte tento skript do definice odvozeného sloupce pro ukládání názvů sloupců a datových typů z toku dat do trvalého úložiště pomocí jímky.
derive(each(match(type=='string'), $$ = 'string'),
each(match(type=='integer'), $$ = 'integer'),
each(match(type=='short'), $$ = 'short'),
each(match(type=='complex'), $$ = 'complex'),
each(match(type=='array'), $$ = 'array'),
each(match(type=='float'), $$ = 'float'),
each(match(type=='date'), $$ = 'date'),
each(match(type=='timestamp'), $$ = 'timestamp'),
each(match(type=='boolean'), $$ = 'boolean'),
each(match(type=='long'), $$ = 'long'),
each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1
Vyplnit dolů
Tady je postup, jak implementovat běžný problém „Fill Down“ s datovými sadami, když chcete nahradit hodnoty NULL hodnotou z předchozí nenulové hodnoty v sekvenci. Tato operace může mít negativní dopad na výkon, protože musíte vytvořit syntetické okno v celé sadě dat s fiktivní hodnotou kategorie. Kromě toho je nutné řadit podle hodnoty, aby se vytvořila správná posloupnost dat, aby se našla předchozí hodnota, která není null. Následující úryvek kódu vytvoří syntetickou kategorii 'dummy' a seřadí podle náhradního klíče. Náhradní klíč můžete odebrat a použít vlastní klíč řazení specifický pro data. Tento fragment kódu předpokládá, že jste už přidali zdrojovou transformaci s názvem source1
source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
asc(sk, true),
Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1
Klouzavý průměr
Klouzavý průměr je možné do toků dat implementovat velmi snadno pomocí transformace Windows. Tento příklad níže vytvoří 15denní klouzavý průměr cen akcií pro Microsoft.
window(over(stocksymbol),
asc(Date, true),
startRowOffset: -7L,
endRowOffset: 7L,
FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1
Počet unikátních hodnot ve všech sloupcích
Tento skript můžete použít k identifikaci klíčových sloupců a zobrazení kardinality všech sloupců ve streamu pomocí jednoho fragmentu skriptu. Přidejte tento skript jako agregovanou transformaci toku dat a automaticky poskytne jedinečné počty všech sloupců.
aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern
Porovnejte hodnoty předchozích nebo následujících řádků
Tento ukázkový fragment kódu ukazuje, jak lze transformaci okna použít k porovnání hodnot sloupců z kontextu aktuálního řádku s hodnotami sloupců z řádků před a za aktuálním řádkem. V tomto příkladu se používá odvozený sloupec k vytvoření fiktivní hodnoty, která umožňuje rozdělení datového okna v celé sadě dat. Transformace náhradního klíče slouží k přiřazení jedinečné hodnoty klíče pro každý řádek. Pokud tento vzor použijete u transformací dat, můžete odebrat náhradní klíč, pokud máte sloupec, podle kterého chcete uspořádat, a pokud máte sloupce, kterými chcete data rozdělit, můžete odebrat odvozený sloupec.
source1 keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
asc(sk, true),
prevAndCurr = lag(title,1)+'-'+last(title),
nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag
Kolik sloupců jsou v mých datech?
size(array(columns()))
Související obsah
Prozkoumejte toky dat tím, že začnete s článkem o přehledu toků dat