Vous avez dit Hadoop ? – 1ère partie


image

Les Big Data constituent à l’évidence un sujet de prédilection ces derniers temps que ce soit sur les différents blogs de la toile ou dans le cadre des conférences du monde de l’IT. Et lorsque l’on parle de Big Data, un Framework revient en particulier inévitablement sur le devant de la scène : Hadoop (et ce, même si ce dernier ne couvre pas forcément tous les domaines caractérisés par le principe des 3V (Cf. Gartner Says Solving ‘Big Data’ Challenge Involves More Than Just Managing Volumes of Data), comme la Vitesse notamment)

Pourquoi ce « buzz » ? On peut se dire que les Big Data existent déjà depuis longtemps, sous d’autres appellations, et que ce n’est autre qu’une dénomination marketing. Néanmoins, si on analyse la situation d’un peu plus près, on se rend vite compte que l’apparition d’un Framework a changé la donne.

Dans ce billet, nous allons nous intéresser à l’écosystème d’Hadoop et ce qu’il propose comme outils. L’objectif n’est pas de faire à nouveau un survol des possibilités de la solution mais plutôt de rentrer dans les « entrailles de la bête ». Malheureusement, il existera toujours des aspects manquants, mais nous allons essayer de traiter les principaux.

Les billets suivants s’intéresseront quant à eux à l’implémentation de solutions avec HDInsight : Windows Azure HDInsight et HDInsight Server.

En guise d’introduction

Hadoop est un Framework pour le développement d’applications massivement parallèles écrit en Java. Concrètement ? Cela veut dire une abstraction des mécanismes complexes liés à la programmation parallèle.

Ils sont :

  • Souvent difficiles à appréhender (ou bien à adapter).
  • Fortement dépendants du système sur lequel l’application s’exécute (ex : architecture à mémoire partagée ou à mémoire distribuée).

Pour parler de programmation parallèle, nous allons d’abord parler de paradigme.

Qu’est-ce qu’un paradigme ?

D’après notre « ami » Wikipédia, c’est une manière de voir les choses ou plus spécifiquement une conception d’un problème en génie logiciel. On peut prendre comme exemples connus : le paradigme procédural, la programmation orientée objet (POO), le paradigme fonctionnel, etc.

Bref, dans la pratique, il existe deux paradigmes lorsqu’on parle de programmation sur systèmes massivement parallèles et ceux-ci découlent de l’architecture sur laquelle on exécute notre solution :

  1. Le paradigme à mémoire partagée : Celui-ci découle de l’architecture des ordinateurs récents composés de processeurs multi-cœurs. Ces cœurs accèdent à une seule et même mémoire (la RAM) par l’intermédiaire d’un bus. On dit que la mémoire est « partagée ». On fait souvent le raccourci en parlant de programmation SMP (pour symmetric shared memory multiprocessor). Ce paradigme ne nous intéressera pas dans ce billet dans la mesure où il est utilisé pour de faire la programmation parallèle sur simple ordinateur. Il est présenté ici pour comprendre l’évolution de la programmation parallèle.
  2. Le paradigme à mémoire distribuée : Quant à lui, son utilisation est pertinente lorsque l’on programme sur des « clusters ». Un cluster est tout simplement un réseau de serveurs/ordinateurs (on appelle ça des nœuds) sur lesquels s’exécute une application. Cela permet de procéder à de gros traitements en parallèle sans posséder de super ordinateur ; une approche certainement moins onéreuse 😉 Chaque nœud possède sa propre mémoire, il est donc nécessaire de s’envoyer par le réseau les bouts de mémoire nécessaires à un autre traitement. On procède par envoi de message (« Message Passing »).

A la simple description précédente, vous l’aurez compris, ce dernier paradigme pourrait nous intéresser pour les Big Data. Cependant, comme vous le verrez tout de suite après, Hadoop se fonde dans la pratique sur un autre paradigme, appelé Map/Reduce découlant des limitations de ces deux premiers paradigmes, d’où l’intérêt de les introduire très brièvement.

Un rapide historique

