مشاركة عبر


أنشطة الإصدارات الفرعية والتسلسل في البنية الأساسية لـ Data Factory

ينطبق على: Azure Data Factory Azure Synapse Analytics

تلميح

جرب Data Factory في Microsoft Fabric، وهو حل تحليلي متكامل للمؤسسات. يغطي Microsoft Fabric كل شيء بدءا من حركة البيانات إلى علم البيانات والتحليلات في الوقت الحقيقي والمعلومات المهنية وإعداد التقارير. تعرف على كيفية بدء إصدار تجريبي جديد مجانا!

في هذا البرنامج التعليمي، يمكنك إنشاء مسار Data Factory يعرض بعض ميزات تدفق التحكم. هذا المسار ينسخ من حاوية في Azure Blob Storage إلى حاوية أخرى في حساب التخزين نفسه. إذا نجح نشاط النسخ، فسيرسل المسار تفاصيل عملية النسخ الناجحة في رسالة بريد إلكتروني. ويمكن أن تشمل تلك المعلومات كمية البيانات المكتوبة. إذا فشل نشاط النسخ، فإنه يرسل تفاصيل فشل النسخ، مثل رسالة الخطأ، في رسالة بريد إلكتروني. في البرنامج التعليمي بأكمله، ترى كيفية تمرير المعلمات.

يوضح هذا الرسم التخطيطي نظرة عامة على السيناريو:

يُظهر الرسم التخطيطي Azure Blob Storage، وهو هدف النسخة، والذي، عند النجاح، يرسل بريداً إلكترونياً يحتوي على التفاصيل أو عند الفشل، يرسل بريداً إلكترونياً يحتوي على تفاصيل الخطأ.

هذا البرنامج التعليمي يوضح لك كيفية إجراء المهام التالية:

  • إنشاء مصدرًا للبيانات
  • أنشئ خدمة مرتبطة بالتخزين في Azure
  • إنشاء مجموعة بيانات Azure Blob
  • إنشاء مسار يحتوي على نشاط نسخ ونشاط ويب.
  • إرسال مخرجات الأنشطة إلى أنشطة لاحقة.
  • استخدام تمرير المعلمة ومتغيرات النظام.
  • بدء تشغيل البنية الأساسية
  • مراقبة البنية الأساسية وتشغيل النشاط

يستخدم هذا البرنامج التعليمي .NET SDK. يمكنك استخدام آليات أخرى للتفاعل مع Azure Data Factory. للتشغيل السريع لـ Data Factory، راجع أدلة التشغيل السريع لمدة 5 دقائق.

في حال لم يكن لديك اشتراك Azure، فأنشئ حساباً مجانيّاً قبل البدء.

المتطلبات الأساسية

  • حساب في مساحة تخزين Azure. استخدام Blob Storage كمخزن بيانات مصدر. إذا لم يكن لديك حساب Azure Storage، فيمكنك إنشاء حساب تخزين.
  • مستكشف تخزين Azure. لتثبيت هذه الأداة، راجع Azure Storage Explorer.
  • قاعدة بيانات Azure SQL. يمكنك استخدام قاعدة البيانات كمخزن بيانات متلقٍ. في حالة عدم امتلاك قاعدة بيانات في Azure SQL، يرجى الرجوع إلى إنشاء قاعدة بيانات في قاعدة بيانات Azure SQL.
  • استوديو مرئي. تستخدم هذه المقالة Visual Studio 2019.
  • Azure .NET SDK. بادر بتنزيل Azure .NET SDKوتثبيته.

للاطلاع على قائمة مناطق Azure التي يتوفر فيها Data Factory حالياً، راجع المنتجات المتاحة حسب المنطقة. يمكن أن تكون مخازن البيانات والحوسبة في مناطق أخرى. تتضمن المتاجر Azure Storage وAzure SQL Database. وتشمل الحوسبة HDInsight، الذي يستخدمه Data Factory.

إنشاء تطبيق كما هو موضح في إنشاء تطبيق Microsoft Entra. تعيين التطبيق إلى الدور Contributor باتباع الإرشادات في المقالة نفسها. ستحتاج إلى عدة قيم لأجزاء لاحقة من هذا البرنامج التعليمي، مثل معرف التطبيق (العميل)ومعرف الدليل (المستأجر).

إنشاء جدول لكائن ثنائي كبير الحجم.

  1. افتح محرر النصوص. انسخ النص التالي، واحفظه محلياً باعتباره ملف input.txt.

    Ethel|Berg
    Tamika|Walsh
    
  2. افتح Azure Storage Explorer. وسّع حساب التخزين الخاص بك. انقر بزر الماوس الأيمن فوق Blob Containers وحدد Create Blob Container.

  3. سمّ الحاوية الجديدة adfv2branch، وحدد Upload لإضافة ملف input.txt إلى الحاوية.

إنشاء مشروع Visual Studio

إنشاء تطبيق وحدة التحكم ‎.NET في C#‎:

  1. افتح Visual Studio، وحدد Create a new project.
  2. فيCreate a new project، اختر Console App (.NET Framework) لـ C#‎ وحدد Next.
  3. سمّ المشروع ADFv2BranchTutorial.
  4. حدد .NET version 4.5.2، أو أعلى، ثم حدد Create.

ثبِّت حزم NuGet

  1. حدد Tools>NuGet Package Manager>Package Manager Console.

  2. في الجزء Package Manager Console، شغل الأوامر التالية لتثبيت الحزم. راجع حزمة Microsoft.Azure.Management.DataFactory nuget للاطلاع على التفاصيل.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

كيفية إنشاء مصنع بيانات العميل

  1. افتح Program.cs وأضف العبارات التالية:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. أضف هذه المتغيرات الثابتة إلى الفئة Program. استبدل العناصر النائبة، واستعمل محلها قيمك الخاصة.

    // Set variables
    static string tenantID = "<tenant ID>";
    static string applicationId = "<application ID>";
    static string authenticationKey = "<Authentication key for your application>";
    static string subscriptionId = "<Azure subscription ID>";
    static string resourceGroup = "<Azure resource group name>";
    
    static string region = "East US";
    static string dataFactoryName = "<Data factory name>";
    
    // Specify the source Azure Blob information
    static string storageAccount = "<Azure Storage account name>";
    static string storageKey = "<Azure Storage account key>";
    // confirm that you have the input.txt file placed in th input folder of the adfv2branch container.
    static string inputBlobPath = "adfv2branch/input";
    static string inputBlobName = "input.txt";
    static string outputBlobPath = "adfv2branch/output";
    static string emailReceiver = "<specify email address of the receiver>";
    
    static string storageLinkedServiceName = "AzureStorageLinkedService";
    static string blobSourceDatasetName = "SourceStorageDataset";
    static string blobSinkDatasetName = "SinkStorageDataset";
    static string pipelineName = "Adfv2TutorialBranchCopy";
    
    static string copyBlobActivity = "CopyBlobtoBlob";
    static string sendFailEmailActivity = "SendFailEmailActivity";
    static string sendSuccessEmailActivity = "SendSuccessEmailActivity";
    
  3. أضف التعليمات البرمجية التالية إلى الأسلوب Main. تُنشئ هذه التعليمة البرمجية مثيل الفئة DataFactoryManagementClient. ثم استخدم هذا الكائن لإنشاء Data Factory، وخدمة مرتبطة، ومجموعات بيانات، ومسار. يمكنك أيضًا استخدام هذا الكائن لمراقبة تفاصيل تشغيل المسار.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

إنشاء مصدرًا للبيانات

  1. أضف الأسلوب CreateOrUpdateDataFactory إلى ملف Program.cs خاصتك:

    static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating data factory " + dataFactoryName + "...");
        Factory resource = new Factory
        {
            Location = region
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
    
        Factory response;
        {
            response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource);
        }
    
        while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
        {
            System.Threading.Thread.Sleep(1000);
        }
        return response;
    }
    
  2. أضِف التعليمة البرمجية التالية إلى الأسلوب Main الذي ينشئ مصنع بيانات:

    Factory df = CreateOrUpdateDataFactory(client);
    

أنشئ خدمة مرتبطة بالتخزين في Azure

  1. أضف الأسلوب StorageLinkedServiceDefinition إلى ملف Program.cs خاصتك:

    static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client)
    {
       Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
       AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService
       {
           ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey)
       };
       Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
       LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName);
       return linkedService;
    }
    
  2. أضِف السطر التالي إلى الأسلوب Main الذي ينشئ خدمة مرتبطة بـ Azure Storage:

    client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
    

للحصول على معلومات عن الخصائص والتفاصيل المدعومة، راجع خصائص الخدمة المرتبطة.

إنشاء datasets

في هذا القسم، يمكنك إنشاء مجموعتي بيانات، واحدة للمصدر وواحدة للمتلقي.

إنشاء مجموعة بيانات لمصدر Azure Blob

أضف أسلوباً ينشئ مجموعة بيانات Azure blob. للحصول على مزيد من المعلومات عن الخصائص والتفاصيل المدعومة، راجع خصائص مجموعة بيانات Azure Blob.

أضف الأسلوب SourceBlobDatasetDefinition إلى ملف Program.cs خاصتك:

static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
    Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
    AzureBlobDataset blobDataset = new AzureBlobDataset
    {
        FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
        FileName = inputBlobName,
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        }
    };
    Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
    DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
    return dataset;
}

يمكنك تعريف مجموعة بيانات تمثل بيانات المصدر في Azure Blob. تشير مجموعة بيانات Blob هذه إلى الخدمة المرتبطة في Azure Storage والمعتمدة في الخطوة السابقة. تصف مجموعة بيانات Blob موقع الكائن الثنائي كبير الحجم للنسخ منه: FolderPath و FileName.

لاحظ استخدام المعلمات لـ FolderPath. sourceBlobContainer هو اسم المعلمة، ويُستبدَل التعبير وتُستخدم محله القيم التي جرى تمريرها في تشغيل البنية الأساسية لبرنامج ربط العمليات التجارية. بناء الجملة اللازم لتعريف المعلمات هو @pipeline().parameters.<parameterName>

إنشاء مجموعة بيانات لمتلقي Azure Blob

  1. أضف الأسلوب SourceBlobDatasetDefinition إلى ملف Program.cs خاصتك:

    static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client)
    {
        Console.WriteLine("Creating dataset " + blobSinkDatasetName + "...");
        AzureBlobDataset blobDataset = new AzureBlobDataset
        {
            FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" },
            LinkedServiceName = new LinkedServiceReference
            {
                ReferenceName = storageLinkedServiceName
            }
        };
        Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
        DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName);
        return dataset;
    }
    
  2. أضِف التعليمة البرمجية التالية إلى الأسلوب Main الذي ينشئ مجموعتي البيانات المصدر والمتلقي في Azure Blob.

    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
    
    client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
    

إنشاء الفئة C#: EmailRequest

