分享方式:


資料流程指令碼 (DFS)

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用

Azure Data Factory 和 Azure Synapse Pipelines 中均可使用資料流。 本文適用於對應資料流。 若您不熟悉轉換作業,請參閱簡介文章使用對應資料流轉換資料

資料流程指令碼 (DFS) 是與編碼語言類似的基礎中繼資料,可用來執行對應資料流中包含的轉換。 每個轉換都是由一系列屬性所表示,這些屬性提供必要的資訊以正確執行作業。 您可以按一下瀏覽器 UI 頂端功能區上的 [腳本] 按鈕,以查看並從 ADF 編輯指令碼。

Script button

例如,在來源轉換中,allowSchemaDrift: true, 服務會告知服務在資料流程中包含來自來源資料集的所有資料行,即使它們未包含在結構描述投影中也一樣。

使用案例

DFS 會自動由使用者介面產生。 您可以按一下 [指令碼] 按鈕來檢視和自訂指令碼。 您也可以在 ADF UI 外部產生指令碼,然後將其傳遞至 PowerShell Cmdlet。 偵錯複雜的資料流程時,您可能會發現其更容易掃描指令碼程式碼後置,而不是掃描流程的 UI 圖形標記法。

以下是一些範例使用案例:

  • 以程式設計方式產生許多相當類似的資料流程,也就是「戳記輸出」資料流程。
  • 難以在 UI 中管理或導致驗證問題的複雜運算式。
  • 偵錯並進一步了解執行期間傳回的各種錯誤。

當您建置資料流程指令碼以搭配 PowerShell 或 API 使用時,必須將格式化的文字摺疊成單行。 您可以將索引標籤和分行符號保留為逸出字元。 但是必須格式化文字,藉以符合 JSON 屬性。 底部的指令碼編輯器 UI 上有一個按鈕,會將指令碼格式化為單行。

Copy button

如何新增轉換

新增轉換需要三個基本步驟:新增核心轉換資料、重新路由傳送輸入資料流,然後重新路由輸出資料流。 這最容易就在範例中看出來。 假設我們從簡單的來源開始接收資料流程,如下所示:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

如果我們決定新增衍生轉換,首先我們需要建立核心轉換文字,其具有簡單的運算式,藉以新增名為 upperCaseTitle 的新大寫資料行:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

然後,我們會採用現有的 DFS 並新增轉換:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

現在,藉由識別我們想要讓新轉換出現在哪個轉換之後來重新路由傳入資料流 (在此案例中為 source1) 並將資料流的名稱複製到新的轉換:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

最後,我們會識別我們想要在這個新轉換之後的轉換,並將其輸入資料流 (在此案例中為 sink1) 取代為新轉換的輸出資料流名稱:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

DFS 基本概念

DFS 是由一系列連線的轉換所組成,包括來源、接收器和各種可新增資料行、篩選資料、聯結資料等的其他項目。 指令碼通常會以一或多個來源開頭,後面接著許多轉換,並以一或多個接收結束。

來源都有相同的基本建構:

source(
  source properties
) ~> source_name

例如,具有三個資料行的簡單來源 (movieId、title、內容類型) 為:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

來源以外的全部轉換都有相同的基本建構:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

例如,採用資料行 (標題) 的簡單衍生轉換,並以大寫版本覆寫它,如下所示:

source1 derive(
  title = upper(title)
) ~> derive1

而且沒有結構描述的接收會是:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

指令碼程式碼片段

指令碼程式碼片段是資料流程指令碼的可共用程式碼,可用來跨資料流程共用。 下列影片將討論如何使用指令碼程式碼片段,以及利用資料流程指令碼來複製和貼上資料流程圖形後方的指令碼部分:

彙總摘要統計資料

將彙總轉換新增至名為 "SummaryStats" 的資料流程,然後針對指令碼中的彙總函式貼上下列程式碼,並取代現有的 SummaryStats。 這會提供資料設定檔摘要統計資料的一般模式。

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

您也可以使用下列範例來計算資料中唯一數目和相異資料列的數目。 下列範例可以在名為 ValueDistAgg 的彙總轉換所在的資料流程中貼上。 此範例使用名稱為 "title"的資料行。 請務必以您想要用來取得值計數的資料字串資料行取代 "title"。

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