Ces architectures présentées précédemment existent déjà depuis fort longtemps (début les années 60 !), alors pourquoi les architectures Big Data sont apparues aussi tard ?

Et bien, tout d’abord parce que, jusqu’à l’avènement d’Internet et surtout du Web 2.0 (encore un « buzzword »…), il n’y avait pas tant de données à traiter que ça. Donc le « Message Passing » fonctionnait. Mais à partir du moment où chaque utilisateur a commencé à produire du contenu (on appelle ça le Web 2.0), la courbe d’évolution du nombre de données s’est transformée en courbe exponentielle. En ce 21ième siècle, à l’heure du tout numérique, désormais toute information a vocation à être « donnée » et toute donnée représente potentiellement un facteur clé pour la réussite d’une organisation, qu’il s’agisse d’un organisme de recherche, d’un acteur du Secteur Public ou d’une entreprise. Je vous renvoie à notre billet d’introduction pour ce blog.

Le problème pour le traitement du déluge de données aussi omniprésentes est, qu’avec les envois de messages, le réseau se trouve très vite saturé. On perd très vite l’avantage de traiter les données en parallèle.

Jeffrey Dean et Sanjay Ghemahat, deux ingénieurs de chez Google, se sont dès lors investis à trouver une solution au problème. Ils sont partis pour cela d’un principe simple et ingénieux : Plutôt que de déplacer les données par le réseau, déplaçons le code. Entre passer par le réseau 500 Go de données ou 40 Mo de code compilé, un choix semble s’imposer de lui-même !

Leur publication Map Reduce : Simplified Data Processing on Large Cluster, résultat de ces travaux, décrit les principes d’un nouveau paradigme appelé Map/Reduce en référence aux fonctions map et reduce, parties intégrantes des langages fonctionnels comme Lisp. Son objectif consiste également à proposer une interface simplifiée à la création d’applications massivement parallèles.

C’est à ce moment-là qu’arrive Hadoop. Doug Cutting et Michael J. Cafarella, les deux créateurs, travaillaient précédemment sur un moteur de recherche Open Source du nom de Nutch. Un sous projet de Nutch visant à en faire un moteur d’indexation distribué les a amené à développer pour le fonctionnement interne de leur moteur un système de fichier distribué appelé NDFS (Nutch Distribued File System).

Avec la publication de Map/Reduce facilitant le traitement de larges ensembles de données, ils créeront Hadoop, un Framework composé d’un système de fichier distribué issu de NDFS sur lequel on exécute des traitements parallèles appliquant Map/Reduce.

Voilà l’historique et le cheminement associé. Maintenant concrètement Hadoop qu’est-ce que c’est ?

C’est un « runtime » (un moteur d’exécution) qui s’installe sur un cluster. A la base, il est composé d’une bibliothèque de classes Java et tout un paquet d’outils (Hadoop Common), d’un système de fichier HDFS (Hadoop Distributed File System), d’une implémentation Map/Reduce pour la définition de programmes et d’un ordonnanceur comme Oozie par exemple.

Pour faire simple, nous programmons notre application en Java en utilisant les classes de la bibliothèque contenue dans Hadoop Common. Ensuite il suffit de compiler ce code et de le lancer dans le moteur d’exécution. Pour le reste, Hadoop « gère » notre code et s’occupe de tout !

Pour appliquer le principe de base énoncé plus haut, les deux ingénieurs ont planché sur un système de stockage distribué. Les données sont réparties sur les différents nœuds du cluster et le code s’exécute sur chacun des nœuds.

Première brique : HDFS (ou la couche de stockage des données)

HDFS est le nom donné au système de stockage distribué. C’est l’élément sur lequel repose tout le Framework Hadoop. On notera qu’Hadoop prévoit aussi qu’on puisse remplacer le système de fichiers distribué par une autre implémentation qu’HDFS. C’est ce que fait la distribution MapR par exemple. Dans le cas d’HDInsight sur Windows Azure, on peut utiliser soit HDFS, soit le stockage Windows Azure (ASV, pour Azure Storage Vault).

