Gestion et exploration des grandes masses de données scientifiques issues d'observations astronomiques grand champ Amin Mesmoudi 1 Contexte • Défi CNRS-Mastodons 2012-2016 • Projet LSST 2 Les Besoins LSST en stockage et accès aux données (1/2) • Stockage: Table Taille #enregistrements #colonnes (arité) Object 109 TB 38 B 470 Moving Object 5 GB 6M 100 Source 3.6 PB 5T 125 Forced Source 1.1 PB 32 T 7 Difference Image Source 71 TB 200 B 65 CCD Exposure 0.6 TB 17 B 45 3 Les Besoins LSST en stockage et accès aux données (2/2) • Accès • Requêtes déclaratives (SQL) SELECT objectId, taiMidPoint, fluxToAbMag(psfMag) FROM Source JOIN Object USING(objectId) JOIN Filter USING(filterId) WHERE areaSpec_box(:raMin, :declMin, :raMax, = :declMax) AND filterName 'u' AND variability BETWEEN :varMin AND :varMax ORDER BY objectId, taiMidPoint ASC • Possibilité de définir des fonctions ad hoc par l’utilisateur (UDF) • Exemple: areaspec_box, angSep < dist • 500,000 requêtes par jour 4 Objectifs généraux • Proposer une architecture distribuée capable de stocker +100 PB de données • Open Source • Shared-Nothing • Pouvoir évaluer aussi bien des requêtes simples (quelques secondes de calculs) que des requêtes complexes (des jours de calculs) • Possibilité d’accéder à des objets en utilisant des indexes ou en procédant à un parcours (scan) complet des grosses tables (>> 1 PB) 5 SQL-Hadoop SQL Completeness (L, M, H suggests range of support) Packaged Operations Analytic SQL-on-Hadoop SQL DDL/DML functions Technology M M L Hive HadoopDB/Hadapt M M L M L Drill M L Impala M L Presto M M Spark/Shark UDFs/Custom functions H L L L H 6 Hive et HadoopDB HIVE* (modified) HIVE RDBMS Map/reduce Map/reduce HDFS HadoopDB Map/reduce RDBMS Map/red uce Map/red uce Map/re duce HDFS Hive 7 Exécution des Requêtes • • • • HiveQL: un langage de requêtes similaire à SQL (SQL-Like) Génération d’un ensemble de « Jobs » Map/Reduce Le plan d'exécution des Jobs est représenté par un DAG Le HDFS est utilisé pour stocker les données initiales et les résultats intermédiaires Job 1 Store result of whole job Job 4 Job 6 Job 3 Job 2 Job 5 8 Environnement d'expérimentation • Matériel: • 10 machines DELL C6220 • 2 clusters de 25 et 50 machines virtuelles ( 8 Go de RAM, 2 cœurs et 300 Go d’espace disque chacune)Resource disponible • Jeu de données PT1.1: • 2 tables: 90 Go • http://petasky.univ-lyon1.fr/pt12/ RAM 1 TO Espace Disque 52 TO #Processeurs 240 • Jeu de données PT1.2: • 22 tables: 220 Go (seulement 146 Go a été utilisées ) • http://petasky.univ-lyon1.fr/pt12/ • Jeu de données Winter 13: • 18 tables: 3 To (1 To a été utilisé) • PT 1.2: 250 Go, 500 Go, 1 To et 2 To 9 Paramètres à mesurer • Tuning: Temps de chargement de données (indexation, partitionnement, …) • Performance: Temps total d'exécution des requêtes • Tolérance aux fautes: Nombre de fautes gérées par l’outil • Latence: temps nécessaire pour avoir la première réponse • Montée en charge (volume de données): • 250 Go 500 Go 1 To 2 To • Evolution Matérielle: • 25 machines 50 machines 10 jointure Group By Selection Requêtes: Syntaxe et Sémantique (1/2) id Syntaxe SQL Q1 select * from source where sourceid=29785473054213321; Q2 select sourceid, ra,decl from source where objectid=402386896042823; Q3 select sourceid, objectid from source where ra > 359.959 and ra < 359.96 and decl < 2.05 and decl > 2; Q4 select sourceid, ra,decl from source where scienceccdexposureid=454490250461; Q5 select objectid,count(sourceid) from source where ra > 359.959 and ra < 359.96 and decl < 2.05 and decl > 2 group by objectid; Q6 select objectid,count(sourceid) from source group by objectid; Q7 select * from source join object on (source.objectid=object.objectid) where ra > 359.959 and ra < 359.96 and decl < 2.05 and decl > 2; Q8 select * from source join object on (source.objectid=object.objectid) where ra > 359.959 and ra < 359.96; Q9 SELECT s.psfFlux, s.psfFluxSigma, sce.exposureType FROM Source s JOIN RefSrcMatch rsm ON (s.sourceId = rsm.sourceId) JOIN Science_Ccd_Exposure_Metadata sce ON (s.scienceCcdExposureId = sce.scienceCcdExposureId) WHERE s.ra > 359.959 and s.ra < 359.96 and s.decl < 2.05 and s.decl > 2 and s.filterId = 2 and rsm.refObjectId is not NULL; 11 id Syntaxe SQL Order by Q10 select objectid,sourceid from source where ra > 359.959 and ra < 359.96 and decl < 2.05 and decl > 2 order by objectid; Q11 select objectid,sourceid from source where ra > 359.959 and ra < 359.96 order by objectid; UDF Requêtes: Syntaxe et Sémantique (2/2) Q12 select id FROM rundeepforcedsource where areaspec_box(coord_ra,coord_decl,-55,-2,55,2)=1; Q13 select fluxToAbMag(flux_naive) from rundeepforcedsource where objectid=1398583353936135; 12 Enseignements à tirer (1/8) Chargement de données 2000 1500 1000 500 0 250 Go 500 Go 1 To 250 Go 25 machine Hive (HDFS) 500 Go 1 To 50 machines global hash local hash tuning • Hive vs HadoopDB • Partitionnement personnalisé avec HadoopDB • Plus de données => plus de temps pour changer le partitionnement des données (passage a l’échelle) • Plus de matériel (Machines) => moins du temps pour partitionner les données (accélération matérielle) 13 Enseignements à tirer (2/8) Index creation Taille de l’index Time (seconds) 200 150 100 50 8000 7000 6000 5000 4000 3000 2000 1000 0 250GB 500 GB 0 250Go Sourceid 500 Go objectid ra 1To decl 2 To scienceccdexposure 1TB 2 TB 250GB 500 GB 25 machines Sourceid objectid 1TB 2 TB 50 machines ra decl scienceccdexposure • 1ère stratégie (Hive): Indexation globale et distribuée • Hive: Taille de l’index (Go) Vs Temps de création de l’index (seconds) • Plus de valeurs distinctes pour un attribut => plus d’espace pour stocker l’index (ex: Decl) • On peut profiter du matériel pour accélérer la création des indexes (25 machines Vs 50 machines) • Temps de création: 8 mins – 2h pour 25 machines et 5 mins – 1h 30 pour 50 machines 14 Enseignements à tirer (3/8) Création d’index (secondes) 2000 1500 1000 500 0 250 Go 500 Go 1 To 250 Go 25 machine 500 Go 1 To 50 machines Load time indexation • 2ème stratégie (HadoopDB): chaque nœud gère les indexes des données qu’il héberge • Temps de création des indexes Vs le temps total de chargement de données • La tâche de création des indexes est parallélisable ce qui permet de tirer profit des ressources disponibles • On peut profiter du matériel pour accélérer la création des indexes (25 machines Vs 50 machines) • Temps de création: 60 – 75 secondes pour 5 indexes 15 Enseignements à tirer (4/8) Time (seconds) 2000 Avec Index 1500 200 1000 150 100 500 50 0 0 HadoopDB Hive HadoopDB 250 GB Hive 500 GB Q1 Q2 Q3 HadoopDB Hive 1 TB Q4 • Performances pour les tâches de sélection (secondes) • 50 machines (400 Go de RAM) • Données non indexées (max 30 mins) Vs Données indexées (max 3 mins) • Hive Vs HadoopDB • • • Hive HadoopDB Hive 250 go HadoopDB 500 Go Q1 Q2 Q3 (RA) Hive HadoopDB 1 To Q4 Sans index: HadoopDB (+) vs Hive (-) sauf pour le jeu de données avec 250 Go (d’autres traitements intenses ont été lancés en parallèle sur plateforme cloud utilisée) Avec Index: Hive (+) vs HadoopDB (-) sauf pour la requêtes Q3 et le jeu de données avec 1 To ou Hive a refusé d’utiliser l’index du a sa taille qui dépasse la capacité de la RAM (> 8 Go) Passage a l’échelle: • • Sans Index: 250 Go (max 4 mins) Vs 500 Go (max 10 mins ) Vs 1 To (max 30 mins) Avec Index: 250 Go (max 2 mins) Vs 500 Go (max 3 mins ) Vs 1 To (max 3,5 mins) 16 Enseignements à tirer (5/8) Group by 4500 4000 3500 3000 2500 2000 1500 1000 500 0 Hive Hive wth Index HadoopDB HadoopDB wth index 250 go Hive Hive wth index HadoopDb HadoopDb wth index 500 go Q5 Hive wth index HadoopDB HadoopDB wth index 1 To Q6 • 50 machines (400 Go de RAM) • Données non indexées (max 1h 8 mins) Vs Données non indexées (max 3 mins) • Hive Vs HadoopDB • Hive • Sans index: HadoopDB (+) vs Hive (-) pour les requêtes sélectives (ex, Q5) et HadoopDB (-) vs Hive (+) pour les requêtes non sélectives (ex, Q6) • Avec Index: Hive refuse systématiquement l’utilisation de l’index pour les requêtes non sélectives, HadoopDB (+) vs Hive (-) pour les requêtes sélectives et HadoopDB ne profite pas de l’index pour les requêtes non sélectives Passage a l’échelle: • • Sans Index: 250 Go (max 20 mins) Vs 500 Go (max 40 mins ) Vs 1 To (max 1h10 mins) • Avec Index: 250 Go (max 15 mins) Vs 500 Go (max 35 mins ) Vs 1 To (max 1h8 mins) Hive parallélise le traitement des group by => Hive sans index est nettement meilleur que HadoopDB avec Index 17 Enseignements à tirer (6/8) HadoopDB avec un partitionnement personnalisé 4500 4000 3500 3000 2500 2000 1500 1000 500 0 HadoopDB HadoopDB HadoopDB HadoopDB HadoopDb HadoopDb HadoopDB HadoopDB HadoopDB HadoopDB HadoopDB HadoopDB wth index wth index wth index wth index wth index wth index SourceID ObjectID 250 go SourceID ObjectID 500 go Q5 SourceID ObjectID 1 To Q6 • Optimisation avec HadoopDB: Changement de l’attribut de partitionnement • Minimisation du coût réseau (nombre d’enregistrements transférés d’une machine à une autre) • Un gain jusqu’à 500 % du temps d’exécution 18 Enseignements à tirer (7/8) Join tasks Time (seconds) 6000 5000 4000 3000 2000 1000 0 Hive HadoopDB HadoopDB wth index Hive 250 GB Q8 Hive HadoopDB HadoopDB wth index 1 TB Q9 Performances pour les tâches de jointures (secondes) 50 machines (400 Go de RAM) HadoopDB ne support les jointures de plus de 2 tables Hive ne support pas l’utilisation de l’index pour évaluer les requêtes de jointures Données non indexées (>> 2H) Vs Données non indexées (>>> 2 heures) Hive Vs HadoopDB • • • • HadoopDB wth index 500 GB Q7 • • • • • • HadoopDB Hive est nettement meilleur que HadoopDB même quand hadoopDB utilise les indexes Hive parallélise le traitement de la jointures HadoopDB collecte les données et fait la jointures en utilisant une seule machine HadoopDB ne profite pas de l’index pour les requêtes non sélectives 19 Enseignements à tirer (8/8) Order by tasks 2000 1500 1000 500 0 Hive Hive wth HadoopDB HadoopDB index index 250 Go Hive Hive wth HadoopDB HadoopDB index index Hive wth HadoopDB HadoopDB index index 500 Go Q10 • • • • • Hive 1 To Q11 Performances pour les tâches ORDER BY (secondes) 50 machines (400 Go de RAM) Hive ne permet pas l’utilisation de l’index pour évaluer les requêtes ORDER BY Données non indexées (max 30 mins) Vs Données indexées (max 1 min) Hive Vs HadoopDB • HadoopDB est nettement meilleur que Hive sans index et avec index • L’ORDER BY n’est pas une tâche paralilisable avec le modèle Map/Reduce 20 Synthèse • Cost (q)= 𝑛−1 𝑘=1 𝐶𝑜𝑠𝑡 𝑗𝑜𝑏𝑘 + 𝑐𝑜𝑠𝑡(𝑗𝑜𝑏𝑛 ) − 𝑐𝑜𝑠𝑡𝑊𝑟 (𝑗𝑜𝑏𝑛) • Cost(job)=cost(Map)+cost(Network)+cost(Reduce)+cost (écriture des résultats) • Optimisations: • N: jobs • Cost (Map): indexation, compression, lecture des données en utilisant plusieurs « Mappers » • Cost (Network): #<key, values> minimisation, résultats intermédiaires, . . . • Partitionnement • Cost (Reduce): traitement parallèle (# Reducers), minimisation de la taille des résultats intermédiaires 21