Big Data:
- Information that cannot be processed or analyzed using traditional processes or tools.
Data analysis:
- Huge amount of data in different formats may have errors.
- Data conditioning and cleaning (80% of total effort): get data in usable state, but data is frequently missing and some are difficult to be parsed.
- Statistics is grammar of data science. open source R language and CRAN.
- Use visualization to see how bad raw data is, and use visualization as first step for data analysis.
Drivetrain appoach:
- define a clear objective
- predictive modeling or optimal decision
- simulation
- optimization
Data scraping:
- uber problem: challenges of data scraping: scale, metadata, data format, source domain complexity
- tools: scraperWiki, Junar, Mashery, Apigee, 2scale
Hadoop:
- From Yahoo. Top level apache open source project Hadoop(http://hadoop.apache.org) written in Java.
- Based on Google File System(GFS) and the MapReduce programming paradigm.
- MapReduce follows functional programming paradigm for parallelism and locality of data access.
- Work is broken down into mapper and reducer tasks to manipulate data to be stored across cluster of servers for massive parallelism.
- Easy to use: to run a MapReduce job, users need to write map and reduce functions while Hadoop handles all the rest(e.g decompose submitted job into map/reduce tasks, schedule tasks, monitor tasks to ensure successful completion or restart tasks that fails).
- Map function has input as single aggregate and output is bunch of key value pairs.
- Sort and Shuffle phase: sort(alphanumeric partitioning) key-value pairs by key and shuffle all pairs with same key to same reducer.
- Reduce function gets multiple map outputs with same key and then combines their values, when there is no more reduce work, all final key value pairs are emitted as output.
- efficiency:
partitioning/shuffling: increase parallelism and reduce the data transfer by partitioning output of mappers into groups(buckets or regions).
combiner: much of data transfer between mapper and reducer are repetitive(multiple key-value pair with same key), a combiner(in essence reducer function) combines all data for same key into single value.
Hadoop designed to:
- scan through large complex/structured data set to produce its results through highly scalable, distributed batch processing system.
- built around function to data model and is about discovery and making once near impossible possible; for reliability(self-healing).
Hadoop components(Hadoop core):
- Hadoop distributed file system (HDFS): data redundancy
- Hadoop MapReduce model: programming redundancy
- Hadoop common: set of libraries
- Hadoop YARN(Yet-Another-Resource-Negotiator)
Related projects:
- Apache project, Apache Incubator projects, Apache Software Foundation hosted on GitHub.
- Cassandra, HBase, Mango, and Hypertable: for column oriented huge databases storing key-value pairs
for fast retreival.
- Chukwa: monitoring system for large distributed system
- Mahout: machine learning library for analytical computation, collaborative filtering, user recommendations,
clustering, regression, classification, pattern mining
- Hive: SQL - like queries for data aggregation and summarization for Hypertable
- Pig: hadoop parallel data flow programming language for cassandra and its User Defined Functions(UDFs)
- Cloudera Hue(Hadoop User Experience): web console
- Zookeeper: naming, coordination and workflow services for distributed applications
- Oozie: managing job workflow, scheduling and dependencies
- Ambari: web based tool for provisioning, configuration, monitoring, administration, upgrade services and deployment.
- Whirr: running services on cloud platforms
- Sqoop: SQL for Hadoop
- Flume: for data ingestion
- Cascading: provide wrapper for Java applications
- Cascalog: query language
MapReduce programming languages:
- Java
- Pig: dataflow scripting language
- Hive and Hive Query Language(HQL): run from Hive shell, Java Database Connectivity or Open Database connectivity
- Jaql: functional, record oriented, declarative query language for Javascript Object Notation(JSON).
- Data locality: MapReduce assign workloads to servers where data to be processed is stored without using SAN or NAS(network attached storage).
- Data redundancy:
For high availability, reliability, scalability and data locality:
HDFS replicated each block onto three servers by default:
NameNode(optionally a BackUpNode to prevent single point of failure)
IBM General Parallel File System(GPFS) based on SAN
IBM GPFS extended to share nothing cluster(GPFS-SNC)
Google File System(GFS)
- Transparent to the application: Hadoop HDFS will contact the NameNode, find the servers that hold the data for the application task, and then send your application to run locally on those nodes.
- MapReduce model: mappers run parallel converting input into tuples(keys and values), reducer combines results from mappers(or optionally via shuffle/sort).
- master node run three deamons: NameNode(optionally a second NameNode to prevent single point of failure), secondary NameNode does housekeeping for the NameNode, and JobTracker; each slave or worker node runs two deamons: DataNodes and TaskTrackers.
- cluster with more nodes will perform better than on with fewer, faster nodes; in general, more disks is better, Hadoop nodes are typical disk and network bond
- an application(run on client machine) submits job to a node(running a deamon JobTracker) in Hadoop Cluster, Job tracker communicates with the NameNode, breaks the job into map and reduce tasks, and TaskTracker schedules the tasks on the node in the cluster where the data resides.
- set of continually running deamons TaskTracker agents monitor the status of each task and report to the job tracker.
- HDFS is not a POSIX compliance file system, cant interact with Linux or Unix file system, and you should use the /bin/hdfs dfs <args> file system shell command interface or FsShell(invoked by Hadoop command fs)
- HDFS shell commands: put,get,cat,chmod,chown,copyFromLocal, copyToLocal, cp, expunge, ls, mkdir, mv,rm
- Java applications need import from org.apache.hadoop.fs package
JVM recommendations:
- use official Oracle JDK(http://java.com/) 1.6.0u24 or 1.6.0u26(dont use the latest version)
Hadoop Versions
- http://hadoop.apache.org/
- Clouderas distribution for Hadoop (CDH) free and enterprise editions http://www.cloudera.com/
- NoSQL databases contain MapReduce functionality
CouchDB: semi structured document based storage with JavaScript
MongoDB: better performance with JavaScript
Riak: high availability with JavaScript or Erlang
Integration with SQL databases:
Sqoop with Java JDBC database API, import data from relational databases into Hadoop
Hive:
- Hive (http://hive.apache.org/) was originally developed at Facebook, and is now a top level Apache Software foundation project;
- It provides SQL-Like language, and it generates MapReduce jobs to run on Hadoop cluster.
- Provides an SQL dialect for querying data stored in HDFS, Amazon S3, and databases (HBase, Cassandra).
- Makes it easier for developers to port SQL based data warehouse applications to Hadoop
Hive Query Language (HiveQL, or HQL):
- Hive translates queries to MapReduce jobs.
- Hive is not full database and does not conform to SQL standard.
- It does not provide record level update, insert, delete of transactions.
- You can generate new tables from queries or output query results to files.
Hive data model:
- Layers table definition on top of data in HDFS directories/files: tables(typed columns, JSON-like data, eg arrays, struct, map etc), partition and buckets(hashed for sampling, join optimization).
- primitive types: INT, TINYINT, SMALLINT, BIGINT, FLOAT, DOUBLE, BOOLEAN, STRING and CDH4 added BINARY, TIMESTAMP.
- Type constructors
ARRAY<primitive-type>:prices = [1000,950,920]
MAP<primitive-type,data-type>: bonus = ['alice':200,'bob':150]
STRUCT<col-name: data-type>: address<zip:STRING, street:STRING, city:STRING>
- Hive Metastore: Hives Metastore is the database containing table definitions and other metadata
Hive data physical layout:
- Hive tables are store in subdirectories in HDFS.
- Actual data is store in flat files either SequenceFile(control character-delimited text), or in arbitrary format with use of custom serializer/deserializer(SerDes).
Hive Shell
$hive
hive> SHOW TABLE;
to create Hive table:
hive> CREATE TABLE shakespeare(freq INT, word STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;
hive>DESCRIBE shakespeare;
To create Hive external table in another directory movie:
hive> CREATE EXTERNAL TABLE movie(id INT, name STRING, year INT) ROW FORMAT DELIMITED FIELD TERMINATED BY '\t' LOCATION '/user/training/movie';
To load data
hive> LOAD DATA I1NPATH "shakespeare_freq" INTO TABLE shakespeare;
Sqoops
- hive-import option can import data from database into Hive tables by generating Hive CREATE TABLE statement based on table definition in RDBMS.
hive> SELECT * FROM shakespeare LIMIT 10;
hive> SELECT * FROM shakespeare
WHERE freq > 100
ORDER BY freq ASC LIMIT 10;
hive> SELECT s.word, s.freq, k.freq
FROM Shakespeare s JOIN kjv k
ON (s.word = k.word)
WHERE s.freq >= 5;
hive> INSERT OVERWRITE TABLE new_table
SELECT s.word, s.freq, k.freq
FROM Shakespeare s JOIN kjv k
ON (s.word = k.word)
WHERE s.freq >= 5;
Hive support manipulation of data via user-defined functions (UDFs) written in Java or in any language via the TRANSFORM operator
hive> INSERT OVERWRITE TABLE u_data_new SELECT TRANSFORM (user_id, movie_id, rating, unixtime) USING ‘python weekday_mapper.py’
AS (user_id, movie_id, rating, weekday)
FROM u_data;
hive> QUIT;
Hive limitations:
- no standard SQL supported i.e no correlated subqueries
- no support for UPDATE and DELETE
- no support for INSERT single rows
Impala
- open-source project created by Cloudera to facilitates interactive real-time queries of data in HDFS
- Impala does not use MapReduce, but uses a language very similar to HiveQL and produces results 5-40x faster.
$ pig [–help] [–h] [-version] [–i] [–execute] [–e]
- Information that cannot be processed or analyzed using traditional processes or tools.
Data analysis:
- Huge amount of data in different formats may have errors.
- Data conditioning and cleaning (80% of total effort): get data in usable state, but data is frequently missing and some are difficult to be parsed.
- Statistics is grammar of data science. open source R language and CRAN.
- Use visualization to see how bad raw data is, and use visualization as first step for data analysis.
Drivetrain appoach:
- define a clear objective
- predictive modeling or optimal decision
- simulation
- optimization
Data scraping:
- uber problem: challenges of data scraping: scale, metadata, data format, source domain complexity
- tools: scraperWiki, Junar, Mashery, Apigee, 2scale
Hadoop:
- From Yahoo. Top level apache open source project Hadoop(http://hadoop.apache.org) written in Java.
- Based on Google File System(GFS) and the MapReduce programming paradigm.
- MapReduce follows functional programming paradigm for parallelism and locality of data access.
- Work is broken down into mapper and reducer tasks to manipulate data to be stored across cluster of servers for massive parallelism.
- Easy to use: to run a MapReduce job, users need to write map and reduce functions while Hadoop handles all the rest(e.g decompose submitted job into map/reduce tasks, schedule tasks, monitor tasks to ensure successful completion or restart tasks that fails).
- Map function has input as single aggregate and output is bunch of key value pairs.
- Sort and Shuffle phase: sort(alphanumeric partitioning) key-value pairs by key and shuffle all pairs with same key to same reducer.
- Reduce function gets multiple map outputs with same key and then combines their values, when there is no more reduce work, all final key value pairs are emitted as output.
- efficiency:
partitioning/shuffling: increase parallelism and reduce the data transfer by partitioning output of mappers into groups(buckets or regions).
combiner: much of data transfer between mapper and reducer are repetitive(multiple key-value pair with same key), a combiner(in essence reducer function) combines all data for same key into single value.
Hadoop designed to:
- scan through large complex/structured data set to produce its results through highly scalable, distributed batch processing system.
- built around function to data model and is about discovery and making once near impossible possible; for reliability(self-healing).
Hadoop components(Hadoop core):
- Hadoop distributed file system (HDFS): data redundancy
- Hadoop MapReduce model: programming redundancy
- Hadoop common: set of libraries
- Hadoop YARN(Yet-Another-Resource-Negotiator)
Related projects:
- Apache project, Apache Incubator projects, Apache Software Foundation hosted on GitHub.
- Cassandra, HBase, Mango, and Hypertable: for column oriented huge databases storing key-value pairs
for fast retreival.
- Chukwa: monitoring system for large distributed system
- Mahout: machine learning library for analytical computation, collaborative filtering, user recommendations,
clustering, regression, classification, pattern mining
- Hive: SQL - like queries for data aggregation and summarization for Hypertable
- Pig: hadoop parallel data flow programming language for cassandra and its User Defined Functions(UDFs)
- Cloudera Hue(Hadoop User Experience): web console
- Zookeeper: naming, coordination and workflow services for distributed applications
- Oozie: managing job workflow, scheduling and dependencies
- Ambari: web based tool for provisioning, configuration, monitoring, administration, upgrade services and deployment.
- Whirr: running services on cloud platforms
- Sqoop: SQL for Hadoop
- Flume: for data ingestion
- Cascading: provide wrapper for Java applications
- Cascalog: query language
MapReduce programming languages:
- Java
- Pig: dataflow scripting language
- Hive and Hive Query Language(HQL): run from Hive shell, Java Database Connectivity or Open Database connectivity
- Jaql: functional, record oriented, declarative query language for Javascript Object Notation(JSON).
- Data locality: MapReduce assign workloads to servers where data to be processed is stored without using SAN or NAS(network attached storage).
- Data redundancy:
For high availability, reliability, scalability and data locality:
HDFS replicated each block onto three servers by default:
NameNode(optionally a BackUpNode to prevent single point of failure)
IBM General Parallel File System(GPFS) based on SAN
IBM GPFS extended to share nothing cluster(GPFS-SNC)
Google File System(GFS)
- Transparent to the application: Hadoop HDFS will contact the NameNode, find the servers that hold the data for the application task, and then send your application to run locally on those nodes.
- MapReduce model: mappers run parallel converting input into tuples(keys and values), reducer combines results from mappers(or optionally via shuffle/sort).
- master node run three deamons: NameNode(optionally a second NameNode to prevent single point of failure), secondary NameNode does housekeeping for the NameNode, and JobTracker; each slave or worker node runs two deamons: DataNodes and TaskTrackers.
- cluster with more nodes will perform better than on with fewer, faster nodes; in general, more disks is better, Hadoop nodes are typical disk and network bond
- an application(run on client machine) submits job to a node(running a deamon JobTracker) in Hadoop Cluster, Job tracker communicates with the NameNode, breaks the job into map and reduce tasks, and TaskTracker schedules the tasks on the node in the cluster where the data resides.
- set of continually running deamons TaskTracker agents monitor the status of each task and report to the job tracker.
- HDFS is not a POSIX compliance file system, cant interact with Linux or Unix file system, and you should use the /bin/hdfs dfs <args> file system shell command interface or FsShell(invoked by Hadoop command fs)
- HDFS shell commands: put,get,cat,chmod,chown,copyFromLocal, copyToLocal, cp, expunge, ls, mkdir, mv,rm
- Java applications need import from org.apache.hadoop.fs package
JVM recommendations:
- use official Oracle JDK(http://java.com/) 1.6.0u24 or 1.6.0u26(dont use the latest version)
Hadoop Versions
- http://hadoop.apache.org/
- Clouderas distribution for Hadoop (CDH) free and enterprise editions http://www.cloudera.com/
- NoSQL databases contain MapReduce functionality
CouchDB: semi structured document based storage with JavaScript
MongoDB: better performance with JavaScript
Riak: high availability with JavaScript or Erlang
Integration with SQL databases:
Sqoop with Java JDBC database API, import data from relational databases into Hadoop
Hive:
- Hive (http://hive.apache.org/) was originally developed at Facebook, and is now a top level Apache Software foundation project;
- It provides SQL-Like language, and it generates MapReduce jobs to run on Hadoop cluster.
- Provides an SQL dialect for querying data stored in HDFS, Amazon S3, and databases (HBase, Cassandra).
- Makes it easier for developers to port SQL based data warehouse applications to Hadoop
Hive Query Language (HiveQL, or HQL):
- Hive translates queries to MapReduce jobs.
- Hive is not full database and does not conform to SQL standard.
- It does not provide record level update, insert, delete of transactions.
- You can generate new tables from queries or output query results to files.
Hive data model:
- Layers table definition on top of data in HDFS directories/files: tables(typed columns, JSON-like data, eg arrays, struct, map etc), partition and buckets(hashed for sampling, join optimization).
- primitive types: INT, TINYINT, SMALLINT, BIGINT, FLOAT, DOUBLE, BOOLEAN, STRING and CDH4 added BINARY, TIMESTAMP.
- Type constructors
ARRAY<primitive-type>:prices = [1000,950,920]
MAP<primitive-type,data-type>: bonus = ['alice':200,'bob':150]
STRUCT<col-name: data-type>: address<zip:STRING, street:STRING, city:STRING>
- Hive Metastore: Hives Metastore is the database containing table definitions and other metadata
Hive data physical layout:
- Hive tables are store in subdirectories in HDFS.
- Actual data is store in flat files either SequenceFile(control character-delimited text), or in arbitrary format with use of custom serializer/deserializer(SerDes).
Hive Shell
$hive
hive> SHOW TABLE;
to create Hive table:
hive> CREATE TABLE shakespeare(freq INT, word STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;
hive>DESCRIBE shakespeare;
To create Hive external table in another directory movie:
hive> CREATE EXTERNAL TABLE movie(id INT, name STRING, year INT) ROW FORMAT DELIMITED FIELD TERMINATED BY '\t' LOCATION '/user/training/movie';
To load data
hive> LOAD DATA I1NPATH "shakespeare_freq" INTO TABLE shakespeare;
Sqoops
- hive-import option can import data from database into Hive tables by generating Hive CREATE TABLE statement based on table definition in RDBMS.
hive> SELECT * FROM shakespeare LIMIT 10;
hive> SELECT * FROM shakespeare
WHERE freq > 100
ORDER BY freq ASC LIMIT 10;
hive> SELECT s.word, s.freq, k.freq
FROM Shakespeare s JOIN kjv k
ON (s.word = k.word)
WHERE s.freq >= 5;
hive> INSERT OVERWRITE TABLE new_table
SELECT s.word, s.freq, k.freq
FROM Shakespeare s JOIN kjv k
ON (s.word = k.word)
WHERE s.freq >= 5;
Hive support manipulation of data via user-defined functions (UDFs) written in Java or in any language via the TRANSFORM operator
hive> INSERT OVERWRITE TABLE u_data_new SELECT TRANSFORM (user_id, movie_id, rating, unixtime) USING ‘python weekday_mapper.py’
AS (user_id, movie_id, rating, weekday)
FROM u_data;
hive> QUIT;
Hive limitations:
- no standard SQL supported i.e no correlated subqueries
- no support for UPDATE and DELETE
- no support for INSERT single rows
Impala
- open-source project created by Cloudera to facilitates interactive real-time queries of data in HDFS
- Impala does not use MapReduce, but uses a language very similar to HiveQL and produces results 5-40x faster.
Pig:
- http://pig.apache.org/ was originally developed at Yahoo, and now is a top level Apache Software Foundation project
- Pig translates a high level PigLatin script run on client machine into MapReduce jobs and coordinates their execution
Pig features:
- joining datasets
- grouping data
- referring to elements by position instead of names
- loading non delimited data using custom SerDe
Pig Latin Features:
urls= LOAD ‘dataset’ AS (url, category, pagerank);
groups = GROUP urls BY category;
bigGroups = FILTER groups BY COUNT(urls)>1000000;
result = FOREACH bigGroups GENERATE group, top10(urls);
STORE result INTO ‘myOutput’;
- Step-by-step dataflow language where computation steps are chained together by use of variables
- Use of high-level transformations (e.g., GROUP, FILTER)
- Ability to specify schemas as port of issuing a program
- Use of user-defined functions (e.g., top10 above)
Modes of user interaction:
Interactive mode through an interactive shell Grunt
$ pig [–help] [–h] [-version] [–i] [–execute] [–e]
grunt> emps = LOAD ‘people’ AS (id, name, salary);
grunt> rich = FILTER emps BY salary > 100000;
grunt> srtd = ORDER rich BY salary DESC;
grunt> srtd = ORDER rich BY salary DESC;
grunt> STORE srtd INTO ‘rich_people’;
to display on screen for debugging (not use in cluster)
grunt> DUMP srtd;
grunt> DESCRIBE bag_name;
grunt> data1 = LOAD ‘data1’ AS (col1, col2, col3, col4);
grunt> data2 = LOAD ‘data2’ AS (colA, colB, colC);
grunt> jnd = JOIN data1 BY col3, data2 by cola;
grunt> STORE jnd INTO ‘outfile’;
grunt> grpd = GROUP bag1 BY element;
grunt> justnames = FOREACH emps GENERATE name;
grunt> summedUP = FOREACH grpd elementCount;
grunt> data2 = LOAD ‘data2’ AS (colA, colB, colC);
grunt> jnd = JOIN data1 BY col3, data2 by cola;
grunt> STORE jnd INTO ‘outfile’;
grunt> grpd = GROUP bag1 BY element;
grunt> justnames = FOREACH emps GENERATE name;
grunt> summedUP = FOREACH grpd elementCount;
grunt> QUIT;
Choosing Pig and Hive:
- Those with SQL background choose Hive, those without choose Pig.
- Pig deals better with less-structured data.
- Hive is used to query that structured data, so some organizations choose to use both
- Hive is used to query that structured data, so some organizations choose to use both
Integrate Hadoop into the EnterpriseWorkflow using Sqoop and Flume
- Sqoop can input
- one table
- all tables in a database
- or just portions of a table(by WHERE clause) using a JDBC interface,
- or use free custom connector(using a systems native protocols to access data for faster performance.
- one table
- all tables in a database
- or just portions of a table(by WHERE clause) using a JDBC interface,
- or use free custom connector(using a systems native protocols to access data for faster performance.
Sqoop syntax and examples:
$ sqoop [import|import-all-tables|list-tables] [--connect|--username|--password]
$ sqoop export [options] # send HDFS data to a RDBMS
$ sqoop help [command]
$ sqoop import –-username fred –-password derf –-connect
jdbc:mysql://database.example.com/personnel –-table employee –-where “id > 1000”
$ sqoop list-databases --connect jdbc:mysql://localhost -username training --password training
$ sqoop list-tables --connect
jdbc:mysql://localhost/movielens --username training -password training
$ sqoop import --connect jdbc:mysql://localhost/movielens -table movie
--fields-terminated-by ‘\t’
--username training
--password training
$ hadoop fs –ls
movie
$ hadoop fs –tail
movie/part-m-00000