في مشروع C# خاصتك، أنشئ فئة باسم EmailRequest. تُعرف هذه الفئة الخصائص التي يرسلها المسار في طلب النص الأساسي عند إرسال بريد إلكتروني. في هذا البرنامج التعليمي، يرسل المسار أربعة خصائص من المسار إلى البريد الإلكتروني:

  • الرسالة. نص البريد الإلكتروني. عند نجاح النسخة، تحتوي هذه الخاصية على مقدار البيانات المكتوبة. عند فشل النسخة، تحتوي هذه الخاصية على تفاصيل الخطأ.
  • اسم Data Factory. اسم مصنع البيانات.
  • اسم المسار. اسم المسار.
  • المتلقي. المعلمة التي تمر عبره. تحدد هذه الخاصية مستقبل البريد الإلكتروني.
    class EmailRequest
    {
        [Newtonsoft.Json.JsonProperty(PropertyName = "message")]
        public string message;

        [Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
        public string dataFactoryName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
        public string pipelineName;

        [Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
        public string receiver;

        public EmailRequest(string input, string df, string pipeline, string receiverName)
        {
            message = input;
            dataFactoryName = df;
            pipelineName = pipeline;
            receiver = receiverName;
        }
    }

إنشاء نقاط نهاية سير عمل البريد الإلكتروني

لتشغيل إرسال بريد إلكتروني، يمكنك استخدام Azure Logic Apps لتعريف سير العمل. لمزيد من المعلومات، راجع إنشاء مثال سير عمل تطبيق منطق الاستهلاك.

سير عمل البريد الإلكتروني الناجح

في مدخل Microsoft Azure، قم بإنشاء سير عمل تطبيق منطقي باسم CopySuccessEmail. أضف مشغل الطلب المسمى عند تلقي طلب HTTP. في مشغل الطلب، املأ مربع مخطط نص الطلب JSON ب JSON التالي:

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

يبدو سير العمل الخاص بك على النحو التالي:

سير عمل البريد الإلكتروني الناجح

يتوافق محتوى JSON هذا مع الفئة EmailRequest التي أنشأتها في القسم السابق.

أضف إجراء Office 365 Outlook المسمى إرسال بريد إلكتروني. لهذا الإجراء، قم بتخصيص الطريقة التي ترغب بها في تنسيق البريد الإلكتروني، باستخدام الخصائص التي تم تمريرها في مخطط Body JSON للطلب. إليك مثال:

مصمم سير العمل مع الإجراء المسمى إرسال بريد إلكتروني.

بعدما تحفظ سير العمل، انسخ قيمة HTTP POST URL واحفظها من المشغل.

فشل سير عمل البريد الإلكتروني

CopySuccessEmail استنساخ سير عمل التطبيق المنطقي إلى سير عمل جديد يسمى CopyFailEmail. في مشغل الطلب، مخطط نص الطلب JSON هو نفسه. تغيير تنسيق البريد الإلكتروني الخاص بك، مثل Subject لكي يتناسب مع إخفاق البريد الإلكتروني. إليك مثال:

مصمم سير العمل وفشل سير عمل البريد الإلكتروني.

بعدما تحفظ سير العمل، انسخ قيمة HTTP POST URL واحفظها من المشغل.

يجب أن يكون لديك الآن عناوين URL لسير العمل، مثل الأمثلة التالية:

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

إنشاء البنية الأساسية لبرنامج ربط العمليات التجارية

عد إلى مشروعك في Visual Studio. سنضيف الآن التعليمة البرمجية التي تنشئ مساراً مع نشاط النسخة وخاصية DependsOn. في هذا البرنامج التعليمي، يحتوي المسار على نشاط واحد، وهو نشاط النسخ، الذي يستوعب مجموعة بيانات Blob كمصدر، ومجموعة بيانات Blob أخرى كمتلقٍ. إذا نجح نشاط النسخ أو فشل، فإنه يستدعي مهام بريد إلكتروني مختلفة.

في هذا المسار، يمكنك استخدام الميزات التالية:

  • المعلمات
  • نشاط الويب
  • تبعية النشاط.
  • استخدام الإخراج من نشاط معين كإدخال لنشاط آخر.
  1. أضف هذا الأسلوب إلى المشروع الخاص بك. توفر الأقسام التالية المزيد من التفاصيل.

    static PipelineResource PipelineDefinition(DataFactoryManagementClient client)
            {
                Console.WriteLine("Creating pipeline " + pipelineName + "...");
                PipelineResource resource = new PipelineResource
                {
                    Parameters = new Dictionary<string, ParameterSpecification>
                    {
                        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
                        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    
                    },
                    Activities = new List<Activity>
                    {
                        new CopyActivity
                        {
                            Name = copyBlobActivity,
                            Inputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSourceDatasetName
                                }
                            },
                            Outputs = new List<DatasetReference>
                            {
                                new DatasetReference
                                {
                                    ReferenceName = blobSinkDatasetName
                                }
                            },
                            Source = new BlobSource { },
                            Sink = new BlobSink { }
                        },
                        new WebActivity
                        {
                            Name = sendSuccessEmailActivity,
                            Method = WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Succeeded" }
                                }
                            }
                        },
                        new WebActivity
                        {
                            Name = sendFailEmailActivity,
                            Method =WebActivityMethod.POST,
                            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000",
                            Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
                            DependsOn = new List<ActivityDependency>
                            {
                                new ActivityDependency
                                {
                                    Activity = copyBlobActivity,
                                    DependencyConditions = new List<String> { "Failed" }
                                }
                            }
                        }
                    }
                };
                Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings));
                return resource;
            }
    
  2. أضف التعليمة السطر التالي إلى الأسلوب Main الذي ينشئ المسار:

    client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
    

المعلمات

القسم الأول من التعليمة البرمجية للمسار الخاص بنا يحدد المعلمات.

  • sourceBlobContainer. تستهلك مجموعة بيانات Blob المصدر هذه المعلمة في المسار.
  • sinkBlobContainer. تستهلك مجموعة بيانات Blob المتلقي هذه المعلمة في المسار.
  • receiver. يستخدم نشاطا الويب في المسار الذي يرسل رسائل البريد الإلكتروني الناجحة أو الفاشلة إلى المتلقي هذه المعلمة.
Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
        { "receiver", new ParameterSpecification { Type = ParameterType.String } }
    },

نشاط الويب

يسمح نشاط الويب بالاتصال بأي نقطة نهاية REST. لمزيد من المعلومات عن النشاط، راجع نشاط الويب في Azure Data Factory. يستخدم هذا المسار نشاط ويب للاتصال بسير عمل البريد الإلكتروني لـLogic Apps. يمكنك إنشاء نشاطي ويب: أحدهما يستدعي إلى سير العمل CopySuccessEmail، والآخر يستدعي CopyFailWorkFlow.

        new WebActivity
        {
            Name = sendCopyEmailActivity,
            Method = WebActivityMethod.POST,
            Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
            Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
            DependsOn = new List<ActivityDependency>
            {
                new ActivityDependency
                {
                    Activity = copyBlobActivity,
                    DependencyConditions = new List<String> { "Succeeded" }
                }
            }
        }

في الخاصية Url، الصق نقاط نهاية HTTP POST URL من مسارات عمل Logic Apps خاصتك. في الخاصية Body، مرر مثيل الفئة EmailRequest. يحتوي طلب البريد الإلكتروني على الخصائص التالية:

  • الرسالة. تمرير القيمة @{activity('CopyBlobtoBlob').output.dataWritten. الوصول إلى خاصية نشاط النسخة السابقة، وتمرير قيمة dataWritten. فيما يتعلق بحالة الإخفاق، مرر إخراج الخطأ بدلاً من @{activity('CopyBlobtoBlob').error.message.
  • اسم Data Factory. تمرير القيمة @{pipeline().DataFactory} يتيح لك متغير النظام هذا الوصول إلى اسم مصنع البيانات المطابق. للحصول على قائمة بمتغيرات النظام، راجع متغيرات النظام.
  • اسم المسار. تمرير القيمة @{pipeline().Pipeline}. يسمح لك متغير النظام هذا بالوصول إلى اسم المسار المطابق.
  • المتلقي. تمرير القيمة "@pipeline().parameters.receiver". الوصول إلى معلمات المسار.

تنشئ التعليمة البرمجية هذه تبعية نشاط جديدة تعتمد على نشاط النسخة السابقة.

إنشاء تشغيل البنية الأساسية

أضِف التعليمة البرمجية التالية إلى الأسلوب Mainالذي يشغل المسار.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

الفئة الرئيسية

يجب أن يبدو الأسلوب النهائي ⁧Main⁩خاصتك كما يلي.

// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };

Factory df = CreateOrUpdateDataFactory(client);

client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));

Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
    { "sourceBlobContainer", inputBlobPath },
    { "sinkBlobContainer", outputBlobPath },
    { "receiver", emailReceiver }
};

CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

بناء وتشغيل البرنامج الخاص بك لتشغيل مسار!

مراقبة تشغيل البنية الأساسية

  1. أضف التعليمات البرمجية Main التالية إلى الأسلوب :

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    

    تتحقق هذه التعليمة البرمجية باستمرار من حالة التشغيل حتى تنتهي من نسخ البيانات.

  2. أضِف التعليمة البرمجية التالية إلى الأسلوب Mainالذي يسترد تفاصيل تشغيل نشاط النسخ، مثل حجم البيانات التي يجري قراءتها/ كتابتها:

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun(
    resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList();
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(activityRuns.First().Output);
        //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons);
    }
    else
        Console.WriteLine(activityRuns.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

تشغيل التعليمات البرمجية

أنشئ وابدأ تشغيل التطبيق، ثم تحقق من تنفيذ المسار.

يعرض التطبيق التقدم المحرز في إنشاء Data Factory والخدمة المرتبطة ومجموعات البيانات والمسارات وتشغيل المسارات. ثم يتحقق من حالة تشغيل المسار. انتظر حتى ترى تفاصيل تشغيل نشاط النسخ مع حجم البيانات المقروءة / المكتوبة. ثم استخدم أدوات مثل Azure Storage Explorer للتحقق من نسخ الكائنات الثنائية كبيرة الحجم إلى outputBlobPath من inputBlobPath كما حددته في المتغيرات.

يجب أن يبدو الإخراج كما يلي:

Creating data factory DFTutorialTest...
{
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "type": "AzureStorage",
  "typeProperties": {
    "connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
  }
}
Creating dataset SourceStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sourceBlobContainer"
    },
    "fileName": "input.txt"
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating dataset SinkStorageDataset...
{
  "type": "AzureBlob",
  "typeProperties": {
    "folderPath": {
      "type": "Expression",
      "value": "@pipeline().parameters.sinkBlobContainer"
    }
  },
  "linkedServiceName": {
    "type": "LinkedServiceReference",
    "referenceName": "AzureStorageLinkedService"
  }
}
Creating pipeline Adfv2TutorialBranchCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SourceStorageDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SinkStorageDataset"
          }
        ],
        "name": "CopyBlobtoBlob"
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendSuccessEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Succeeded"
            ]
          }
        ]
      },
      {
        "type": "WebActivity",
        "typeProperties": {
          "method": "POST",
          "url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
          "body": {
            "message": "@{activity('CopyBlobtoBlob').error.message}",
            "dataFactoryName": "@{pipeline().DataFactory}",
            "pipelineName": "@{pipeline().Pipeline}",
            "receiver": "@pipeline().parameters.receiver"
          }
        },
        "name": "SendFailEmailActivity",
        "dependsOn": [
          {
            "activity": "CopyBlobtoBlob",
            "dependencyConditions": [
              "Failed"
            ]
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      },
      "receiver": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 20,
  "dataWritten": 20,
  "copyDuration": 4,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}

Press any key to exit...

لقد نفذت المهام التالية في هذا البرنامج التعليمي:

  • إنشاء مصدرًا للبيانات
  • أنشئ خدمة مرتبطة بالتخزين في Azure
  • إنشاء مجموعة بيانات Azure Blob
  • إنشاء مسار يحتوي على نشاط نسخ ونشاط ويب.
  • إرسال مخرجات الأنشطة إلى أنشطة لاحقة.
  • استخدام تمرير المعلمة ومتغيرات النظام.
  • بدء تشغيل البنية الأساسية
  • مراقبة البنية الأساسية وتشغيل النشاط

يمكنك الآن متابعة قسم المفاهيم للحصول على مزيد من المعلومات حول Azure Data Factory.