Gérer son environnement Hadoop avec Microsoft SDK .NET for Hadoop

Lectrice ou lecteur de ce blog, vous savez que Microsoft travaille maintenant depuis 2 ans à l’implémentation d’Hadoop sur Windows en partenariat avec le développeur logiciel et distributeur Hadoop Hortonworks.

Nous avons ainsi eu l’occasion de consacrer des billets au service HDInsight dans Windows Azure à mi-chemin qui apporte la possibilité de déployer un cluster Hadoop à la demande. (Pour mémoire, HDInsight s’appuie sur Hortonworks Data Platform (HDP) 1.1.0 pour mettre à disposition les services Hadoop dans le nuage. Vous pouvez consulter l’article What version of Hadoop is in Windows Azure HDInsight? pour plus de détails. HDInsight Server constitue une version autonome installable avec WebPI sur une seule machine à des fins de tests.)

Au côté de ces investissements dans l’écosystème Hadoop, une équipe développe dans l’ombre d’un garage, avec des pizzas et du café – c’est pour le mythe ;) - un Framework venant se superposer à l’environnement .NET. Pourquoi n’y a-t-il pas beaucoup de communication derrière celui-ci ? Tout simplement parce qu’il n’est pas terminé. Il est néanmoins suffisamment avancé pour que nous le mettions en lumière et l’utilisions aujourd’hui dans ce billet.

Le premier avantage de ce Framework est évidemment d’être compatible avec l’environnement .NET. Il propose ainsi l’utilisation des langages C#, F#, VB et C++ avec l’environnement de développement Visual Studio.

Depuis la version 0.7.48 (alpha), le SDK propose une API permettant d’administrer un cluster sur le service HDInsight de Windows Azure. Il est ainsi possible de provisionner un cluster à la volée, puis d’exécuter un traitement Map/Reduce sur des données stockées dans un stockage Blob Windows Azure (ou sur HDFS), et une fois que le résultat est récupéré, de dé-provisionner le cluster. Et c’est exactement ce que nous allons montrer au court de ce billet. Allons-y !

Présentation du Framework

Comme évoqués ci-avant, les équipes Microsoft en charge d’HDInsight travaillent sur la construction d’un portfolio d'outils qui permet aux développeurs de tirer parti de leurs compétences et des investissements en .NET pour utiliser Hadoop. Ces projets sont hébergés sur la forge CodePlex, avec des paquets disponibles à partir de NuGet.

Les outils/bibliothèques suivants sont aujourd’hui notamment disponibles sur la forge :

  • .NET Map/Reduce est un « wrapper » .NET de la bibliothèque Java Hadoop Common pour écrire un programme Map/Reduce. Il se base sur l’utilisation de la partie Streaming de Hadoop. Il est composé principalement de deux exécutables, Map.exe et Reduce.exe qui se chargent d’abstraire l’utilisation du streaming, ainsi que d’une bibliothèque de classe .NET pour écrire un Job Hadoop.
  • LINQ to Hive permet d’utiliser des requêtes LINQ sur des données gérées par Hive installé (par défaut) sur votre cluster Windows Azure HDInsight. Il est ainsi possible par exemple de créer une base de données Hive sur votre cluster Windows Azure HDInsight, puis, développer une application cliente sur votre machine personnelle qui requête cette base Hive avec LINQ.
  • WebClient est une interface .NET (API) venant s’ajouter au-dessus de l’interface REST déployé avec un cluster Hadoop. En effet, Hadoop permet la gestion de son système de fichier HDFS (avec WebHDFS), le déclanchement à distance de Jobs (avec WebHCat) ou encore la gestion d’un workflow de Jobs (avec Oozie) via une multitude d’interfaces REST. Cependant le tout est encapsulé dans un client pour nous faciliter la vie ;) Dernièrement, il a été ajouté la possibilité d’accéder au monitoring d’Ambari.
  • Cluster Management constitue la toute dernière interface ajoutée dans ce SDK qui utilise les APIs de Windows Azure pour créer et déployer des clusters HDInsight à la volée, ainsi que les désallouer.
  • Avro Library est un nouveau format de sérialisation de données dans Hadoop.

