Hadoop & Exemples Mapreduce: création du premier programme en Java

Table des matières:

Anonim

Dans ce didacticiel, vous apprendrez à utiliser Hadoop avec des exemples MapReduce. Les données d'entrée utilisées sont SalesJan2009.csv. Il contient des informations relatives aux ventes telles que le nom du produit, le prix, le mode de paiement, la ville, le pays du client, etc. L'objectif est de connaître le nombre de produits vendus dans chaque pays.

Dans ce didacticiel, vous apprendrez-

  • Premier programme Hadoop MapReduce
  • Explication de la classe SalesMapper
  • Explication de la classe SalesCountryReducer
  • Explication de la classe SalesCountryDriver

Premier programme Hadoop MapReduce

Maintenant, dans ce tutoriel MapReduce, nous allons créer notre premier programme Java MapReduce:

Données de venteJan2009

Assurez-vous que Hadoop est installé. Avant de commencer le processus réel, changez l'utilisateur en 'hduser' (id utilisé lors de la configuration Hadoop, vous pouvez basculer vers l'ID utilisateur utilisé lors de votre configuration de programmation Hadoop).

su - hduser_

Étape 1)

Créez un nouveau répertoire avec le nom MapReduceTutorial comme shwon dans l'exemple MapReduce ci-dessous

sudo mkdir MapReduceTutorial

Donner des autorisations

sudo chmod -R 777 MapReduceTutorial

SalesMapper.java

package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper  {private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector  output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}

SalesCountryReducer.java

package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer {public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}

SalesCountryDriver.java

package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}

Téléchargez les fichiers ici

Vérifiez les autorisations de fichier de tous ces fichiers

et si les autorisations de `` lecture '' sont manquantes, accordez-en la même chose.

Étape 2)

Exportez le chemin de classe comme indiqué dans l'exemple Hadoop ci-dessous

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Étape 3)

Compilez les fichiers Java (ces fichiers sont présents dans le répertoire Final-MapReduceHandsOn ). Ses fichiers de classe seront placés dans le répertoire du package

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Cet avertissement peut être ignoré en toute sécurité.

Cette compilation créera un répertoire dans un répertoire courant nommé avec le nom du package spécifié dans le fichier source java (c'est-à-dire SalesCountry dans notre cas) et y mettra tous les fichiers de classe compilés.

Étape 4)

Créer un nouveau fichier Manifest.txt

sudo gedit Manifest.txt

ajoutez-y les lignes suivantes,

Main-Class: SalesCountry.SalesCountryDriver

SalesCountry.SalesCountryDriver est le nom de la classe principale. Veuillez noter que vous devez appuyer sur la touche Entrée à la fin de cette ligne.

Étape 5)

Créer un fichier Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Vérifiez que le fichier jar est créé

Étape 6)

Démarrez Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Étape 7)

Copiez le fichier SalesJan2009.csv dans ~ / inputMapReduce

Maintenant, utilisez la commande ci-dessous pour copier ~ / inputMapReduce vers HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Nous pouvons ignorer cet avertissement en toute sécurité.

Vérifiez si un fichier est effectivement copié ou non.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Étape 8)

Exécuter la tâche MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Cela créera un répertoire de sortie nommé mapreduce_output_sales sur HDFS. Le contenu de ce répertoire sera un fichier contenant les ventes de produits par pays.

Étape 9)

Le résultat peut être vu via l'interface de commande comme suit:

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Les résultats peuvent également être consultés via une interface Web comme-

Ouvrez r dans un navigateur Web.

Sélectionnez maintenant 'Parcourir le système de fichiers' et accédez à / mapreduce_output_sales

Pièce ouverte -r-00000

Explication de la classe SalesMapper

Dans cette section, nous allons comprendre l'implémentation de la classe SalesMapper .

1. Nous commençons par spécifier un nom de package pour notre classe. SalesCountry est un nom de notre package. Veuillez noter que la sortie de la compilation, SalesMapper.class ira dans un répertoire nommé par ce nom de package: SalesCountry .

Ensuite, nous importons des packages de bibliothèques.

Ci-dessous, un instantané montre une implémentation de la classe SalesMapper-

Exemple d'explication de code:

1. Définition de la classe SalesMapper-