在彙總中包含所有資料行

這是一般彙總模式,示範如何在建置彙總時,將剩餘的資料行保留在輸出中繼資料中。 在此情況下,我們會使用 first() 函式來選擇名稱不是「電影」的每個資料行中的第一個值。 若要使用此方式,請建立名為 DistinctRows 的彙總轉換,然後在現有 DistinctRows 彙總指令碼頂端的指令碼貼上此轉換。

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

建立資料列雜湊指紋

在您的資料流程指令碼中使用此程式碼,建立名為 DWhash 的新衍生資料行,藉以產生三個資料行的 sha1 雜湊。

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

您也可以使用以下指令碼,以存在於資料流中的所有資料行來產生資料列雜湊,而不需要為每個資料行命名:

derive(DWhash = sha1(columns())) ~> DWHash

String_agg 對等項目

此程式碼會像 T-SQL string_agg() 函式一樣運作,並將字串值彙總成陣列。 然後,您可以將該陣列轉換成字串,藉以搭配 SQL 目的地使用。

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

更新、更新插入、插入、刪除的計數

使用變更資料列轉換時,您可能會想要計算來自變更資料列原則的更新、更新插入、插入、刪除數目。 在改變資料列之後新增彙總轉換,並這些計數的彙總定義貼上此資料流程指令碼。

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

使用所有資料行的相異資料列

此程式碼片段會將新的彙總轉換新增至您的資料流程,這會取得全部傳入資料行、產生雜湊,藉以用於群組以消除重複項目,然後提供每個重複項目的第一次出現作為輸出。 您不需要明確命名稱資料行,它們會自動從傳入資料流產生。

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

檢查所有資料行中的 NULL

這是可以在資料流程中貼上的程式碼片段,以一般方式檢查所有資料行是否有 NULL 值。 這項技術會利用結構描述漂移來查看全部資料列中的所有資料行,並使用條件式分割來分隔含 NULL 的資料列與不含 NULL 的資料列。

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

具有選取項目的自動對應結構描述漂移

當您需要從未知或動態的傳入資料行集合載入現有資料庫結構描述時,您必須對應接收轉換中的右側資料行。 只有在載入現有的資料表時,才需要此項目。 在接收之前新增此程式碼片段,藉以建立 [選取] 來自動對應資料行。 將接收對應保留為自動對應。

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

保存資料行資料類型

在衍生資料行定義內新增此指令碼,藉以使用接收將資料流程中的資料行名稱和資料類型儲存到永續性存放區。

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

向下填滿

當您想要以序列中先前的非 NULL 值來取代 NULL 值時,如何實作資料集的常見「填滿」問題。 請注意,這項作業可能會有負面的效能影響,因為您必須在整個資料集中建立具有「虛擬」類別值的綜合視窗。 此外,您必須依值排序,才能建立適當的資料序列,藉以尋找先前的非 NULL 值。 下列程式碼片段會將綜合類別建立為「虛擬」,並以代理索引鍵排序。 您可以移除代理索引鍵,並使用自己的資料特定排序索引鍵。 此程式碼片段假設您已新增名為 source1 的來源轉換

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

移動平均

使用 Windows 轉換,可以輕鬆地在資料流程中實作移動平均。 以下範例會建立 Microsoft 股票價格的 15 天移動平均。

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

所有資料行值的相異計數

您可以使用此指令碼來識別索引鍵資料行,並使用單一指令碼程式碼片段來檢視資料流中所有資料行的基數。 將此指令碼新增為資料流的彙總轉換,它會自動提供所有資料行的相異計數。

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

比較上一個或下一個資料列值

此範例程式碼片段示範如何使用 Window 轉換來比較目前資料列內容中的資料行值與目前資料列前後的資料行值。 在此範例中,衍生資料行是用來產生虛擬值,藉以在整個資料集上啟用視窗分割區。 代理索引鍵轉換可用來為每個資料列指派唯一索引鍵值。 您將此模式套用至資料轉換時,如果您是想要排序的資料行,則可以移除代理索引鍵,而且如果您有資料行可用來分割資料的資料行,則可以移除衍生資料行。

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

我的資料中有多少個資料行?

size(array(columns()))

從資料流程概觀一文開始探索資料流程