使用 HDInsight .NET SDK 執行 MapReduce 作業
了解如何使用 HDInsight .NET SDK 提交 MapReduce 作業。 HDInsight 叢集隨附內含一些 MapReduce 範例的 jar 檔案。 jar 檔案為 /example/jars/hadoop-mapreduce-examples.jar
。 其中一個範例是 wordcount。 您可開發 C# 主控台應用程式以提交 wordcount 作業。 此作業會讀取 /example/data/gutenberg/davinci.txt
檔案,並將結果輸出至 /example/data/davinciwordcount
。 如果您想要重新執行應用程式,您必須清除輸出資料夾。
此文章中的步驟必須從 Windows 用戶端執行。 如需搭配 Linux、OS X 或 Unix 用戶端使用 Hive 的資訊,請使用本文頂端顯示的索引標籤選取器。
HDInsight 上的 Apache Hadoop 叢集。 請參閱使用 Azure 入口網站建立 Apache Hadoop 叢集。
使用 HDInsight .NET SDK 提交 MapReduce 工作
HDInsight .NET SDK 提供 .NET 用戶端程式庫,讓您可輕鬆從 .NET 使用 HDInsight 叢集。
啟動 Visual Studio,建立一個 C# 主控台應用程式。
瀏覽至 [工具]>[NuGet 套件管理員]>[套件管理員主控台],然後輸入下列命令:
Install-Package Microsoft.Azure.Management.HDInsight.Job
將下列程式碼複製到 Program.cs 中。 然後編輯程式碼,設定
的值。using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using Microsoft.Azure.Management.HDInsight.Job; using Microsoft.Azure.Management.HDInsight.Job.Models; using Hyak.Common; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; namespace SubmitHDInsightJobDotNet { class Program { private static HDInsightJobManagementClient _hdiJobManagementClient; private const string existingClusterName = "<Your HDInsight Cluster Name>"; private const string existingClusterPassword = "<Cluster User Password>"; private const string defaultStorageAccountName = "<Default Storage Account Name>"; private const string defaultStorageAccountKey = "<Default Storage Account Key>"; private const string defaultStorageContainerName = "<Default Blob Container Name>"; private const string existingClusterUsername = "admin"; private const string existingClusterUri = existingClusterName + ".azurehdinsight.net"; private const string sourceFile = "/example/data/gutenberg/davinci.txt"; private const string outputFolder = "/example/data/davinciwordcount"; static void Main(string[] args) { System.Console.WriteLine("The application is running ..."); var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = existingClusterUsername, Password = existingClusterPassword }; _hdiJobManagementClient = new HDInsightJobManagementClient(existingClusterUri, clusterCredentials); SubmitMRJob(); System.Console.WriteLine("Press ENTER to continue ..."); System.Console.ReadLine(); } private static void SubmitMRJob() { List<string> args = new List<string> { { "/example/data/gutenberg/davinci.txt" }, { "/example/data/davinciwordcount" } }; var paras = new MapReduceJobSubmissionParameters { JarFile = @"/example/jars/hadoop-mapreduce-examples.jar", JarClass = "wordcount", Arguments = args }; System.Console.WriteLine("Submitting the MR job to the cluster..."); var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceJob(paras); var jobId = jobResponse.JobSubmissionJsonResponse.Id; System.Console.WriteLine("Response status code is " + jobResponse.StatusCode); System.Console.WriteLine("JobId is " + jobId); System.Console.WriteLine("Waiting for the job completion ..."); // Wait for job completion var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail; while (!jobDetail.Status.JobComplete) { Thread.Sleep(1000); jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail; } // Get job output System.Console.WriteLine("Job output is: "); var storageAccess = new AzureStorageAccess(defaultStorageAccountName, defaultStorageAccountKey, defaultStorageContainerName); if (jobDetail.ExitValue == 0) { // Create the storage account object CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" + defaultStorageAccountName + ";AccountKey=" + defaultStorageAccountKey); // Create the blob client. CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient(); // Retrieve reference to a previously created container. CloudBlobContainer container = blobClient.GetContainerReference(defaultStorageContainerName); CloudBlockBlob blockBlob = container.GetBlockBlobReference(outputFolder.Substring(1) + "/part-r-00000"); using (var stream = blockBlob.OpenRead()) { using (StreamReader reader = new StreamReader(stream)) { while (!reader.EndOfStream) { System.Console.WriteLine(reader.ReadLine()); } } } } else { // fetch stderr output in case of failure var output = _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); using (var reader = new StreamReader(output, Encoding.UTF8)) { string value = reader.ReadToEnd(); System.Console.WriteLine(value); } } } } }
按 F5 執行應用程式。
若要再次執行作業,您必須變更範例中的作業輸出資料夾名稱,在此範例中為 /example/data/davinciwordcount
作業順利完成時,應用程式會顯示輸出檔案的內容 part-r-00000