Qu’est-ce qu’un système de fichier ?

Un système de fichiers (File system) est une abstraction pour stocker, récupérer ou mettre à jour des données. Dans le cas d’HDFS, la mise à jour n’est pas autorisée.

On peut prendre comme exemples les systèmes de fichiers ReFS, NTFS et FAT32 (pour les stockages Windows) ou bien ext3, ext4 (pour les stockages Linux).

HDFS lui est distribué, c’est-à-dire qu’il va diviser les données par paquet ou « split » et ensuite les stocker. La taille d’un paquet est fixée par Hadoop.

Le fait qu’il soit distribué ne se perçoit pas durant l’utilisation car on accède aux fichiers HDFS de manière classique, autrement dit au travers d’une arborescence classique sous la forme dossier/sous_dossier/fichier.extension. Cependant, la donnée est bel est bien répartie sur les différents nœuds avec un mécanisme de réplication (pour le cas où il y aurait perte).

Il est tout à fait possible d’utiliser HDFS sans passer par la partie Map/Reduce d’Hadoop ; c’est ce que fait par exemple Apache HBase, une base de données non-relationnelle.

Comment HDFS retrouve l’information ?

HDFS définit deux types de nœuds :

  • Le nœud principal ou NameNode : il s’agit d’un nœud qui a pour fonction de répertorier où sont stockées les données, sur quels nœuds précisément. Si un nœud est hors d’usage, ce dernier a répertorié aussi l’adresse de la réplique de données.
  • Le nœud de données ou DataNode : Ce nœud est réservé pour le stockage de la donnée. Elle est stockée telle quelle (donnée brute). Cela permet à HDFS de rester simple et d’éviter la non prise en charge de certains formats (pour les convertir).

Le schéma suivant illustre la répartition des fichiers dans HDFS :

image

Pour plus d’informations, vous pouvez regarder ici.

Deuxième brique : Map/Reduce (ou la couche de traitement des données)

On présente souvent Map/Reduce en parlant d’Hadoop comme un Framework, mais le terme est inexact. Map/Reduce est une stratégie de parallélisation et lorsqu’on parle de Map/Reduce dans Hadoop, il s’agit de son implémentation.

Greg Andrews définit cinq familles de stratégies (dans son ouvrage Concurrent Programming: Principles and Practice), dont une qui s’intitule « le parallélisme récursif », qui n’est autre que l’application du concept de « diviser pour régner ».

Il s’agit de décomposer le problème initial en sous-problèmes plus simples, et ainsi de suite jusqu’à ce que le sous-problème soit trivial (ex : oui ou non).

Pour faire une analogie, cela revient à défaire un sac de nœuds en défaisant d’abord les plus petits nœuds. Cette stratégie est facilement parallélisable car chaque sous-problème est indépendant.

Le principe de base

Map/Reduce est une application de cette stratégie dite récursive avec une particularité toutefois : on stocke les résultats dans une Map (au sens programmation du terme).

Prenons un exemple : Nous voulons compter le nombre de vues pour chaque billet de ce blog. Pour cela, nous avons les fichiers journaux (logs) de la plateforme sur laquelle est hébergé ce blog.

Solution :

Dans notre programme, nous voulons un affichage avec la page (son url) et le nombre de vues.

Ex : exploitez-le-171-d-233-luge-des-donn-233-es-187.aspx : 20834 vues

(Le nombre est volontairement exagéré, c’est pour l’égo pur ;))

Explication :

La phase de « mapping » va générer, pour chaque ligne de nos fichiers journaux, le nom de la page (qui est notre clé) puis la valeur 1 (pour signaler qu’il y a une vue) dans un objet de type Map. Une méthode map traitera donc chacune des lignes, recevant comme input son adresse relative au document (« offset ») et le contenu correspondant.

La phase de « reduce » va récupérer cet objet Map, puis faire la somme des valeurs trouvées pour une page donnée.

En quelque sorte, Map spécifie ce que nous voulons comme type de résultats (ce qui permet de diviser implicitement le travail) et Reduce rassemble tous ces résultats intermédiaires pour composer le résultat final.

L’implémentation de Map/Reduce dans Hadoop (Hadoop MapReduce API)

Bonne nouvelle : l’implémentation existe déjà dans l’API fournie par Hadoop ; il n’est donc nul besoin d’apprendre un nouveau paradigme. Il suffit d’utiliser les classes fournies par la bibliothèque Hadoop Common. Nous allons en expliquer le mécanisme.

Une application Hadoop est un fichier JAR (Java ARchive) composée d’une classe point d’entrée Main et de deux implémentations : la première de la classe Mapper, l’autre de la classe Reducer.

Dans le Main, on configure ce qu’on appelle un « job ». Il correspond à un traitement sur le cluster. C’est notre problème initial.

Le job prend comme paramètres :

  • Le répertoire d’entrée,
  • Le fichier de sortie qui contiendra le résultat,
  • Le type de données en entrée,
  • Et celui en sortie.

Il est ensuite exécuté. Voici à quoi ressemble le Main :

public static class Main

{

   // Ne pas oublier de faire les imports!

   public static void main(String[] args) throws Exception{

               

      // Classe de configuration fournie par Hadoop Common

      Configuration config = new Configuration();

 

      // Instance du job représentant notre problème. Il a un objet de config et un nom.

      Job job = new Job(config, "le_nom_du_programme");

               

      // Configuration du type de contenu en entrée

      job.setInputFormatClass(TextInputFormat.class);

      // Configuration du type de contenu du résultat final

      job.setOutputFormatClass(TextOutputFormat.class);

               

      // On spécifie quelle est notre implémentation du Mapper

      job.setMapperClass(MonMapperImpl.class);

      // On spécifie quelle est notre implémentation du Reducer

      job.setReducerClass(MonReducerImpl.class);

               

      //Configuration du type de la Map. Ici elle sera de type Map<Text, IntWritable>

      job.setOutputKeyClass(Text.class);

      job.setOutputValueClass(IntWritable.class);

               

      // On donne le chemin d’entrée où sont stockées nos données

      FileInputFormat.addInputPath(job, new Path("dossierHDFS/sous_dossier/mon_dossier_data");

      // On donne le chemin de sortie où sera stocké le fichier contenant le résultat

      FileOutputFormat.setOutputPath(job, new Path("dossierHDFS/mon_dossier_sortie"));

               

      // On lance le job et on spécifie qu’il faut attendre la fin de celui-ci avant de

      // terminer le main (true).

      job.waitForCompletion(true);

   }

}

Il convient de noter qu’il est tout à fait possible de définir plusieurs Jobs dans le Main.

Explication de code :

  1. Notre méthode main() s’occupe ici d’instancier un objet Job qui nous est fourni par le package MapReduce de Hadoop (org.apache.hadoop.mapreduce). On le paramètre d’abord par le type de contenu en entrée que nous allons émettre dans la Map. Ce type, TextInputFormat, se trouve également dans l’API. Le deuxième paramétrage est le type de contenu en sortie (Idem que le précédent). Ces deux premiers paramètres vont indiquer au moteur d’exécution qu’en entrée on aura du texte et en sortie également du texte. Simple non ?
  2. Ensuite, c’est au tour des paramètres indiquant au moteur d’exécution quelles sont nos implémentations. Ici nous avons nommé pour l’exemple « MonMapperImpl » l’implémentation de notre classe Mapper, et « MonReducerImpl » l’implémentation de notre classe Reducer.
  3. Les deux paramètres suivants ciblent le type de l’objet Map» interne au Framework. On va dire que notre clé sera de type « Text » et que notre valeur sera de type « IntWritable » (un entier donc). Chaque paire contenue dans la Map sera identifiée par une chaine de caractère et aura pour valeur un entier. (Mais tout cela dépendra de votre application). Ceci correspond à ce que nous voulons : le nom de la page (Text) et un nombre de vues (IntWritable).
  4. Les deux derniers paramètres précisent le chemin d’accès au dossier d’entrée, autrement dit là où nos données que nous voulons analyser sont stockées, et le chemin de sortie du fichier de résultat.
  5. Une fois tous ces paramètres spécifiés, nous lançons le Job grâce à la méthode « waitForCompletion() ». Cette méthode à la particularité de prendre en paramètre un booléen qui précise si oui ou non elle bloquera l’exécution des instructions suivantes dans le main jusqu’à ce que le Job soit fini.

Maintenant, nous passons aux implémentations :

public static class MonMapperImpl extends Mapper<Object, Text, Text, IntWritable> {

   private static final int pagePos = 2;

   static final IntWritable nombreUn = new IntWritable(1);

   private Text clefCourante = new Text();

 

   @Override

   public void map(Object key, Text value, Context context)

      throws IOException {                    

     

      String[] parts = value.toString().split("\\s+");

      if (parts.length < 0) //only take records with all values

      {

         return;

      }

 

      clefCourante.set( parts[pagePos] );

      context.write(clefCourante, nombreUn);

    }

}

Notre classe MonMapperImpl implémente l’interface « Mapper » qui est générique. Le premier type générique correspond à la clé en entrée de la méthode map (c’est un objet représentant le numéro de ligne courant) et le second correspond à la valeur en entrée (c’est le contenu de cette ligne). Le troisième et le quatrième sont respectivement indiqués pour la clé de sortie et la valeur de sortie de cette méthode.

Ceux-ci permettent à la méthode map() de savoir ce qui lui arrive comme « record » (ici c’est un numéro de ligne et du texte), puis de configurer un objet Context utilisé pour émettre un résultat intermédiaire. (Un « record » par défaut correspond à une ligne au sens Unix (« \n ») dans des données intermédiaires stockées sur le système de fichiers distribué.)

L’interface Mapper spécifie donc une méthode map() à implémenter. La nôtre est simple :

  1. On découpe dans un tableau de chaines de caractères ce qui nous arrive en entrée (l’élément de séparation est un espace Unix : \s).
  2. On teste si le tableau est vide. Si oui, on passe au « record » suivant (la ligne suivante).
  3. On modifier l’instance clefCourante pour y mettre la chaine contenue dans le tableau à la case numéro 2 car nous savons que nos fichiers journaux sont constitués ainsi : <DateTime> <ip_du_visiteur> <nom_de_la_page.html>. Le fait d’avoir une seule instance de la clef évite d’en instancier une à chaque fois que la méthode map() est appelée (ce qui fait beaucoup). C’est la même chose pour le nombre un.
  4. On émet via l’objet output une nouvelle clef trouvée avec la valeur 1 pour signaler que nous l’avons trouvée une fois dans le record.

Implémentation du Reducer :

public static class MonReducerImpl extends Reducer<Text, IntWritable, Text, IntWritable>

{

   @Override

   public void reduce(Text key, Iterator<IntgWritable> values,Context context)

      throws IOException {

                       

         int compte = 0;

         while( values.hasNext())

         {

            compte += values.next().get();

         }

        

         if (compte> 0)

         {

           context.collect(key, new IntWritable(compte));

         }

      }

}

Notre Reducer ici se déclare comme le Mapper précédent, mis à part le fait qu’il implémente non pas l’interface Mapper, mais bien l’interface Reducer (avec les mêmes types génériques):

  1. On reçoit une clé – le nom de la page- et un itérateur, « values », qui nous permet de parcourir la collection de valeurs remplies par la méthode map(). On boucle donc sur la collection : pour chaque valeur (dans notre cas égale à 1), on incrémente le compteur pour cette clé.
  2. On émet la paire « nom de la page, compte total » dans le fichier ou les fichiers de sortie. (Le fait que les traitements soient répartis fait que plusieurs fichiers peuvent être générés dans un même dossier. Logiquement, cet ensemble de données peut être lu comme la concaténation de tous les fichiers.)

 

L’exécution en détail (Hadoop MapReduce System)

image

Explication du schéma :

Au niveau du nœud principal (head node), le « JobTracker » est responsable de la gestion des ressources, autrement dit du contrôle des nœuds de calcul (worker node). Il gère toute la durée de vie d’un job.

Le « TaskTracker » a des responsabilités plus simples, à savoir lancer les tâches dans l’ordre fourni par le « JobTracker » et périodiquement donner un statut d’avancement de la tâche au « JobTracker ». Ces nœuds de calcul sont les mêmes nœuds que les nœuds de données (data node) évoqués précédemment au niveau de la couche de stockage.

Chaque « Tracker » est une instance de la JVM (Java Virtual Machine).

Il convient de considérer 3 phases successives :

  1. La phase de « mapping »,
  2. La phase de « shuffle »,
  3. La phase de « reduce ».

Voyons ce qu’il en est.

La phase de « mapping »

Lorsque nous lançons un job, ce dernier s’exécute sur le cluster et commence la phase de Map. Le « JobTracker » au niveau du nœud principal (head node) prend en compte le job et le disperse sur les « TaskTrackers ». Les nœuds de calcul (worker node) reçoivent alors le code à exécuter sur leurs sous-ensembles de données respectifs.

Le code se trouve dans notre classe d’implémentation MonMapperImpl : c’est la méthode « map() ». Dans notre cas de test, la méthode émet en clé le nom de la page du blog visité et puis en valeur (l’entier 1) pour incrémenter le compteur total. Une fois la tâche terminée, le « TaskTracker » signale au « JobTracker » qu’il a fini.

La phase de « shuffle »

Elle est gérée entièrement par le Framework. Elle constitue la phase où nous devons regrouper toutes les valeurs produites pour une même clé par les nœuds de calcul. La phase de « reduce » n’aura plus qu’à les agréger.

La phase de « reduce »

La phase de « reduce » commence quand tous les « TaskTracker » ont terminé leur tâche « map ». Il lance alors une nouvelle tâche sur une JVM qui va s’occuper de la combinaison de tous les résultats intermédiaires et ainsi former le résultat final (principe de « diviser pour régner »).

Plus concrètement ? Notre programme récupère les objets Map de chaque nœud de calcul, puis les agrège. Partons du principe suivant :

Le nœud de calcul n°1 possède une Map avec comme clefs :

‘exploitez-le-171-d-233-luge-des-donn-233-es-187.aspx’=>1
‘tirer-parti-d-un-cluster-hadoop-depuis-ssis.aspx’=>1

Le nœud de calcul n°2 possède une Map avec comme clefs :

‘exploitez-le-171-d-233-luge-des-donn-233-es-187.aspx’=>1

Le Reduce va constituer une seule Map avec :

‘exploitez-le-171-d-233-luge-des-donn-233-es-187.aspx’=>2
‘tirer-parti-d-un-cluster-hadoop-depuis-ssis.aspx’=>1

Dans notre cas, nous voulions une somme des 1 pour chaque page correspondante. Mais à vous de voir le Reduce comme il vous plaira. (Il existe une petite optimisation qui fait un « reduce » intermédiaire sur les nœuds de calcul. On l’appelle le « combiner ». Généralement c’est la même classe Reducer qui est utilisée, mais il est possible d’en utiliser une autre.)

Ceci conclut cette première partie du billet. Dans une seconde et dernière partie nous aborderons la notion d’ordonnanceur et les abstractions proposées par l’écosystème Hadoop.

Comments (6)

  1. sonia says:

    Bonjour,

    très bon article mais moi j'aurai une question à vous posez qui pourrait m'expliquer quelle est la différence entre un distributeur hadoop et une distribution hadoop.

    Merci

  2. David says:

    Bonjour,

    Merci tout d'abord pour ces petites explications. J'ai tout de même une petite question : Le map se fait donc uniquement sur des sources de données différentes ?

    merci

    David

  3. youssef says:

    excellente presentation je vous remercie bcp

  4. Ambimm says:

    bonsoir,

    excellente présentation, c'est très claire et prise. merci beaucoup

  5. Josias youmbi says:

    Tres interessant et facil a comprendre

    1. Josias youmbi says:

      Merci encore tres claire