Développer son application Hadoop

Pour les besoins d’illustration de ce billet, nous allons développer une petite application qui analysera les tweets que nous aurons récupérés au préalable. Ceci étant, l’objectif dans ce tutoriel n’est pas d’illustrer ces derniers sont récupérés. En effet, comme lectrice ou lecteur de ce blog, vous vous souvenez certainement que le processus d’acquisition a déjà été présenté sur ce blog et plus particulièrement avec le billet StreamInsight et le modèle de conception Observateur.

Pour celles et ceux qui préfèrent, un jeu de données est fourni à la fin de ce tutoriel. Il conviendra alors de le télécharger sur Windows Azure au niveau d’un compte de stockage situé à l’Est des Etats Unis ou en Europe du Nord (comme les clusters HDInsight sont en l’état déployés seulement dans les centres de données afférents).

Provisionner son cluster HDInsight

Pour provisionner votre cluster Hadoop avec le service HDInsight, il n’y a rien de bien compliqué comme nous avons déjà pu le traiter dans ce blog et comme nous allons le voir à nouveau aujourd’hui dans un contexte un peu différent avec le SDK .NET pour Hadoop. Nous utiliserons pour cela la bibliothèque « Cluster Management » de gestion de cluster HDInsight sur Windows Azure.

Pour initier ce tutoriel :

  • Créez un nouveau projet de type « Application Console » sous Visual Studio 2012 - peu importe la version -. Choisissez d’enregistrer le projet dans un dossier de telle sorte que le chemin pour y accéder ne comporte pas d’espace, par ex. « C:\projet_hadoop\mon_projet ». Si vous placez ce projet dans le dossier par défaut de Visual Studio, une exception sera levé avec le message « Environment variable not set : HADOOP_HOME » (le bogue est relaté ici).
  • Ajouter ensuite via le gestionnaire de paquets NuGet la référence à Microsoft .NET API For Windows Azure HDInsight Cluster Management ainsi que la référence à Microsoft .NET API For Hadoop WebClient.
  • Munissez-vous de l’ID D’ABONNEMENT d’un certificat de gestion d’un compte Windows Azure ainsi que de son EMPREINTE NUMERIQUE associée via le portail de gestion depuis l’onglet PARAMETRES.

image

  • Une fois récupérées ces deux informations, munissez-vous des informations suivantes relatives à un compte de stockage ayant comme emplacement Europe du nord : NOM DU COMPTE DE STOCKAGE et CLE D’ACCES PRIMAIRE. Ces informations sont disponibles en cliquant sur GERER LES CLES D’ACCES dans l’onglet STOCKAGE du portail de gestion de Windows Azure. (Si besoin, créez un compte de stockage en Europe du nord).

image

  • Ouvrez la classe Program.cs de votre projet, le code qui suit est à ajouter dans la méthode Main().

Il convient en premier lieu de récupérer un certificat qui permettra à votre application de s’authentifier sur Windows Azure :

var store = new X509Store();

store.Open(OpenFlags.ReadOnly);

