Data Factory パイプラインでのアクティビティの分岐と連鎖

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。

このチュートリアルでは、いくつかの制御フロー機能を紹介する Data Factory パイプラインを作成します。 このパイプラインでは、Azure Blob Storage 内のコンテナーから同じストレージ アカウント内の別のコンテナーへのコピーを行います。 コピー アクティビティが成功した場合は、成功したコピー操作の詳細がパイプラインによってメールで送信されます。 この情報には、書き込まれたデータの量が含まれる場合があります。 コピー アクティビティが失敗した場合は、エラー メッセージなどのコピー失敗の詳細がメールで送信されます。 チュートリアル全体を通じて、パラメーターを渡す方法が示されます。

次の図は、シナリオの概要を示しています。

Diagram shows Azure Blob Storage, which is the target of a copy, which, on success, sends an email with details or, on failure, sends an email with error details.

このチュートリアルでは、以下のタスクの実行方法を説明します。

  • Data Factory の作成
  • Azure Storage のリンクされたサービスを作成する
  • Azure BLOB データセットを作成します。
  • コピー アクティビティと Web アクティビティを含むパイプラインを作成します。
  • アクティビティの出力を後続のアクティビティに送信します。
  • パラメーターの引き渡しとシステム変数を使用する
  • パイプラインの実行を開始します。
  • パイプラインとアクティビティの実行を監視します。

このチュートリアルでは .NET SDK を使用します。 Azure Data Factory の操作にはその他のメカニズムを使用することもできます。 Data Factory のクイックスタートについては、5 分間のクイックスタートを参照してください。

Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。

前提条件

  • Azure ストレージ アカウント。 BLOB ストレージをソース データ ストアとして使用します。 Azure ストレージ アカウントがない場合は、「ストレージ アカウントの作成」を参照してください。
  • Azure Storage Explorer. このツールをインストールするには、「Azure Storage Explorer」を参照してください。
  • Azure SQL Database。 データベースをシンク データ ストアとして使用します。 Azure SQL Database のデータベースがない場合は、Azure SQL Database のデータベースの作成に関するページを参照してください。
  • 見ることができます。 この記事では、Visual Studio 2019 を使用します。
  • Azure .NET SDK。 Azure .NET SDK をダウンロードしてインストールします。

現在 Data Factory が利用可能な Azure リージョンの一覧については、「リージョン別の利用可能な製品」を参照してください。 データ ストアとコンピューティングは、別のリージョンに配置できます。 ストアには、Azure Storage と Azure SQL Database が含まれます。 コンピューティングには、Data Factory で使用される HDInsight が含まれます。

「Microsoft Entra アプリケーションの作成」の説明に 従ってアプリケーションを作成します。 同じ記事の手順に従って、このアプリケーションを共同作成者ロールに割り当てます。 このチュートリアルの後の方で、アプリケーション (クライアント) IDディレクトリ (テナント) ID など、いくつかの値が必要になります。

BLOB テーブルを作成する

  1. テキスト エディターを開きます。 次のテキストをコピーし、input.txt としてローカルに保存します。

    Ethel|Berg
    Tamika|Walsh
    
  2. Azure ストレージ エクスプローラーを開きます。 お使いのストレージ アカウントを展開します。 [BLOB コンテナー] を右クリックし、 [BLOB コンテナーの作成] を選択します。

  3. 新しいコンテナーに adfv2branch という名前を付け、 [アップロード] を選択して、そのコンテナーに input.txt ファイルを追加します。

Visual Studio プロジェクトを作成する

C# .NET コンソール アプリケーションを作成します。

  1. Visual Studio を起動し、 [新しいプロジェクトの作成] を選択します。
  2. [新しいプロジェクトの作成] で、C# の [コンソール アプリ (.NET Framework)] を選択してから、 [次へ] を選択します。
  3. プロジェクトに ADFv2BranchTutorial という名前を付けます。
  4. .NET バージョン 4.5.2 以降を選択し、 [作成] を選択します。

NuGet パッケージのインストール

  1. [ツール]>[NuGet パッケージ マネージャー]>[パッケージ マネージャー コンソール] の順に選択します。

  2. パッケージ マネージャー コンソールで、次のコマンドを実行してパッケージをインストールします。 詳細については、Microsoft.Azure.Management.DataFactory nuget パッケージを参照してください。

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

