資料流程腳本 (DFS)

適用于: Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用于企業的單一分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告等所有專案。 瞭解如何 免費啟動新的試用版

資料流程可在 Azure Data Factory 和 Azure Synapse Pipelines 中使用。 本文適用于對應資料流程。 如果您不熟悉轉換,請參閱使用對應資料流程 轉換資料的簡介文章

資料流程指令碼 (DFS) 是與編碼語言類似的基礎中繼資料,可用來執行對應資料流中包含的轉換。 每個轉換都是由一系列屬性所表示,這些屬性提供必要的資訊以正確執行作業。 您可以按一下瀏覽器 UI 頂端功能區上的 [腳本] 按鈕,以查看並從 ADF 編輯指令碼。

Script button

例如,在來源轉換中, allowSchemaDrift: true, 會指示服務在資料流程中包含來自源資料集的所有資料行,即使它們未包含在架構投影中也一樣。

使用案例

使用者介面會自動產生 DFS。 您可以按一下 [腳本] 按鈕來檢視和自訂腳本。 您也可以在 ADF UI 外部產生腳本,然後將它傳遞至 PowerShell Cmdlet。 偵錯複雜的資料流程時,您可能會發現更容易掃描腳本程式碼後置,而不是掃描流程的 UI 圖形標記法。

以下是一些範例使用案例:

  • 以程式設計方式產生許多相當類似的資料流程,也就是「戳記」資料流程。
  • 難以在 UI 中管理或導致驗證問題的複雜運算式。
  • 偵錯並進一步瞭解執行期間傳回的各種錯誤。

當您建置資料流程腳本以搭配 PowerShell 或 API 使用時,必須將格式化的文字折迭成單行。 您可以將索引標籤和分行符號保留為逸出字元。 但是必須格式化文字,以符合 JSON 屬性。 底部的腳本編輯器 UI 上有一個按鈕,會為您將腳本格式化為單行。

Copy button

如何新增轉換

新增轉換需要三個基本步驟:新增核心轉換資料、重新路由輸入資料流程,然後重新路由輸出資料流程。 這在範例中可看得最簡單。 假設我們從簡單的來源開始接收資料流程,如下所示:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

如果我們決定新增衍生轉換,首先我們需要建立核心轉換文字,其具有簡單的運算式,以新增名為 upperCaseTitle 的新大寫資料行:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

然後,我們會採用現有的 DFS 並新增轉換:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

現在,我們會藉由識別我們想要新轉換之後的轉換來重新路由傳送連入資料流程(在此案例中為 source1 ),並將資料流程的名稱複製到新的轉換:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

最後,我們會識別我們想要在此新轉換之後執行的轉換,並以新轉換的輸出資料流程名稱取代其輸入資料流程(在此案例 sink1 中為 ):

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

DFS 基本概念

DFS 是由一系列連線的轉換所組成,包括來源、接收和各種其他轉換,這些轉換可以新增資料行、篩選資料、聯結資料等等。 腳本通常會以一或多個來源開頭,後面接著許多轉換,並以一或多個接收結尾。

來源都有相同的基本建構:

source(
  source properties
) ~> source_name

例如,具有三個數據行的簡單來源(movieId、title、內容類型)會是:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

來源以外的所有轉換都有相同的基本建構:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

例如,採用資料行 (title) 並以大寫版本覆寫的簡單衍生轉換,如下所示:

source1 derive(
  title = upper(title)
) ~> derive1

沒有架構的接收會是:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

腳本程式碼片段

腳本程式碼片段是資料流程腳本的可共用程式碼,可用來跨資料流程共用。 以下影片將討論如何使用腳本程式碼片段,以及利用資料流程腳本來複製和貼上資料流程圖形後方的腳本部分:

匯總摘要統計資料

將匯總轉換新增至名為 「SummaryStats」 的資料流程,然後將下列程式碼貼到腳本中的彙總函式,並取代現有的 SummaryStats。 這會提供資料設定檔摘要統計資料的一般模式。

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

您也可以使用下列範例來計算資料中唯一的資料列數目和相異資料列數目。 下列範例可以使用稱為 ValueDistAgg 的匯總轉換,貼到資料流程中。 此範例使用名為 「title」 的資料行。 請務必以您想要用來取得值計數的資料中的字串資料行取代 「title」。

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

