Aracılığıyla paylaş


Data Factory işlem hattında dallanma ve zincirleme etkinlikleri

UYGULANANLAR: Azure Data Factory Azure Synapse Analytics

İpucu

Kuruluşlar için hepsi bir arada analiz çözümü olan Microsoft Fabric'te Data Factory'yi deneyin. Microsoft Fabric , veri taşımadan veri bilimine, gerçek zamanlı analize, iş zekasına ve raporlamaya kadar her şeyi kapsar. Yeni bir deneme sürümünü ücretsiz olarak başlatmayı öğrenin!

Bu öğreticide, bazı denetim akışı özelliklerini gösteren bir Data Factory işlem hattı oluşturacaksınız. Bu işlem hattı, Azure Blob Depolama içindeki bir kapsayıcıdan aynı depolama hesabındaki başka bir kapsayıcıya kopyalanır. Kopyalama etkinliği başarılı olursa işlem hattı başarılı kopyalama işleminin ayrıntılarını e-postayla gönderir. Bu bilgiler, yazılan veri miktarını içerebilir. Kopyalama etkinliği başarısız olursa, kopyalama hatasının ayrıntılarını (hata iletisi gibi) bir e-postayla gönderir. Öğretici boyunca parametreleri nasıl geçireceğinizi göreceksiniz.

Bu grafik, senaryoya genel bir bakış sağlar:

Diyagramda, başarı durumunda ayrıntıları içeren bir e-posta gönderen veya hata ayrıntılarını içeren bir e-posta gönderen bir kopyanın hedefi olan Azure Blob Depolama gösterilmektedir.

Bu öğreticide aşağıdaki görevlerin nasıl gerçekleştirdiğiniz gösterilmektedir:

  • Veri fabrikası oluşturma
  • Azure Depolama bağlı hizmeti oluşturma
  • Azure blob veri kümesi oluşturma
  • Kopyalama etkinliği ve bir web etkinliği içeren işlem hattı oluşturma
  • Etkinliklerin çıktılarını sonraki etkinliklere gönderme
  • Parametre geçirme ve sistem değişkenlerini kullanma
  • Bir işlem hattı çalıştırması başlatma
  • İşlem hattı ve etkinlik çalıştırmalarını izleme

Bu öğreticide .NET SDK kullanılır. Azure Data Factory ile etkileşime geçmek için diğer mekanizmaları kullanabilirsiniz. Data Factory hızlı başlangıçları için bkz . 5 Dakikalık Hızlı Başlangıçlar.

Azure aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.

Önkoşullar

  • Azure Depolama hesabı. Blob depolamayı kaynak veri deposu olarak kullanırsınız. Azure depolama hesabınız yoksa bkz . Depolama hesabı oluşturma.
  • Azure Depolama Gezgini. Bu aracı yüklemek için bkz. Azure Depolama Gezgini.
  • Azure SQL Veritabanı. Veritabanını havuz veri deposu olarak kullanabilirsiniz. Azure SQL Veritabanı'da veritabanınız yoksa bkz. Azure SQL Veritabanı'da veritabanı oluşturma.
  • Visual Studio. Bu makalede Visual Studio 2019 kullanılır.
  • Azure .NET SDK. Azure .NET SDK'sını indirip yükleyin.

Data Factory'nin şu anda kullanılabilir olduğu Azure bölgelerinin listesi için bkz . Bölgeye göre kullanılabilir ürünler. Veri depoları ve işlemleri başka bölgelerde olabilir. Depolar Azure Depolama ve Azure SQL Veritabanı içerir. İşlemler, Data Factory tarafından kullanılan HDInsight'ı içerir.

Microsoft Entra uygulaması oluşturma bölümünde açıklandığı gibi bir uygulama oluşturun. Aynı makaledeki yönergeleri izleyerek uygulamayı Katkıda Bulunan rolüne atayın. Bu öğreticinin sonraki bölümlerinde Uygulama (istemci) kimliği ve Dizin (kiracı) kimliği gibi çeşitli değerlere ihtiyacınız olacaktır.