データ ファクトリ クライアントを作成する

  1. Program.cs を開き、次のステートメントを追加します。

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Program クラスに次の静的変数を追加します。 プレースホルダーは実際の値に置き換えてください。

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. Main メソッドに次のコードを追加します。 このコードによって、DataFactoryManagementClient クラスのインスタンスが作成されます。 その後、このオブジェクトを使用して、データ ファクトリ、リンクされたサービス、データセット、パイプラインを作成します。 また、このオブジェクトを使用して、パイプラインの実行の詳細を監視することもできます。

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Data Factory の作成

  1. Program.cs ファイルに CreateOrUpdateDataFactory メソッドを追加します。

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. データ ファクトリを作成する次の行を Main メソッドに追加します。

    Factory df = CreateOrUpdateDataFactory(client);
    

Azure Storage のリンクされたサービスを作成する

  1. Program.cs ファイルに StorageLinkedServiceDefinition メソッドを追加します。

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. Azure Storage のリンクされたサービスを作成する次の行を Main メソッドに追加します。

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

サポートされているプロパティと詳細については、「リンクされたサービスのプロパティ」を参照してください。

データセットを作成する

このセクションでは、ソース用とシンク用に 1 つずつ、2 つのデータセットを作成します。

ソース Azure BLOB 用のデータセットの作成

Azure BLOB データセットを作成するメソッドを追加します。 サポートされているプロパティと詳細については、Azure BLOB データセットのプロパティに関するページを参照してください。

Program.cs ファイルに SourceBlobDatasetDefinition メソッドを追加します。

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

Azure BLOB 内のソース データを表すデータセットを定義します。 この BLOB データセットは、前の手順でサポートされていた Azure Storage のリンクされたサービスを参照します。 BLOB データセットには、コピー元の BLOB の場所が記述されます。FolderPathFileName が使用されます。

FolderPath ではパラメーターが使用されていることに注意してください。 sourceBlobContainer はパラメーターの名前であり、この式はパイプラインの実行で渡された値に置き換えられます。 パラメーターを定義する構文は @pipeline().parameters.<parameterName> です。

シンク Azure BLOB 用のデータセットの作成

  1. Program.cs ファイルに SourceBlobDatasetDefinition メソッドを追加します。

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. Azure BLOB のソース データセットとシンク データセット両方を作成する次のコードを Main メソッドに追加します。

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

C# クラスを作成する:EmailRequest

C# プロジェクトで、EmailRequest という名前のクラスを作成します。 このクラスは、パイプラインがメールを送信するときに要求本文で送信するプロパティを定義します。 このチュートリアルでは、パイプラインから電子メールに次の 4 つのプロパティが送信されます。

  • Message. メールの本文。 コピーが成功した場合、このプロパティには、書き込まれたデータの量が含まれます。 コピーが失敗した場合、このプロパティには、エラーの詳細が含まれます。
  • データ ファクトリ名。 データ ファクトリの名前。
  • パイプライン名。 パイプラインの名前。
  • 受信者。 パススルーされるパラメーター。 このプロパティは、電子メールの受信者を指定します。
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

電子メール ワークフロー エンドポイントを作成する

メール送信をトリガーするには、Azure Logic Apps を使用してワークフローを定義します。 詳細については、「従量課金ロジック アプリ ワークフローの例を作成する」をご覧ください。

成功電子メールのワークフロー

Azure portal で、CopySuccessEmail という名前のロジック アプリ ワークフローを作成します。 [HTTP 要求の受信時] という名前の要求トリガーを追加します。 要求トリガーで、[要求本文の JSON スキーマ] ボックスに、次の JSON を入力します。

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

ワークフローは次の例のようになります。

Success email workflow

この JSON コンテンツは、前のセクションで作成した EmailRequest クラスに合わせたものです。

[メールの送信] という名前の [Office 365 Outlook] アクションを追加します。 このアクションの場合、[要求本文の JSON スキーマ] で渡されるプロパティを使用して、メールの書式設定をカスタマイズします。 次に例を示します。

Workflow designer with the action named Send an email.

ワークフローを保存した後、トリガーの [HTTP POST の URL] の値をコピーして保存します。

失敗電子メールのワークフロー

CopySuccessEmail ロジック アプリ ワークフローを複製して、CopyFailEmail という名前の新しいワークフローにします。 要求トリガーでは、[要求本文の JSON スキーマ] は同じです。 Subject のように電子メールの書式を変更し、失敗電子メールになるように調整します。 例を次に示します。

Workflow designer and the fail email workflow.

ワークフローを保存した後、トリガーの [HTTP POST の URL] の値をコピーして保存します。

これで、次の例のように、2 つのワークフローの URL を取得できたはずです。

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

パイプラインを作成する

Visual Studio でプロジェクトに戻ります。 ここで、コピー アクティビティと DependsOn プロパティを含むパイプラインを作成する次のコードを追加します。 このチュートリアルでは、パイプラインに 1 つのアクティビティ (コピー アクティビティ) が含まれています。このアクティビティは、ソースとして BLOB データセットを取り込み、シンクとして別の BLOB データセットを取り込みます。 このコピー アクティビティが成功したか失敗したかに応じて、異なるメール タスクが呼び出されます。

このパイプラインでは、次の機能を使用します。

  • パラメーター
  • Web アクティビティ
  • アクティビティの依存関係
  • あるアクティビティからの出力を別のアクティビティへの入力として使用する
  1. このメソッドをプロジェクトに追加します。 以降のセクションで、さらに詳しく説明します。

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. パイプラインを作成する次の行を Main メソッドに追加します。

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

パラメーター

パイプライン コードの最初のセクションでは、パラメーターを定義します。

  • sourceBlobContainer. ソース BLOB データセットは、パイプラインでこのパラメーターを使用します。
  • sinkBlobContainer. シンク BLOB データセットは、パイプラインでこのパラメーターを使用します。
  • receiver. 成功または失敗のメールを受信者に送信する、パイプライン内の 2 つの Web アクティビティがこのパラメーターを使用します。
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

Web アクティビティ

Web アクティビティを使用すると、任意の REST エンドポイントを呼び出すことができます。 このアクティビティの詳細については、「Azure Data Factory の Web アクティビティ」を参照してください。 このパイプラインでは、Web アクティビティを使用して Logic Apps のメール ワークフローを呼び出します。 CopySuccessEmail ワークフローを呼び出す Web アクティビティと CopyFailWorkFlow を呼び出す Web アクティビティの 2 つ作成します。

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

Url プロパティには、Logic Apps ワークフローの HTTP POST URL エンドポイントを貼り付けます。 Body プロパティで、EmailRequest クラスのインスタンスを渡します。 この電子メール要求には次のプロパティが含まれます。

  • Message. @{activity('CopyBlobtoBlob').output.dataWritten の値を渡します。 前のコピー アクティビティのプロパティにアクセスし、dataWritten の値を渡します。 失敗の場合、@{activity('CopyBlobtoBlob').error.message の代わりにエラー出力を渡します。
  • データ ファクトリ名。 @{pipeline().DataFactory} の値を渡します。このシステム変数を使用すると、対応するデータ ファクトリ名にアクセスできます。 システム変数の一覧については、システム変数に関するページを参照してください。
  • パイプライン名。 @{pipeline().Pipeline} の値を渡します。 このシステム変数を使用すると、対応するパイプライン名にアクセスできます。
  • 受信者。 "@pipeline().parameters.receiver" の値を渡します。 パイプライン パラメーターにアクセスします。

このコードにより、前のコピー アクティビティに依存する、アクティビティの依存関係が新しく作成されます。

パイプラインの実行を作成する

パイプラインの実行をトリガーする次のコードを Main メソッドに追加します。

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Main クラス

最終的な Main メソッドは次のようになります。

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

プログラムをビルドして実行し、パイプラインの実行をトリガーしてください。

パイプラインの実行を監視する

  1. 次のコードを Main メソッドに追加します。

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    このコードは、データのコピーが完了するまで、実行の状態を継続的に確認します。

  2. コピー アクティビティの実行の詳細 (たとえば、読み書きされたデータのサイズ) を取得する次のコードを Main メソッドに追加します。

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

コードの実行

アプリケーションをビルドして起動し、パイプラインの実行を確認します。

このアプリケーションでは、データ ファクトリ、リンクされたサービス、データセット、パイプライン、およびパイプラインの実行の作成の進捗状況が表示されます。 その後、パイプラインの実行状態が確認されます。 コピー アクティビティの実行の詳細と、データの読み取り/書き込みのサイズが表示されるまで待ちます。 次に、Azure Storage Explorer などのツールを使用して、変数で指定したように BLOB が inputBlobPath から outputBlobPath にコピーされたことを確認します。

出力は次のサンプルのようになります。

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

このチュートリアルでは、次のタスクを実行しました。

  • Data Factory の作成
  • Azure Storage のリンクされたサービスを作成する
  • Azure BLOB データセットを作成します。
  • コピー アクティビティと Web アクティビティを含むパイプラインを作成します。
  • アクティビティの出力を後続のアクティビティに送信します。
  • パラメーターの引き渡しとシステム変数を使用する
  • パイプラインの実行を開始します。
  • パイプラインとアクティビティの実行を監視します。

これで、概念セクションに進むと、Azure Data Factory の詳細を確認できるようになりました。