MapReduce

publicité
MapReduce
1.  Introduction
2.  Architecture
3.  Stockage de données
4.  Apache Hadoop
5.  Bilan MapReduce
6.  Intégration dans le SI
1.  Qu'est ce que MapReduce?
•  Un framework pour l'analyse de big data
•  Inventé par Google
•  Ecrit en C++
•  Propriétaire, protégé par des brevets “non offensifs”
•  Pour données non structurées, sans schéma, etc.
•  SQL ou Xquery trop lourd
•  Pour de très grands clusters
•  Des milliers de nœuds
•  Partitionnement et parallélisation automatiques
•  De nombreuses variations
•  Hadoop (Apache), Hadoop++, Amazon MapReduce, etc.
VI.2
Modèle de programmation
•  Données sous forme de paires (key, value)
•  Ex. (doc-id, content), (word, count), etc.
•  Le programmeur fournit le code de deux fonctions :
1.  Map (key, value) -> list(ikey, ivalue)
•  Permet de faire le même traitement en parallèle sur des
données partitionnées
2.  Reduce(ikey, list(ivalue)) –> list(ikey, fvalue)
•  Permet d'agréger les données traitées par Map
•  Traitement parallèle des étapes Map et Reduce
• 
• 
• 
• 
Partitionnement des données
Tolérance aux fautes
Ordonnancement des accès disques
Monitoring
VI.3
Exemples d’applications de MapReduce
•  Au départ pour des données web
•  Compter le nombre de certains mots dans un ensemble
de documents
•  Calculer un fichier inverse pour un ensemble de
documents
•  Grep distribué: text pattern matching
•  Compter les fréquences d’accès URL dans un web log
•  Calculer un graphe inverse de liens web
•  Compter les vecteurs de termes (résumant les mots les
plus importants) dans un ensemble de documents
•  Tri distribué
VI.4
Input data set Split 0 Split 1 Split 2 Map Map Map (k1,v) (k2,v) (k2,v) (k2,v) (k1,v) Group by k Group by k … Map Map (k1,(v,v,v)) (k2,(v,v,v,v)) Reduce Reduce Output data set Fonctionnement MapReduce
(k1,v) (k2,v) Shuffle Reduce VI.5
Ex1: comptage de mots dans un texte
Map (key, value):
// key: nom de fichier; value: contenu (d'une partie) du fichier
for each word w in value
EmitIntermediate (w, 1)
Reduce (key, values):
// key: un mot; values: une liste de 1
result = 0
for each value v in values
result += v;
Emit (key, result)
VI.6
Ex1: illustration
Input:
Split1: eat, watch, run
Split2: sleep, run, eat
Map
Reduce
eat 1
watch 1
eat 2
run 1
sleep 1
run 1
eat 1
watch 1
run 2
sleep 1
VI.7
Ex2: taille d'un serveur web
•  Soit un grand fichier contenant des métadonnées
sur la taille d'une collection de pages web
•  Lignes de la forme (Server, URL de la page, taille de la
page, …)
•  Pour chaque serveur, calculer la taille totale des
pages
•  C-à-d. la somme des tailles de pages de tous les URLs
du serveur
VI.8
Ex2: pseudo-code
Map (key, value):
// key: nom de fichier; value: lignes de fichier
for each line L(Server, Page url, Page size, …) in value
EmitIntermediate (Server, page size);
Reduce (key, values):
// key: un nom de serveur; values: une liste de taille de pages
result = 0;
for each size s in values:
result += s;
Emit (key, result);
VI.9
Ex3: liens web inverses
•  Soit un très grand ensemble de pages web
•  Pour chaque page p dans l'ensemble
•  Trouver l'ensemble des pages qui référencent p
•  Ex. si dans les pages p1 et p2, il y a des liens vers
la page q, alors nous avons:
•  Sources(q) : {p1, p2, …}
VI.10
Ex3. pseudo-code
map(key, value):
// key: un URL de page web; value: contenu de la page
// p: the given pattern
for each link to a target t in value
EmitIntermediate (t, {key});
reduce(key, values):
// key: un URL; values: une liste d'URLs référençant key
src_set = {};
for each value v in values
if v ∉ src_set then
src_set = src_set + v;
Emit(key, src_set);
VI.11
Ex4: jointure de deux tables
Map (K: null, V : une ligne d'un split de R ou de S)
join key = extract the join column from V
tagged record = add a tag of either R or S
emit (join key, tagged record)
Reduce (K: clé de jointure, V: lignes de R et S ayant la clé de jointure
K)
create buffers BR and BS for R and S, respectively
for each record t in V do
append t to one of the buffers according to its tag
for each pair of records (r, s) in BR × BS do
emit (null, new record(r, s))
VI.12
Ex.5: group by
EMP (ENAME, TITLE, CITY)
Requête: pour chaque ville, nombre d’employés dont le nom
est “Martin"
SELECT CITY, COUNT(*)
FROM EMP
WHERE ENAME LIKE "\%Martin"
GROUP BY CITY
Map (Input (TID,emp), Output: (CITY,1))
// TID: tuple identifier, emp: une ligne de EMP
if emp.ENAME like "%Martin" return (CITY,1)
Reduce (Input (CITY,list(1)), Output: (CITY,SUM(list(1)))
return (CITY,SUM(1*))
VI.13
2. Architecture de MapReduce
User
Program
Submit
Job
assign
map
Input Data
Split 0
read
Split 1
Split 2
Master
assign
reduce
Worker
Worker
Worker
local
write
Output
write
Worker
Worker
Sort,
Group by keys
VI.14
Flow de données
VI.15
Ordonnancement des tâches
•  Approche dynamique
•  Etat d'une tâche: inactive, active, terminée
•  Les tâches inactives sont activées à mesure que des
nœuds de travail (workers) deviennent disponibles
•  Elles sont affectées aux workers qui sont les plus
proches des données d'entrées
•  Ex. disque local ou même rack, pour réduire les transferts
entre nœuds
•  Quand une tâche termine, elle envoie au master les
adresses et les tailles des données intermédiaires
•  Quand toutes les tâches Map sont terminées, les tâches
Reduce commencent
VI.16
Tolérance aux fautes
•  Grain fin, bien adapté aux très grands jobs
•  Données d’entrée et de sortie stockées dans un
système de fichiers
•  Distribué, répliqué et tolérant aux pannes
•  Toutes les données intermédiaires sont écrites sur
disque
•  Checkpoint des opérations Map
•  Tolérances aux pannes douces (mémoire)
•  Si un noeud Map ou un noeud Reduce tombe en panne
pdt l’exécution (panne dure)
•  Ses tâches deviennent éligibles pour être ordonnancées vers
d’autres noeuds
•  Ceci peut conduire à ré-exécuter entièrement des tâche Map
terminées, car les données d’entrées sont devenues
inaccessibles
VI.17
3. Stockage des données
•  GFS: Google Distributed File System
•  Conçu pour les besoins de stockage de données des
applications Google (projet BigFiles)
•  Fichiers de taille importante (plusieurs Go)
•  Optimisé pour les lectures ou ajouts en fin de fichier
•  Fichiers découpés en blocs de 64 Mo (chunks) dans un
cluster
•  Distribués: parallélisme
•  Répliqués: tolérance aux fautes et haute disponibilité
VI.18
Google File System (GFS)
•  Stockage des données de Google
•  Moteur de recherche, Bigtable, MapReduce, etc.
•  La base pour des logiciels libres
•  Hadoop HDFS (Apache & Yahoo)
•  Optimisé pour des besoins spécifiques
•  Cluster SN avec des milliers de noeuds bon marché => la
panne d'un noeud est normale !
•  Très grands fichiers (des teraoctets), contenant de grands
nombres d'objets, ex. des pages web
•  Principalement en lecture et insertion (mises à jours rares)
•  Lectures massives de grands blocks (ex. 1 megaoctet)
•  Lectures directes de petits blocks (ex. 1 kilooctet)
•  Insertions de grande taille, en mode concurrent
•  Haut débit (pour données massives) plus important que
latence faible
VI.19
Caractéristiques
•  Interface classique de système de fichiers (create,
open, read, write, close, delete file)
•  Deux opérations additionnelles: snapshot, record append
•  Cohérence relâchée, avec record append atomique
•  Pas besoin de contrôle de concurrence distribué
•  Pour une cohérence forte, par ex. pour plusieurs insertions,
le programmeur doit utiliser des techniques spécifiques
(verrous, checkpoints, etc.)
•  Un seul GFS master
•  Maintient les métadonnées sur les fichiers
•  Espace de noms, information de contrôle d'accès,
palcement des données
•  Simple, faible charge, tolérant aux pannes
•  Techniques de reprise rapide et de réplication
VI.20
Architecture de GFS
•  Fichiers partitionnés en chunks, de grande taille, ex.
64 megaoctets, chacun répliqués sur 3 noeuds
Application
Get chunk location
GFS client
Get chunk data
GFS
Master
GFS chunk server
GFS chunk server
Linux file system
Linux file system
VI.21
4. Apache Hadoop
•  Framework en logiciel libre pour le stockage et le
traitement de big data sur grands clusters
•  Ecrit en Java
•  A l'origine créé par Yahoo
•  Au centre de tout un éco-système
•  Modules
•  Hadoop Common: bibliothèque de codes et utilitaires
•  Hadoop YARN: gestion de ressources dans un cluster
•  Hadoop Distributed File System (HDFS): équivalent de Google
File System
•  Hadoop MapReduce
•  Outils complémentaires
•  Apache Pig: interface de type workflow
•  Apache Hive: interface de type SQL
VI.22
Pig
•  Système pour programmer des
jobs MapReduce
•  Plus facile et rapide
•  Origine: Yahoo
•  Langage Pig Latin
•  Ressemble à SQL, mais en plus
procédural
•  Permet d'exprimer des séquences de
jobs MapReduce
•  Support des fonctions utilisateurs
•  Java, Python, Javascript, Ruby
•  Optimisation
•  Combinaison d'opérations
Architecture de Pig
VI.23
Exemple de programme Pig Latin
•  Soit un fichier "F-url" contenant des lignes (url,
category, pagerank)
•  Ex. de requête: pour les catégories ayant plus de
100K URLs, retourner la somme des pageranks
Code Pig Latin:
urls = LOAD `F-url' AS (url, category, pagerank);
groups = GROUP urls BY category;
bigGroups = FILTER groups BY COUNT(urls)>100000;
result = FOREACH bigGroups GENERATE
group, SUM(pagerank);
STORE result INTO `myOutput';
VI.24
Hive
•  Infrastructure de data warehouse sur Hadoop
•  Origine: Facebook
•  Langage de type SQL: HiveQL
•  Extensibilité: types, fonctions, scripts utilisateurs
•  Simplifie l'accès à Hadoop
•  Permet l'analyse des données stockées dans HDFS
•  Avec des outils de BI
•  Optimisation des requêtes HiveSQL avec index
•  Conversion automatique des requêtes HiveQL en jobs
MapReduce
•  1 requête => 1 graphe de jobs MapReduce
VI.25
Architecture de Hive
•  Orienté batch
•  Analyse de grands
ensembles de données
•  Table scans
•  Indexation
Hive
HiveQL
ODBC
JDBC
Hive
Web
Interface
Command
Line
Interface
Compiler, Optimizer, Executor
Metastore
•  Pour data warehouse
•  Ajout de données
•  BI via ODBC et JDBC
Head Node
Name Node
Data Nodes / Task Nodes
Hadoop
VI.26
5. Bilan MapReduce
•  Avantages
•  Simple pour le programmeur
•  Parallélisation, tolérance aux fautes, scalabilité
•  Bien adapté à des données non structurées
•  Une très grande communauté de développeurs
•  La base de tout un éco-système
•  Adoption par les géants du web
•  Google, Facebook, Amazon, etc.
•  Et les grands éditeurs de logiciels
•  Oracle, IBM, Microsoft, etc.
•  Ex. significatif: Abandon de Microsoft de la solution concurrente Dryad
•  Grande marge de progression
•  Ex. en introduisant les index, ex. Hadoop++
•  Applications complexes d’analyse de données
VI.27
MapReduce vs SGBD
•  Sujet très polémique
•  Mais MapReduce n'est pas un SGBD
•  Comparaison MapReduce – SGBD
•  Une fois les données chargées, le SGBD est plus efficace (ex.
2,5 fois plus rapide avec Vertica)
•  Grâce aux index et au stockage sur disque
•  Mais sans le chargement, MapReduce reste supérieur
•  Incompatibilité avec les outils d'analyse de données
(reporting, data mining, BI)
•  Complémentaires?
•  MapReduce comme outil ETL pour les données non
structurées (ex. web)
•  SGBD pour les données "précieuses" et le BI
VI.28
Quelques solutions MapReduce
Editeur
Produit
Remarques
Google
MapReduce
Propriétaire, C++, sur données GFS
API Python et Java dans AppEngine-MapReduce
Apache
Hadoop
MapReduce
Open source, Java, sur données HDFS
Interfaces: C++, streams Unix pour tout langage
Apache
Pig
Interface pour jobs MapReduce
Apache
Hive
Interface SQL / MapReduce
Cloudera
Dist. Hadoop
Services et produits Hadoop
Amazon
Amazon Elastic MapReduce pour le cloud Amazon
MapReduce
IBM
InfoSphere
BigInsights
Plateforme d'analyse bigdata incluant Hadoop
Microsoft
HDInsight
Plateforme MapReduce pour le cloud Azure
Oracle
Bigdata
Appliance
Plateforme d'analyse bigdata incluant Hadoop (de
Cloudera)
VI.29
Les meilleures pratiques
•  Attention: big data ne veut pas dire seulement
Hadoop
•  Quand utiliser MapReduce?
•  Données faiblement structurées, sans schéma précis
•  Structure répétitive, facile à partitionner
•  Traitements et analyses de type batch
•  Besoin de gros traitements à faible coût
•  Forte expertise en développement, pas en BD
•  Quand ne pas utiliser?
• 
• 
• 
• 
Flux de données et traitement continu/incrémental
Analyse temps réel, avec temps de réponse garanti
Accès à des données partagées, avec mises à jour
Données fortement structurées?
VI.30
Les alternatives basées sur SQL
•  Google Bigquery
•  Service de requêtes du cloud Google
•  Origine: projet Dremel de Google
•  Cloudera Impala
•  Base de données Open Source
•  Inspiré de l'article Dremel
VI.31
Google Bigquery
•  Objectif: analyse temps réel de big data dans le cloud
•  Requêtes interactives sur teraoctets en qqs secondes
•  Données stockées dans Google Storage
•  Support d'un sous-ensemble de SQL
•  Utile, et support efficace dan une architecture parallèle
•  Complémentaire de MapReduce
•  Utilisation
•  Première analyse grossière avec des requêtes rapides
Bigquery
•  Puis analyse plus précise avec MapReduce (ou SQL traduit en
MapReduce)
•  Modèle de données : JSON
VI.32
Bigquery - fonctionnalités
•  Création de tables, selon un schéma JSON depuis
des données stockées dans Google Storage
•  Requêtes SQL, avec résultat JSON de taille limitée
(ex. 64 mégaoctets)
•  Agrégations (SUM, MAX, AVG, etc.)
•  Jointures de deux grandes tables
•  Intégration avec Google Apps Script, Google
Spreadsheets, ou n'importe quel language qui
peut fonctionner avec une API REST
•  Contrôle d'accès: possible de partager des tables
avec des individus, groupes ou tout le monde
VI.33
Limitations de Bigquery
•  Si les données ne sont pas déjà dans Google
Storage
•  Difficile de transférer du big data
•  Google a l'habitude de modifier ses APIs
•  BigQuery évolue en permanence
•  Difficile d'intégrer avec le SI
•  Crainte des entreprises de partager leurs données
avec Google
•  Relativement cher (ex. 35$ par Téraoctet)
VI.34
Cloudera Impala
•  Machine base de
données parallèle
développée par
Cloudera
•  Intégrée dans la
pile logicielle
Hadoop, au
même niveau
que MapReduce
Pig
Impala
Hive
Map Reduce
HDFS
VI.35
6. Intégration de MapReduce dans le SI
•  Pourquoi est ce important?
•  Rappel: objectif d'une plateforme de bigdata
d'entreprise
•  Intégration des données du SI
•  Infos clients, transactions, etc.
•  Avec big data
•  Données d'appels ou de capteurs
•  Enregistrements d'appels, weblogs, positions GPS, données de
trading, capteurs intelligents
•  Données sociales
•  Retours clients, microblogs (Twitter), réseaux sociaux
(Facebook)
•  Pour analyser finement les données
•  Et produire informations et connaissances à forte valeur
VI.36
Besoins d'une plateforme big data
1.  Acquisition des données
•  Flux de données très variées, plus rapides
•  Besoin de latence faible pour capturer les données,
dans un environnement distribué
2.  Organisation des données
•  Intégration et traitement initial des données au plus
près de leur site d'origine, pour éviter les transferts de
big data
3.  Analyse des données
•  Dans un environnement distribué, en poussant les
requêtes directement sur les données
•  Combinant données d'entreprise et big data
VI.37
Place de MapReduce
Données
structurées
Données non
structurées
SGBD
transactionnels
Systèmes de
fichiers
distribués
Acquisition
Data
wharehouse
ETL
MapReduce
Organisation
Analyse
VI.38
Plateforme Oracle Big Data
Oracle Big Data
Appliance
Optimized for Hadoop,
and NoSQL Processing
Oracle
Big Data
Connectors
Hadoop
Open Source
R
Oracle NoSQL
Database
Oracle Big
Data
Connectors
Oracle
Exadata
Optimized for DW/OLTP
Oracle
Advanced
Analytics
Data
Warehouse
Oracle Data
Integrator
Oracle
Database
Oracle
Exalytics
Optimized for
Analytics & In-Memory Workloads
Oracle Enterprise
Performance
Management!
!
Oracle Business
Intelligence Applications!
!
Oracle Business
Intelligence Tools!
!
Oracle Endeca
Information Discovery!
!
Applications
Acquisition
Organisation
Analyse
VI.39
IBM InfoSphere BigInsights
•  Basé sur HDFS + MapReduce
•  Avec des fonctionnalités supplémentaires pour SI
•  Organisation des données
•  Stockage, sécurité, gestion de cluster
•  Intégration
•  Connecteurs vers DB2, bases de données JDBC, etc
•  Analyse
•  Analyse de texts
•  Accélérateurs d'applications
•  Facilités d'utilisation
•  Console web
•  Outils de type spreadsheet, par ex. pour analyse MapReduce
•  Applications prédéfinies
VI.40
BigInsights & data warehouse
Traditional
analytic
tools
Big Data
analytic
applications
Data warehouse
BigInsights
Filter
Transform
Aggregate
VI.41
Microsoft HDInsight
•  Apache Hadoop sous forme de service dans le cloud
Windows Azure
•  Framework HDFS/MapReduce + Pig et Hive
•  Optimisation de HDFS pour les traitements de données
•  Stockage efficace des données
•  Azure Storage Vault pour le stockage de blobs
•  Intégration avec les outils BI
•  Intégration avec Excel, PowerPivot, SQL Server Analysis
Services et Reporting Services via
•  Power Query addin
•  Driver ODBC
VI.42
Architecture HDInsight
Stats
processing
(RHadoop)
Scripting
(Pig)
Query
(Hive)
Event Pipeline
(Flume)
Distributed Processing
(MapReduce)
Distributed Storage
(HDFS)
Machine
Learning
(Mahout)
Data Integration
( ODBC / SQOOP/ REST)
Metadata
(HCatalog)
NoSQL Database
(HBase)
Pipeline / Workflow
(Oozie)
Graph
(Pegasus)
VI.43
Conclusion sur MapReduce
•  Technologie emblématique du big data pour
•  Données faiblement structurées, sans chéma
•  Traitements et analyses de type batch
•  Développeurs (Java, C++, Python, etc.)
•  Mais pas pour l'analyse interactive
•  S'intègre bien dans une plateforme big data
•  Comme ETL pour organiser les données pour le BI
•  Pour externaliser des traitements à grande échelle
(scale out), notamment dans le cloud
VI.44
Téléchargement