Declarative queries on large astronomy databases : Experiments with Hive and HadoopDB Projet soutenu par le CNRS dans le cadre du défi: Grandes masses de données scientifiques - MASTODONS http://com.isima.fr/Petasky Amin Mesmoudi (LIRIS) Mohand-Saïd Hacid (LIRIS) Farouk Toumani (LIMOS) Le flot de données LSST • Caméra : 198 CCD (16 Mpix) en parallèle → 3,2 G pixels ! ~ 6 Gbyte / 17 secondes → 15 TB / nuit Pendant 10 ans ! http://www.lsst.org/lsst/science/science_goals 2 Le flot de données LSST • Caméra : 198 CCD (16 Mpix) en parallèle → 3,2 G pixels ! ~ 6 Gbyte / 17 secondes → 15 TB / nuit Pendant 10 ans ! http://www.lsst.org/lsst/science/science_goals 2 Les données LSST sont-elles ’’big’’? Table Taille #enregistrements #colonnes (arité) Object 109 TB 38 B 470 Moving Object 5 GB 6M 100 Source 3.6 PB 5T 125 Forced Source 1.1 PB 32 T 7 Difference Image Source 71 TB 200 B 65 CCD Exposure 0.6 TB 17 B 45 3 Accès aux données • Accès • Requêtes déclaratives (SQL) SELECT objectId, taiMidPoint, fluxToAbMag(psfMag) FROM Source JOIN Object USING(objectId) JOIN Filter USING(filterId) WHERE areaSpec_box(:raMin, :declMin,:raMax, = :declMax) AND filterName 'u' AND variability BETWEEN :varMin AND :varMax ORDER BY objectId, taiMidPoint ASC • Possibilité de définir des fonctions ad hoc par l’utilisateur (UDF) • Exemple: areaspec_box, angSep < dist Liste complète des requêtes: http://dev.lsstcorp.org/trac/wiki/dbQueries Défis LSST : • ½ million de requêtes par jour • ~50 requêtes simples et ~20 requêtes complexes à n’importe quel moment 4 Accès aux données • Accès • Requêtes déclaratives (SQL) SELECT objectId, taiMidPoint, fluxToAbMag(psfMag) FROM Source JOIN Object USING(objectId) JOIN Filter USING(filterId) WHERE areaSpec_box(:raMin, :declMin,:raMax, = :declMax) AND filterName 'u' AND variability BETWEEN :varMin AND :varMax ORDER BY objectId, taiMidPoint ASC • Possibilité de définir des fonctions ad hoc par l’utilisateur (UDF) • Exemple: areaspec_box, angSep < dist Liste complète des requêtes: http://dev.lsstcorp.org/trac/wiki/dbQueries Défis LSST : • ½ million de requêtes par jour • ~50 requêtes simples et ~20 requêtes complexes à n’importe quel moment 4 Objectifs généraux • Proposer une architecture distribuée capable de stocker +100 PB de données • Open Source • Shared-Nothing • Pouvoir évaluer aussi bien des requêtes simples (quelques secondes de calcul) que des requêtes complexes (des jours de calcul) • Possibilité d’accéder à des objets en utilisant des indexes ou en procédant à un parcours (scan) complet des grosses tables (>> 1 PB) 5 Objectifs généraux • Proposer une architecture distribuée capable de stocker +100 PB de données • Open Source • Shared-Nothing • Pouvoir évaluer aussi bien des requêtes simples (quelques secondes de calcul) que des requêtes complexes (des jours de calcul) • Possibilité d’accéder à des objets en utilisant des indexes ou en procédant à un parcours (scan) complet des grosses tables (>> 1 PB) • Étudier la capacité des systèmes existants à répondre aux besoins du projet LSST 5 Plan • Contexte et Objectifs • Benchmark • Résultats d'expérimentations • Synthèse • Conclusion Benchmark Resources utilisées • Jeux de données de référence issus de LSST • Données réelles SDSS (Sloan Digital Sky Survey - http://www.sdss.org/) : 3 TB • Données simulées : ~250 GB, possibilité de reproduction à volonté (http://petasky.univ-lyon1.fr/scale) • En pratique, expérimentations jusqu'à 2 TB • Jeu de requêtes de référence (~13) • Basé sur des requêtes métiers : ~70 requêtes formalisées • Complexité croissante • Plateforme de tests • Jusqu'à 50 machines virtuelles • 10 noeuds physiques • DELL C6220 • 1 To de RAM, 52 TO d’espace disque et 240 processeurs 8 Protocole d’expérimentations • Evaluation des machines • 25 machines, 50 machines • Evaluation des données • 250 Go, 500 Go, 1 To et 2 To • Evaluation de la sélectivité des requêtes • 1 jusqu’à 16 millions • Changement d’attribut de partitionnement • SourceID et ObjectID • Données indexées vs. Données Non indexées • 5 index pour Source et 2 index pour Object 9 In-Memory SQL-MapReduce SQL Completeness (L, M, H suggests range of support) Packaged Operations Analytic SQL-on-Hadoop SQL DDL/DML functions Technology M M L Hive HadoopDB/Hadapt M M L M L Drill M L Impala M L Presto M M Spark/Shark UDFs/Custom functions H L L L H 10 Hive et HadoopDB HIVE Map/reduce Map/reduce Map/reduce HDFS Hive 11 Hive et HadoopDB HIVE* HIVE (modified) RDBMS Map/reduce Map/reduce Map/reduce RDBMS HDFS HadoopDB 11 Résultats d’expérimentations Chargement de données (1/2) Minutes 2000 1500 1000 500 0 250 GB 500 GB 1 TB 250 GB 25 machine Hive (HDFS) loading time for 1 TB- 50 machines 33% 39% 13% global hash local hash 1 TB 50 machines global hash local hash tuning • Différence (Hive >> HadoopDB): 300 % pour 25 machines et 200 % pour 50 machines • Hive: • 2X de temps pour 2X de données • 25 >> 50 machines: même temps 15% HDFS 500 GB tuning • HadoopDB: • 2X de données: 90 %-120 % de plus • 25 >>50 machines: 25 % de gain 13 Chargement de données (2/2) 6% 30% 70% 94% Load time indexation Load time Indexation for 1 TB - 50 machines Hive Indexation for 1 TB - 50 machines HadoopDB • Hive: 200 Index size (GB) indexation • 25 >> 50 machines: 15% 150 100 50 0 250GB Sourceid objectid 500 GB ra 1TB decl 2 TB scienceccdexposure 14 Exécution de requêtes Hive vs. HadoopDB (1/6) id Syntaxe SQL |Q| Q1 select * from source where sourceid=29785473054213321; 1 Q2 select sourceid, ra,decl from source where objectid=402386896042823; 43 Q3 select sourceid, objectid from source where ra > 359.959 and ra < 359.96 and decl < 2.05 and decl > 2; 21-172 Q4 select sourceid, ra,decl from source where scienceccdexposureid=454490250461; 3,2*10329,2*103 2000 • ~250 % temps pour 2X de données Seconds 1500 • 25 >> 50 machines: un gain de 50 % du temps 1000 • HadoopDB >> Hive: sauf pour le jeu de données avec 250 Gb 500 0 HadoopDB Hive HadoopDB 250 GB Hive 500 GB Q1 Q2 Q3 Q4 HadoopDB Hive 1 TB 15 Exécution de requêtes Hive vs. HadoopDB (2/6) 200 2000 180 160 Seconds 1500 140 120 100 1000 80 60 500 40 20 0 0 HadoopDB Hive HadoopDB 250 GB Hive 500 GB Q1 Q2 Q3 Q4 HadoopDB 1 TB Hive Hive HadoopDB Hive 250 Gb HadoopDB 500 GB Q1 Q2 Q3 (RA) Hive HadoopDB 1 TB Q4 16 Exécution de requêtes Hive vs. HadoopDB (3/6) id Syntaxe SQL Q5 select objectid,count(sourceid) from source where ra > 359.959 and ra < 359.96 and decl < 2.05 and decl > 2 group by objectid; Q6 select objectid,count(sourceid) from source group by objectid; 4500 4000 3500 3000 2500 2000 1500 1000 500 0 Hive Hive wth HadoopDB HadoopDB Index wth index Hive Hive wth HadoopDb HadoopDb index wth index 250 GB 500 GB Q5 Q6 Hive Hive wth HadoopDB HadoopDB index wth index 1 TB • Sans index: HadoopDB (+) vs Hive (-) pour les requêtes sélectives (ex, Q5) et HadoopDB (-) vs Hive (+) pour les requêtes non sélectives (ex, Q6) • Avec Index: Hive refuse systématiquement l’utilisation de l’index pour les requêtes non sélectives, HadoopDB (+) vs Hive (-) pour les requêtes sélectives et HadoopDB ne profite pas de l’index pour les requêtes non sélectives • Passage a l’échelle: • Hive parallélise le traitement des group by => Hive sans index est nettement meilleur que HadoopDB avec Index 17 Exécution de requêtes Hive vs. HadoopDB (4/6) Optimization within HadoopDB 4500 4000 3500 3000 2500 2000 1500 1000 500 SourceID ObjectID 250 GB SourceID ObjectID 500 go Q5 SourceID HadoopDB wth index HadoopDB HadoopDB wth index HadoopDB HadoopDB wth index HadoopDB HadoopDb wth index HadoopDb HadoopDB wth index HadoopDB HadoopDB wth index HadoopDB 0 ObjectID • Optimisation avec HadoopDB: Changement de l’attribut de partitionnement • Minimisation du coût réseau (nombre d’enregistrements transférés d’une machine à une autre) • Un gain jusqu’à 500 % du temps d’exécution 1 GB Q6 18 Exécution de requêtes Hive vs. HadoopDB (5/6) id Syntaxe SQL Q7 select * from source join object on (source.objectid=object.objectid) where ra > 359.959 and ra < 359.96 and decl < 2.05 and decl > 2; Q8 select * from source join object on (source.objectid=object.objectid) where ra > 359.959 and ra < 359.96; Q9 SELECT s.psfFlux, s.psfFluxSigma, sce.exposureType FROM Source s JOIN RefSrcMatch rsm ON (s.sourceId = rsm.sourceId) JOIN Science_Ccd_Exposure_Metadata sce ON (s.scienceCcdExposureId = sce.scienceCcdExposureId) WHERE s.ra > 359.959 and s.ra < 359.96 and s.decl < 2.05 and s.decl > 2 and s.filterId = 2 and rsm.refObjectId is not NULL; • Hive ne support pas l’utilisation de l’index pour évaluer les requêtes de jointures • Hive Vs HadoopDB 6000 Time (seconds) 5000 4000 3000 • 2000 • • 1000 0 Hive HadoopDB HadoopDB wth index Hive 250 GB HadoopDB HadoopDB wth index 500 GB Q7 Q8 Q9 Hive HadoopDB HadoopDB wth index 1 TB Hive est nettement meilleur que HadoopDB même quand hadoopDB utilise les indexes Hive parallélise le traitement de la jointures HadoopDB collecte les données et fait la jointures en utilisant une seule machine • HadoopDB ne profite pas de l’index pour les requêtes non sélectives 19 Exécution de requêtes Hive vs. HadoopDB (6/6) id Syntaxe SQL Q10 select objectid,sourceid from source where ra > 359.959 and ra < 359.96 and decl < 2.05 and decl > 2 order by objectid; Q11 select objectid,sourceid from source where ra > 359.959 and ra < 359.96 order by objectid; 2000 1800 1600 1400 1200 1000 800 600 400 200 0 Hive Hive wth HadoopDB HadoopDB index index Hive Hive wth HadoopDB HadoopDB index index 250 GB 500 GB Q10 Q11 Hive Hive wth HadoopDB HadoopDB index index 1 TB • Hive ne support pas l’utilisation de l’index pour évaluer les requêtes d’order By • HadoopDB est nettement meilleur que Hive sans index et avec index • L’order by n’est pas une tâche paralilisable avec le modèle Map/Reduce 20 Job 1 Map Reduce Job 2 Map Reduce Cost_ReadHDFS Cost_Map Cost_Network Cost_Reduce Cost_WriteHDFS Cost_ReadHDFS Cost_Query Query Cost_job Synthèse Job n Map Reduce 21 Conclusion • Ces outils ne sont pas adaptés pour gérer toutes les requêtes LSST • Les techniques d'optimisation classiques (indexation, partitionnement, cache...) doivent être revisitées • Perspectives Hive/HadoopDB : • Étude d'un système de recommandation d'index, de partitionnement et de paramètres de configuration d'Hadoop • Développement de fonctions ad hoc pour l'astrophysique (en exploitant le paradigme Map/Reduce) • Développement d'un modèle concurrent basé sur le BSP (Bulk Synchronous Parallel) • Paralléliser ce qui ne l'est pas en MapReduce (e.g., jointures, ORDER BY) • Garantir la tolérance aux pannes, le passage à l'échelle et le chargement personnalisé des données 22 Merci 23