var cert = store.Certificates.Cast<X509Certificate2>().First(item => item.Thumbprint == "{Empreinte de certificat}”);

var subscriptionId = new Guid("{Identifiant d’abonnement}");

Ces deux lignes créent les objets nécessaires à l’instance du client de provisionnement de cluster HDInsight :

var client = new ClusterProvisioningClient(subscriptionId, cert);

Voilà pour ce qui est de la connexion à Windows Azure.

Ensuite, la classe HDInsightClusterCreationDetails  permet de stocker les informations nécessaires à la création d’un cluster en particulier, c’est-à-dire le nom qu’il portera sur la plateforme Windows Azure ainsi que l’emplacement géographique (quel centre de données Windows Azure), le nom d’utilisateur administrateur et son mot de passe, et pour finir, le compte de stockage sur lequel le cluster s’appuie pour stocker ses fichiers internes.

Notez au passage que le compte stockage et l’emplacement du centre de données doivent être les mêmes afin que votre cluster soit constitué de machines hautement disponible. Les régions possibles dans cette version préliminaire du service HDInsight sont l’Est des Etats Unis (« East US ») et le Nord de l’Europe (« North Europe »).

var clusterInfo = new HDInsightClusterCreationDetails();

 

//Le nom du cluster sur Windows Azure

clusterInfo.Name = "{Nom DNS du cluster}";

 

//Le centre de données sur lequel les noeuds seront en fonctionnement

clusterInfo.Location = "North Europe";

 

//Les informations sur le compte de stockage Windows Azure

clusterInfo.DefaultStorageAccountName = “{Nom du compte de stockage}”;

clusterInfo.DefaultStorageAccountKey = ”{Clé d’accès}”;

clusterInfo.DefaultStorageContainer = "{Conteneur du compte de stockage}";

 

//Les informations d'identification de l'utilisateur principal

clusterInfo.UserName = “{Nom d’utilisateur du cluster}”;

clusterInfo.Password = “{Mot de passe}”;

 

//Le nombre de noeuds du cluster (jusqu’à 32 !)

clusterInfo.ClusterSizeInNodes = 4;

Et avec tout ça, on crée notre cluster :

var cluster = client.CreateCluster(clusterInfo);

Cette méthode CreateCluster est aussi proposée sous forme d’appel asynchrone (CreateClusterAsync). Ainsi, il est possible de procédé à d’autres traitements pendant la création du cluster sur Windows Azure, qui peut durer quelques minutes (de l’ordre de 10 à15 minutes).

Exécuter un traitement Map/Reduce en C#

L’écriture d’un job Map/Reduce en C# avec le SDK Microsoft for Hadoop est tout à fait semblable à la version Java.

La bibliothèque nécessaire ici est Microsoft .NET Map Reduce API For Hadoop. Elle contient dans la pratique trois classes abstraites dont il faut hériter pour en implémenter les méthodes Map() et Reduce().

La documentation de cette bibliothèque explique synthétiquement les rôles de ces classes abstraites - si vous souhaitez une explication plus détaillée du fonctionnement Map/Reduce, nous vous invitons à relire un précédent billet dédié à ce sujet - :

  • Classe MapperBase : comme son nom le suggère, elle représente la base à implémenter. Trois méthodes sont définies, mais une seule est à implémenter, en l’occurrence : Map(string inputLine, MapperContext context);
  • Classe ReducerCombinerBase : idem, mais pour la phase de reduce. Le combiner et le reducer peuvent être implémentées par la même classe ou deux différentes au choix. Cela fait gagner du temps. Trois méthodes aussi sont définies, et une seule est à implémenter : Reduce(string key, IEnumerable<string> values, ReducerCombinerContext context);
  • Classe HadoopJob<Mapper, Reducer> : cette dernière classe permet de définir un job qui sera lancé à partir du programme principal. L’unique méthode à implémenter retourne un objet de configuration Hadoop qui spécifie entre autre le chemin où sont stockées les données à analyser (sur le cluster ou sur le stockage Windows Azure) ainsi que le chemin de sortie où sera stocké le résultat : Configure(ExecutorContext context) ;

Dans ce tutoriel, le principe consiste essentiellement à décrire un traitement Map/Reduce simple afin de se concentrer sur les aspects du déploiement et de soumission de jobs par le biais du SDK. C’est pourquoi, une fois de plus, le traitement est un WordCount

Ajoutez en premier lieu le package NuGet au projet en cours. Voici le code d’implémentation des trois classes abstraites présentées plus haut :

Mapper :

public class MyMapper : MapperBase

{

   string un = (1).ToString();

 

   public override void Map(string inputLine, MapperContext context)

   {

      string[] tab = inputLine.Split('\t');

  var langue = tab[3];

      var texte = tab[4];

  

      if (langue == "en")

      {

      string[] mots = texte.Split(new char[] { ' ', ',', '.' });

         foreach (var m in mots)

         {

         if(m.Length >= 5)

            context.EmitKeyValue(m, un);

         }

      }

   }

Reducer :

public class MyReduce : ReducerCombinerBase

{

   public override void Reduce(string key, IEnumerable<string> values,

            ReducerCombinerContext context)

   {

   int count = 0;

      foreach (var item in values)

      {

      count++;

      }

 

      context.EmitKeyValue(key, count.ToString());

   }

}

HadoopJob :

public class MyJobType : HadoopJob<MyMap, MyReduce>

{

   public override HadoopJobConfiguration Configure(ExecutorContext context)

   {

   HadoopJobConfiguration config = new HadoopJobConfiguration();

      config.InputPath = "/input/";

      config.OutputFolder = "/output/";

      return config;

   }

}

 

Soumettre le job au cluster

Retour dans notre programme principal représenté par la classe Program.cs, dans la méthode Main(). Nous allons utiliser le clientWebHCat par le biais de l’espace de noms .NET Map/Reduce.

Le premier client que nous avons utilisé était celui de déploiement HDInsight. Il fait appel directement à l’API REST de Windows Azure alors que les classes de l’API Map/Reduce utilisent, quant à elles l’API REST WebHCat (ou Templeton) d’un cluster donné. Cette interface REST est accessible depuis un port sur le nœud maitre, le 563 en l’occurrence.

Il est important de dissocier les deux, car, dans le cas où votre application s’exécute sur une machine, derrière un proxy qui n’autorise pas ce port (563), le code de provisionnement du cluster fonctionnera (comme il passe par le protocole http classique), alors que celui qui exécute le job Map/Reduce retournera une erreur lors de l’exécution (erreur 401).

var hadoop = Microsoft.Hadoop.MapReduce.Hadoop.Connect(

  new Uri(cluster.ConnectionUrl + “:563”),

        "{Nom d’utilisateur du cluster}",

        "{Nom d’utilisateur Hadoop (souvent ‘Hadoop’) }",

    "{Mot de passe hadoop du cluster}",

        "{Compte de stockage}.blob.core.windows.net",

        "{Clé d’accès au compte de stockage}",

    "{Conteneur principal du compte de stockage}",

    true);

 

var result = hadoop.MapReduceJob.ExecuteJob<MyJobType>();

L’ajout dans l’URL de « :563 » indique que le point d’entrée du cluster est le port 563. La méthode ExecuteJob est bloquante dans cet exemple, la suite du programme s’exécutera une fois le Job Map/Reduce terminé.

Autre élément de à considérer : il est important de compiler le projet en 64 bits absolument ! Si vous ne le faites pas, une exception générique lors de l’exécution sera levée. Cette exception provient de la machine virtuelle Java du cluster, de l’archive hadoop-streaming.jar, de type java.lang.RuntimeException : PipeMapRed.

Elle redirige en réalité une erreur .NET de type BadImageFormatException. Cette erreur apparait alors comme le Framework .NET utilisé par l’équipe développant le Microsoft Hadoop SDK pour compiler les programmes MapDriver.exe et ReduceDriver.exe n’a pas la même version que le Framework .NET utilisé pour compiler notre projet. Etant donné que les machines du cluster sont des Windows Server 64 bits, n’oubliez pas dans les Propriétés du projet de forcer la compilation en 64 bits au niveau de Platform target.

image

Vous pouvez maintenant compiler et lancer le programme si vous le souhaitez, ou bien continuer avec la prochaine étape : la récupération du résultat.

Avant de passer à la suite, la désallocation du cluster sur Windows Azure s’effectue comme suit (pour ne payer le cluster inutilement) :

client.DeleteCluster("{Nom DNS du cluster}");

 

Afficher le résultat

Pour récupérer le résultat de notre traitement Map/Reduce, il convient d'utiliser le deuxième WebClient proposé dans le SDK, à savoir le client WebHDFSClient

Ce client n’utilise pas l’interface WebHDFS pour accéder aux données sur le cluster seulement lorsque celui-ci est sur HDInsight. Il repose alors l’API Azure Vault Storage pour cet usage. Ceci constitue une facette intéressante comme le cluster n’a pas besoin d’être alloué sur Windows Azure pour que nous puissions récupérer les données par le biais du SDK Microsoft .NET pour Hadoop.

Le constructeur du client HDFS prend en paramètre le nom d’utilisateur Hadoop ainsi que un adaptateur du stockage Windows Azure.

// Création du client HDFS

var clientHDFS = new WebHDFSClient("hadoop",

                       new BlobStorageAdapter(

                                "{Nom du compte de stockage}",

                                "{Clé d’accès au compte de stockage}",

                                "{Conteneur du compte de stockage}",

                                false));

Continuons en récupérant le fichier où est stocké tout résultat Hadoop : part-00000

// Récupération du fichier résultat MapReduce

var file = clientHDFS.OpenFile("{Chemin du dossier de sortie}/part-00000");

file.Wait();

Ensuite, nous procédons à une petite manipulation du fichier que l’on récupère sous forme d’un objet .NET Stream et que nous parcourons ensuite ligne par ligne pour insérer les clés et valeurs dans un Dictionnary<string, int>. A l’issue de l’opération, lorsque le dictionnaire est complet, nous effectuons une requête LINQ pour afficher que les 50 mots les plus utilisés.

// Récupération d'un stream à partir du fichier résultat

file.Result.Content.ReadAsStreamAsync().ContinueWith((stream) =>

{

   // Transformation en StreamReader

   using (StreamReader sR = new StreamReader(stream.Result))

   {

      var line = "";

     

      // Liste qui permet de faire la requête LINQ

      Dictionary<string, int> list = new Dictionary<string, int>();

      while ((line = sR.ReadLine()) != null)

      {

         string[] tab = new string[2];

         tab = line.Split('\t');

         list.Add(tab[0], int.Parse(tab[1]));

      }

 

     Console.WriteLine("Nombre de résultats : "+list.Count);

     var top50 = (from item in list

                  where item.Key.Length >= 7

                  orderby item.Value descending

                  select item).Take(50);

 

     foreach (var i in top50)

     {

        Console.WriteLine(i.Key + " : " + i.Value);

    }

   }

}).Wait();

 

Ceci nous amène à la fin de ce tutoriel ! Pensez à mettre en dernière instruction un Console.ReadLine(); pour bloquer l’affichage de la console et éviter qu’elle ne se ferme avant que vous puissiez admirer le résultat ;)

image

En guise de conclusion

Au travers de cette rapide illustration, nous espérons avoir illustré les avancées réalisées au niveau du Microsoft .NET SDK for Hadoop qui constitue aujourd’hui une proposition mature pour développer une application Hadoop de A à Z en utilisant le service HDInsight (aussi bien qu’en utilisant un cluster sur site, à vous).

Aujourd’hui, comme il est possible de disposer d’un environnement applicatif Hadoop sur sa propre machine, l’utilisation du SDK couplé à Visual Studio 2012 simplifie grandement la tâche du développeur Map/Reduce que nous sommes !

Pour les plus observateurs, le SDK propose aussi une classe pour les tests unitaires de votre programme qui appellera sur votre machine vos implémentations du Mapper et du Reducer avec un petit jeu de données d’entrée sans passer par Hadoop.

Bonne découverte de ce SDK !

Profitons également de ce billet pour mentionner sur le blog MSDN Data Otaku une série de billets Hadoop pour les développeurs .NET.

HDInsightCluster.zip