Blob tablosu oluşturma

  1. Bir metin düzenleyicisini açın. Aşağıdaki metni kopyalayın ve yerel olarak input.txt olarak kaydedin.

    Ethel|Berg
    Tamika|Walsh
    
  2. Azure Depolama Gezgini'i açın. Depolama hesabınızı genişletin. Blob Kapsayıcıları'na sağ tıklayın ve Blob Kapsayıcısı Oluştur'u seçin.

  3. Yeni kapsayıcıya adfv2branch adını verin ve input.txt dosyanızı kapsayıcıya eklemek için Karşıya Yükle'yi seçin.

Visual Studio projesi oluşturma

C# .NET konsol uygulaması oluşturma:

  1. Visual Studio'yu başlatın ve Yeni proje oluştur'u seçin.
  2. Yeni proje oluştur bölümünde C# için Konsol Uygulaması (.NET Framework) seçeneğini belirleyin ve İleri'yi seçin.
  3. Projeye ADFv2BranchTutorial adını verin.
  4. .NET sürüm 4.5.2 veya üzerini ve ardından Oluştur'u seçin.

NuGet paketlerini yükleme

  1. Araçlar

  2. paketleri yüklemek için Paket Yöneticisi Konsolu'nda aşağıdaki komutları çalıştırın. Ayrıntılar için Microsoft.Azure.Management.DataFactory nuget paketine bakın.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Veri fabrikası istemcisi oluşturma

  1. Program.cs açın ve aşağıdaki deyimleri ekleyin:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Bu statik değişkenleri sınıfına Program ekleyin. Yer tutucuları kendi değerlerinizle değiştirin.

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. Aşağıdaki kodu Main yöntemine ekleyin. Bu kod, sınıfın bir örneğini DataFactoryManagementClient oluşturur. Ardından bu nesneyi veri fabrikası, bağlı hizmet, veri kümeleri ve işlem hattı oluşturmak için kullanırsınız. İşlem hattı çalıştırma ayrıntılarını izlemek için de bu nesneyi kullanabilirsiniz.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Veri fabrikası oluşturma

  1. Program.csCreateOrUpdateDataFactorybir yöntem ekleyin:

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. Veri fabrikası oluşturan yönteme Main aşağıdaki satırı ekleyin:

    Factory df = CreateOrUpdateDataFactory(client);
    

Azure Depolama bağlı hizmeti oluşturma

  1. Program.csStorageLinkedServiceDefinitionbir yöntem ekleyin:

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. Azure Depolama bağlı hizmeti oluşturan yönteme Main aşağıdaki satırı ekleyin:

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

Desteklenen özellikler ve ayrıntılar hakkında daha fazla bilgi için bkz . Bağlı hizmet özellikleri.

Veri kümeleri oluşturma

Bu bölümde biri kaynak, diğeri havuz için olan iki veri kümesi oluşturacaksınız.

Kaynak Azure Blobu için veri kümesi oluşturma

Azure blob veri kümesi oluşturan bir yöntem ekleyin. Desteklenen özellikler ve ayrıntılar hakkında daha fazla bilgi için bkz . Azure Blob veri kümesi özellikleri.

Program.csSourceBlobDatasetDefinitionbir yöntem ekleyin:

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

Azure Blob’da kaynak verilerini temsil eden bir veri kümesi tanımlayın. Bu Blob veri kümesi, önceki adımda desteklenen Azure Depolama bağlı hizmetini ifade eder. Blob veri kümesi, kopyalanacak blobun konumunu açıklar: FolderPath ve FileName.

FolderPath için parametrelerin kullanımına dikkat edin. sourceBlobContainer parametresinin adıdır ve ifade, işlem hattı çalıştırmasında geçirilen değerlerle değiştirilir. Parametreleri tanımlamaya yönelik söz dizimi @pipeline().parameters.<parameterName>

Havuz Azure Blobu için veri kümesi oluşturma

  1. Program.csSourceBlobDatasetDefinitionbir yöntem ekleyin:

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. Hem Azure Blob kaynağı hem de havuz veri kümelerini oluşturan yöntemine aşağıdaki kodu Main ekleyin.

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

