قم بتشغيل مهامّ MapReduce باستخدام HDInsight .NET SDK

تعرف على كيفية إرسال مهام MapReduce باستخدام HDInsight .NET SDK. تأتي مجموعات HDInsight مع ملف jar مع بعض عينات MapReduce. ملف jar هو /example/jars/hadoop-mapreduce-examples.jar. أحد النماذج هو wordcount. تقوم بتطوير تطبيق وحدة تحكم #C لتقديم وظيفة عدد الكلمات. تقرأ الوظيفة ملف /example/data/gutenberg/davinci.txt، وتخرج النتائج إلى /example/data/davinciwordcount. إذا كنت تريد إعادة تشغيل التطبيق، فيجب عليك تنظيف مجلد الإخراج.

إشعار

يجب تنفيذ الخطوات الواردة في هذه المقالة من عميل Windows. للحصول على معلومات حول استخدام عميل Linux أو OS X أو Unix للعمل بـ Apache Hive، قم باستخدام محدد علامات التبويب الموضح في الجزء العلوي من المقالة.

المتطلبات الأساسية

إرسال مهام MapReduce باستخدام HDInsight .NET SDK

يوفر HDInsight .NET SDK مكتبات عملاء NET.، ما يسهل العمل مع مجموعات HDInsight من .NET.

  1. ابدأ تشغيل Visual Studio وأنشئ تطبيق وحدة تحكم #C.

  2. انتقل إلى Tools>NuGet Package Manager>Package Manager Console وأدخل الأمر التالي:

    Install-Package Microsoft.Azure.Management.HDInsight.Job
    
  3. انسخ التعليمة البرمجية أدناه إلى Program.cs. ثم قم بتحرير التعليمة البرمجية عن طريق تعيين القيم لكل من: existingClusterName وexistingClusterPassword وdefaultStorageAccountName وdefaultStorageAccountKey وdefaultStorageContainerName.

    using System.Collections.Generic;
    using System.IO;
    using System.Text;
    using System.Threading;
    using Microsoft.Azure.Management.HDInsight.Job;
    using Microsoft.Azure.Management.HDInsight.Job.Models;
    using Hyak.Common;
    using Microsoft.WindowsAzure.Storage;
    using Microsoft.WindowsAzure.Storage.Blob;
    
    namespace SubmitHDInsightJobDotNet
    {
        class Program
        {
            private static HDInsightJobManagementClient _hdiJobManagementClient;
    
            private const string existingClusterName = "<Your HDInsight Cluster Name>";
            private const string existingClusterPassword = "<Cluster User Password>";
            private const string defaultStorageAccountName = "<Default Storage Account Name>"; 
            private const string defaultStorageAccountKey = "<Default Storage Account Key>";
            private const string defaultStorageContainerName = "<Default Blob Container Name>";
    
            private const string existingClusterUsername = "admin";
            private const string existingClusterUri = existingClusterName + ".azurehdinsight.net";
            private const string sourceFile = "/example/data/gutenberg/davinci.txt";
            private const string outputFolder = "/example/data/davinciwordcount";
    
            static void Main(string[] args)
            {
                System.Console.WriteLine("The application is running ...");
    
                var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = existingClusterUsername, Password = existingClusterPassword };
                _hdiJobManagementClient = new HDInsightJobManagementClient(existingClusterUri, clusterCredentials);
    
                SubmitMRJob();
    
                System.Console.WriteLine("Press ENTER to continue ...");
                System.Console.ReadLine();
            }
    
            private static void SubmitMRJob()
            {
                List<string> args = new List<string> { { "/example/data/gutenberg/davinci.txt" }, { "/example/data/davinciwordcount" } };
    
                var paras = new MapReduceJobSubmissionParameters
                {
                    JarFile = @"/example/jars/hadoop-mapreduce-examples.jar",
                    JarClass = "wordcount",
                    Arguments = args
                };
    
                System.Console.WriteLine("Submitting the MR job to the cluster...");
                var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceJob(paras);
                var jobId = jobResponse.JobSubmissionJsonResponse.Id;
                System.Console.WriteLine("Response status code is " + jobResponse.StatusCode);
                System.Console.WriteLine("JobId is " + jobId);
    
                System.Console.WriteLine("Waiting for the job completion ...");
    
                // Wait for job completion
                var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                while (!jobDetail.Status.JobComplete)
                {
                    Thread.Sleep(1000);
                    jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                }
    
                // Get job output
                System.Console.WriteLine("Job output is: ");
                var storageAccess = new AzureStorageAccess(defaultStorageAccountName, defaultStorageAccountKey,
                    defaultStorageContainerName);
    
                if (jobDetail.ExitValue == 0)
                {
                    // Create the storage account object
                    CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" +
                        defaultStorageAccountName +
                        ";AccountKey=" + defaultStorageAccountKey);
    
                    // Create the blob client.
                    CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
    
                    // Retrieve reference to a previously created container.
                    CloudBlobContainer container = blobClient.GetContainerReference(defaultStorageContainerName);
    
                    CloudBlockBlob blockBlob = container.GetBlockBlobReference(outputFolder.Substring(1) + "/part-r-00000");
    
                    using (var stream = blockBlob.OpenRead())
                    {
                        using (StreamReader reader = new StreamReader(stream))
                        {
                            while (!reader.EndOfStream)
                            {
                                System.Console.WriteLine(reader.ReadLine());
                            }
                        }
                    }
                }
                else
                {
                    // fetch stderr output in case of failure
                    var output = _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess);
    
                    using (var reader = new StreamReader(output, Encoding.UTF8))
                    {
                        string value = reader.ReadToEnd();
                        System.Console.WriteLine(value);
                    }
    
                }
            }
        }
    }
    
    
  4. انقر F5 لتشغيل التطبيق.

لتشغيل الوظيفة مرة أخرى، يجب عليك تغيير اسم مجلد إخراج المهمة، في العينة هو /example/data/davinciwordcount.

عند اكتمال المهمة بنجاح، يقوم التطبيق بطباعة محتوى ملف الإخراج part-r-00000.

الخطوات التالية

في هذه المقالة، تعلمت عدة طرق لإنشاء مجموعة HDInsight. لمعرفة المزيد، راجع المقالات التالية: