Udostępnij za pośrednictwem


Uruchamianie zadania MapReduce przy użyciu zestawu SDK dla platformy .NET usługi HDInsight

Dowiedz się, jak przesyłać zadania MapReduce przy użyciu zestawu SDK platformy .NET usługi HDInsight. Klastry usługi HDInsight są dostarczane z plikiem jar z przykładami usługi MapReduce. Plik jar to /example/jars/hadoop-mapreduce-examples.jar. Jednym z przykładów jest wordcount. Tworzysz aplikację konsolową w języku C#, aby przesłać zadanie wordcount. Zadanie odczytuje /example/data/gutenberg/davinci.txt plik i zwraca wyniki do /example/data/davinciwordcount. Jeśli chcesz ponownie uruchomić aplikację, musisz wyczyścić folder wyjściowy.

Uwaga

Kroki opisane w tym artykule należy wykonać z klienta systemu Windows. Aby uzyskać informacje na temat korzystania z klienta systemu Linux, OS X lub Unix do pracy z programem Hive, użyj selektora kart pokazanego w górnej części artykułu.

Wymagania wstępne

Przesyłanie zadań MapReduce przy użyciu zestawu SDK platformy .NET usługi HDInsight

Zestaw .NET SDK usługi HDInsight udostępnia biblioteki klienckie platformy .NET, które ułatwiają pracę z klastrami usługi HDInsight z platformy .NET.

  1. Uruchom program Visual Studio i utwórz aplikację konsolową języka C#.

  2. Przejdź do pozycji Narzędzia NuGet>Menedżer pakietów> Menedżer pakietów Konsola i wprowadź następujące polecenie:

    Install-Package Microsoft.Azure.Management.HDInsight.Job
    
  3. Skopiuj poniższy kod do Program.cs. Następnie zmodyfikuj kod, ustawiając wartości dla: existingClusterName, , existingClusterPassworddefaultStorageAccountName, defaultStorageAccountKeyi defaultStorageContainerName.

    using System.Collections.Generic;
    using System.IO;
    using System.Text;
    using System.Threading;
    using Microsoft.Azure.Management.HDInsight.Job;
    using Microsoft.Azure.Management.HDInsight.Job.Models;
    using Hyak.Common;
    using Microsoft.WindowsAzure.Storage;
    using Microsoft.WindowsAzure.Storage.Blob;
    
    namespace SubmitHDInsightJobDotNet
    {
        class Program
        {
            private static HDInsightJobManagementClient _hdiJobManagementClient;
    
            private const string existingClusterName = "<Your HDInsight Cluster Name>";
            private const string existingClusterPassword = "<Cluster User Password>";
            private const string defaultStorageAccountName = "<Default Storage Account Name>"; 
            private const string defaultStorageAccountKey = "<Default Storage Account Key>";
            private const string defaultStorageContainerName = "<Default Blob Container Name>";
    
            private const string existingClusterUsername = "admin";
            private const string existingClusterUri = existingClusterName + ".azurehdinsight.net";
            private const string sourceFile = "/example/data/gutenberg/davinci.txt";
            private const string outputFolder = "/example/data/davinciwordcount";
    
            static void Main(string[] args)
            {
                System.Console.WriteLine("The application is running ...");
    
                var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = existingClusterUsername, Password = existingClusterPassword };
                _hdiJobManagementClient = new HDInsightJobManagementClient(existingClusterUri, clusterCredentials);
    
                SubmitMRJob();
    
                System.Console.WriteLine("Press ENTER to continue ...");
                System.Console.ReadLine();
            }
    
            private static void SubmitMRJob()
            {
                List<string> args = new List<string> { { "/example/data/gutenberg/davinci.txt" }, { "/example/data/davinciwordcount" } };
    
                var paras = new MapReduceJobSubmissionParameters
                {
                    JarFile = @"/example/jars/hadoop-mapreduce-examples.jar",
                    JarClass = "wordcount",
                    Arguments = args
                };
    
                System.Console.WriteLine("Submitting the MR job to the cluster...");
                var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceJob(paras);
                var jobId = jobResponse.JobSubmissionJsonResponse.Id;
                System.Console.WriteLine("Response status code is " + jobResponse.StatusCode);
                System.Console.WriteLine("JobId is " + jobId);
    
                System.Console.WriteLine("Waiting for the job completion ...");
    
                // Wait for job completion
                var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                while (!jobDetail.Status.JobComplete)
                {
                    Thread.Sleep(1000);
                    jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                }
    
                // Get job output
                System.Console.WriteLine("Job output is: ");
                var storageAccess = new AzureStorageAccess(defaultStorageAccountName, defaultStorageAccountKey,
                    defaultStorageContainerName);
    
                if (jobDetail.ExitValue == 0)
                {
                    // Create the storage account object
                    CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" +
                        defaultStorageAccountName +
                        ";AccountKey=" + defaultStorageAccountKey);
    
                    // Create the blob client.
                    CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
    
                    // Retrieve reference to a previously created container.
                    CloudBlobContainer container = blobClient.GetContainerReference(defaultStorageContainerName);
    
                    CloudBlockBlob blockBlob = container.GetBlockBlobReference(outputFolder.Substring(1) + "/part-r-00000");
    
                    using (var stream = blockBlob.OpenRead())
                    {
                        using (StreamReader reader = new StreamReader(stream))
                        {
                            while (!reader.EndOfStream)
                            {
                                System.Console.WriteLine(reader.ReadLine());
                            }
                        }
                    }
                }
                else
                {
                    // fetch stderr output in case of failure
                    var output = _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess);
    
                    using (var reader = new StreamReader(output, Encoding.UTF8))
                    {
                        string value = reader.ReadToEnd();
                        System.Console.WriteLine(value);
                    }
    
                }
            }
        }
    }
    
    
  4. Naciśnij klawisz F5, aby uruchomić aplikację.

Aby ponownie uruchomić zadanie, musisz zmienić nazwę folderu wyjściowego zadania, w przykładzie jest /example/data/davinciwordcountto .

Po pomyślnym zakończeniu zadania aplikacja wyświetla zawartość pliku part-r-00000wyjściowego .

Następne kroki

W tym artykule przedstawiono kilka sposobów tworzenia klastra usługi HDInsight. Więcej informacji można znaleźć w następujących artykułach: