Use C# com mapReduce streaming em Apache Hadoop em HDInsight
Saiba como usar o C# para criar uma solução MapReduce no HDInsight.
O streaming apache Hadoop permite-lhe executar trabalhos MapReduce usando um script ou executável. Aqui, .NET é usado para implementar o mapper e redutor para uma solução de contagem de palavras.
.NET em HDInsight
Os clusters HDInsight utilizam Mono (https://mono-project.com) para executar aplicações .NET. A versão mono 4.2.1 está incluída na versão HDInsight 3.6. Para obter mais informações sobre a versão de Mono incluída com HDInsight, consulte os componentes Apache Hadoop disponíveis com as versões HDInsight.
Para obter mais informações sobre a compatibilidade da Mono com .NET Framework versões, consulte Mono compatibilidade.
Como funciona o streaming hadoop
O processo básico utilizado para o streaming neste documento é o seguinte:
- Hadoop transmite dados para o mapper (mapper.exe neste exemplo) no STDIN.
- O mapper processa os dados e emite par de chaves/valor delimitados por separadores para STDOUT.
- A saída é lida por Hadoop, e depois passada para o redutor (reducer.exe neste exemplo) em STDIN.
- O redutor lê os pares de teclas/valor delimitados por separadores, processa os dados e, em seguida, emite o resultado como par de chave/valor delimitado por separadores em STDOUT.
- A saída é lida por Hadoop e escrita para o diretório de saída.
Para mais informações sobre o streaming, consulte o Hadoop Streaming.
Pré-requisitos
Visual Studio.
Uma familiaridade com a escrita e construção do código C# que visa .NET Framework 4,5.
Uma forma de enviar ficheiros .exe para o cluster. Os passos deste documento utilizam as Ferramentas do Lago de Dados para o Estúdio Visual para fazer o upload dos ficheiros para o armazenamento primário do cluster.
Se utilizar o PowerShell, vai precisar do Módulo Az.
Um aglomerado apache Hadoop em HDInsight. Consulte Get Start com HDInsight no Linux.
O esquema URI para o armazenamento primário dos seus clusters. Este esquema seria
wasb://
para o Azure Storage,abfs://
para Azure Data Lake Storage Gen2 ouadl://
para Azure Data Lake Storage Gen1. Se a transferência segura estiver ativada para o Azure Storage ou Data Lake Storage Gen2, o URI seráwasbs://
ouabfss://
, respectivamente.
Criar o mapper
No Visual Studio, crie uma nova aplicação .NET Framework de consola chamada mapper. Utilize o seguinte código para a aplicação:
using System;
using System.Text.RegularExpressions;
namespace mapper
{
class Program
{
static void Main(string[] args)
{
string line;
//Hadoop passes data to the mapper on STDIN
while((line = Console.ReadLine()) != null)
{
// We only want words, so strip out punctuation, numbers, etc.
var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
// Split at whitespace.
var words = Regex.Matches(onlyText, @"[\w]+");
// Loop over the words
foreach(var word in words)
{
//Emit tab-delimited key/value pairs.
//In this case, a word and a count of 1.
Console.WriteLine("{0}\t1",word);
}
}
}
}
}
Depois de criar a aplicação, construa-a para produzir o ficheiro /bin/Debug/mapper.exe no diretório do projeto.
Criar o redutor
No Visual Studio, crie uma nova aplicação de consola .NET Framework chamada redutor. Utilize o seguinte código para a aplicação:
using System;
using System.Collections.Generic;
namespace reducer
{
class Program
{
static void Main(string[] args)
{
//Dictionary for holding a count of words
Dictionary<string, int> words = new Dictionary<string, int>();
string line;
//Read from STDIN
while ((line = Console.ReadLine()) != null)
{
// Data from Hadoop is tab-delimited key/value pairs
var sArr = line.Split('\t');
// Get the word
string word = sArr[0];
// Get the count
int count = Convert.ToInt32(sArr[1]);
//Do we already have a count for the word?
if(words.ContainsKey(word))
{
//If so, increment the count
words[word] += count;
} else
{
//Add the key to the collection
words.Add(word, count);
}
}
//Finally, emit each word and count
foreach (var word in words)
{
//Emit tab-delimited key/value pairs.
//In this case, a word and a count of 1.
Console.WriteLine("{0}\t{1}", word.Key, word.Value);
}
}
}
}
Depois de criar a aplicação, construa-a para produzir o ficheiro /bin/Debug/reducer.exe no diretório do projeto.
Carregar para o armazenamento
Em seguida, você precisa carregar as aplicações mapper e redutor para o armazenamento HDInsight.
No Estúdio Visual, selecione Ver>Explorador de Servidor.
Clique com o botão direito Azure, selecione Connect to Microsoft Azure Subscription..., e complete o processo de inscrição.
Expandir o cluster HDInsight para o que deseja implementar esta aplicação. Uma entrada com o texto (Conta de Armazenamento Padrão) é listada.
Se a entrada (Conta de Armazenamento Predefinido) puder ser expandida, está a utilizar uma Conta de Armazenamento Azure como armazenamento predefinido para o cluster. Para visualizar os ficheiros no armazenamento predefinido para o cluster, expanda a entrada e, em seguida, clique duas vezes (Recipiente Padrão).
Se a entrada (Conta de Armazenamento Predefinido) não puder ser expandida, está a utilizar Azure Data Lake Storage como o armazenamento predefinido para o cluster. Para visualizar os ficheiros no armazenamento predefinido para o cluster, clique duas vezes na entrada (Conta de Armazenamento Predefinido).
Para fazer o upload dos ficheiros .exe, utilize um dos seguintes métodos:
Se estiver a utilizar uma conta de armazenamento Azure, selecione o ícone Upload Blob .
Na caixa de diálogo de ficheiro novo de upload , em nome de ficheiro, selecione Procurar. Na caixa de diálogo Upload Blob , vá à pasta bin\debug para o projeto mapper e, em seguida, escolha o ficheiro mapper.exe . Por fim, selecione Open e, em seguida, OK para completar o upload.
Para Azure Data Lake Storage, clique com o direito numa área vazia na listagem de ficheiros e, em seguida, selecione Upload. Por fim, selecione o ficheiro mapper.exe e, em seguida, selecione Abrir.
Uma vez terminada a mapper.exe upload, repita o processo de upload para o ficheiro reducer.exe .
Executar um trabalho: Usando uma sessão de SSH
O procedimento a seguir descreve como executar um trabalho mapReduce utilizando uma sessão de SSH:
Utilize o comando ssh para ligar ao seu cluster. Edite o comando abaixo substituindo o CLUSTERNAME pelo nome do seu cluster e, em seguida, insira o comando:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Utilize um dos seguintes comandos para iniciar o trabalho MapReduce:
Se o armazenamento predefinido for Azure Storage:
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \ -files wasbs:///mapper.exe,wasbs:///reducer.exe \ -mapper mapper.exe \ -reducer reducer.exe \ -input /example/data/gutenberg/davinci.txt \ -output /example/wordcountout
Se o armazenamento predefinido for Data Lake Storage Gen1:
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \ -files adl:///mapper.exe,adl:///reducer.exe \ -mapper mapper.exe \ -reducer reducer.exe \ -input /example/data/gutenberg/davinci.txt \ -output /example/wordcountout
Se o armazenamento predefinido for Data Lake Storage Gen2:
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \ -files abfs:///mapper.exe,abfs:///reducer.exe \ -mapper mapper.exe \ -reducer reducer.exe \ -input /example/data/gutenberg/davinci.txt \ -output /example/wordcountout
A lista que se segue descreve o que cada parâmetro e opção representa:
Parâmetro Descrição hadoop-streaming.jar Especifica o ficheiro do frasco que contém a funcionalidade de streaming MapReduce. -ficheiros Especifica os ficheirosmapper.exe e reducer.exe para este trabalho. A wasbs:///
declaração ,adl:///
ouabfs:///
protocolo antes de cada ficheiro é o caminho para a raiz do armazenamento padrão para o cluster.-mapper Especifica o ficheiro que implementa o mapper. -redutor Especifica o ficheiro que implementa o redutor. -entrada Especifica os dados de entrada. -saída Especifica o diretório de saída. Uma vez concluído o trabalho mapReduce, utilize o seguinte comando para visualizar os resultados:
hdfs dfs -text /example/wordcountout/part-00000
O texto a seguir é um exemplo dos dados devolvidos por este comando:
you 1128 young 38 younger 1 youngest 1 your 338 yours 4 yourself 34 yourselves 3 youth 17
Executar um trabalho: Usar o PowerShell
Use o seguinte script PowerShell para executar um trabalho MapReduce e descarregue os resultados.
# Login to your Azure subscription
$context = Get-AzContext
if ($context -eq $null)
{
Connect-AzAccount
}
$context
# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"
# Path for job output
$outputPath="/example/wordcountoutput"
# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]
# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
# of default storage.
$jobDef=New-AzHDInsightStreamingMapReduceJobDefinition `
-Files "/mapper.exe","/reducer.exe" `
-Mapper "mapper.exe" `
-Reducer "reducer.exe" `
-InputPath "/example/data/gutenberg/davinci.txt" `
-OutputPath $outputPath
# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzHDInsightJob `
-ClusterName $clusterName `
-JobDefinition $jobDef `
-HttpCredential $creds
#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzHDInsightJob `
-ClusterName $clusterName `
-JobId $job.JobId `
-HttpCredential $creds
Write-Progress -Activity $activity -Completed
# Download the output
if($storageType -eq 'azuredatalakestore') {
# Azure Data Lake Store
# Fie path is the root of the HDInsight storage + $outputPath
$filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
Export-AzDataLakeStoreItem `
-Account $storageAccountName `
-Path $filePath `
-Destination output.txt
} else {
# Az.Storage account
# Get the container
$container=$clusterInfo.DefaultStorageContainer
#NOTE: This assumes that the storage account is in the same resource
# group as HDInsight. If it is not, change the
# --ResourceGroupName parameter to the group that contains storage.
$storageAccountKey=(Get-AzStorageAccountKey `
-Name $storageAccountName `
-ResourceGroupName $resourceGroup)[0].Value
#Create a storage context
$context = New-AzStorageContext `
-StorageAccountName $storageAccountName `
-StorageAccountKey $storageAccountKey
# Download the file
Get-AzStorageBlobContent `
-Blob 'example/wordcountoutput/part-00000' `
-Container $container `
-Destination output.txt `
-Context $context
}
Este script solicita-lhe o nome e palavra-passe da conta de login do cluster, juntamente com o nome do cluster HDInsight. Uma vez concluído o trabalho, a saída é transferida para um ficheiro chamado output.txt. O texto a seguir é um exemplo dos dados do output.txt
ficheiro:
you 1128
young 38
younger 1
youngest 1
your 338
yours 4
yourself 34
yourselves 3
youth 17