Bagikan melalui


Aktivitas pencabangan dan penghubungan dalam pipeline di Data Factory

BERLAKU UNTUK: Azure Data Factory Azure Synapse Analytics

Tip

Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!

Dalam tutorial ini, Anda dapat membuat alur Data Factory yang menampilkan beberapa fitur aliran kontrol. Alur ini menyalin dari kontainer di Azure Blob Storage ke kontainer lain di akun penyimpanan yang sama. Jika aktivitas penyalinan berhasil, alur akan mengirimkan detail operasi penyalinan yang berhasil dalam email. Informasi itu dapat mencakup jumlah data yang ditulis. Jika aktivitas penyalinan gagal, aktivitas tersebut akan mengirimkan detail kegagalan penyalinan, seperti pesan kesalahan, dalam email. Sepanjang tutorial, Anda akan melihat bagaimana cara memberikan parameter.

Grafik ini memberikan gambaran umum skenario:

Diagram memperlihatkan Azure Blob Storage, yang merupakan target penyalinan, yang, jika berhasil, akan mengirim email dengan detail atau, jika gagal, akan mengirim email dengan detail kesalahan.

Tutorial ini menunjukkan kepada Anda cara melakukan tugas-tugas berikut:

  • Membuat pabrik data
  • Membuat layanan yang terhubung ke Azure Storage
  • Buat himpunan data Azure Blob
  • Buat alur yang berisi aktivitas salin dan aktivitas web
  • Mengirim output aktivitas ke aktivitas berikutnya
  • Menggunakan parameter yang meneruskan dan variabel sistem
  • Mulai eksekusi alur
  • Pantau eksekusi alur dan aktivitas

Tutorial ini menggunakan .NET SDK. Anda dapat menggunakan mekanisme lain untuk berinteraksi dengan Azure Data Factory. Untuk panduan cepat Data Factory, lihat Panduan Cepat 5 Menit.

Jika Anda tidak memiliki langganan Azure, buat akun gratis sebelum Anda memulai.

Prasyarat

  • Akun Azure Storage. Anda menggunakan penyimpanan blob sebagai penyimpanan data sumber. Jika Anda tidak memiliki akun penyimpanan Azure, lihat Membuat akun penyimpanan.
  • Penjelajah Penyimpanan Azure. Untuk menginstal alat ini, lihat Azure Storage Explorer.
  • Microsoft Azure SQL database. Anda menggunakan database sebagai penyimpanan data sink. Jika Anda tidak memiliki database di Azure SQL Database, lihat Membuat database di Azure SQL Database.
  • Studio Visual. Artikel ini menggunakan Visual Studio 2019.
  • Azure .NET SDK. Mengunduh dan menginstal Azure .NET SDK.

Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, lihat Produk yang tersedia menurut wilayah. Penyimpanan dan komputasi data dapat berada di wilayah lain. Toko-toko tersebut termasuk Azure Storage dan Azure SQL Database. Komputasi termasuk HDInsight, yang digunakan oleh Data Factory.

Buat aplikasi seperti yang dijelaskan dalam Membuat aplikasi Microsoft Entra. Tetapkan aplikasi ke peran Kontributor dengan mengikuti instruksi di artikel yang sama. Anda akan memerlukan beberapa nilai untuk bagian selanjutnya dari tutorial ini, seperti ID Aplikasi (klien) dan ID Direktori (penyewa).

Membuat tabel blob

  1. Buka editor teks. Salin teks berikut dan simpan secara lokal sebagai input.txt.

    Ethel|Berg
    Tamika|Walsh
    
  2. Buka Azure Storage Explorer. Perluas akun penyimpanan Anda. Klik kanan Kontainer Blob dan pilih Buat Kontainer Blob.

  3. Beri nama kontainer baru adfv2branch dan pilih Unggah untuk menambahkan file input.txt Anda ke kontainer.

Membuat proyek Visual Studio

Membuat aplikasi konsol C# .NET:

  1. Mulai Visual Studio dan pilih Buat proyek baru.
  2. Di Buat proyek baru, pilih Aplikasi Konsol (.NET Framework) untuk C#, lalu pilih Berikutnya.
  3. Beri nama proyek ADFv2BranchTutorial.
  4. Pilih .NET versi 4.5.2 atau lebih tinggi lalu pilih Buat.