C# sınıfı oluşturma: EmailRequest

C# projenizde adlı EmailRequestbir sınıf oluşturun. Bu sınıf, e-posta gönderirken işlem hattının gövde isteğinde hangi özellikleri gönderdiğini tanımlar. Bu öğreticide işlem hattı, işlem hattından e-postaya dört özellik gönderir:

  • İleti. E-postanın gövdesi. Başarılı bir kopya için bu özellik yazılan veri miktarını içerir. Başarısız bir kopya için bu özellik hatanın ayrıntılarını içerir.
  • Veri fabrikası adı. Veri fabrikasının adı.
  • İşlem hattı adı. İşlem hattının adı.
  • Ahize. Geçen parametre. Bu özellik, e-postanın alıcısını belirtir.
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

E-posta iş akışı uç noktaları oluşturma

E-posta göndermeyi tetikleme amacıyla iş akışını tanımlamak için Azure Logic Apps'i kullanırsınız. Daha fazla bilgi için bkz . Örnek tüketim mantığı uygulaması iş akışı oluşturma.

Başarı e-postası iş akışı

Azure portalında adlı CopySuccessEmailbir mantıksal uygulama iş akışı oluşturun. HTTP isteği alındığında adlı İstek tetikleyicisini ekleyin. İstek tetikleyicisinde İstek Gövdesi JSON şema kutusunu aşağıdaki JSON ile doldurun:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

İş akışınız aşağıdaki örneğe benzer:

Başarı e-postası iş akışı

Bu JSON içeriği, önceki bölümde oluşturduğunuz sınıfla EmailRequest hizalanır.

E-posta gönder adlı Office 365 Outlook eylemini ekleyin. Bu eylem için, istek Gövdesi JSON şemasında geçirilen özellikleri kullanarak e-postayı nasıl biçimlendirmek istediğinizi özelleştirin. Bir örnek aşağıda verilmiştir:

E-posta gönder adlı eyleme sahip iş akışı tasarımcısı.

İş akışını kaydettikten sonra, tetikleyiciden HTTP POST URL değerini kopyalayıp kaydedin.

Hata e-postası iş akışı

CopySuccessEmail Mantıksal uygulama iş akışını adlı CopyFailEmailyeni bir iş akışına kopyalama. İstek tetikleyicisinde İstek Gövdesi JSON şeması aynıdır. Hata e-postasına uyarlamak için e-postanızın biçimini Subject olarak değiştirin. Örnek aşağıda verilmiştir:

İş akışı tasarımcısı ve başarısız e-posta iş akışı.

İş akışını kaydettikten sonra, tetikleyiciden HTTP POST URL değerini kopyalayıp kaydedin.

Şimdi aşağıdaki örnekler gibi iki iş akışı URL'niz olmalıdır:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

İşlem hattı oluşturma

Visual Studio'da projenize geri dönün. Şimdi kopyalama etkinliği ve DependsOn özelliğiyle işlem hattı oluşturan kodu ekleyeceğiz. Bu öğreticide işlem hattı bir etkinlik, Blob veri kümesini kaynak olarak alan bir kopyalama etkinliği ve havuz olarak başka bir Blob veri kümesi içerir. Kopyalama etkinliği başarılı veya başarısız olursa, farklı e-posta görevlerini çağırır.

Bu işlem hattında aşağıdaki özellikleri kullanırsınız:

  • Parametreler
  • Web etkinliği
  • Etkinlik bağımlılığı
  • Bir etkinlikten çıkışı başka bir etkinliğe giriş olarak kullanma
  1. Bu yöntemi projenize ekleyin. Aşağıdaki bölümler daha ayrıntılı olarak sağlanır.

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. İşlem hattını oluşturan yönteme Main aşağıdaki satırı ekleyin:

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

Parametreler

İşlem hattı kodumuzun ilk bölümü parametreleri tanımlar.

  • sourceBlobContainer. Kaynak blob veri kümesi bu parametreyi işlem hattında kullanır.
  • sinkBlobContainer. Havuz blobu veri kümesi bu parametreyi işlem hattında tüketir.
  • receiver. İşlem hattındaki alıcıya başarı veya başarısızlık e-postaları gönderen iki Web etkinliği bu parametreyi kullanır.
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

