Use C# com mapReduce streaming em Apache Hadoop em HDInsight

Saiba como usar o C# para criar uma solução MapReduce no HDInsight.

O streaming apache Hadoop permite-lhe executar trabalhos MapReduce usando um script ou executável. Aqui, .NET é usado para implementar o mapper e redutor para uma solução de contagem de palavras.

.NET em HDInsight

Os clusters HDInsight utilizam Mono (https://mono-project.com) para executar aplicações .NET. A versão mono 4.2.1 está incluída na versão HDInsight 3.6. Para obter mais informações sobre a versão de Mono incluída com HDInsight, consulte os componentes Apache Hadoop disponíveis com as versões HDInsight.

Para obter mais informações sobre a compatibilidade da Mono com .NET Framework versões, consulte Mono compatibilidade.

Como funciona o streaming hadoop

O processo básico utilizado para o streaming neste documento é o seguinte:

  1. Hadoop transmite dados para o mapper (mapper.exe neste exemplo) no STDIN.
  2. O mapper processa os dados e emite par de chaves/valor delimitados por separadores para STDOUT.
  3. A saída é lida por Hadoop, e depois passada para o redutor (reducer.exe neste exemplo) em STDIN.
  4. O redutor lê os pares de teclas/valor delimitados por separadores, processa os dados e, em seguida, emite o resultado como par de chave/valor delimitado por separadores em STDOUT.
  5. A saída é lida por Hadoop e escrita para o diretório de saída.

Para mais informações sobre o streaming, consulte o Hadoop Streaming.

Pré-requisitos

  • Visual Studio.

  • Uma familiaridade com a escrita e construção do código C# que visa .NET Framework 4,5.

  • Uma forma de enviar ficheiros .exe para o cluster. Os passos deste documento utilizam as Ferramentas do Lago de Dados para o Estúdio Visual para fazer o upload dos ficheiros para o armazenamento primário do cluster.

  • Se utilizar o PowerShell, vai precisar do Módulo Az.

  • Um aglomerado apache Hadoop em HDInsight. Consulte Get Start com HDInsight no Linux.

  • O esquema URI para o armazenamento primário dos seus clusters. Este esquema seria wasb:// para o Azure Storage, abfs:// para Azure Data Lake Storage Gen2 ou adl:// para Azure Data Lake Storage Gen1. Se a transferência segura estiver ativada para o Azure Storage ou Data Lake Storage Gen2, o URI será wasbs:// ouabfss://, respectivamente.

Criar o mapper

No Visual Studio, crie uma nova aplicação .NET Framework de consola chamada mapper. Utilize o seguinte código para a aplicação:

using System;
using System.Text.RegularExpressions;

namespace mapper
{
    class Program
    {
        static void Main(string[] args)
        {
            string line;
            //Hadoop passes data to the mapper on STDIN
            while((line = Console.ReadLine()) != null)
            {
                // We only want words, so strip out punctuation, numbers, etc.
                var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
                // Split at whitespace.
                var words = Regex.Matches(onlyText, @"[\w]+");
                // Loop over the words
                foreach(var word in words)
                {
                    //Emit tab-delimited key/value pairs.
                    //In this case, a word and a count of 1.
                    Console.WriteLine("{0}\t1",word);
                }
            }
        }
    }
}

Depois de criar a aplicação, construa-a para produzir o ficheiro /bin/Debug/mapper.exe no diretório do projeto.

Criar o redutor

No Visual Studio, crie uma nova aplicação de consola .NET Framework chamada redutor. Utilize o seguinte código para a aplicação:

using System;
using System.Collections.Generic;

namespace reducer
{
    class Program
    {
        static void Main(string[] args)
        {
            //Dictionary for holding a count of words
            Dictionary<string, int> words = new Dictionary<string, int>();

            string line;
            //Read from STDIN
            while ((line = Console.ReadLine()) != null)
            {
                // Data from Hadoop is tab-delimited key/value pairs
                var sArr = line.Split('\t');
                // Get the word
                string word = sArr[0];
                // Get the count
                int count = Convert.ToInt32(sArr[1]);

                //Do we already have a count for the word?
                if(words.ContainsKey(word))
                {
                    //If so, increment the count
                    words[word] += count;
                } else
                {
                    //Add the key to the collection
                    words.Add(word, count);
                }
            }
            //Finally, emit each word and count
            foreach (var word in words)
            {
                //Emit tab-delimited key/value pairs.
                //In this case, a word and a count of 1.
                Console.WriteLine("{0}\t{1}", word.Key, word.Value);
            }
        }
    }
}

Depois de criar a aplicação, construa-a para produzir o ficheiro /bin/Debug/reducer.exe no diretório do projeto.

Carregar para o armazenamento

Em seguida, você precisa carregar as aplicações mapper e redutor para o armazenamento HDInsight.

  1. No Estúdio Visual, selecione Ver>Explorador de Servidor.

  2. Clique com o botão direito Azure, selecione Connect to Microsoft Azure Subscription..., e complete o processo de inscrição.

  3. Expandir o cluster HDInsight para o que deseja implementar esta aplicação. Uma entrada com o texto (Conta de Armazenamento Padrão) é listada.

    Conta de armazenamento, cluster HDInsight, Server Explorer, Visual Studio

    • Se a entrada (Conta de Armazenamento Predefinido) puder ser expandida, está a utilizar uma Conta de Armazenamento Azure como armazenamento predefinido para o cluster. Para visualizar os ficheiros no armazenamento predefinido para o cluster, expanda a entrada e, em seguida, clique duas vezes (Recipiente Padrão).

    • Se a entrada (Conta de Armazenamento Predefinido) não puder ser expandida, está a utilizar Azure Data Lake Storage como o armazenamento predefinido para o cluster. Para visualizar os ficheiros no armazenamento predefinido para o cluster, clique duas vezes na entrada (Conta de Armazenamento Predefinido).

  4. Para fazer o upload dos ficheiros .exe, utilize um dos seguintes métodos:

    • Se estiver a utilizar uma conta de armazenamento Azure, selecione o ícone Upload Blob .

      HdInsight ícone de upload para mapper, Visual Studio

      Na caixa de diálogo de ficheiro novo de upload , em nome de ficheiro, selecione Procurar. Na caixa de diálogo Upload Blob , vá à pasta bin\debug para o projeto mapper e, em seguida, escolha o ficheiro mapper.exe . Por fim, selecione Open e, em seguida, OK para completar o upload.

    • Para Azure Data Lake Storage, clique com o direito numa área vazia na listagem de ficheiros e, em seguida, selecione Upload. Por fim, selecione o ficheiro mapper.exe e, em seguida, selecione Abrir.

    Uma vez terminada a mapper.exe upload, repita o processo de upload para o ficheiro reducer.exe .

Executar um trabalho: Usando uma sessão de SSH

O procedimento a seguir descreve como executar um trabalho mapReduce utilizando uma sessão de SSH:

  1. Utilize o comando ssh para ligar ao seu cluster. Edite o comando abaixo substituindo o CLUSTERNAME pelo nome do seu cluster e, em seguida, insira o comando:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Utilize um dos seguintes comandos para iniciar o trabalho MapReduce:

    • Se o armazenamento predefinido for Azure Storage:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files wasbs:///mapper.exe,wasbs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • Se o armazenamento predefinido for Data Lake Storage Gen1:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files adl:///mapper.exe,adl:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • Se o armazenamento predefinido for Data Lake Storage Gen2:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files abfs:///mapper.exe,abfs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      

    A lista que se segue descreve o que cada parâmetro e opção representa:

    Parâmetro Descrição
    hadoop-streaming.jar Especifica o ficheiro do frasco que contém a funcionalidade de streaming MapReduce.
    -ficheiros Especifica os ficheirosmapper.exe e reducer.exe para este trabalho. A wasbs:///declaração , adl:///ou abfs:/// protocolo antes de cada ficheiro é o caminho para a raiz do armazenamento padrão para o cluster.
    -mapper Especifica o ficheiro que implementa o mapper.
    -redutor Especifica o ficheiro que implementa o redutor.
    -entrada Especifica os dados de entrada.
    -saída Especifica o diretório de saída.
  3. Uma vez concluído o trabalho mapReduce, utilize o seguinte comando para visualizar os resultados:

    hdfs dfs -text /example/wordcountout/part-00000
    

    O texto a seguir é um exemplo dos dados devolvidos por este comando:

    you     1128
    young   38
    younger 1
    youngest        1
    your    338
    yours   4
    yourself        34
    yourselves      3
    youth   17
    

Executar um trabalho: Usar o PowerShell

Use o seguinte script PowerShell para executar um trabalho MapReduce e descarregue os resultados.

# Login to your Azure subscription
$context = Get-AzContext
if ($context -eq $null) 
{
    Connect-AzAccount
}
$context

# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

# Path for job output
$outputPath="/example/wordcountoutput"

# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]

# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
#       of default storage.
$jobDef=New-AzHDInsightStreamingMapReduceJobDefinition `
    -Files "/mapper.exe","/reducer.exe" `
    -Mapper "mapper.exe" `
    -Reducer "reducer.exe" `
    -InputPath "/example/data/gutenberg/davinci.txt" `
    -OutputPath $outputPath

# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDef `
    -HttpCredential $creds

#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Write-Progress -Activity $activity -Completed

# Download the output 
if($storageType -eq 'azuredatalakestore') {
    # Azure Data Lake Store
    # Fie path is the root of the HDInsight storage + $outputPath
    $filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
    Export-AzDataLakeStoreItem `
        -Account $storageAccountName `
        -Path $filePath `
        -Destination output.txt
} else {
    # Az.Storage account
    # Get the container
    $container=$clusterInfo.DefaultStorageContainer
    #NOTE: This assumes that the storage account is in the same resource
    #      group as HDInsight. If it is not, change the
    #      --ResourceGroupName parameter to the group that contains storage.
    $storageAccountKey=(Get-AzStorageAccountKey `
        -Name $storageAccountName `
    -ResourceGroupName $resourceGroup)[0].Value

    #Create a storage context
    $context = New-AzStorageContext `
        -StorageAccountName $storageAccountName `
        -StorageAccountKey $storageAccountKey
    # Download the file
    Get-AzStorageBlobContent `
        -Blob 'example/wordcountoutput/part-00000' `
        -Container $container `
        -Destination output.txt `
        -Context $context
}

Este script solicita-lhe o nome e palavra-passe da conta de login do cluster, juntamente com o nome do cluster HDInsight. Uma vez concluído o trabalho, a saída é transferida para um ficheiro chamado output.txt. O texto a seguir é um exemplo dos dados do output.txt ficheiro:

you     1128
young   38
younger 1
youngest        1
your    338
yours   4
yourself        34
yourselves      3
youth   17

Passos seguintes