Поделиться через


Использование C# для потоковой передачи MapReduce в Apache Hadoop в HDInsight

Узнайте, как использовать C# для создания решения MapReduce в HDInsight.

Потоковая передача Apache Hadoop позволяет запускать задания MapReduce с помощью скрипта или исполняемого файла. В данном случае .NET используется для реализации модулей сопоставления и редукции для решения для подсчета слов.

.NET в HDInsight

В кластерах HDInsight для запуска приложений .NET используется Mono (https://mono-project.com). Mono версии 4.2.1 входит в состав HDInsight версии 3.6. Дополнительные сведения о версии Mono, которая входит в состав HDInsight, см. в разделе Компоненты Hadoop, доступные в разных версиях HDInsight.

Дополнительные сведения о совместимости Mono с различными версиями платформы .NET Framework см. в разделе Compatibility (Совместимость).

Как работает потоковая передача Hadoop

Базовый процесс потоковой передачи в данном документе выглядит следующим образом.

  1. Hadoop передает данные в модуль сопоставления (mapper.exe в этом примере) через канал STDIN.
  2. Модуль сопоставления обрабатывает эти данные и передает пары "ключ-значение", разделенные знаками табуляции, в канал STDOUT.
  3. Выходные данные считываются Hadoop и затем передаются в модуль редукции (reducer.exe в этом примере) через канал STDIN.
  4. Модуль редукции считывает пары "ключ-значение", разделенные знаками табуляции, обрабатывает данные и передает результат в виде пар "ключ-значение", разделенные знаками табуляции, в канал STDOUT.
  5. Выходные данные считываются Hadoop и записываются в выходной каталог.

Дополнительные сведения см. в разделе Потоковая передача Hadoop.

Необходимые компоненты

  • Visual Studio.

  • Опыт написания и выполнения сборки кода C#, предназначенного для платформы .NET Framework 4.5.

  • Способ передачи EXE-файлов в кластер. В этом документе для передачи файлов в основное хранилище кластера используются средства Data Lake для Visual Studio.

  • При использовании PowerShell вам потребуется модуль Az.

  • Кластер Apache Hadoop в HDInsight. Ознакомьтесь со статьей Краткое руководство. Использование Apache Hadoop и Apache Hive в Azure HDInsight с шаблоном Resource Manager.

  • Схема универсального кода ресурса (URI) для основного хранилища кластеров. Для службы хранилища Azure этой схемой будет wasb://, для Azure Data Lake Storage 2-го поколения — abfs://, или adl:// для Azure Data Lake Storage 1-го поколения. Если для службы хранилища Azure или Azure Data Lake Storage 2-го поколения включена безопасная передача, URI будет представлять собой wasbs:// или abfss:// соответственно.

Создание модуля сопоставления

Создайте консольное приложение .NET Framework в Visual Studio и назовите его mapper. Используйте для него следующий код.

using System;
using System.Text.RegularExpressions;

namespace mapper
{
    class Program
    {
        static void Main(string[] args)
        {
            string line;
            //Hadoop passes data to the mapper on STDIN
            while((line = Console.ReadLine()) != null)
            {
                // We only want words, so strip out punctuation, numbers, etc.
                var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
                // Split at whitespace.
                var words = Regex.Matches(onlyText, @"[\w]+");
                // Loop over the words
                foreach(var word in words)
                {
                    //Emit tab-delimited key/value pairs.
                    //In this case, a word and a count of 1.
                    Console.WriteLine("{0}\t1",word);
                }
            }
        }
    }
}

После создания приложения скомпилируйте его для получения файла /bin/Debug/mapper.exe в каталоге проекта.

Создание модуля редукции

Создайте консольное приложение .NET Framework в Visual Studio и назовите его reducer. Используйте для него следующий код.

using System;
using System.Collections.Generic;

namespace reducer
{
    class Program
    {
        static void Main(string[] args)
        {
            //Dictionary for holding a count of words
            Dictionary<string, int> words = new Dictionary<string, int>();

            string line;
            //Read from STDIN
            while ((line = Console.ReadLine()) != null)
            {
                // Data from Hadoop is tab-delimited key/value pairs
                var sArr = line.Split('\t');
                // Get the word
                string word = sArr[0];
                // Get the count
                int count = Convert.ToInt32(sArr[1]);

                //Do we already have a count for the word?
                if(words.ContainsKey(word))
                {
                    //If so, increment the count
                    words[word] += count;
                } else
                {
                    //Add the key to the collection
                    words.Add(word, count);
                }
            }
            //Finally, emit each word and count
            foreach (var word in words)
            {
                //Emit tab-delimited key/value pairs.
                //In this case, a word and a count of 1.
                Console.WriteLine("{0}\t{1}", word.Key, word.Value);
            }
        }
    }
}

После создания приложения скомпилируйте его для получения файла /bin/Debug/reducer.exe в каталоге проекта.

Отправка в хранилище

Далее необходимо загрузить приложения mapper и reducer в хранилище HDInsight.

  1. В Visual Studio выберите Просмотреть>Обозреватель сервера.

  2. Щелкните правой кнопкой мыши пункт Azure, выберите Подключиться к подписке Microsoft Azure... и выполните процесс входа.

  3. Разверните кластер HDInsight, в который нужно развернуть это приложение. Отобразится запись с текстом (Учетная запись хранения по умолчанию).

    Storage account, HDInsight cluster, Server Explorer, Visual Studio.

    • Если запись (Учетная запись хранения по умолчанию) можно развернуть, то для кластера в качестве хранилища по умолчанию используется учетная запись хранения Azure. Чтобы просмотреть файлы в хранилище по умолчанию кластера, разверните эту запись, а затем дважды щелкните запись (Контейнер по умолчанию).

    • Если запись (Учетная запись хранения по умолчанию) нельзя развернуть, то для кластера в качестве хранилища по умолчанию используется Azure Data Lake Storage. Чтобы просмотреть файлы в хранилище по умолчанию кластера, дважды щелкните запись (Контейнер по умолчанию).

  4. Чтобы передать EXE-файлы, используйте один из следующих методов.

    • Если вы используете учетную запись хранения Azure, щелкните значок Отправить BLOB-объект.

      HDInsight upload icon for mapper, Visual Studio.

      В диалоговом окне Отправка нового файла в поле Имя файла нажмите Обзор. В диалоговом окне Отправка большого двоичного объекта перейдите в папку bin\debug проекта mapper, а затем выберите файл mapper.exe. Наконец, выберите Открыть, а затем нажмите кнопку OK, чтобы завершить отправку.

    • Для Azure Data Lake Storage щелкните правой кнопкой мыши пустое место в списке файлов и выберите Отправить. Выберите файл mapper.exe и нажмите кнопку Открыть.

    После завершения передачи mapper.exe повторите этот процесс для передачи файла reducer.exe.

Выполнение задания с помощью сеанса SSH

Следующая процедура описывает выполнение задания MapReduce с помощью сеанса SSH:

  1. С помощью команды ssh command подключитесь к кластеру. Измените приведенную ниже команду, заменив CLUSTERNAME именем своего кластера, а затем введите команду:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Используйте одну из приведенных команд для запуска задания MapReduce.

    • Если хранилищем по умолчанию является служба хранилища Azure:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files wasbs:///mapper.exe,wasbs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • Если хранилищем по умолчанию является Data Lake Storage 1-го поколения:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files adl:///mapper.exe,adl:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • Если хранилищем по умолчанию является Data Lake Storage 2-го поколения:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files abfs:///mapper.exe,abfs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      

    В следующем списке описывается, что представляет собой каждый параметр и опция:

    Параметр Описание
    hadoop-streaming.jar Указывает JAR-файл, содержащий функции потоковой передачи MapReduce.
    -files Указывает файлы mapper.exe и reducer.exe для этого задания. Объявление протокола wasbs:///, adl:/// или abfs:/// перед именем файла — это путь к корню хранилища по умолчанию для кластера.
    -mapper Указывает файл, который реализует модуль сопоставления.
    -reducer Указывает файл, который реализует редуктор.
    -input Указывает входные данные.
    -output Указывает выходной каталог.
  3. После завершения задания MapReduce просмотрите результаты с помощью следующей команды:

    hdfs dfs -text /example/wordcountout/part-00000
    

    Ниже приведен пример данных, возвращаемых этой командой.

    you     1128
    young   38
    younger 1
    youngest        1
    your    338
    yours   4
    yourself        34
    yourselves      3
    youth   17
    

Выполнение задания с помощью PowerShell

Используйте приведенный ниже сценарий PowerShell для запуска задания MapReduce и скачивания результатов.

# Login to your Azure subscription
$context = Get-AzContext
if ($context -eq $null) 
{
    Connect-AzAccount
}
$context

# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

# Path for job output
$outputPath="/example/wordcountoutput"

# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]

# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
#       of default storage.
$jobDef=New-AzHDInsightStreamingMapReduceJobDefinition `
    -Files "/mapper.exe","/reducer.exe" `
    -Mapper "mapper.exe" `
    -Reducer "reducer.exe" `
    -InputPath "/example/data/gutenberg/davinci.txt" `
    -OutputPath $outputPath

# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDef `
    -HttpCredential $creds

#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Write-Progress -Activity $activity -Completed

# Download the output 
if($storageType -eq 'azuredatalakestore') {
    # Azure Data Lake Store
    # Fie path is the root of the HDInsight storage + $outputPath
    $filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
    Export-AzDataLakeStoreItem `
        -Account $storageAccountName `
        -Path $filePath `
        -Destination output.txt
} else {
    # Az.Storage account
    # Get the container
    $container=$clusterInfo.DefaultStorageContainer
    #NOTE: This assumes that the storage account is in the same resource
    #      group as HDInsight. If it is not, change the
    #      --ResourceGroupName parameter to the group that contains storage.
    $storageAccountKey=(Get-AzStorageAccountKey `
        -Name $storageAccountName `
    -ResourceGroupName $resourceGroup)[0].Value

    #Create a storage context
    $context = New-AzStorageContext `
        -StorageAccountName $storageAccountName `
        -StorageAccountKey $storageAccountKey
    # Download the file
    Get-AzStorageBlobContent `
        -Blob 'example/wordcountoutput/part-00000' `
        -Container $container `
        -Destination output.txt `
        -Context $context
}

Этот сценарий предлагает ввести имя учетной записи для входа и пароль для кластера, а также имя кластера HDInsight. После завершения задания выходные данные скачиваются в файл output.txt. Ниже приведен пример выходных данных в файле output.txt.

you     1128
young   38
younger 1
youngest        1
your    338
yours   4
yourself        34
yourselves      3
youth   17

Следующие шаги