Usare C# con lo streaming di MapReduce su Apache Hadoop in HDInsight
Informazioni su come usare C# per creare una soluzione di MapReduce su HDInsight.
Apache Hadoop Streaming consente di eseguire processi MapReduce tramite uno script o un eseguibile. Qui, .NET è usato per implementare il mapper e il reducer per una soluzione di conteggio parole.
.NET su HDInsight
I cluster HDInsight usano Mono (https://mono-project.com)per eseguire applicazioni .NET. La versione Mono 4.2.1 è inclusa nella versione 3.6 di HDInsight. Per altre informazioni sulla versione Mono compresa in HDInsight, vedere Componenti di Hadoop disponibili nelle versioni di HDInsight.
Per altre informazioni sulla compatibilità Mono con le versioni di .NET Framework, vedere il documento relativo alla compatibilità Mono.
Come funziona lo streaming di Hadoop
Il processo di base usato per il flusso in questo documento è il seguente:
- Hadoop passa i dati al mapper (mapper.exe in questo esempio) su STDIN.
- Il mapper elabora i dati ed emette una coppia chiave/valore delimitata da tabulazione su STDOUT.
- L'output viene letto da Hadoop e quindi passato al riduttore (reducer.exe in questo esempio) su STDIN.
- Il riduttore legge le coppie chiave/valore delimitate da tabulazioni, elabora i dati e quindi genera il risultato come coppie chiave/valore delimitate da tabulazione su STDOUT.
- L'output viene letto da Hadoop e scritto nella directory di output.
Per altre informazioni sullo streaming, vedere Hadoop Streaming.
Prerequisiti
Visual Studio.
Una familiarità nello scrivere e nel compilare il codice C# destinato a .NET Framework 4.5.
Un modo per caricare i file .exe sul cluster. La procedura in questo documento usa gli strumenti Data Lake per Visual Studio per caricare i file nell'archiviazione primaria per il cluster.
Se si usa PowerShell, è necessario il modulo Az.
Un cluster Apache Hadoop in HDInsight. Vedere Guida introduttiva: Introduzione ad Apache Hadoop e Apache Hive in Azure HDInsight usando il modello di Resource Manager.
Lo schema URI per l'archiviazione primaria dei cluster. Questo schema corrisponde a
wasb://
per Archiviazione di Azure, aabfs://
per Azure Data Lake Storage Gen2 e aadl://
per Azure Data Lake Storage Gen1. Se il trasferimento sicuro è abilitato per Archiviazione di Azure o Data Lake Storage Gen2, l'URI sarà rispettivamentewasbs://
oabfss://
.
Creare il mapper
In Visual Studio creare una nuova applicazione console .NET Framework denominata mapper. Usare il codice seguente per l'applicazione:
using System;
using System.Text.RegularExpressions;
namespace mapper
{
class Program
{
static void Main(string[] args)
{
string line;
//Hadoop passes data to the mapper on STDIN
while((line = Console.ReadLine()) != null)
{
// We only want words, so strip out punctuation, numbers, etc.
var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
// Split at whitespace.
var words = Regex.Matches(onlyText, @"[\w]+");
// Loop over the words
foreach(var word in words)
{
//Emit tab-delimited key/value pairs.
//In this case, a word and a count of 1.
Console.WriteLine("{0}\t1",word);
}
}
}
}
}
Dopo aver creato l'applicazione, compilarla per produrre il file /bin/Debug/mapper.exe
nella directory del progetto.
Creare il reducer
In Visual Studio, creare una nuova applicazione console .NET Framework denominata reducer. Usare il codice seguente per l'applicazione:
using System;
using System.Collections.Generic;
namespace reducer
{
class Program
{
static void Main(string[] args)
{
//Dictionary for holding a count of words
Dictionary<string, int> words = new Dictionary<string, int>();
string line;
//Read from STDIN
while ((line = Console.ReadLine()) != null)
{
// Data from Hadoop is tab-delimited key/value pairs
var sArr = line.Split('\t');
// Get the word
string word = sArr[0];
// Get the count
int count = Convert.ToInt32(sArr[1]);
//Do we already have a count for the word?
if(words.ContainsKey(word))
{
//If so, increment the count
words[word] += count;
} else
{
//Add the key to the collection
words.Add(word, count);
}
}
//Finally, emit each word and count
foreach (var word in words)
{
//Emit tab-delimited key/value pairs.
//In this case, a word and a count of 1.
Console.WriteLine("{0}\t{1}", word.Key, word.Value);
}
}
}
}
Dopo aver creato l'applicazione, compilarla per produrre il file /bin/Debug/reducer.exe
nella directory del progetto.
Caricare nella risorsa di archiviazione
Successivamente, è necessario caricare le applicazioni mapper e reducer nell'archiviazione di HDInsight.
In Visual Studio, selezionare Visualizza>Esplora server.
Fare clic con il pulsante destro del mouse su Azure, selezionare Connessione alla sottoscrizione di Microsoft Azure e quindi completare il processo di accesso.
Espandere il cluster HDInsight in cui si desidera distribuire l'applicazione. Viene elencata una voce con il testo (Account di archiviazione predefinito).
Se non è possibile espandere la voce (Default Storage Account), si usa un account di archiviazione di Azure come archivio predefinito per il cluster. Per visualizzare i file nel percorso di archiviazione predefinito per il cluster, espandere la voce e quindi fare doppio clic su (Contenitore predefinito).
Se non è possibile espandere la voce (Default Storage Account), si usa Azure Data Lake Storage come archivio predefinito per il cluster. Per visualizzare i file nel percorso di archiviazione predefinito per il cluster, fare doppio clic sulla voce (Account di archiviazione predefinito).
Per caricare i file con estensione .exe, usare uno dei metodi seguenti:
Se si utilizza un account di archiviazione di Azure, selezionare l'icona Carica BLOB.
Nella finestra di dialogo Carica nuovo file, in Nome file, selezionare Sfoglia. Nella finestra di dialogo Carica BLOB, passare alla cartella bin\debug del progetto mapper e quindi scegliere il file mapper.exe. Infine, selezionare Apri e quindi OK per completare il caricamento.
Per Azure Data Lake Storage, fare clic con il pulsante destro del mouse su un'area vuota nell'elenco di file e quindi scegliere Carica. Selezionare infine il file mapper.exe e quindi selezionare Apri.
Una volta terminato il caricamento mapper.exe, ripetere il processo di caricamento per il file reducer.exe.
Eseguire un processo: uso di una sessione SSH
La procedura seguente descrive come eseguire un processo MapReduce usando una sessione SSH:
Usare il comando ssh per connettersi al cluster. Modificare il comando seguente sostituendo CLUSTERNAME con il nome del cluster in uso e quindi immettere il comando:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Usare uno dei comandi seguenti per avviare il processo MapReduce:
Se la risorsa di archiviazione predefinita è Archiviazione di Azure:
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \ -files wasbs:///mapper.exe,wasbs:///reducer.exe \ -mapper mapper.exe \ -reducer reducer.exe \ -input /example/data/gutenberg/davinci.txt \ -output /example/wordcountout
Se la risorsa di archiviazione predefinita è Data Lake Storage Gen1:
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \ -files adl:///mapper.exe,adl:///reducer.exe \ -mapper mapper.exe \ -reducer reducer.exe \ -input /example/data/gutenberg/davinci.txt \ -output /example/wordcountout
Se la risorsa di archiviazione predefinita è Data Lake Storage Gen2:
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \ -files abfs:///mapper.exe,abfs:///reducer.exe \ -mapper mapper.exe \ -reducer reducer.exe \ -input /example/data/gutenberg/davinci.txt \ -output /example/wordcountout
L'elenco seguente descrive il significato di ciascun parametro e opzione:
Parametro Descrizione hadoop-streaming.jar Specifica il file con estensione jar che contiene la funzionalità di streaming MapReduce. -files Specifica i file mapper.exe e reducer.exe per questo processo. La dichiarazione di protocollo wasbs:///
,adl:///
oabfs:///
prima di ogni file rappresenta il percorso della radice di archiviazione predefinita per il cluster.-mapper Specifica il file che implementa il mapper. -reducer Specifica il file che implementa il reducer. -input Specifica i dati di input. -output Specifica la directory di output. Dopo il completamento del processo di MapReduce, usare il comando seguente per visualizzare i risultati:
hdfs dfs -text /example/wordcountout/part-00000
L'elenco seguente è un esempio dei dati restituiti da questo comando:
you 1128 young 38 younger 1 youngest 1 your 338 yours 4 yourself 34 yourselves 3 youth 17
Esecuzione di un processo: Uso di PowerShell
Usare il seguente script di PowerShell per eseguire un processo MapReduce e scaricare i risultati.
# Login to your Azure subscription
$context = Get-AzContext
if ($context -eq $null)
{
Connect-AzAccount
}
$context
# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"
# Path for job output
$outputPath="/example/wordcountoutput"
# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]
# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
# of default storage.
$jobDef=New-AzHDInsightStreamingMapReduceJobDefinition `
-Files "/mapper.exe","/reducer.exe" `
-Mapper "mapper.exe" `
-Reducer "reducer.exe" `
-InputPath "/example/data/gutenberg/davinci.txt" `
-OutputPath $outputPath
# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzHDInsightJob `
-ClusterName $clusterName `
-JobDefinition $jobDef `
-HttpCredential $creds
#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzHDInsightJob `
-ClusterName $clusterName `
-JobId $job.JobId `
-HttpCredential $creds
Write-Progress -Activity $activity -Completed
# Download the output
if($storageType -eq 'azuredatalakestore') {
# Azure Data Lake Store
# Fie path is the root of the HDInsight storage + $outputPath
$filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
Export-AzDataLakeStoreItem `
-Account $storageAccountName `
-Path $filePath `
-Destination output.txt
} else {
# Az.Storage account
# Get the container
$container=$clusterInfo.DefaultStorageContainer
#NOTE: This assumes that the storage account is in the same resource
# group as HDInsight. If it is not, change the
# --ResourceGroupName parameter to the group that contains storage.
$storageAccountKey=(Get-AzStorageAccountKey `
-Name $storageAccountName `
-ResourceGroupName $resourceGroup)[0].Value
#Create a storage context
$context = New-AzStorageContext `
-StorageAccountName $storageAccountName `
-StorageAccountKey $storageAccountKey
# Download the file
Get-AzStorageBlobContent `
-Blob 'example/wordcountoutput/part-00000' `
-Container $container `
-Destination output.txt `
-Context $context
}
Questo script richiede l'account di accesso del cluster e la password, insieme al nome del cluster HDInsight. Al termine del processo, l'output viene scaricato in un file denominato output.txt. Il testo seguente è un esempio dei dati nel file output.txt
:
you 1128
young 38
younger 1
youngest 1
your 338
yours 4
yourself 34
yourselves 3
youth 17