タンブリング ウィンドウでパイプラインを実行するトリガーの作成
適用対象: Azure Data Factory Azure Synapse Analytics
ヒント
企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新たに試用を開始する方法については、こちらをご覧ください。
この記事では、タンブリング ウィンドウ トリガーを作成、起動、および監視する手順について説明します。 トリガーとサポートされる種類の全般的な情報については、パイプラインの実行とトリガーに関する記事をご覧ください。
タンブリング ウィンドウ トリガーは、状態を維持しながら、指定した開始時刻から定期的に実行される種類のトリガーです。 タンブリング ウィンドウとは、固定サイズで重複しない一連の連続する時間間隔です。 タンブリング ウィンドウ トリガーはパイプラインと 1 対 1 の関係を持ち、単一のパイプラインのみを参照できます。
タンブリング ウィンドウ トリガーは、スケジュール トリガーのより重量級な代替手段です。 複雑なシナリオ (他のタンブリング ウィンドウ トリガーへの依存、失敗したジョブの再実行、パイプラインのユーザー再試行の設定など) に対応する一連の機能が用意されています。 スケジュール トリガーとタンブリング ウィンドウ トリガーの違いについて詳しくは、「トリガー種類の比較」を参照してください。
Azure Data Factory と Azure Synapse のポータル エクスペリエンス
- Azure portal でタンブリング ウィンドウ トリガーを作成するには、[トリガー] タブを選択し、[新規] を選択します。
- トリガー構成ウィンドウが開いたら、[タンブリング ウィンドウ] を選択します。 次に、タンブリング ウィンドウ トリガーのプロパティを定義します。
- 完了したら、[保存] をクリックします。
タンブリング ウィンドウのトリガーの種類のプロパティ
タンブリング ウィンドウには、次のトリガーの種類のプロパティがあります。
{
"name": "MyTriggerName",
"properties": {
"type": "TumblingWindowTrigger",
"runtimeState": "<<Started/Stopped/Disabled - readonly>>",
"typeProperties": {
"frequency": <<Minute/Hour>>,
"interval": <<int>>,
"startTime": "<<datetime>>",
"endTime": <<datetime - optional>>,
"delay": <<timespan - optional>>,
"maxConcurrency": <<int>> (required, max allowed: 50),
"retryPolicy": {
"count": <<int - optional, default: 0>>,
"intervalInSeconds": <<int>>,
},
"dependsOn": [
{
"type": "TumblingWindowTriggerDependencyReference",
"size": <<timespan - optional>>,
"offset": <<timespan - optional>>,
"referenceTrigger": {
"referenceName": "MyTumblingWindowDependency1",
"type": "TriggerReference"
}
},
{
"type": "SelfDependencyTumblingWindowTriggerReference",
"size": <<timespan - optional>>,
"offset": <<timespan>>
}
]
},
"pipeline": {
"pipelineReference": {
"type": "PipelineReference",
"referenceName": "MyPipelineName"
},
"parameters": {
"parameter1": {
"type": "Expression",
"value": "@{concat('output',formatDateTime(trigger().outputs.windowStartTime,'-dd-MM-yyyy-HH-mm-ss-ffff'))}"
},
"parameter2": {
"type": "Expression",
"value": "@{concat('output',formatDateTime(trigger().outputs.windowEndTime,'-dd-MM-yyyy-HH-mm-ss-ffff'))}"
},
"parameter3": "https://mydemo.azurewebsites.net/api/demoapi"
}
}
}
}
次のテーブルに、タンブリング ウィンドウ トリガーの繰り返しとスケジュール設定に関連する主な JSON 要素の概要を示します。
JSON 要素 | 説明 | Type | 使用できる値 | 必須 |
---|---|---|---|---|
type |
トリガーの種類。 type は固定値 TumblingWindowTrigger です。 |
String |
TumblingWindowTrigger |
はい |
runtimeState |
トリガー実行時の現在の状態。 この要素は <readOnly> です。 |
String |
Started 、Stopped 、Disabled |
はい |
frequency |
トリガーが繰り返される頻度の単位 (分、時間、または月) を表す文字列。 startTime の日付値が frequency 値よりも細かい場合、ウィンドウの境界を計算するときに startTime の日付が考慮されます。 たとえば、frequency 値が hourly で、startTime 値が 2017-09-01T10:10:10Z の場合、最初のウィンドウは (2017-09-01T10:10:10Z, 2017-09-01T11:10:10Z) になります。 |
String |
Minute 、Hour 、Month |
はい |
interval |
トリガーの実行頻度を決定する、frequency 値の間隔を示す正の整数。 たとえば、interval が 3 で frequency が hour である場合、トリガーは 3 時間ごとに繰り返されます。 最小ウィンドウ間隔は 5 分です。 |
Integer |
正の整数。 | はい |
startTime |
最初の発生。これは過去の場合があります。 最初のトリガー間隔は (startTime , startTime + interval ) です。 |
DateTime |
DateTime 値。 |
はい |
endTime |
最後の発生。これは過去の場合があります。 | DateTime |
DateTime 値。 |
はい |
delay |
ウィンドウのデータ処理の開始の遅延時間。 パイプライン実行は、予想される実行時間 + delay の時間が経過してから開始されます。 delay は、トリガーが期限を過ぎてから新しい実行をトリガーするまでの待機時間を定義します。 遅延によってウィンドウ startTime が変更されることはありません。 たとえば、delay 値が 00:10:00 の場合、10 分の遅延を意味します。 |
Timespan (hh:mm:ss) |
既定値が 00:00:00 の timespan 値。 |
いいえ |
maxConcurrency |
準備ができているウィンドウに対して発生する同時トリガー実行の数。 たとえば、前日の実行を 1 時間ごとにバックフィルすると、24 ウィンドウになります。 maxConcurrency = 10 の場合、トリガー イベントは最初の 10 ウィンドウ (00:00-01:00 - 09:00-10:00) に対してのみ発生します。 最初の 10 回がトリガーされたパイプライン実行が完了すると、次の 10 ウィンドウ (10:00-11:00 - 19:00-20:00) に対してトリガー実行が発生します。 maxConcurrency = 10 のこの例を続けると、準備ができているウィンドウが 10 個ある場合、パイプライン実行は合計 10 回になります。 1 つのウィンドウのみ準備ができている場合は、1 つのパイプラインのみが実行されます。 |
Integer |
1 ~ 50 の整数。 | はい |
retryPolicy: Count |
パイプライン実行前の再試行回数は Failed とマークされます。 |
Integer |
整数。既定値は 0 (再試行なし) です。 | いいえ |
retryPolicy: intervalInSeconds |
秒単位で指定された再試行の間の遅延。 | Integer |
秒数。既定値は 30 です。 最小値は 30 です。 |
いいえ |
dependsOn: type |
TumblingWindowTriggerReference の型。 依存関係が設定されている場合は必須です。 |
String |
TumblingWindowTriggerDependencyReference 、SelfDependencyTumblingWindowTriggerReference |
いいえ |
dependsOn: size |
依存関係のタンブリング ウィンドウのサイズ。 | Timespan (hh:mm:ss) |
正の timespan 値。既定値は子トリガーのウィンド ウサイズです。 |
いいえ |
dependsOn: offset |
依存関係トリガーのオフセット。 | Timespan (hh:mm:ss) |
自己依存関係内の負の値を指定する必要がある timespan 値。 値が指定されていない場合、ウィンドウはトリガーそのものと同じになります。 |
自己依存関係:はい その他:いいえ |
Note
タンブリング ウィンドウ トリガーが発行されると、interval
と frequency
の値を編集できません。
WindowStart および WindowEnd システム変数
タンブリング ウィンドウ トリガーの WindowStart
および WindowEnd
システム変数は、パイプラインの定義 (つまり、クエリの一部) で使用できます。 システム変数は、トリガー定義でパイプラインにパラメーターとして渡します。 次の例は、これらの変数をパラメーターとして渡す方法を示しています。
{
"name": "MyTriggerName",
"properties": {
"type": "TumblingWindowTrigger",
...
"pipeline": {
"pipelineReference": {
"type": "PipelineReference",
"referenceName": "MyPipelineName"
},
"parameters": {
"MyWindowStart": {
"type": "Expression",
"value": "@{concat('output',formatDateTime(trigger().outputs.windowStartTime,'-dd-MM-yyyy-HH-mm-ss-ffff'))}"
},
"MyWindowEnd": {
"type": "Expression",
"value": "@{concat('output',formatDateTime(trigger().outputs.windowEndTime,'-dd-MM-yyyy-HH-mm-ss-ffff'))}"
}
}
}
}
}
パイプライン定義で WindowStart
と WindowEnd
システム変数の値を使用するには、それに応じて、MyWindowStart
と MyWindowEnd
パラメーターを使用します。
バックフィル シナリオでのウィンドウの実行順序
トリガー startTime
が過去の場合、M=(CurrentTime- TriggerStartTime)/TumblingWindowSize という数式に基づき、トリガーはトリガーのコンカレンシーを考慮して、{M} 回のバックフィル (過去分) の実行を並行して生成してから、今後の実行を行います。 ウィンドウの実行順序は、最も古い間隔から最も新しい間隔までの間で決定論的となります。 現時点では、この動作を変更することはできません。
Note
このシナリオでは、選択した startTime
からのすべての実行は、今後の実行を実行する前に実行されます。 長期間バックフィルする必要がある場合は、履歴の初期読み込みを行うことをお勧めします。
既存の TriggerResource 要素
既存の TriggerResource
要素には、次の点が適用されます:
- トリガーの
frequency
要素 (またはウィンドウ サイズ) とinterval
要素の値は、トリガーの作成後に変更できません。 この制限はtriggerRun
の再実行と依存関係の評価を適切に機能させるために必要です。 - トリガーの
endTime
要素の値が (追加または更新によって) 変更された場合、既に処理されているウィンドウの状態はリセットされません。 トリガーは新しいendTime
値に従います。 新しいendTime
値が既に実行されたウィンドウよりも前の場合、トリガーは停止します。 それ以外の場合は、新しいendTime
値に達すると、トリガーは停止します。
ユーザー割り当てのパイプラインの再試行
パイプライン エラーが発生した場合、タンブリング ウィンドウ トリガーは、ユーザーの介入なしに、同じ入力パラメーターを使用して、参照されたパイプラインの実行を自動的に再試行することができます。 トリガー定義の retryPolicy
プロパティを使用して、このアクションを指定します。
タンブリング ウィンドウ トリガーの依存関係
データ ファクトリ内の別のタンブリング ウィンドウ トリガーの実行が成功した後でのみ、タンブリング ウィンドウ トリガーが実行されることを確認する場合は、タンブリング ウィンドウ トリガーの依存関係を作成します。
タンブリング ウィンドウの実行のキャンセル
タンブリング ウィンドウ トリガーの実行をキャンセルできるのは、特定のウィンドウが待機中、依存関係の待機中、または実行中状態である場合です:
- ウィンドウが実行中状態である場合は、関連付けられているパイプラインの実行をキャンセルすると、その後のトリガーの実行がキャンセル済みとマークされます。
- ウィンドウが待機中または依存関係の待機中状態の場合は、ウィンドウを監視からキャンセルできます。
キャンセルされたウィンドウを再実行することもできます。 再実行では、トリガーの最新公開された定義が取得されます。 指定したウィンドウの依存関係は、再実行時に再評価されます。
Azure PowerShell と Azure CLI 用のサンプル
ここでは、Azure PowerShell を使用してトリガーを作成、起動、および監視する方法について説明します。
Note
Azure を操作するには、Azure Az PowerShell モジュールを使用することをお勧めします。 作業を始めるには、「Azure PowerShell をインストールする」を参照してください。 Az PowerShell モジュールに移行する方法については、「AzureRM から Az への Azure PowerShell の移行」を参照してください。
前提条件
- Azure サブスクリプション:Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。
- Azure PowerShell: 「PowerShellGet を使用した Windows への Azure PowerShell のインストール」に記載されている手順に従います。
- Azure Data Factory: 「PowerShell を使用した Azure Data Factory の作成」に記載されている手順に従って、データ ファクトリとパイプラインを作成します。
サンプル コード
次の内容が含まれた MyTrigger.json という名前の JSON ファイルを C:\ADFv2QuickStartPSH\ フォルダーに作成します。
重要
JSON ファイルを保存する前に、
startTime
要素の値を現在の協定世界時 (UTC) 時間に設定します。endTime
要素の値を現在の UTC 時間の 1 時間後に設定します。{ "name": "PerfTWTrigger", "properties": { "type": "TumblingWindowTrigger", "typeProperties": { "frequency": "Minute", "interval": "15", "startTime": "2017-09-08T05:30:00Z", "endTime" : "2017-09-08T06:30:00Z", "delay": "00:00:01", "retryPolicy": { "count": 2, "intervalInSeconds": 30 }, "maxConcurrency": 50 }, "pipeline": { "pipelineReference": { "type": "PipelineReference", "referenceName": "DynamicsToBlobPerfPipeline" }, "parameters": { "windowStart": "@trigger().outputs.windowStartTime", "windowEnd": "@trigger().outputs.windowEndTime" } }, "runtimeState": "Started" } }
Set-AzDataFactoryV2Trigger コマンドレットを使用してトリガーを作成します。
Set-AzDataFactoryV2Trigger -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -Name "MyTrigger" -DefinitionFile "C:\ADFv2QuickStartPSH\MyTrigger.json"
Get-AzDataFactoryV2Trigger コマンドレットを使用して、トリガーの状態が Stopped であることを確認します。
Get-AzDataFactoryV2Trigger -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -Name "MyTrigger"
Start-AzDataFactoryV2Trigger コマンドレットを使用してトリガーを起動します。
Start-AzDataFactoryV2Trigger -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -Name "MyTrigger"
Get-AzDataFactoryV2Trigger コマンドレットを使用して、トリガーの状態が Started であることを確認します。
Get-AzDataFactoryV2Trigger -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -Name "MyTrigger"
Get-AzDataFactoryV2TriggerRun コマンドレットを使用して、Azure PowerShell でトリガー実行を取得します。 トリガー実行に関する情報を取得するには、次のコマンドを定期的に実行します。
TriggerRunStartedAfter
とTriggerRunStartedBefore
の値をトリガー定義の値と一致するように更新します。Get-AzDataFactoryV2TriggerRun -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -TriggerName "MyTrigger" -TriggerRunStartedAfter "2017-12-08T00:00:00" -TriggerRunStartedBefore "2017-12-08T01:00:00"
Azure Portal でトリガー実行とパイプライン実行を監視するには、パイプライン実行の監視に関するセクションをご覧ください。