Web etkinliği

Web etkinliği herhangi bir REST uç noktasına çağrı yapılmasına izin verir. Etkinlik hakkında daha fazla bilgi için bkz . Azure Data Factory'de web etkinliği. Bu işlem hattı, Logic Apps e-posta iş akışını çağırmak için bir web etkinliği kullanır. İki web etkinliği oluşturursunuz: biri iş akışına CopySuccessEmail , diğeri de öğesini çağıran CopyFailWorkFlow.

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

özelliğine Url Logic Apps iş akışlarınızdan HTTP POST URL uç noktalarını yapıştırın. özelliğinde Body sınıfının bir örneğini EmailRequest geçirin. E-posta isteği aşağıdaki özellikleri içerir:

  • İleti. değerini @{activity('CopyBlobtoBlob').output.dataWrittengeçirir. Önceki kopyalama etkinliğinin bir özelliğine erişir ve değerini dataWrittengeçirir. Hata durumunda @{activity('CopyBlobtoBlob').error.message yerine hata çıktısını geçirir.
  • Data Factory Adı. Bu sistem değişkeninin @{pipeline().DataFactory} değerini geçirir, karşılık gelen veri fabrikası adına erişmenizi sağlar. Sistem değişkenlerinin listesi için bkz . Sistem Değişkenleri.
  • İşlem Hattı Adı. değerini @{pipeline().Pipeline}geçirir. Bu sistem değişkeni, ilgili işlem hattı adına erişmenizi sağlar.
  • Ahize. değerini "@pipeline().parameters.receiver"geçirir. İşlem hattı parametrelerine erişir.

Bu kod, önceki kopyalama etkinliğine bağlı olan yeni bir Etkinlik Bağımlılığı oluşturur.

İşlem hattı çalıştırması oluşturma

İşlem hattı çalıştırmasını tetikleyen yöntemine Main aşağıdaki kodu ekleyin.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Main sınıfı

Son Main yönteminiz şöyle görünmelidir.

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Bir işlem hattı çalıştırması tetiklemek için programınızı derleyip çalıştırın!

İşlem hattı çalıştırmasını izleme

  1. Main yöntemine aşağıdaki kodu ekleyin:

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    Bu kod, verilerin kopyalanması bitene kadar çalıştırmanın durumunu sürekli denetler.

  2. Kopyalama etkinliği çalıştırma ayrıntılarını (örneğin, okunan/yazılan verilerin boyutu) alan yönteme aşağıdaki kodu Main ekleyin:

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Kodu çalıştırma

Uygulamayı derleyip başlatın, ardından işlem hattı yürütmesini doğrulayın.

Uygulama, veri fabrikası, bağlı hizmet, veri kümeleri, işlem hattı ve işlem hattı çalıştırması oluşturma işleminin ilerleme durumunu görüntüler. Daha sonra işlem hattı çalıştırma durumunu denetler. Okunan/yazılan veri boyutunu içeren kopyalama etkinliği ayrıntılarını görene kadar bekleyin. Ardından, değişkenlerde belirttiğiniz gibi blob'un inputBlobPath'ten outputBlobPath'e kopyalandığını denetlemek için Azure Depolama Gezgini gibi araçları kullanın.

Çıkışınız aşağıdaki örneğe benzemelidir:

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

Bu öğreticide aşağıdaki görevleri gerçekleştirdiniz:

  • Veri fabrikası oluşturma
  • Azure Depolama bağlı hizmeti oluşturma
  • Azure blob veri kümesi oluşturma
  • Kopyalama etkinliği ve bir web etkinliği içeren işlem hattı oluşturma
  • Etkinliklerin çıktılarını sonraki etkinliklere gönderme
  • Parametre geçirme ve sistem değişkenlerini kullanma
  • Bir işlem hattı çalıştırması başlatma
  • İşlem hattı ve etkinlik çalıştırmalarını izleme

Artık Azure Data Factory hakkında daha fazla bilgi için Kavramlar bölümüne geçebilirsiniz.