Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
UYGULANANLAR:
Azure Data Factory
Azure Synapse Analytics
İpucu
Kuruluşlar için hepsi bir arada analiz çözümü olan Microsoft Fabric'te Data Factory'yi deneyin. Microsoft Fabric , veri taşımadan veri bilimine, gerçek zamanlı analize, iş zekasına ve raporlamaya kadar her şeyi kapsar. Yeni bir deneme sürümünü ücretsiz olarak başlatmayı öğrenin!
Bu öğreticide, bazı denetim akışı özelliklerini gösteren bir Data Factory işlem hattı oluşturacaksınız. Bu işlem hattı, Azure Blob Depolama içindeki bir kapsayıcıdan aynı depolama hesabındaki başka bir kapsayıcıya kopyalanır. Kopyalama etkinliği başarılı olursa işlem hattı başarılı kopyalama işleminin ayrıntılarını e-postayla gönderir. Bu bilgiler, yazılan veri miktarını içerebilir. Kopyalama etkinliği başarısız olursa, kopyalama hatasının ayrıntılarını (hata iletisi gibi) bir e-postayla gönderir. Öğretici boyunca parametreleri nasıl geçireceğinizi göreceksiniz.
Bu grafik, senaryoya genel bir bakış sağlar:
Bu öğreticide aşağıdaki görevlerin nasıl gerçekleştirdiğiniz gösterilmektedir:
- Veri fabrikası oluşturma
- Azure Depolama bağlı hizmeti oluşturma
- Azure blob veri kümesi oluşturma
- Kopyalama etkinliği ve bir web etkinliği içeren işlem hattı oluşturma
- Etkinliklerin çıktılarını sonraki etkinliklere gönderme
- Parametre geçirme ve sistem değişkenlerini kullanma
- Bir işlem hattı çalıştırması başlatma
- İşlem hattı ve etkinlik çalıştırmalarını izleme
Bu öğreticide .NET SDK kullanılır. Azure Data Factory ile etkileşime geçmek için diğer mekanizmaları kullanabilirsiniz. Data Factory hızlı başlangıçları için bkz . 5 Dakikalık Hızlı Başlangıçlar.
Azure aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.
Önkoşullar
- Azure Depolama hesabı. Blob depolamayı kaynak veri deposu olarak kullanırsınız. Azure depolama hesabınız yoksa bkz . Depolama hesabı oluşturma.
- Azure Depolama Gezgini. Bu aracı yüklemek için bkz. Azure Depolama Gezgini.
- Azure SQL Veritabanı. Veritabanını havuz veri deposu olarak kullanabilirsiniz. Azure SQL Veritabanı'da veritabanınız yoksa bkz. Azure SQL Veritabanı'da veritabanı oluşturma.
- Visual Studio. Bu makalede Visual Studio 2019 kullanılır.
- Azure .NET SDK. Azure .NET SDK'sını indirip yükleyin.
Data Factory'nin şu anda kullanılabilir olduğu Azure bölgelerinin listesi için bkz . Bölgeye göre kullanılabilir ürünler. Veri depoları ve işlemleri başka bölgelerde olabilir. Depolar Azure Depolama ve Azure SQL Veritabanı içerir. İşlemler, Data Factory tarafından kullanılan HDInsight'ı içerir.
Microsoft Entra uygulaması oluşturma bölümünde açıklandığı gibi bir uygulama oluşturun. Aynı makaledeki yönergeleri izleyerek uygulamayı Katkıda Bulunan rolüne atayın. Bu öğreticinin sonraki bölümlerinde Uygulama (istemci) kimliği ve Dizin (kiracı) kimliği gibi çeşitli değerlere ihtiyacınız olacaktır.
Blob tablosu oluşturma
Bir metin düzenleyicisini açın. Aşağıdaki metni kopyalayın ve yerel olarak input.txt olarak kaydedin.
Ethel|Berg Tamika|WalshAzure Depolama Gezgini'i açın. Depolama hesabınızı genişletin. Blob Kapsayıcıları'na sağ tıklayın ve Blob Kapsayıcısı Oluştur'u seçin.
Yeni kapsayıcıya adfv2branch adını verin ve input.txt dosyanızı kapsayıcıya eklemek için Karşıya Yükle'yi seçin.
Visual Studio projesi oluşturma
C# .NET konsol uygulaması oluşturma:
- Visual Studio'yu başlatın ve Yeni proje oluştur'u seçin.
- Yeni proje oluştur bölümünde C# için Konsol Uygulaması (.NET Framework) seçeneğini belirleyin ve İleri'yi seçin.
- Projeye ADFv2BranchTutorial adını verin.
- .NET sürüm 4.5.2 veya üzerini ve ardından Oluştur'u seçin.
NuGet paketlerini yükleme
Araçlar
paketleri yüklemek için Paket Yöneticisi Konsolu'nda aşağıdaki komutları çalıştırın. Ayrıntılar için Microsoft.Azure.Management.DataFactory nuget paketine bakın.
Install-Package Microsoft.Azure.Management.DataFactory Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
Veri fabrikası istemcisi oluşturma
Program.cs açın ve aşağıdaki deyimleri ekleyin:
using System; using System.Collections.Generic; using System.Linq; using Microsoft.Rest; using Microsoft.Azure.Management.ResourceManager; using Microsoft.Azure.Management.DataFactory; using Microsoft.Azure.Management.DataFactory.Models; using Microsoft.IdentityModel.Clients.ActiveDirectory;Bu statik değişkenleri sınıfına
Programekleyin. Yer tutucuları kendi değerlerinizle değiştirin.// Set variables static string tenantID = "<tenant ID>"; static string applicationId = "<application ID>"; static string authenticationKey = "<Authentication key for your application>"; static string subscriptionId = "<Azure subscription ID>"; static string resourceGroup = "<Azure resource group name>"; static string region = "East US"; static string dataFactoryName = "<Data factory name>"; // Specify the source Azure Blob information static string storageAccount = "<Azure Storage account name>"; static string storageKey = "<Azure Storage account key>"; // confirm that you have the input.txt file placed in th input folder of the adfv2branch container. static string inputBlobPath = "adfv2branch/input"; static string inputBlobName = "input.txt"; static string outputBlobPath = "adfv2branch/output"; static string emailReceiver = "<specify email address of the receiver>"; static string storageLinkedServiceName = "AzureStorageLinkedService"; static string blobSourceDatasetName = "SourceStorageDataset"; static string blobSinkDatasetName = "SinkStorageDataset"; static string pipelineName = "Adfv2TutorialBranchCopy"; static string copyBlobActivity = "CopyBlobtoBlob"; static string sendFailEmailActivity = "SendFailEmailActivity"; static string sendSuccessEmailActivity = "SendSuccessEmailActivity";Aşağıdaki kodu
Mainyöntemine ekleyin. Bu kod, sınıfın bir örneğiniDataFactoryManagementClientoluşturur. Ardından bu nesneyi veri fabrikası, bağlı hizmet, veri kümeleri ve işlem hattı oluşturmak için kullanırsınız. İşlem hattı çalıştırma ayrıntılarını izlemek için de bu nesneyi kullanabilirsiniz.// Authenticate and create a data factory management client var context = new AuthenticationContext("https://login.windows.net/" + tenantID); ClientCredential cc = new ClientCredential(applicationId, authenticationKey); AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result; ServiceClientCredentials cred = new TokenCredentials(result.AccessToken); var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Veri fabrikası oluşturma
Program.cs
CreateOrUpdateDataFactorybir yöntem ekleyin:static Factory CreateOrUpdateDataFactory(DataFactoryManagementClient client) { Console.WriteLine("Creating data factory " + dataFactoryName + "..."); Factory resource = new Factory { Location = region }; Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings)); Factory response; { response = client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, resource); } while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation") { System.Threading.Thread.Sleep(1000); } return response; }Veri fabrikası oluşturan yönteme
Mainaşağıdaki satırı ekleyin:Factory df = CreateOrUpdateDataFactory(client);
Azure Depolama bağlı hizmeti oluşturma
Program.cs
StorageLinkedServiceDefinitionbir yöntem ekleyin:static LinkedServiceResource StorageLinkedServiceDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating linked service " + storageLinkedServiceName + "..."); AzureStorageLinkedService storageLinkedService = new AzureStorageLinkedService { ConnectionString = new SecureString("DefaultEndpointsProtocol=https;AccountName=" + storageAccount + ";AccountKey=" + storageKey) }; Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)); LinkedServiceResource linkedService = new LinkedServiceResource(storageLinkedService, name:storageLinkedServiceName); return linkedService; }Azure Depolama bağlı hizmeti oluşturan yönteme
Mainaşağıdaki satırı ekleyin:client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
Desteklenen özellikler ve ayrıntılar hakkında daha fazla bilgi için bkz . Bağlı hizmet özellikleri.
Veri kümeleri oluşturma
Bu bölümde biri kaynak, diğeri havuz için olan iki veri kümesi oluşturacaksınız.
Kaynak Azure Blobu için veri kümesi oluşturma
Azure blob veri kümesi oluşturan bir yöntem ekleyin. Desteklenen özellikler ve ayrıntılar hakkında daha fazla bilgi için bkz . Azure Blob veri kümesi özellikleri.
Program.csSourceBlobDatasetDefinitionbir yöntem ekleyin:
static DatasetResource SourceBlobDatasetDefinition(DataFactoryManagementClient client)
{
Console.WriteLine("Creating dataset " + blobSourceDatasetName + "...");
AzureBlobDataset blobDataset = new AzureBlobDataset
{
FolderPath = new Expression { Value = "@pipeline().parameters.sourceBlobContainer" },
FileName = inputBlobName,
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = storageLinkedServiceName
}
};
Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
DatasetResource dataset = new DatasetResource(blobDataset, name:blobSourceDatasetName);
return dataset;
}
Azure Blob’da kaynak verilerini temsil eden bir veri kümesi tanımlayın. Bu Blob veri kümesi, önceki adımda desteklenen Azure Depolama bağlı hizmetini ifade eder. Blob veri kümesi, kopyalanacak blobun konumunu açıklar: FolderPath ve FileName.
FolderPath için parametrelerin kullanımına dikkat edin.
sourceBlobContainer parametresinin adıdır ve ifade, işlem hattı çalıştırmasında geçirilen değerlerle değiştirilir. Parametreleri tanımlamaya yönelik söz dizimi @pipeline().parameters.<parameterName>
Havuz Azure Blobu için veri kümesi oluşturma
Program.cs
SourceBlobDatasetDefinitionbir yöntem ekleyin:static DatasetResource SinkBlobDatasetDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating dataset " + blobSinkDatasetName + "..."); AzureBlobDataset blobDataset = new AzureBlobDataset { FolderPath = new Expression { Value = "@pipeline().parameters.sinkBlobContainer" }, LinkedServiceName = new LinkedServiceReference { ReferenceName = storageLinkedServiceName } }; Console.WriteLine(SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)); DatasetResource dataset = new DatasetResource(blobDataset, name: blobSinkDatasetName); return dataset; }Hem Azure Blob kaynağı hem de havuz veri kümelerini oluşturan yöntemine aşağıdaki kodu
Mainekleyin.client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client)); client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
C# sınıfı oluşturma: EmailRequest
C# projenizde adlı EmailRequestbir sınıf oluşturun. Bu sınıf, e-posta gönderirken işlem hattının gövde isteğinde hangi özellikleri gönderdiğini tanımlar. Bu öğreticide işlem hattı, işlem hattından e-postaya dört özellik gönderir:
- İleti. E-postanın gövdesi. Başarılı bir kopya için bu özellik yazılan veri miktarını içerir. Başarısız bir kopya için bu özellik hatanın ayrıntılarını içerir.
- Veri fabrikası adı. Veri fabrikasının adı.
- İşlem hattı adı. İşlem hattının adı.
- Ahize. Geçen parametre. Bu özellik, e-postanın alıcısını belirtir.
class EmailRequest
{
[Newtonsoft.Json.JsonProperty(PropertyName = "message")]
public string message;
[Newtonsoft.Json.JsonProperty(PropertyName = "dataFactoryName")]
public string dataFactoryName;
[Newtonsoft.Json.JsonProperty(PropertyName = "pipelineName")]
public string pipelineName;
[Newtonsoft.Json.JsonProperty(PropertyName = "receiver")]
public string receiver;
public EmailRequest(string input, string df, string pipeline, string receiverName)
{
message = input;
dataFactoryName = df;
pipelineName = pipeline;
receiver = receiverName;
}
}
E-posta iş akışı uç noktaları oluşturma
E-posta göndermeyi tetikleme amacıyla iş akışını tanımlamak için Azure Logic Apps'i kullanırsınız. Daha fazla bilgi için bkz . Örnek tüketim mantığı uygulaması iş akışı oluşturma.
Başarı e-postası iş akışı
Azure portalında adlı CopySuccessEmailbir mantıksal uygulama iş akışı oluşturun. HTTP isteği alındığında adlı İstek tetikleyicisini ekleyin. İstek tetikleyicisinde İstek Gövdesi JSON şema kutusunu aşağıdaki JSON ile doldurun:
{
"properties": {
"dataFactoryName": {
"type": "string"
},
"message": {
"type": "string"
},
"pipelineName": {
"type": "string"
},
"receiver": {
"type": "string"
}
},
"type": "object"
}
İş akışınız aşağıdaki örneğe benzer:
Bu JSON içeriği, önceki bölümde oluşturduğunuz sınıfla EmailRequest hizalanır.
E-posta gönder adlı Office 365 Outlook eylemini ekleyin. Bu eylem için, istek Gövdesi JSON şemasında geçirilen özellikleri kullanarak e-postayı nasıl biçimlendirmek istediğinizi özelleştirin. Bir örnek aşağıda verilmiştir:
İş akışını kaydettikten sonra, tetikleyiciden HTTP POST URL değerini kopyalayıp kaydedin.
Hata e-postası iş akışı
CopySuccessEmail Mantıksal uygulama iş akışını adlı CopyFailEmailyeni bir iş akışına kopyalama. İstek tetikleyicisinde İstek Gövdesi JSON şeması aynıdır. Hata e-postasına uyarlamak için e-postanızın biçimini Subject olarak değiştirin. Örnek aşağıda verilmiştir:
İş akışını kaydettikten sonra, tetikleyiciden HTTP POST URL değerini kopyalayıp kaydedin.
Şimdi aşağıdaki örnekler gibi iki iş akışı URL'niz olmalıdır:
//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
İşlem hattı oluşturma
Visual Studio'da projenize geri dönün. Şimdi kopyalama etkinliği ve DependsOn özelliğiyle işlem hattı oluşturan kodu ekleyeceğiz. Bu öğreticide işlem hattı bir etkinlik, Blob veri kümesini kaynak olarak alan bir kopyalama etkinliği ve havuz olarak başka bir Blob veri kümesi içerir. Kopyalama etkinliği başarılı veya başarısız olursa, farklı e-posta görevlerini çağırır.
Bu işlem hattında aşağıdaki özellikleri kullanırsınız:
- Parametreler
- Web etkinliği
- Etkinlik bağımlılığı
- Bir etkinlikten çıkışı başka bir etkinliğe giriş olarak kullanma
Bu yöntemi projenize ekleyin. Aşağıdaki bölümler daha ayrıntılı olarak sağlanır.
static PipelineResource PipelineDefinition(DataFactoryManagementClient client) { Console.WriteLine("Creating pipeline " + pipelineName + "..."); PipelineResource resource = new PipelineResource { Parameters = new Dictionary<string, ParameterSpecification> { { "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } }, { "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } }, { "receiver", new ParameterSpecification { Type = ParameterType.String } } }, Activities = new List<Activity> { new CopyActivity { Name = copyBlobActivity, Inputs = new List<DatasetReference> { new DatasetReference { ReferenceName = blobSourceDatasetName } }, Outputs = new List<DatasetReference> { new DatasetReference { ReferenceName = blobSinkDatasetName } }, Source = new BlobSource { }, Sink = new BlobSink { } }, new WebActivity { Name = sendSuccessEmailActivity, Method = WebActivityMethod.POST, Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/00000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000000", Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"), DependsOn = new List<ActivityDependency> { new ActivityDependency { Activity = copyBlobActivity, DependencyConditions = new List<String> { "Succeeded" } } } }, new WebActivity { Name = sendFailEmailActivity, Method =WebActivityMethod.POST, Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/000000000000000000000000000000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=0000000000000000000000000000000000000000000", Body = new EmailRequest("@{activity('CopyBlobtoBlob').error.message}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"), DependsOn = new List<ActivityDependency> { new ActivityDependency { Activity = copyBlobActivity, DependencyConditions = new List<String> { "Failed" } } } } } }; Console.WriteLine(SafeJsonConvert.SerializeObject(resource, client.SerializationSettings)); return resource; }İşlem hattını oluşturan yönteme
Mainaşağıdaki satırı ekleyin:client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
Parametreler
İşlem hattı kodumuzun ilk bölümü parametreleri tanımlar.
-
sourceBlobContainer. Kaynak blob veri kümesi bu parametreyi işlem hattında kullanır. -
sinkBlobContainer. Havuz blobu veri kümesi bu parametreyi işlem hattında tüketir. -
receiver. İşlem hattındaki alıcıya başarı veya başarısızlık e-postaları gönderen iki Web etkinliği bu parametreyi kullanır.
Parameters = new Dictionary<string, ParameterSpecification>
{
{ "sourceBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
{ "sinkBlobContainer", new ParameterSpecification { Type = ParameterType.String } },
{ "receiver", new ParameterSpecification { Type = ParameterType.String } }
},
Web etkinliği
Web etkinliği herhangi bir REST uç noktasına çağrı yapılmasına izin verir. Etkinlik hakkında daha fazla bilgi için bkz . Azure Data Factory'de web etkinliği. Bu işlem hattı, Logic Apps e-posta iş akışını çağırmak için bir web etkinliği kullanır. İki web etkinliği oluşturursunuz: biri iş akışına CopySuccessEmail , diğeri de öğesini çağıran CopyFailWorkFlow.
new WebActivity
{
Name = sendCopyEmailActivity,
Method = WebActivityMethod.POST,
Url = "https://prodxxx.eastus.logic.azure.com:443/workflows/12345",
Body = new EmailRequest("@{activity('CopyBlobtoBlob').output.dataWritten}", "@{pipeline().DataFactory}", "@{pipeline().Pipeline}", "@pipeline().parameters.receiver"),
DependsOn = new List<ActivityDependency>
{
new ActivityDependency
{
Activity = copyBlobActivity,
DependencyConditions = new List<String> { "Succeeded" }
}
}
}
özelliğine Url Logic Apps iş akışlarınızdan HTTP POST URL uç noktalarını yapıştırın. özelliğinde Body sınıfının bir örneğini EmailRequest geçirin. E-posta isteği aşağıdaki özellikleri içerir:
- İleti. değerini
@{activity('CopyBlobtoBlob').output.dataWrittengeçirir. Önceki kopyalama etkinliğinin bir özelliğine erişir ve değerinidataWrittengeçirir. Hata durumunda@{activity('CopyBlobtoBlob').error.messageyerine hata çıktısını geçirir. - Data Factory Adı. Bu sistem değişkeninin
@{pipeline().DataFactory}değerini geçirir, karşılık gelen veri fabrikası adına erişmenizi sağlar. Sistem değişkenlerinin listesi için bkz . Sistem Değişkenleri. - İşlem Hattı Adı. değerini
@{pipeline().Pipeline}geçirir. Bu sistem değişkeni, ilgili işlem hattı adına erişmenizi sağlar. - Ahize. değerini
"@pipeline().parameters.receiver"geçirir. İşlem hattı parametrelerine erişir.
Bu kod, önceki kopyalama etkinliğine bağlı olan yeni bir Etkinlik Bağımlılığı oluşturur.
İşlem hattı çalıştırması oluşturma
İşlem hattı çalıştırmasını tetikleyen yöntemine Main aşağıdaki kodu ekleyin.
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
{ "sourceBlobContainer", inputBlobPath },
{ "sinkBlobContainer", outputBlobPath },
{ "receiver", emailReceiver }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Main sınıfı
Son Main yönteminiz şöyle görünmelidir.
// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Factory df = CreateOrUpdateDataFactory(client);
client.LinkedServices.CreateOrUpdate(resourceGroup, dataFactoryName, storageLinkedServiceName, StorageLinkedServiceDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSourceDatasetName, SourceBlobDatasetDefinition(client));
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, blobSinkDatasetName, SinkBlobDatasetDefinition(client));
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, PipelineDefinition(client));
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> arguments = new Dictionary<string, object>
{
{ "sourceBlobContainer", inputBlobPath },
{ "sinkBlobContainer", outputBlobPath },
{ "receiver", emailReceiver }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, dataFactoryName, pipelineName, arguments).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Bir işlem hattı çalıştırması tetiklemek için programınızı derleyip çalıştırın!
İşlem hattı çalıştırmasını izleme
Mainyöntemine aşağıdaki kodu ekleyin:// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); PipelineRun pipelineRun; while (true) { pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress") System.Threading.Thread.Sleep(15000); else break; }Bu kod, verilerin kopyalanması bitene kadar çalıştırmanın durumunu sürekli denetler.
Kopyalama etkinliği çalıştırma ayrıntılarını (örneğin, okunan/yazılan verilerin boyutu) alan yönteme aşağıdaki kodu
Mainekleyin:// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); List<ActivityRun> activityRuns = client.ActivityRuns.ListByPipelineRun( resourceGroup, dataFactoryName, runResponse.RunId, DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)).ToList(); if (pipelineRun.Status == "Succeeded") { Console.WriteLine(activityRuns.First().Output); //SaveToJson(SafeJsonConvert.SerializeObject(activityRuns.First().Output, client.SerializationSettings), "ActivityRunResult.json", folderForJsons); } else Console.WriteLine(activityRuns.First().Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
Kodu çalıştırma
Uygulamayı derleyip başlatın, ardından işlem hattı yürütmesini doğrulayın.
Uygulama, veri fabrikası, bağlı hizmet, veri kümeleri, işlem hattı ve işlem hattı çalıştırması oluşturma işleminin ilerleme durumunu görüntüler. Daha sonra işlem hattı çalıştırma durumunu denetler. Okunan/yazılan veri boyutunu içeren kopyalama etkinliği ayrıntılarını görene kadar bekleyin. Ardından, değişkenlerde belirttiğiniz gibi blob'un inputBlobPath'ten outputBlobPath'e kopyalandığını denetlemek için Azure Depolama Gezgini gibi araçları kullanın.
Çıkışınız aşağıdaki örneğe benzemelidir:
Creating data factory DFTutorialTest...
{
"location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=***;AccountKey=***"
}
}
Creating dataset SourceStorageDataset...
{
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"type": "Expression",
"value": "@pipeline().parameters.sourceBlobContainer"
},
"fileName": "input.txt"
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
Creating dataset SinkStorageDataset...
{
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"type": "Expression",
"value": "@pipeline().parameters.sinkBlobContainer"
}
},
"linkedServiceName": {
"type": "LinkedServiceReference",
"referenceName": "AzureStorageLinkedService"
}
}
Creating pipeline Adfv2TutorialBranchCopy...
{
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
},
"inputs": [
{
"type": "DatasetReference",
"referenceName": "SourceStorageDataset"
}
],
"outputs": [
{
"type": "DatasetReference",
"referenceName": "SinkStorageDataset"
}
],
"name": "CopyBlobtoBlob"
},
{
"type": "WebActivity",
"typeProperties": {
"method": "POST",
"url": "https://xxxx.eastus.logic.azure.com:443/workflows/... ",
"body": {
"message": "@{activity('CopyBlobtoBlob').output.dataWritten}",
"dataFactoryName": "@{pipeline().DataFactory}",
"pipelineName": "@{pipeline().Pipeline}",
"receiver": "@pipeline().parameters.receiver"
}
},
"name": "SendSuccessEmailActivity",
"dependsOn": [
{
"activity": "CopyBlobtoBlob",
"dependencyConditions": [
"Succeeded"
]
}
]
},
{
"type": "WebActivity",
"typeProperties": {
"method": "POST",
"url": "https://xxx.eastus.logic.azure.com:443/workflows/... ",
"body": {
"message": "@{activity('CopyBlobtoBlob').error.message}",
"dataFactoryName": "@{pipeline().DataFactory}",
"pipelineName": "@{pipeline().Pipeline}",
"receiver": "@pipeline().parameters.receiver"
}
},
"name": "SendFailEmailActivity",
"dependsOn": [
{
"activity": "CopyBlobtoBlob",
"dependencyConditions": [
"Failed"
]
}
]
}
],
"parameters": {
"sourceBlobContainer": {
"type": "String"
},
"sinkBlobContainer": {
"type": "String"
},
"receiver": {
"type": "String"
}
}
}
}
Creating pipeline run...
Pipeline run ID: 00000000-0000-0000-0000-0000000000000
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
"dataRead": 20,
"dataWritten": 20,
"copyDuration": 4,
"throughput": 0.01,
"errors": [],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
}
{}
Press any key to exit...
İlgili içerik
Bu öğreticide aşağıdaki görevleri gerçekleştirdiniz:
- Veri fabrikası oluşturma
- Azure Depolama bağlı hizmeti oluşturma
- Azure blob veri kümesi oluşturma
- Kopyalama etkinliği ve bir web etkinliği içeren işlem hattı oluşturma
- Etkinliklerin çıktılarını sonraki etkinliklere gönderme
- Parametre geçirme ve sistem değişkenlerini kullanma
- Bir işlem hattı çalıştırması başlatma
- İşlem hattı ve etkinlik çalıştırmalarını izleme
Artık Azure Data Factory hakkında daha fazla bilgi için Kavramlar bölümüne geçebilirsiniz.