7 Développement d`une application de MapReduce

publicité
7
Développement d’une application de MapReduce
Ecrire un programme d’Hadoop demande un processus : écrire une fonction map, une fonction reduce et
tester localement. Ecrire ensuite un programme pour lancer un job sur une bases de données de petite taille,
fixer les erreurs s’il y en a. Une fois que le programme marche parfaitement sur les données ”petites”, on
peut le mettre sur un cluster. Debugger un programme raté est un chalenge. Ensuite, quand le programme
marche sur les données complètes, on fait des tunings.
7.1
7.1.1
API de configuration
Configuration d’une ressource
Les composantes d’Hadoop sont configurées dans son propre API de configuration. Une instance de la
classe Configuration représente une collection des propriétés de configuration et leurs valeurs. Chaque
propriété est nommée par un String, et le type peut être boolean, int, long, float, String, Class,
java.io.File ou d’autres collections de String.
Voici un example
<?xml version="1.0"?>
<configuration>
<property>
<name>color</name>
<value>yellow</value>
<description>Color</description>
</property>
<property>
<name>size</name>
<value>10</value>
<description>Size</description>
</property>
<property>
<name>weight</name>
<value>heavy</value>
<final>true</final>
<description>Weight</description>
</property>
<property>
<name>size-weight</name>
<value>${size},${weight}</value>
<description>Size and weight</description>
</property>
</configuration>
Il est possible d’intervenir dans le fichier configuration-1.xml ci-dessus par une pièce de code Java.
Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
assertThat(conf.get("color"), is("yellow"));
assertThat(conf.getInt("size", 0), is(10));
assertThat(conf.get("breadth", "wide"), is("wide"));
Remarquons que le type d’une propriété n’est pas stocké dans le fichier xml.
21
7.1.2
Combinaison de ressources
Si l’on a un deuxième fichier de configuration configuration-2.xml, on pourra combiner en codant :
Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
conf.addResource("configuration-2.xml");
avec
<?xml version="1.0"?>
<configuration>
<property>
<name>size</name>
<value>12</value>
</property>
<property>
<name>weight</name>
<value>light</value>
</property>
</configuration>
Règle : Les propriétés sont définies comme dans la ressource qui sont ajoutée en dernier, sauf celles qui
sont marquées final.
7.1.3
Expansion de variables
Les propriétés peuvent être définies en terme des propriétés de système. Par exemple, la propriété
size-weight dans le premier fichier de configuration est définie comme
$s{size}, $s{weight}
puis ces propriétés sont épandu en utilisant la valeur dans la configuration 2.
Les propriétés de système prend la priorité sur celles définies dans les fichiers de ressources.
System.setProperty("size", "14");
assertThat(conf.get("size-weight"), is("14,heavy"));
Notons que même si les propriétés de configuration peuvent être définies par la propriété de système, ils
sont inaccessibles à la di↵érence des propriétés redéfinies en utilisant celles de configuration.
7.2
Configuration de l’environnement de développement
Dans ce cours, on utilise Eclipse comme l’IDE de développement. On crée un nouveau projet de Java
avec Eclipse et ajoute tous les fichiers JAR du niveau top de la distribution et du dossier lib au classpath.
On pourra ensuite compiler Java Hadoop et le fonctionner en mode locale.
7.2.1
Gestion de Configuration
Il est nécessaire au cours de développement de basculer entre le mode local et en cluster. Une manière
d’accommoder ces variations est d’avoir des fichiers de configuration d’Hadoop. On peut utiliser trois fichiers
correspondant aux modes divers. Les noms de fichiers ne sont pas spéciaux, on peut les remplacer par les
autres noms à notre choix.
— Le fichier hadoop-local.xml : Configuration par défaut, utilisant le mode local.
22
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>file:///</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>local</value>
</property>
</configuration>
— Le fichier hadoop-localhost.xml : Les configurations pointent à un namenode et un jobtracker
exécutant en localhost.
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost/</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>localhost:8021</value>
</property>
</configuration>
— Le fichier hadoop-cluster.xml contient les détails sur les namenode de clusters et les adresses de
jobtracker.
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://namenode/</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>jobtracker:8021</value>
</property>
</configuration>
On pourra lancer en choisissant un mode, par exemple :
% hadoop fs -conf hadoop-localhost.xml -ls
L’écran affiche quelque chose de similaire à :
Found 12 items
-rw-r--r-1
-rw-r--r-1
-rw-r--r-1
drwxr-xr-x
-rw-r--r-1
-rw-r--r-1
drwxr-xr-x
drwxr-xr-x
-
riduan91
riduan91
riduan91
riduan91
riduan91
riduan91
riduan91
riduan91
staff
staff
staff
staff
staff
staff
staff
staff
6148
958
366
340
255
272
136
136
2015-01-21
2015-01-16
2015-01-16
2015-01-21
2015-01-21
2015-01-21
2015-01-21
2015-01-19
23
14:44
10:37
07:23
22:14
18:14
18:15
14:58
10:59
.DS_Store
.classpath
.project
bin
hadoop-local.xml
hadoop-localhost.xml
output
output1
drwxr-xr-x
-rw-r--r-drwxr-xr-x
-rw-r--r-7.2.2
1
1
riduan91
riduan91
riduan91
riduan91
staff
staff
staff
staff
238
95
204
39
2015-01-21
2015-01-16
2015-01-21
2015-01-21
22:14
08:33
14:44
15:05
src
weather.txt
weather_result
weather_result.txt
GenericOptionsParser, Tool et ToolRunner
GenericOptionsParser est une classe qui interprète les options command-line commune d’Hadoop, puis
les installe sur un objet de type Configuration. Il est convénient d’implémenter l’interface Tool et exécuter
l’application avec ToolRunner. On remarque que l’interface Tool est de structure suivante :
public interface Tool extends Configurable {
int run(String [] args) throws Exception;
}
Voici un exemple qui affiche les propriétés d’une Configuration.
import java.util.Map.Entry;
import
import
import
import
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.conf.Configured;
org.apache.hadoop.util.Tool;
org.apache.hadoop.util.ToolRunner;
public class ConfigurationPrinter extends Configured implements Tool {
static {
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
for (Entry<String, String> entry: conf) {
System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());
}
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new ConfigurationPrinter(), args);
System.exit(exitCode);
}
}
Le bloc statique garantit que les configurations de HFDS et MapReduce sont ajoutés dans Configuration
en outre que le tronc (core).
Il est possible de trouver la valeur d’une propriété en appelant :
% export HADOOP_CLASSPATH=bin
% hadoop ConfigurationPrinter -conf hadoop-localhost.xml | grep mapred.job.tracker=
24
qui affichera
mapred.job.tracker=localhost:8021
7.3
7.3.1
Ecrire des tests local par MRUnit
Préparation
Il est essentiel de suivre les étapes suivants pour lancer MRUnit en Eclipse si l’on utilise Hadoop 2.6.0.
1. Télécharger la version recommandée de MRUnit d’Apache. Pour Hadoop 2.6.0, on utilise
mrunit-1.1.0-hadoop2.jar et le met dans Buildpath d’Eclipse. (https://repository.apache.
org/content/repositories/releases/org/apache/mrunit/mrunit/)
2. Chercher la version correspondante pour Mosquito, i.e, 1.9.5 pour Hadoop 2.6.0 (https://code.
google.com/p/mockito/downloads/list ), le met dans Buildpath d’Eclipse.
3. Créer un projet Maven dans Eclipse et modifier le fichier pom.xml en ajoutant :
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<classifier>hadoop2</classifier>
</dependency>
4. Vérifier que la partie ”Dependencies” a forme :
Figure 6 – Dependences de MRUnit pour Hadoop2
5. Mettre les fichiers jar nécessaires en Buildpath (Figure 7). On peut les trouver dans
hadoop-2.6.0/source/hadoop/.
7.3.2
Map Test
La classe MaxTemperatureMapper
import java.io.IOException;
25
Figure 7 – Buildpath pour le projet test
import
import
import
import
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.io.Text;
org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(0, 4);
String temp = "";
if (line.charAt(4)==’+’)
temp = line.substring(5, 9);
else
temp = line.substring(4, 9);
if (!missing(temp)) {
int airTemperature = Integer.parseInt(temp);
context.write(new Text(year), new IntWritable(airTemperature));
}
}
private boolean missing(String temp) {
return temp.equals("+9999");
}
}
La classe test MaxTemperatureMapperTest
import
import
import
import
import
import
java.io.IOException;
org.apache.hadoop.io.Text;
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.mrunit.mapreduce.MapDriver;
org.junit.*;
26
public class MaxTemperatureMapperTest {
@Test
public void processesValidRecord() throws IOException, InterruptedException {
Text value = new Text("1951+12349_");
new MapDriver<LongWritable, Text, Text, IntWritable>()
.withMapper(new MaxTemperatureMapper())
.withInput(new LongWritable(),value)
.withOutput(new Text("1951"), new IntWritable(1234))
.runTest();
}
}
7.3.3
Reduce Test
La classe MaxTemperatureReducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
La classe test MaxTemperatureReducerTest
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
public class MaxTemperatureReducerTest {
public void returnsMaximumIntegerInValues() throws IOException,
InterruptedException {
new ReduceDriver<Text, IntWritable, Text, IntWritable>()
.withReducer(new MaxTemperatureReducer())
27
.withInput(new Text("1950"), Arrays.asList(new IntWritable(10), new IntWritable(5)))
.withOutput(new Text("1950"), new IntWritable(10))
.runTest();
}
}
7.4
Exécution locale sur les données test
7.4.1
Installation de Maven
1. Télécharger un fichier tar.gz sur le site http://maven.apache.org/download.cgi, l’extraire dans
un dossier, par exemple /user/apache-maven-3.2.5
2. Ecrire dans le fichier .bash profile :
export M2_HOME=/user/apache-maven-3.2.5
export M2=$M2_HOME/bin
export PATH=$M2:$PATH
3. Vérifier avec la commande :
% mvn --version
L’écran devrait afficher quelque chose de similaire à :
Apache Maven 3.2.5 (12a6b3acb947671f09b81f49094c53f426d8cea1; 2014-12-14T18:29:23+01:00)
Maven home: /user/apache-maven-3.2.5
Java version: 1.6.0_65, vendor: Apple Inc.
Java home: /System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
Default locale: en_US, platform encoding: MacRoman
OS name: "mac os x", version: "10.9.5", arch: "x86_64", family: "mac"
7.4.2
Lancer un Job dans Local Job Runner
L’application pour MaxTemperature
import
import
import
import
import
import
import
import
import
org.apache.hadoop.conf.Configured;
org.apache.hadoop.fs.Path;
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.Text;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
org.apache.hadoop.util.Tool;
org.apache.hadoop.util.ToolRunner;
public class MaxTemperatureDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = Job.getInstance();
28
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args); System.exit(exitCode);
}
}
Pour lancer le programme, on peut utliser MVN :
% mvn compile
% export HADOOP_CLASSPATH=target/classes/
% hadoop MaxTemperatureDriver -conf conf/hadoop-local.xml weather.txt output
ou bien en Hadoop
% hadoop v2.MaxTemperatureDriver -fs file:/// -jt local input/ncdc/micro output
29
Téléchargement