在 HDInsight 中的 Apache Hadoop 上搭配 MapReduce 串流使用 C#
瞭解如何使用 C# 在 HDInsight 上建立 MapReduce 解決方案。
Apache Hadoop 串流可讓您使用腳本或可執行文件執行 MapReduce 作業。 在這裡,.NET 是用來實作字數計數解決方案的對應器和歸納器。
HDInsight 上的 .NET
HDInsight 叢集會使用 Mono (https://mono-project.com) 執行 .NET 應用程式。 4.2.1 版的 Mono 隨附於 3.6 版的 HDInsight。 如需 HDInsight 隨附之 Mono 版本的詳細資訊,請參閱 HDInsight 版本提供的 Apache Hadoop 元件。
如需Mono與.NET Framework 版本相容性的詳細資訊,請參閱 Mono相容性。
Hadoop 串流的運作方式
本檔案中用於串流的基本程式如下:
- Hadoop 會將數據傳遞至 STDIN 上的對應程式(在此範例中為mapper.exe )。
- 對應程式會處理數據,並將索引標籤分隔的索引鍵/值組發出至 STDOUT。
- Hadoop 會讀取輸出,然後傳遞至 STDIN 上的歸納器(在此範例中reducer.exe )。
- 歸納器會讀取索引標籤分隔的索引鍵/值組、處理數據,然後將結果發出為 STDOUT 上以製表符分隔的索引鍵/值組。
- Hadoop 會讀取輸出並寫入輸出目錄。
如需串流的詳細資訊,請參閱 Hadoop 串流。
必要條件
Visual Studio。
熟悉撰寫和建置以 .NET Framework 4.5 為目標的 C# 程序代碼。
將.exe檔案上傳至叢集的方法。 本檔中的步驟會使用 Data Lake Tools for Visual Studio 將檔案上傳至叢集的主要記憶體。
如果使用 PowerShell,您將需要 Az Module。
HDInsight 上的 Apache Hadoop 叢集。 請參閱開始在 Linux 上使用 HDInsight。
您叢集主要儲存體的 URI 配置。 此配置適用於
wasb://
Azure 儲存體、abfs://
Azure Data Lake 儲存體 Gen2 或adl://
Azure Data Lake 儲存體 Gen1。 如果針對 Azure 儲存體或 Data Lake Storage Gen2 已啟用安全傳輸,則 URI 分別會是wasbs://
或abfss://
。
建立對應程式
在 Visual Studio 中,建立名為 mapper 的新 .NET Framework 控制台應用程式。 針對應用程式使用下列程式代碼:
using System;
using System.Text.RegularExpressions;
namespace mapper
{
class Program
{
static void Main(string[] args)
{
string line;
//Hadoop passes data to the mapper on STDIN
while((line = Console.ReadLine()) != null)
{
// We only want words, so strip out punctuation, numbers, etc.
var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
// Split at whitespace.
var words = Regex.Matches(onlyText, @"[\w]+");
// Loop over the words
foreach(var word in words)
{
//Emit tab-delimited key/value pairs.
//In this case, a word and a count of 1.
Console.WriteLine("{0}\t1",word);
}
}
}
}
}
建立應用程式之後,請建置它以在專案目錄中產生 /bin/Debug/mapper.exe 檔案。
建立歸納器
在 Visual Studio 中,建立名為 reducer 的新 .NET Framework 控制台應用程式。 針對應用程式使用下列程式代碼:
using System;
using System.Collections.Generic;
namespace reducer
{
class Program
{
static void Main(string[] args)
{
//Dictionary for holding a count of words
Dictionary<string, int> words = new Dictionary<string, int>();
string line;
//Read from STDIN
while ((line = Console.ReadLine()) != null)
{
// Data from Hadoop is tab-delimited key/value pairs
var sArr = line.Split('\t');
// Get the word
string word = sArr[0];
// Get the count
int count = Convert.ToInt32(sArr[1]);
//Do we already have a count for the word?
if(words.ContainsKey(word))
{
//If so, increment the count
words[word] += count;
} else
{
//Add the key to the collection
words.Add(word, count);
}
}
//Finally, emit each word and count
foreach (var word in words)
{
//Emit tab-delimited key/value pairs.
//In this case, a word and a count of 1.
Console.WriteLine("{0}\t{1}", word.Key, word.Value);
}
}
}
}
建立應用程式之後,請建置它以在專案目錄中產生 /bin/Debug/reducer.exe 檔案。
上傳至儲存體
接下來,您必須將對應器和歸納器應用程式上傳至 HDInsight 記憶體。
在 Visual Studio 中,選取 [檢視>伺服器總管]。
以滑鼠右鍵按兩下 [Azure],選取 [連線 至 Microsoft Azure 訂用帳戶...],然後完成登入程式。
展開您想要部署此應用程式的 HDInsight 叢集。 列出文字為 [預設 儲存體 帳戶] 的專案。
如果可以展開 [預設 儲存體 帳戶] 專案,您就會使用 Azure 儲存體 帳戶作為叢集的預設記憶體。 若要檢視叢集預設記憶體上的檔案,請展開專案,然後按兩下 [預設容器]。
如果無法展開 [預設 儲存體 帳戶] 專案,您就會使用 Azure Data Lake 儲存體 作為叢集的預設記憶體。 若要檢視叢集預設記憶體上的檔案,請按兩下 [預設 儲存體 帳戶] 專案。
若要上傳.exe檔案,請使用下列其中一種方法:
如果您使用 Azure 儲存體 帳戶,請選取 [上傳 Blob] 圖示。
在 [上傳新檔案] 對話方塊的 [檔名] 底下,選取 [瀏覽]。 在 [上傳 Blob] 對話框中,移至 mapper 專案的 bin\debug 資料夾,然後選擇mapper.exe檔案。 最後,選取 [開啟],然後選取 [確定] 以完成上傳。
針對 Azure Data Lake 儲存體,以滑鼠右鍵按下檔案清單中的空白區域,然後選取 [上傳]。 最後,選取 mapper.exe 檔案,然後選取 [ 開啟]。
mapper.exe上傳完成後,請針對reducer.exe檔案重複上傳程式。
執行作業:使用 SSH 工作階段
下列程序說明如何使用 SSH 工作階段執行 MapReduce 作業:
使用 ssh 命令來連線到您的叢集。 編輯以下命令並將 CLUSTERNAME 取代為您叢集的名稱,然後輸入命令:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
使用下列其中一個命令來啟動 MapReduce 作業:
如果預設記憶體是 Azure 儲存體:
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \ -files wasbs:///mapper.exe,wasbs:///reducer.exe \ -mapper mapper.exe \ -reducer reducer.exe \ -input /example/data/gutenberg/davinci.txt \ -output /example/wordcountout
如果預設記憶體為 Data Lake 儲存體 Gen1:
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \ -files adl:///mapper.exe,adl:///reducer.exe \ -mapper mapper.exe \ -reducer reducer.exe \ -input /example/data/gutenberg/davinci.txt \ -output /example/wordcountout
如果預設記憶體為 Data Lake 儲存體 Gen2:
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \ -files abfs:///mapper.exe,abfs:///reducer.exe \ -mapper mapper.exe \ -reducer reducer.exe \ -input /example/data/gutenberg/davinci.txt \ -output /example/wordcountout
下列清單描述每個參數和選項所代表的內容:
參數 描述 hadoop-streaming.jar 指定包含串流 MapReduce 功能的 jar 檔案。 -檔 指定 此作業的mapper.exe 和 reducer.exe 檔案。 、 wasbs:///
adl:///
、 或abfs:///
通訊協定宣告之前,每個檔案都是叢集預設記憶體根目錄的路徑。-映射 指定實作對應工具的檔案。 -減速器 指定實作歸納器的檔案。 -輸入 指定輸入數據。 -輸出 指定輸出目錄。 MapReduce 作業完成後,請使用下列命令來檢視結果:
hdfs dfs -text /example/wordcountout/part-00000
下列文字是此指令所傳回資料的範例:
you 1128 young 38 younger 1 youngest 1 your 338 yours 4 yourself 34 yourselves 3 youth 17
執行作業:使用 PowerShell
使用下列 PowerShell 腳本來執行 MapReduce 作業並下載結果。
# Login to your Azure subscription
$context = Get-AzContext
if ($context -eq $null)
{
Connect-AzAccount
}
$context
# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"
# Path for job output
$outputPath="/example/wordcountoutput"
# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]
# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
# of default storage.
$jobDef=New-AzHDInsightStreamingMapReduceJobDefinition `
-Files "/mapper.exe","/reducer.exe" `
-Mapper "mapper.exe" `
-Reducer "reducer.exe" `
-InputPath "/example/data/gutenberg/davinci.txt" `
-OutputPath $outputPath
# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzHDInsightJob `
-ClusterName $clusterName `
-JobDefinition $jobDef `
-HttpCredential $creds
#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzHDInsightJob `
-ClusterName $clusterName `
-JobId $job.JobId `
-HttpCredential $creds
Write-Progress -Activity $activity -Completed
# Download the output
if($storageType -eq 'azuredatalakestore') {
# Azure Data Lake Store
# Fie path is the root of the HDInsight storage + $outputPath
$filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
Export-AzDataLakeStoreItem `
-Account $storageAccountName `
-Path $filePath `
-Destination output.txt
} else {
# Az.Storage account
# Get the container
$container=$clusterInfo.DefaultStorageContainer
#NOTE: This assumes that the storage account is in the same resource
# group as HDInsight. If it is not, change the
# --ResourceGroupName parameter to the group that contains storage.
$storageAccountKey=(Get-AzStorageAccountKey `
-Name $storageAccountName `
-ResourceGroupName $resourceGroup)[0].Value
#Create a storage context
$context = New-AzStorageContext `
-StorageAccountName $storageAccountName `
-StorageAccountKey $storageAccountKey
# Download the file
Get-AzStorageBlobContent `
-Blob 'example/wordcountoutput/part-00000' `
-Container $container `
-Destination output.txt `
-Context $context
}
此文稿會提示您輸入叢集登入帳戶名稱和密碼,以及 HDInsight 叢集名稱。 作業完成後,輸出會下載至名為 output.txt 的檔案。 下列文字是檔案中 output.txt
資料的範例:
you 1128
young 38
younger 1
youngest 1
your 338
yours 4
yourself 34
yourselves 3
youth 17
下一步
- 在 HDInsight 上的 Apache Hadoop 中使用 MapReduce。
- 搭配 Apache Hive 和 Apache Pig 使用 C# 使用者定義函式。
- 開發 Java MapReduce 程式
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應