Instal paket NuGet

  1. Pilih Alat>NuGet Package Manager>Packet Manager Console.

  2. Di Konsol Manajer Paket, jalankan perintah berikut untuk menginstal paket. Lihat paket nuget Microsoft.Azure.Management.DataFactory untuk detailnya.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Membuat klien pabrik data

  1. Buka Program.cs dan tambahkan pernyataan berikut:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Tambahkan variabel statis ini ke kelas Program. Ganti placeholder dengan nilai Anda sendiri.

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. Tambahkan kode berikut ke metode Main. Kode ini membuat instans kelas DataFactoryManagementClient. Anda kemudian menggunakan objek ini untuk membuat pabrik data, layanan tertaut, himpunan data, dan alur. Anda juga dapat menggunakan objek ini untuk memantau detail penjaluran.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Membuat pabrik data

  1. Tambahkan CreateOrUpdateDataFactorymetode ke file Program.cs Anda:

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. Tambahkan baris berikut ke metode Main yang membuat pabrik data:

    Factory df = CreateOrUpdateDataFactory(client);
    

Membuat layanan yang terhubung ke Azure Storage

  1. Tambahkan StorageLinkedServiceDefinitionmetode ke file Program.cs Anda:

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. Tambahkan baris berikut ke metode Main yang membuat layanan tertaut Azure Storage:

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

Untuk informasi selengkapnya tentang properti dan detail yang didukung, lihat Properti layanan tertaut.

Membuat himpunan data

Di bagian ini, Anda akan membuat dua himpunan data, satu untuk sumber dan yang lain untuk sink.

Membuat dataset untuk sumber Azure Blob

Tambahkan metode yang membuat himpunan data blob Azure. Untuk informasi selengkapnya tentang properti dan detail yang didukung, lihat Properti himpunan data Azure Blob.

Tambahkan SourceBlobDatasetDefinitionmetode ke file Program.cs Anda:

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

Anda menentukan himpunan data yang mewakili data sumber di Azure Blob. Himpunan data Blob ini mengacu pada layanan tertaut Azure Storage yang Anda dukung di langkah sebelumnya. Himpunan data Blob menjelaskan lokasi blob untuk disalin dari: FolderPath dan FileName.

Perhatikan penggunaan parameter untuk FolderPath. sourceBlobContainer adalah nama parameter dan ekspresi diganti dengan nilai yang diteruskan dalam eksekusi alur. Sintaks untuk menentukan parameter adalah @pipeline().parameters.<parameterName>

Membuat himpunan data untuk Azure Blob sink

  1. Tambahkan SourceBlobDatasetDefinitionmetode ke file Program.cs Anda:

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. Tambahkan kode berikut ke metode Main yang membuat sumber Azure Blob dan himpunan data sink.

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

Buat kelas C#: EmailRequest

Dalam proyek C# Anda, buat kelas bernama EmailRequest. Kelas ini menentukan properti apa yang dikirimkan alur dalam permintaan isi saat mengirim email. Dalam tutorial ini, alur akan mengirim empat properti dari alur ke email:

  • Pesan. Isi email. Untuk salinan yang berhasil, properti ini berisi total data yang ditulis. Untuk salinan yang gagal, properti ini berisi detail kesalahan.
  • Nama pabrik data. Nama pabrik data.
  • Nama alur. Nama jalur.
  • Penerima. Parameter yang melewati. Properti ini menentukan penerima email.
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

Buat titik akhir alur kerja email

Untuk memicu pengiriman email, Anda menggunakan Azure Logic Apps untuk menentukan alur kerja. Untuk informasi selengkapnya, lihat Membuat contoh alur kerja aplikasi logika Konsumsi.

Alur Kerja Email Sukses

Di portal Azure, buat alur kerja aplikasi logika bernama CopySuccessEmail. Tambahkan pemicu Permintaan bernama Saat permintaan HTTP diterima. Di pemicu Permintaan, isi kotak skema JSON Badan Permintaan dengan JSON berikut:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

Alur kerja Anda terlihat seperti contoh berikut:

Alur kerja email berhasil

Konten JSON ini selaras dengan kelas EmailRequest yang Anda buat di bagian sebelumnya.

Tambahkan tindakan Office 365 Outlook bernama Kirim email. Untuk tindakan ini, sesuaikan cara Anda memformat email menggunakan properti yang diteruskan dalam JSON skema Isi permintaan. Berikut contohnya:

Perancang alur kerja dengan tindakan bernama Kirim email.

Setelah Anda menyimpan alur kerja, salin dan simpan nilai URL HTTP POST dari pemicu.

Alur kerja email gagal

Kloning CopySuccessEmail alur kerja aplikasi logika ke alur kerja baru bernama CopyFailEmail. Dalam pemicu Permintaan, skema JSON Badan Permintaan tetap sama. Ubah format email Anda seperti Subject untuk menyesuaikan dengan email yang gagal. Berikut adalah contoh:

Perancang alur kerja dan alur kerja email yang gagal.

Setelah Anda menyimpan alur kerja, salin dan simpan nilai URL HTTP POST dari pemicu.

Sekarang Anda harus memiliki dua URL alur kerja, seperti contoh berikut:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Buat alur

Kembali ke proyek Anda di Visual Studio. Sekarang kita akan menambahkan kode yang membuat pipeline dengan aktivitas menyalin dan properti DependsOn. Dalam tutorial ini, saluran berisi satu aktivitas, aktivitas penyalinan, yang menerima himpunan data Blob sebagai sumber dan himpunan data Blob lain sebagai sink. Jika aktivitas penyalinan berhasil atau gagal, aktivitas tersebut akan memanggil tugas email yang berbeda.

Dalam alur ini, Anda menggunakan fitur berikut:

  • Parameter
  • Aktivitas web
  • Dependensi aktivitas
  • Menggunakan output dari aktivitas sebagai input ke aktivitas lain
  1. Tambahkan metode ini ke proyek Anda. Bagian berikut ini tersedia secara lebih terperinci.

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. Tambahkan baris berikut ke metode Main yang membuat alur:

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

Parameter

Bagian pertama dari kode alur kami menentukan parameter.

  • sourceBlobContainer. Himpunan data blob sumber mengonsumsi parameter ini di dalam alur.
  • sinkBlobContainer. Himpunan data blob sink mengonsumsi parameter ini di dalam alur.
  • receiver. Dua aktivitas Web di dalam alur yang mengirim email keberhasilan atau kegagalan ke penerima menggunakan parameter ini.
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

Aktivitas web

Aktivitas Web memungkinkan panggilan ke titik akhir REST mana pun. Untuk informasi selengkapnya tentang aktivitas tersebut, lihat Aktivitas web di Azure Data Factory. Alur ini menggunakan aktivitas web untuk memanggil alur kerja email Aplikasi Logika. Anda membuat dua aktivitas web: aktivitas yang memanggil alur kerja CopySuccessEmail dan yang memanggil CopyFailWorkFlow.

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

Di properti Url, tempelkan titik akhir URL HTTP POST dari alur kerja Aplikasi Logika Anda. Di properti Body, teruskan instans kelas EmailRequest. Permintaan email berisi properti berikut:

  • Pesan. Meneruskan nilai @{activity('CopyBlobtoBlob').output.dataWritten. Mengakses properti aktivitas salin sebelumnya dan meneruskan nilai dataWritten. Untuk kasus kegagalan, lewatkan output kesalahan alih-alih @{activity('CopyBlobtoBlob').error.message.
  • Nama Data Factory. Nilai @{pipeline().DataFactory} diteruskan. Variabel sistem ini memungkinkan Anda mengakses nama pabrik pengolahan data yang sesuai. Untuk daftar variabel sistem, lihat Variabel Sistem.
  • Nama Pipeline. Meneruskan nilai @{pipeline().Pipeline}. Variabel sistem ini memungkinkan Anda untuk mengakses nama alur yang sesuai.
  • Penerima. Meneruskan nilai "@pipeline().parameters.receiver". Mengakses parameter alur.

Kode ini membuat Dependensi Aktivitas baru yang bergantung pada aktivitas penyalinan sebelumnya.

Jalankan alur tugas

Tambahkan kode berikut ke metode Main yang memicu pengoperasian pipeline.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Kelas utama

Metode Main terakhir Anda akan terlihat seperti ini.

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Bangun dan jalankan program Anda untuk memicu eksekusi alur!

Memantau eksekusi alur

  1. Tambahkan kode berikut ke metode Main:

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    Kode ini terus-menerus memeriksa status eksekusi sampai selesai menyalin data.

  2. Tambahkan kode berikut ke metode Main yang mengambil detail eksekusi aktivitas penyalinan, misalnya, ukuran data yang dibaca/ditulis:

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Menjalankan kode

Buat dan mulai aplikasi, lalu verifikasi eksekusi alur.

Aplikasi menampilkan kemajuan pembuatan pabrik data, layanan tertaut, himpunan data, alur, dan eksekusi alur. Kemudian memeriksa status eksekusi alur. Tunggu hingga Anda melihat detail pelaksanaan aktivitas penyalinan dengan ukuran data yang dibaca/ditulis. Kemudian, gunakan alat seperti Azure Storage Explorer untuk memeriksa blob disalin ke outputBlobPath dari inputBlobPath seperti yang Anda tentukan di variabel.

Output Anda akan menyerupai sampel berikut:

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

Anda melakukan tugas-tugas berikut dalam tutorial ini:

  • Membuat pabrik data
  • Membuat layanan yang terhubung ke Azure Storage
  • Buat himpunan data Azure Blob
  • Buat alur yang berisi aktivitas salin dan aktivitas web
  • Mengirim output aktivitas ke aktivitas berikutnya
  • Menggunakan parameter yang meneruskan dan variabel sistem
  • Mulai eksekusi alur
  • Pantau eksekusi alur dan aktivitas

Sekarang Anda dapat melanjutkan ke bagian Konsep untuk informasi selengkapnya tentang Azure Data Factory.