在匯總中包含所有資料行

這是一般匯總模式,示範如何在建置匯總時,將剩餘的資料行保留在輸出中繼資料中。 在此情況下,我們會使用 函 first() 式來選擇名稱不是 「movie」 之每個資料行中的第一個值。 若要使用此功能,請建立名為 DistinctRows 的匯總轉換,然後將此轉換貼到現有的 DistinctRows 匯總腳本頂端。

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

建立資料列雜湊指紋

在您的資料流程腳本中使用此程式碼,建立名為 DWhash 的新衍生資料行,以產生三個 sha1 資料行的雜湊。

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

您也可以使用下列腳本,使用串流中的所有資料行來產生資料列雜湊,而不需要為每個資料行命名:

derive(DWhash = sha1(columns())) ~> DWHash

String_agg對等專案

此程式碼會像 T-SQL string_agg() 函式一樣運作,並將字串值匯總到陣列中。 然後,您可以將該陣列轉換成字串,以搭配 SQL 目的地使用。

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

計數更新數目、upserts、inserts、deletes

使用 Alter Row 轉換時,您可能會想要計算來自 Alter Row 原則的更新、更新、插入、插入數目。 在改變數據列之後新增匯總轉換,並將此資料流程腳本貼到這些計數的匯總定義中。

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

使用所有資料行的相異資料列

此程式碼片段會將新的匯總轉換新增至資料流程,其會採用所有連入資料行、產生雜湊以用於分組以消除重複專案,然後提供每個重複專案作為輸出的第一個出現次數。 您不需要明確命名資料行,它們會自動從傳入資料流程產生。

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

檢查所有資料行中的 Null

這是一個程式碼片段,您可以貼到資料流程中,以一般方式檢查所有資料行是否有 Null 值。 這項技術利用架構漂移來查看所有資料列中的所有資料行,並使用條件式分割來分隔具有 Null 的資料列與不含 NUL 的資料列。

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

自動對應架構漂移與選取

當您需要從未知或動態的傳入資料行集合載入現有的資料庫架構時,您必須對應接收轉換中的右側資料行。 只有在載入現有的資料表時,才需要此專案。 在接收之前新增此程式碼片段,以建立 [選取] 來自動對應資料行。 將接收對應保留至自動對應。

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

保存資料行資料類型

在衍生資料行定義內新增此腳本,以使用接收將資料流程中的資料行名稱和資料類型儲存到永續性存放區。

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

填滿

當您想要以序列中先前非 Null 值的值取代 Null 值時,如何實作資料集的常見「填滿」問題。 請注意,這項作業可能會對效能造成負面影響,因為您必須在整個資料集上建立具有「虛擬」類別值的綜合視窗。 此外,您必須依值排序,才能建立適當的資料順序,以尋找先前的非 Null 值。 下列程式碼片段會將綜合類別建立為「虛擬」,並以 Surrogate 索引鍵排序。 您可以移除 Surrogate 索引鍵,並使用您自己的資料特定排序索引鍵。 此程式碼片段假設您已新增名為 的來源轉換 source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

移動平均

您可以使用 Windows 轉換,輕鬆地在資料流程中實作移動平均。 下列範例會為 Microsoft 建立 15 天移動平均股票價格。

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

所有資料行值的相異計數

您可以使用此腳本來識別索引鍵資料行,並使用單一腳本程式碼片段檢視資料流程中所有資料行的基數。 將此腳本新增為數據流的匯總轉換,且會自動提供所有資料行的相異計數。

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

比較上一個或下一個資料列值

此範例程式碼片段示範如何使用 Window 轉換,將目前資料列內容中的資料行值與目前資料列前後的資料行值進行比較。 在此範例中,衍生資料行可用來產生虛擬值,以在整個資料集上啟用視窗分割。 Surrogate Key 轉換可用來為每個資料列指派唯一的索引鍵值。 當您將此模式套用至資料轉換時,如果您是想要排序的資料行,而且如果您有資料行可用來分割資料的資料行,則可以移除衍生資料行。

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

我的資料中有多少個數據行?

size(array(columns()))

從資料流程概觀一文開始探索 資料流程