classe publique SalesMapper étend MapReduceBase implémente Mapper {

Chaque classe de mappeur doit être étendue à partir de la classe MapReduceBase et doit implémenter l' interface Mapper .

2. Définition de la fonction 'map' -

public void map(LongWritable key,Text value,OutputCollector output,Reporter reporter) throws IOException

La partie principale de la classe Mapper est une méthode 'map ()' qui accepte quatre arguments.

A chaque appel à la méthode 'map ()' , une paire clé-valeur ( 'key' et 'value' dans ce code) est passée.

La méthode 'map ()' commence par fractionner le texte d'entrée qui est reçu comme argument. Il utilise le tokenizer pour diviser ces lignes en mots.

String valueString = value.toString();String[] SingleCountryData = valueString.split(",");

Ici, ',' est utilisé comme délimiteur.

Après cela, une paire est formée en utilisant un enregistrement au 7ème index du tableau 'SingleCountryData' et une valeur '1' .

output.collect (nouveau texte (SingleCountryData [7]), un);

Nous choisissons l'enregistrement au 7ème index parce que nous avons besoin de données de pays et il est situé au 7ème index dans le tableau 'SingleCountryData' .

Veuillez noter que nos données d'entrée sont au format ci-dessous (où le pays est au 7ème index, avec 0 comme indice de départ) -

Transaction_date, Product, Price, Payment_Type, Name, City, State, Country , Account_Created, Last_Login, Latitude, Longitude

Une sortie de mapper est à nouveau une paire clé-valeur qui est sortie en utilisant la méthode 'collect ()' de 'OutputCollector' .

Explication de la classe SalesCountryReducer

Dans cette section, nous allons comprendre l'implémentation de la classe SalesCountryReducer .

1. Nous commençons par spécifier un nom du package pour notre classe. SalesCountry est un nom de package out. Veuillez noter que la sortie de la compilation, SalesCountryReducer.class ira dans un répertoire nommé par ce nom de package: SalesCountry .

Ensuite, nous importons des packages de bibliothèques.

Ci-dessous, l'instantané montre une implémentation de la classe SalesCountryReducer-

Explication du code:

1. Définition de la classe SalesCountryReducer-

Classe publique SalesCountryReducer étend MapReduceBase implémente Reducer {

Ici, les deux premiers types de données, «Texte» et «IntWritable» sont le type de données de la valeur-clé d'entrée du réducteur.

La sortie du mappeur se présente sous la forme , . Cette sortie du mappeur devient l'entrée du réducteur. Ainsi, pour s'aligner sur son type de données, Text et IntWritable sont utilisés comme type de données ici.

Les deux derniers types de données, «Texte» et «IntWritable» sont le type de données de sortie généré par le réducteur sous la forme d'une paire clé-valeur.

Chaque classe de réducteur doit être étendue à partir de la classe MapReduceBase et doit implémenter l' interface Reducer .

2. Définition de la fonction «réduire»

public void reduce( Text t_key,Iterator values,OutputCollector output,Reporter reporter) throws IOException {

Une entrée de la méthode reduction () est une clé avec une liste de plusieurs valeurs.

Par exemple, dans notre cas, ce sera-

, , , , , .

Ceci est donné au réducteur comme

Ainsi, pour accepter des arguments de cette forme, les deux premiers types de données sont utilisés, à savoir, Text et Iterator . Le texte est un type de données de clé et Iterator est un type de données pour la liste de valeurs pour cette clé.

L'argument suivant est de type OutputCollector qui collecte la sortie de la phase du réducteur.

La méthode reduction () commence par copier la valeur de la clé et initialiser le nombre de fréquences à 0.

Touche de texte = touche_t; int frequencyForCountry = 0;

Ensuite, en utilisant la boucle « while » , nous parcourons la liste des valeurs associées à la clé et calculons la fréquence finale en additionnant toutes les valeurs.

 while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}

Maintenant, nous poussons le résultat vers le collecteur de sortie sous la forme de clé et de comptage de fréquence obtenu .

Le code ci-dessous fait cela-

output.collect(key, new IntWritable(frequencyForCountry));

Explication de la classe SalesCountryDriver

Dans cette section, nous allons comprendre l'implémentation de la classe SalesCountryDriver

1. Nous commençons par spécifier un nom de package pour notre classe. SalesCountry est un nom de package out. Veuillez noter que la sortie de la compilation, SalesCountryDriver.class ira dans le répertoire nommé par ce nom de package: SalesCountry .

Voici une ligne spécifiant le nom du package suivi du code pour importer les packages de bibliothèque.

2. Définissez une classe de pilote qui créera un nouveau travail client, un nouvel objet de configuration et annoncera les classes Mapper et Reducer.

La classe de pilote est responsable de la définition de notre tâche MapReduce pour qu'elle s'exécute dans Hadoop. Dans cette classe, nous spécifions le nom du travail, le type de données d'entrée / sortie et les noms des classes de mappeur et de réducteur .

3. Dans l'extrait de code ci-dessous, nous définissons les répertoires d'entrée et de sortie qui sont utilisés pour consommer le jeu de données d'entrée et produire la sortie, respectivement.

arg [0] et arg [1] sont les arguments de ligne de commande passés avec une commande donnée dans MapReduce pratique, c'est-à-dire,

$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales

4. Déclenchez notre travail

Sous le code, lancez l'exécution de la tâche MapReduce-

try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}