Azure Data Factory をプログラムで監視する

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。

この記事では、さまざまなソフトウェア開発キット (SDK) を使用して、データ ファクトリのパイプラインを監視する方法について説明します。

注意

Azure を操作するには、Azure Az PowerShell モジュールを使用することをお勧めします。 作業を開始するには、Azure PowerShell のインストールに関する記事を参照してください。 Az PowerShell モジュールに移行する方法については、「AzureRM から Az への Azure PowerShell の移行」を参照してください。

データの範囲

Data Factory では、パイプラインの実行データを 45 日間だけ格納します。 Data Factory パイプラインの実行に関するデータに対し、プログラムによってクエリを実行する場合 (たとえば、PowerShell コマンド Get-AzDataFactoryV2PipelineRun を使用して)、省略可能な LastUpdatedAfter パラメーターおよび LastUpdatedBefore パラメーターには日付の制限がありません。 ただし、たとえば、過去 1 年間のデータに対してクエリを実行した場合、エラーは発生しませんが、過去 45 日間のパイプライン実行データのみとなります。

過去 45 日より前のパイプライン実行データを保持する場合は、Azure Monitor を使用して独自の診断ログを設定する必要があります。

パイプライン実行情報

パイプライン実行プロパティについては、PipelineRun API リファレンスを参照してください。 パイプライン実行は、そのライフサイクル中にさまざまな状態になります。考えられる実行状態の値を次に示します。

  • キューに登録済み
  • InProgress
  • 成功
  • 失敗
  • Canceling
  • キャンセル

.NET

.NET SDK を使用して、パイプラインを作成し監視する完全なチュートリアルについては、.NET を使用したデータ ファクトリとパイプラインの作成に関する記事をご覧ください。

  1. データのコピーが完了するまでパイプラインの実行の状態を継続的にチェックする次のコードを追加します。

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. コピー アクティビティの実行の詳細 (たとえば、読み書きされたデータのサイズ) を取得する次のコードを追加します。

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    RunFilterParameters filterParams = new RunFilterParameters(
        DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10));
    ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun(
        resourceGroup, dataFactoryName, runResponse.RunId, filterParams);
    if (pipelineRun.Status == "Succeeded")
        Console.WriteLine(queryResponse.Value.First().Output);
    else
        Console.WriteLine(queryResponse.Value.First().Error);
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

.NET SDK の詳細については、Data Factory .NET SDK リファレンスに関するページをご覧ください。

Python

Python SDK を使用して、パイプラインを作成し監視する完全なチュートリアルについては、「Python を使用してデータ ファクトリとパイプラインを作成する」をご覧ください。

パイプラインの実行を監視するには、次のコードを追加します。

# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
    rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
    last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
    rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])

Python SDK の詳細については、データ ファクトリの Python SDK リファレンスに関するページをご覧ください。

REST API

REST API を使用して、パイプラインを作成し監視する完全なチュートリアルについては、REST API を使用したデータ ファクトリとパイプラインの作成に関するページをご覧ください。

  1. 次のスクリプトを実行し、データのコピーが完了するまで、パイプラインの実行の状態を継続的にチェックします。

    $request = "https://management.azure.com/subscriptions/${subsId}/resourceGroups/${resourceGroup}/providers/Microsoft.DataFactory/factories/${dataFactoryName}/pipelineruns/${runId}?api-version=${apiVersion}"
    while ($True) {
        $response = Invoke-RestMethod -Method GET -Uri $request -Header $authHeader
        Write-Host  "Pipeline run status: " $response.Status -foregroundcolor "Yellow"
    
        if ( ($response.Status -eq "InProgress") -or ($response.Status -eq "Queued") ) {
            Start-Sleep -Seconds 15
        }
        else {
            $response | ConvertTo-Json
            break
        }
    }
    
  2. 次のスクリプトを実行し、コピー アクティビティの実行の詳細 (たとえば、読み書きされたデータのサイズ) を取得します。

    $request = "https://management.azure.com/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/pipelineruns/${runId}/queryActivityruns?api-version=${apiVersion}&startTime="+(Get-Date).ToString('yyyy-MM-dd')+"&endTime="+(Get-Date).AddDays(1).ToString('yyyy-MM-dd')+"&pipelineName=Adfv2QuickStartPipeline"
    $response = Invoke-RestMethod -Method POST -Uri $request -Header $authHeader
    $response | ConvertTo-Json
    

REST API の詳細については、データ ファクトリの REST API リファレンスに関するページをご覧ください。

PowerShell

PowerShell を使用して、パイプラインを作成し監視する完全なチュートリアルについては、「PowerShell を使用してデータ ファクトリとパイプラインを作成する」をご覧ください。

  1. 次のスクリプトを実行し、データのコピーが完了するまで、パイプラインの実行の状態を継続的にチェックします。

    while ($True) {
        $run = Get-AzDataFactoryV2PipelineRun -ResourceGroupName $resourceGroupName -DataFactoryName $DataFactoryName -PipelineRunId $runId
    
        if ($run) {
            if ( ($run.Status -ne "InProgress") -and ($run.Status -ne "Queued") ) {
                Write-Output ("Pipeline run finished. The status is: " +  $run.Status)
                $run
                break
            }
            Write-Output ("Pipeline is running...status: " + $run.Status)
        }
    
        Start-Sleep -Seconds 30
    }
    
  2. 次のスクリプトを実行し、コピー アクティビティの実行の詳細 (たとえば、読み書きされたデータのサイズ) を取得します。

    Write-Host "Activity run details:" -foregroundcolor "Yellow"
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    $result
    
    Write-Host "Activity 'Output' section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "\nActivity 'Error' section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n"
    

PowerShell コマンドレットの詳細については、データ ファクトリの PowerShell コマンドレット リファレンスに関するページをご覧ください。

Azure Monitor を使って Data Factory のパイプラインを監視する方法については、Azure Monitor を使ったパイプラインの監視に関する記事をご覧ください。