MapReduce 1. Introduction 2. Architecture 3. Stockage de données 4. Apache Hadoop 5. Bilan MapReduce 6. Intégration dans le SI 1. Qu'est ce que MapReduce? • Un framework pour l'analyse de big data • Inventé par Google • Ecrit en C++ • Propriétaire, protégé par des brevets “non offensifs” • Pour données non structurées, sans schéma, etc. • SQL ou Xquery trop lourd • Pour de très grands clusters • Des milliers de nœuds • Partitionnement et parallélisation automatiques • De nombreuses variations • Hadoop (Apache), Hadoop++, Amazon MapReduce, etc. VI.2 Modèle de programmation • Données sous forme de paires (key, value) • Ex. (doc-id, content), (word, count), etc. • Le programmeur fournit le code de deux fonctions : 1. Map (key, value) -> list(ikey, ivalue) • Permet de faire le même traitement en parallèle sur des données partitionnées 2. Reduce(ikey, list(ivalue)) –> list(ikey, fvalue) • Permet d'agréger les données traitées par Map • Traitement parallèle des étapes Map et Reduce • • • • Partitionnement des données Tolérance aux fautes Ordonnancement des accès disques Monitoring VI.3 Exemples d’applications de MapReduce • Au départ pour des données web • Compter le nombre de certains mots dans un ensemble de documents • Calculer un fichier inverse pour un ensemble de documents • Grep distribué: text pattern matching • Compter les fréquences d’accès URL dans un web log • Calculer un graphe inverse de liens web • Compter les vecteurs de termes (résumant les mots les plus importants) dans un ensemble de documents • Tri distribué VI.4 Input data set Split 0 Split 1 Split 2 Map Map Map (k1,v) (k2,v) (k2,v) (k2,v) (k1,v) Group by k Group by k … Map Map (k1,(v,v,v)) (k2,(v,v,v,v)) Reduce Reduce Output data set Fonctionnement MapReduce (k1,v) (k2,v) Shuffle Reduce VI.5 Ex1: comptage de mots dans un texte Map (key, value): // key: nom de fichier; value: contenu (d'une partie) du fichier for each word w in value EmitIntermediate (w, 1) Reduce (key, values): // key: un mot; values: une liste de 1 result = 0 for each value v in values result += v; Emit (key, result) VI.6 Ex1: illustration Input: Split1: eat, watch, run Split2: sleep, run, eat Map Reduce eat 1 watch 1 eat 2 run 1 sleep 1 run 1 eat 1 watch 1 run 2 sleep 1 VI.7 Ex2: taille d'un serveur web • Soit un grand fichier contenant des métadonnées sur la taille d'une collection de pages web • Lignes de la forme (Server, URL de la page, taille de la page, …) • Pour chaque serveur, calculer la taille totale des pages • C-à-d. la somme des tailles de pages de tous les URLs du serveur VI.8 Ex2: pseudo-code Map (key, value): // key: nom de fichier; value: lignes de fichier for each line L(Server, Page url, Page size, …) in value EmitIntermediate (Server, page size); Reduce (key, values): // key: un nom de serveur; values: une liste de taille de pages result = 0; for each size s in values: result += s; Emit (key, result); VI.9 Ex3: liens web inverses • Soit un très grand ensemble de pages web • Pour chaque page p dans l'ensemble • Trouver l'ensemble des pages qui référencent p • Ex. si dans les pages p1 et p2, il y a des liens vers la page q, alors nous avons: • Sources(q) : {p1, p2, …} VI.10 Ex3. pseudo-code map(key, value): // key: un URL de page web; value: contenu de la page // p: the given pattern for each link to a target t in value EmitIntermediate (t, {key}); reduce(key, values): // key: un URL; values: une liste d'URLs référençant key src_set = {}; for each value v in values if v ∉ src_set then src_set = src_set + v; Emit(key, src_set); VI.11 Ex4: jointure de deux tables Map (K: null, V : une ligne d'un split de R ou de S) join key = extract the join column from V tagged record = add a tag of either R or S emit (join key, tagged record) Reduce (K: clé de jointure, V: lignes de R et S ayant la clé de jointure K) create buffers BR and BS for R and S, respectively for each record t in V do append t to one of the buffers according to its tag for each pair of records (r, s) in BR × BS do emit (null, new record(r, s)) VI.12 Ex.5: group by EMP (ENAME, TITLE, CITY) Requête: pour chaque ville, nombre d’employés dont le nom est “Martin" SELECT CITY, COUNT(*) FROM EMP WHERE ENAME LIKE "\%Martin" GROUP BY CITY Map (Input (TID,emp), Output: (CITY,1)) // TID: tuple identifier, emp: une ligne de EMP if emp.ENAME like "%Martin" return (CITY,1) Reduce (Input (CITY,list(1)), Output: (CITY,SUM(list(1))) return (CITY,SUM(1*)) VI.13 2. Architecture de MapReduce User Program Submit Job assign map Input Data Split 0 read Split 1 Split 2 Master assign reduce Worker Worker Worker local write Output write Worker Worker Sort, Group by keys VI.14 Flow de données VI.15 Ordonnancement des tâches • Approche dynamique • Etat d'une tâche: inactive, active, terminée • Les tâches inactives sont activées à mesure que des nœuds de travail (workers) deviennent disponibles • Elles sont affectées aux workers qui sont les plus proches des données d'entrées • Ex. disque local ou même rack, pour réduire les transferts entre nœuds • Quand une tâche termine, elle envoie au master les adresses et les tailles des données intermédiaires • Quand toutes les tâches Map sont terminées, les tâches Reduce commencent VI.16 Tolérance aux fautes • Grain fin, bien adapté aux très grands jobs • Données d’entrée et de sortie stockées dans un système de fichiers • Distribué, répliqué et tolérant aux pannes • Toutes les données intermédiaires sont écrites sur disque • Checkpoint des opérations Map • Tolérances aux pannes douces (mémoire) • Si un noeud Map ou un noeud Reduce tombe en panne pdt l’exécution (panne dure) • Ses tâches deviennent éligibles pour être ordonnancées vers d’autres noeuds • Ceci peut conduire à ré-exécuter entièrement des tâche Map terminées, car les données d’entrées sont devenues inaccessibles VI.17 3. Stockage des données • GFS: Google Distributed File System • Conçu pour les besoins de stockage de données des applications Google (projet BigFiles) • Fichiers de taille importante (plusieurs Go) • Optimisé pour les lectures ou ajouts en fin de fichier • Fichiers découpés en blocs de 64 Mo (chunks) dans un cluster • Distribués: parallélisme • Répliqués: tolérance aux fautes et haute disponibilité VI.18 Google File System (GFS) • Stockage des données de Google • Moteur de recherche, Bigtable, MapReduce, etc. • La base pour des logiciels libres • Hadoop HDFS (Apache & Yahoo) • Optimisé pour des besoins spécifiques • Cluster SN avec des milliers de noeuds bon marché => la panne d'un noeud est normale ! • Très grands fichiers (des teraoctets), contenant de grands nombres d'objets, ex. des pages web • Principalement en lecture et insertion (mises à jours rares) • Lectures massives de grands blocks (ex. 1 megaoctet) • Lectures directes de petits blocks (ex. 1 kilooctet) • Insertions de grande taille, en mode concurrent • Haut débit (pour données massives) plus important que latence faible VI.19 Caractéristiques • Interface classique de système de fichiers (create, open, read, write, close, delete file) • Deux opérations additionnelles: snapshot, record append • Cohérence relâchée, avec record append atomique • Pas besoin de contrôle de concurrence distribué • Pour une cohérence forte, par ex. pour plusieurs insertions, le programmeur doit utiliser des techniques spécifiques (verrous, checkpoints, etc.) • Un seul GFS master • Maintient les métadonnées sur les fichiers • Espace de noms, information de contrôle d'accès, palcement des données • Simple, faible charge, tolérant aux pannes • Techniques de reprise rapide et de réplication VI.20 Architecture de GFS • Fichiers partitionnés en chunks, de grande taille, ex. 64 megaoctets, chacun répliqués sur 3 noeuds Application Get chunk location GFS client Get chunk data GFS Master GFS chunk server GFS chunk server Linux file system Linux file system VI.21 4. Apache Hadoop • Framework en logiciel libre pour le stockage et le traitement de big data sur grands clusters • Ecrit en Java • A l'origine créé par Yahoo • Au centre de tout un éco-système • Modules • Hadoop Common: bibliothèque de codes et utilitaires • Hadoop YARN: gestion de ressources dans un cluster • Hadoop Distributed File System (HDFS): équivalent de Google File System • Hadoop MapReduce • Outils complémentaires • Apache Pig: interface de type workflow • Apache Hive: interface de type SQL VI.22 Pig • Système pour programmer des jobs MapReduce • Plus facile et rapide • Origine: Yahoo • Langage Pig Latin • Ressemble à SQL, mais en plus procédural • Permet d'exprimer des séquences de jobs MapReduce • Support des fonctions utilisateurs • Java, Python, Javascript, Ruby • Optimisation • Combinaison d'opérations Architecture de Pig VI.23 Exemple de programme Pig Latin • Soit un fichier "F-url" contenant des lignes (url, category, pagerank) • Ex. de requête: pour les catégories ayant plus de 100K URLs, retourner la somme des pageranks Code Pig Latin: urls = LOAD `F-url' AS (url, category, pagerank); groups = GROUP urls BY category; bigGroups = FILTER groups BY COUNT(urls)>100000; result = FOREACH bigGroups GENERATE group, SUM(pagerank); STORE result INTO `myOutput'; VI.24 Hive • Infrastructure de data warehouse sur Hadoop • Origine: Facebook • Langage de type SQL: HiveQL • Extensibilité: types, fonctions, scripts utilisateurs • Simplifie l'accès à Hadoop • Permet l'analyse des données stockées dans HDFS • Avec des outils de BI • Optimisation des requêtes HiveSQL avec index • Conversion automatique des requêtes HiveQL en jobs MapReduce • 1 requête => 1 graphe de jobs MapReduce VI.25 Architecture de Hive • Orienté batch • Analyse de grands ensembles de données • Table scans • Indexation Hive HiveQL ODBC JDBC Hive Web Interface Command Line Interface Compiler, Optimizer, Executor Metastore • Pour data warehouse • Ajout de données • BI via ODBC et JDBC Head Node Name Node Data Nodes / Task Nodes Hadoop VI.26 5. Bilan MapReduce • Avantages • Simple pour le programmeur • Parallélisation, tolérance aux fautes, scalabilité • Bien adapté à des données non structurées • Une très grande communauté de développeurs • La base de tout un éco-système • Adoption par les géants du web • Google, Facebook, Amazon, etc. • Et les grands éditeurs de logiciels • Oracle, IBM, Microsoft, etc. • Ex. significatif: Abandon de Microsoft de la solution concurrente Dryad • Grande marge de progression • Ex. en introduisant les index, ex. Hadoop++ • Applications complexes d’analyse de données VI.27 MapReduce vs SGBD • Sujet très polémique • Mais MapReduce n'est pas un SGBD • Comparaison MapReduce – SGBD • Une fois les données chargées, le SGBD est plus efficace (ex. 2,5 fois plus rapide avec Vertica) • Grâce aux index et au stockage sur disque • Mais sans le chargement, MapReduce reste supérieur • Incompatibilité avec les outils d'analyse de données (reporting, data mining, BI) • Complémentaires? • MapReduce comme outil ETL pour les données non structurées (ex. web) • SGBD pour les données "précieuses" et le BI VI.28 Quelques solutions MapReduce Editeur Produit Remarques Google MapReduce Propriétaire, C++, sur données GFS API Python et Java dans AppEngine-MapReduce Apache Hadoop MapReduce Open source, Java, sur données HDFS Interfaces: C++, streams Unix pour tout langage Apache Pig Interface pour jobs MapReduce Apache Hive Interface SQL / MapReduce Cloudera Dist. Hadoop Services et produits Hadoop Amazon Amazon Elastic MapReduce pour le cloud Amazon MapReduce IBM InfoSphere BigInsights Plateforme d'analyse bigdata incluant Hadoop Microsoft HDInsight Plateforme MapReduce pour le cloud Azure Oracle Bigdata Appliance Plateforme d'analyse bigdata incluant Hadoop (de Cloudera) VI.29 Les meilleures pratiques • Attention: big data ne veut pas dire seulement Hadoop • Quand utiliser MapReduce? • Données faiblement structurées, sans schéma précis • Structure répétitive, facile à partitionner • Traitements et analyses de type batch • Besoin de gros traitements à faible coût • Forte expertise en développement, pas en BD • Quand ne pas utiliser? • • • • Flux de données et traitement continu/incrémental Analyse temps réel, avec temps de réponse garanti Accès à des données partagées, avec mises à jour Données fortement structurées? VI.30 Les alternatives basées sur SQL • Google Bigquery • Service de requêtes du cloud Google • Origine: projet Dremel de Google • Cloudera Impala • Base de données Open Source • Inspiré de l'article Dremel VI.31 Google Bigquery • Objectif: analyse temps réel de big data dans le cloud • Requêtes interactives sur teraoctets en qqs secondes • Données stockées dans Google Storage • Support d'un sous-ensemble de SQL • Utile, et support efficace dan une architecture parallèle • Complémentaire de MapReduce • Utilisation • Première analyse grossière avec des requêtes rapides Bigquery • Puis analyse plus précise avec MapReduce (ou SQL traduit en MapReduce) • Modèle de données : JSON VI.32 Bigquery - fonctionnalités • Création de tables, selon un schéma JSON depuis des données stockées dans Google Storage • Requêtes SQL, avec résultat JSON de taille limitée (ex. 64 mégaoctets) • Agrégations (SUM, MAX, AVG, etc.) • Jointures de deux grandes tables • Intégration avec Google Apps Script, Google Spreadsheets, ou n'importe quel language qui peut fonctionner avec une API REST • Contrôle d'accès: possible de partager des tables avec des individus, groupes ou tout le monde VI.33 Limitations de Bigquery • Si les données ne sont pas déjà dans Google Storage • Difficile de transférer du big data • Google a l'habitude de modifier ses APIs • BigQuery évolue en permanence • Difficile d'intégrer avec le SI • Crainte des entreprises de partager leurs données avec Google • Relativement cher (ex. 35$ par Téraoctet) VI.34 Cloudera Impala • Machine base de données parallèle développée par Cloudera • Intégrée dans la pile logicielle Hadoop, au même niveau que MapReduce Pig Impala Hive Map Reduce HDFS VI.35 6. Intégration de MapReduce dans le SI • Pourquoi est ce important? • Rappel: objectif d'une plateforme de bigdata d'entreprise • Intégration des données du SI • Infos clients, transactions, etc. • Avec big data • Données d'appels ou de capteurs • Enregistrements d'appels, weblogs, positions GPS, données de trading, capteurs intelligents • Données sociales • Retours clients, microblogs (Twitter), réseaux sociaux (Facebook) • Pour analyser finement les données • Et produire informations et connaissances à forte valeur VI.36 Besoins d'une plateforme big data 1. Acquisition des données • Flux de données très variées, plus rapides • Besoin de latence faible pour capturer les données, dans un environnement distribué 2. Organisation des données • Intégration et traitement initial des données au plus près de leur site d'origine, pour éviter les transferts de big data 3. Analyse des données • Dans un environnement distribué, en poussant les requêtes directement sur les données • Combinant données d'entreprise et big data VI.37 Place de MapReduce Données structurées Données non structurées SGBD transactionnels Systèmes de fichiers distribués Acquisition Data wharehouse ETL MapReduce Organisation Analyse VI.38 Plateforme Oracle Big Data Oracle Big Data Appliance Optimized for Hadoop, and NoSQL Processing Oracle Big Data Connectors Hadoop Open Source R Oracle NoSQL Database Oracle Big Data Connectors Oracle Exadata Optimized for DW/OLTP Oracle Advanced Analytics Data Warehouse Oracle Data Integrator Oracle Database Oracle Exalytics Optimized for Analytics & In-Memory Workloads Oracle Enterprise Performance Management! ! Oracle Business Intelligence Applications! ! Oracle Business Intelligence Tools! ! Oracle Endeca Information Discovery! ! Applications Acquisition Organisation Analyse VI.39 IBM InfoSphere BigInsights • Basé sur HDFS + MapReduce • Avec des fonctionnalités supplémentaires pour SI • Organisation des données • Stockage, sécurité, gestion de cluster • Intégration • Connecteurs vers DB2, bases de données JDBC, etc • Analyse • Analyse de texts • Accélérateurs d'applications • Facilités d'utilisation • Console web • Outils de type spreadsheet, par ex. pour analyse MapReduce • Applications prédéfinies VI.40 BigInsights & data warehouse Traditional analytic tools Big Data analytic applications Data warehouse BigInsights Filter Transform Aggregate VI.41 Microsoft HDInsight • Apache Hadoop sous forme de service dans le cloud Windows Azure • Framework HDFS/MapReduce + Pig et Hive • Optimisation de HDFS pour les traitements de données • Stockage efficace des données • Azure Storage Vault pour le stockage de blobs • Intégration avec les outils BI • Intégration avec Excel, PowerPivot, SQL Server Analysis Services et Reporting Services via • Power Query addin • Driver ODBC VI.42 Architecture HDInsight Stats processing (RHadoop) Scripting (Pig) Query (Hive) Event Pipeline (Flume) Distributed Processing (MapReduce) Distributed Storage (HDFS) Machine Learning (Mahout) Data Integration ( ODBC / SQOOP/ REST) Metadata (HCatalog) NoSQL Database (HBase) Pipeline / Workflow (Oozie) Graph (Pegasus) VI.43 Conclusion sur MapReduce • Technologie emblématique du big data pour • Données faiblement structurées, sans chéma • Traitements et analyses de type batch • Développeurs (Java, C++, Python, etc.) • Mais pas pour l'analyse interactive • S'intègre bien dans une plateforme big data • Comme ETL pour organiser les données pour le BI • Pour externaliser des traitements à grande échelle (scale out), notamment dans le cloud VI.44