Great strength of Hadoop platform is its ability to work with data in several different forms.
HDFS can reliably store logs and other data from plethora of sources.
Sqoop:
- Tool that allows users to extract data from relational database into Hadoop for further processing.
- This proocessing can be done with MapReduce programs or other higher level tools such as Hive.
- When final results of analytic pipeline are available, Sqoop can export these results back to the database for consumption by other clients.
Getting Sqoop:
- Sqoop is available in few places.
- Primary home of project is: http://incubator
.apache.org/sqoop/
- This repository contains all the sqoop source code and documentation.
- Repository contains instructions for compiling project.
- If you download a release from Apache, it will be placed in a directory such as /home/
yourname/sqoop-x.y.z/. We’ll call this directory $SQOOP_HOME. You can run Sqoop by
running the executable script $SQOOP_HOME/bin/sqoop.
- If you’ve installed a release from Cloudera, the package will have placed Sqoop’s scripts
in standard locations like /usr/bin/sqoop. You can run Sqoop by simply typing sqoop at
the command line.
Running Sqoop with no arguments does not do much of interest:
% sqoop
Try sqoop help for usage.
- Sqoop is organized as set of tools or commands.
- Without selecting a tool, Sqoop does not know what to do.
- % sqoop help
usage: sqoop COMMAND [ARGS]
Available commands:
codegen Generate code to interact with database records
create-hive-table Import a table definition into Hive
eval Evaluate a SQL statement and display the results
export Export an HDFS directory to a database table
help List available commands
import Import a table from a database to HDFS
import-all-tables Import tables from a database to HDFS
job Work with saved jobs
list-databases List available databases on a server
list-tables List available tables in a database
merge Merge results of incremental imports
metastore Run a standalone Sqoop metastore
version Display version information
As it explains, the help tool can also provide specific usage instructions on a particular
tool, by providing that tool’s name as an argument:
% sqoop help import
usage: sqoop import [GENERIC-ARGS] [TOOL-ARGS]
Common arguments:
--connect <jdbc-uri> Specify JDBC connect string
--driver <class-name> Manually specify JDBC driver class to use
--hadoop-home <dir> Override $HADOOP_HOME
--help Print usage instructions
-P Read password from console
--password <password> Set authentication password
--username <username> Set authentication username
--verbose Print more information while working
...
An alternate way of running a Sqoop tool is to use a tool-specific script. This script will
be named sqoop-toolname. For example, sqoop-help, sqoop-import, etc. These commands
are identical to running sqoop help or sqoop import.
Sample Import:
- After you install Sqoop, you can use it to import data to Hadoop.
Sqoop imports from databases. The list of databases that it has been tested with includes
MySQL, PostgreSQL, Oracle, SQL Server and DB2. For the examples in this chapter
we’ll use MySQL, which is easy-to-use and available for a large number of platforms.
To install and configure MySQL, follow the documentation at http://dev.mysql.com/
doc/refman/5.1/en/. Chapter 2 (“Installing and Upgrading MySQL”) in particular
should help. Users of Debian-based Linux systems (e.g., Ubuntu) can type sudo aptget
install mysql-client mysql-server.
RedHat users can type sudo yum install
mysql mysql-server.
Example: Creating a new MySQL database schema
% mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 349
Server version: 5.1.37-1ubuntu5.4 (Ubuntu)
Type 'help;' or '\h' for help. Type '\c' to clear the current input
statement.
mysql> CREATE DATABASE hadoopguide;
Query OK, 1 row affected (0.02 sec)
mysql> GRANT ALL PRIVILEGES ON hadoopguide.* TO '%'@'localhost';
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT ALL PRIVILEGES ON hadoopguide.* TO ''@'localhost';
Query OK, 0 rows affected (0.00 sec)
mysql> quit;
Bye
The password prompt above asks for your root user password. This is likely the same
as the password for the root shell login. If you are running Ubuntu or another variant
of Linux where root cannot directly log in, then enter the password you picked at
MySQL installation time.
In this session, we created a new database schema called hadoopguide, which we’ll use
throughout this appendix. We then allowed any local user to view and modify the
contents of the hadoopguide schema, and closed our session.
Now let’s log back into the database (not as root, but as yourself this time), and create
a table to import into HDFS
Example: Populating the database
% mysql hadoopguide
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 352
Server version: 5.1.37-1ubuntu5.4 (Ubuntu)
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> CREATE TABLE widgets(id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
-> widget_name VARCHAR(64) NOT NULL,
-> price DECIMAL(10,2),
-> design_date DATE,
-> version INT,
-> design_comment VARCHAR(100));
Query OK, 0 rows affected (0.00 sec)
mysql> INSERT INTO widgets VALUES (NULL, 'sprocket', 0.25, '2010-02-10',
-> 1, 'Connects two gizmos');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO widgets VALUES (NULL, 'gizmo', 4.00, '2009-11-30', 4,
-> NULL);
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO widgets VALUES (NULL, 'gadget', 99.99, '1983-08-13',
-> 13, 'Our flagship product');
Query OK, 1 row affected (0.00 sec)
mysql> quit;
In the above listing, we created a new table called widgets. We’ll be using this fictional
product database in further examples in this chapter. The widgets table contains several
fields representing a variety of data types.
Now let’s use Sqoop to import this table into HDFS:
% sqoop import --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets -m 1
10/06/23 14:44:18 INFO tool.CodeGenTool: Beginning code generation
...
10/06/23 14:44:20 INFO mapred.JobClient: Running job: job_201006231439_0002
10/06/23 14:44:21 INFO mapred.JobClient: map 0% reduce 0%
10/06/23 14:44:32 INFO mapred.JobClient: map 100% reduce 0%
10/06/23 14:44:34 INFO mapred.JobClient: Job complete:
job_201006231439_0002
...
10/06/23 14:44:34 INFO mapreduce.ImportJobBase: Retrieved 3 records.
Sqoop’s import tool will run a MapReduce job that connects to the MySQL database
and reads the table. By default, this will use four map tasks in parallel to speed up the
import process. Each task will write its imported results to a different file, but all in a
common directory. Since we knew that we had only three rows to import in this example,
we specified that Sqoop should use a single map task (-m 1) so we get a single file in HDFS.
We can inspect this file’s contents like so:
% hadoop fs -cat widgets/part-m-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gizmo,4.00,2009-11-30,4,null
3,gadget,99.99,1983-08-13,13,Our flagship product
Generated code:
In addition to writing the contents of the database table to HDFS, Sqoop has also
provided you with a generated Java source file (widgets.java) written to the current local
directory. (After running the sqoop import command above, you can see this file by
running ls widgets.java.)
The generated class (widgets) is capable of holding a single record retrieved from the
imported table. It can manipulate such a record in MapReduce or store it in a SequenceFile
in HDFS. (SequenceFiles written by Sqoop during the import process will store
each imported row in the “value” element of the SequenceFile’s key-value pair format, using the generated class.)
It is likely that you don’t want to name your generated class widgets since each instance
of the class refers to only a single record. We can use a different Sqoop tool to generate
source code without performing an import; this generated code will still examine the
database table to determine the appropriate data types for each field:
% sqoop codegen --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets --class-name Widget
The codegen tool simply generates code; it does not perform the full import. We specified
that we’d like it to generate a class named Widget; this will be written to Widget.java. We also could have specified --class-name and other code-generation ar-guments during the import process we performed earlier. This tool can be used to regenerate code, if you accidentally remove the source file, or generate code with different settings than were used during the import.
Additional Serialization Systems
Database Imports: Deeper look
As mentioned earlier, Sqoop imports a table from a database by running a MapReduce
job that extracts rows from the table, and writes the records to HDFS. How does MapReduce
read the rows? This section explains how Sqoop works under the hood.
At a high level, Figure below demonstrates how Sqoop interacts with both the database
source and Hadoop. Like Hadoop itself, Sqoop is written in Java. Java provides an API
called Java Database Connectivity, or JDBC, that allows applications to access data
stored in an RDBMS as well as inspect the nature of this data. Most database vendors
provide a JDBC driver that implements the JDBC API and contains the necessary code
to connect to their database server.
Before the import can start, Sqoop uses JDBC to examine the table it is to import. It
retrieves a list of all the columns and their SQL data types. These SQL types (VARCHAR,
INTEGER, and so on) can then be mapped to Java data types (String, Integer, etc.), which
will hold the field values in MapReduce applications. Sqoop’s code generator will use
this information to create a table-specific class to hold a record extracted from the table.
The Widget class from earlier, for example, contains the following methods that retrieve
each column from an extracted record:
public Integer get_id();
public String get_widget_name();
public java.math.BigDecimal get_price();
public java.sql.Date get_design_date();
public Integer get_version();
public String get_design_comment();
More critical to the import system’s operation, though, are the serialization methods
that form the DBWritable interface, which allow the Widget class to interact with JDBC:
public void readFields(ResultSet __dbResults) throws SQLException;
public void write(PreparedStatement __dbStmt) throws SQLException;
JDBC’s ResultSet interface provides a cursor that retrieves records from a query; the
readFields() method here will populate the fields of the Widget object with the columns
from one row of the ResultSet’s data. The write() method shown above allows Sqoop
to insert new Widget rows into a table, a process called exporting.
The MapReduce job launched by Sqoop uses an InputFormat that can read sections of
a table from a database via JDBC. The DataDrivenDBInputFormat provided with Hadoop
partitions a query’s results over several map tasks.
Reading a table is typically done with a simple query such as:
SELECT col1,col2,col3,... FROM tableName
But often, better import performance can be gained by dividing this query across multiple
nodes. This is done using a splitting column. Using metadata about the table, Sqoop will guess a good column to use for splitting the table (typically the primary key for the table, if one exists). The minimum and maximum values for the primary key column are retrieved, and then these are used in conjunction with a target number of tasks to determine the queries that each map task should issue.
Controlling the import:
Sqoop does not need to import an entire table at a time. For example, a subset of the
table’s columns can be specified for import. Users can also specify a WHERE clause to
include in queries, which bound the rows of the table to import. For example, if widgets
0 through 99,999 were imported last month, but this month our vendor catalog
included 1,000 new types of widget, an import could be configured with the clause
WHERE id >= 100000; this will start an import job retrieving all the new rows added to
the source database since the previous import run. User-supplied WHERE clauses are
applied before task splitting is performed, and are pushed down into the queries executed
by each task.
Imports and Consistency:
When importing data to HDFS, it is important that you ensure access to a consistent
snapshot of the source data. Map tasks reading from a database in parallel are running
in separate processes. Thus, they cannot share a single database transaction. The best
way to do this is to ensure that any processes that update existing rows of a table are
disabled during the import.
Direct-mode imports
Sqoop’s architecture allows it to choose from multiple available strategies for performing
an import. Most databases will use the DataDrivenDBInputFormat-based approach described above. Some databases offer specific tools designed to extract data quickly. For example, MySQL’s mysqldump application can read from a table with greater throughput than a JDBC channel. The use of these external tools is referred to as direct mode in Sqoop’s documentation. Direct mode must be specifically enabled by the user (via the --direct argument), as it is not as general-purpose as the JDBC approach. (For
example, MySQL’s direct mode cannot handle large objects—CLOB or BLOB columns,
as Sqoop needs to use a JDBC-specific API to load these columns into HDFS.)
For databases that provide such tools, Sqoop can use these to great effect. A directmode
import from MySQL is usually much more efficient (in terms of map tasks and time required) than a comparable JDBC-based import. Sqoop will still launch multiple map tasks in parallel. These tasks will then spawn instances of the mysqldump program and read its output. The effect is similar to a distributed implementation of mkparallel-dump from the Maatkit tool set. Sqoop can also perform direct-mode imports from PostgreSQL.
Working with imported data:
Once data has been imported to HDFS, it is now ready for processing by custom MapReduce
programs.
Text-based imports can be easily used in scripts run with Hadoop Streaming or in MapReduce jobs run with the default TextInputFormat.
To use individual fields of an imported record, though, the field delimiters (and any
escape/enclosing characters) must be parsed and the field values extracted and converted
to the appropriate data types. For example, the id of the “sprocket” widget is
represented as the string "1" in the text file, but should be parsed into an Integer or
int variable in Java. The generated table class provided by Sqoop can automate this
process, allowing you to focus on the actual MapReduce job to run. Each autogenerated
class has several overloaded methods named parse() that operate on the data represented as Text, CharSequence, char[], or other common types.
The MapReduce application called MaxWidgetId (available in the example code) will
find the widget with the highest ID.
The class can be compiled into a JAR file along with Widget.java. Both Hadoop (hadoop-core-version.jar)
and Sqoop (sqoop-version.jar)
will need to be on the classpath for compilation. The class files can then be combined into a JAR file and executed like so:
% jar cvvf widgets.jar *.class
% HADOOP_CLASSPATH=/usr/lib/sqoop/sqoop-version.jar hadoop jar \
> widgets.jar MaxWidgetId -libjars /usr/lib/sqoop/sqoop-version.jar
This command line ensures that Sqoop is on the classpath locally (via $HADOOP_CLASS
PATH), when running the MaxWidgetId.run() method, as well as when map tasks are
running on the cluster (via the -libjars argument).
When run, the maxwidgets path in HDFS will contain a file named part-r-00000 with
the following expected result:
3,gadget,99.99,1983-08-13,13,Our flagship product
It is worth noting that in this example MapReduce program, a Widget object was
emitted from the mapper to the reducer; the auto-generated Widget class implements
the Writable interface provided by Hadoop, which allows the object to be sent via
Hadoop’s serialization mechanism, as well as written to and read from SequenceFiles.
The MaxWidgetId example is built on the new MapReduce API. MapReduce applications
that rely on Sqoop-generated code can be built on the new or old APIs, though some
advanced features (such as working with large objects) are more convenient to use in
the new API.
Imported data and Hive:
For many types of analysis, using a system like Hive to handle
relational operations can dramatically ease the development of the analytic pipeline.
Especially for data originally from a relational data source, using Hive makes a lot of
sense. Hive and Sqoop together form a powerful toolchain for performing analysis.
Suppose we had another log of data in our system, coming from a web-based widget
purchasing system. This may return log files containing a widget id, a quantity, a shipping
address, and an order date.
Here is a snippet from an example log of this type:
1,15,120 Any St.,Los Angeles,CA,90210,2010-08-01
3,4,120 Any St.,Los Angeles,CA,90210,2010-08-01
2,5,400 Some Pl.,Cupertino,CA,95014,2010-07-30
2,7,88 Mile Rd.,Manhattan,NY,10005,2010-07-18
By using Hadoop to analyze this purchase log, we can gain insight into our sales operation.
By combining this data with the data extracted from our relational data source (the widgets table), we can do better. In this example session, we will compute which zip code is responsible for the most sales dollars, so we can better focus our sales team’s operations.
Doing this requires data from both the sales log and the widgets table.
The above table should be in a local file named sales.log for this to work.
First, let’s load the sales data into Hive:
hive> CREATE TABLE sales(widget_id INT, qty INT,
> street STRING, city STRING, state STRING,
> zip INT, sale_date STRING)
> ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
OK
Time taken: 5.248 seconds
hive> LOAD DATA LOCAL INPATH "sales.log" INTO TABLE sales;
Copying data from file:/home/sales.log
Loading data to table sales
OK
Time taken: 0.188 seconds
Sqoop can generate a Hive table based on a table from an existing relational data source.
Since we’ve already imported the widgets data to HDFS, we can generate the Hive table
definition and then load in the HDFS-resident data:
% sqoop create-hive-table --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets --fields-terminated-by ','
...
10/06/23 18:05:34 INFO hive.HiveImport: OK
10/06/23 18:05:34 INFO hive.HiveImport: Time taken: 3.22 seconds
10/06/23 18:05:35 INFO hive.HiveImport: Hive import complete.
% hive
hive> LOAD DATA INPATH "widgets" INTO TABLE widgets;
Loading data to table widgets
OK
Time taken: 3.265 seconds
When creating a Hive table definition with a specific already-imported dataset in mind,
we need to specify the delimiters used in that dataset. Otherwise, Sqoop will allow Hive
to use its default delimiters (which are different from Sqoop’s default delimiters).
This three-step process of importing data to HDFS, creating the Hive table, and then
loading the HDFS-resident data into Hive can be shortened to one step if you know
that you want to import straight from a database directly into Hive. During an import,
Sqoop can generate the Hive table definition and then load in the data. Had we not
already performed the import, we could have executed this command, which re-creates
the widgets table in Hive, based on the copy in MySQL:
% sqoop import --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets -m 1 --hive-import
Regardless of which data import route we chose, we can now use the widgets data set
and the sales data set together to calculate the most profitable zip code. Let’s do so,
and also save the result of this query in another table for later:
hive> CREATE TABLE zip_profits (sales_vol DOUBLE, zip INT);
OK
hive> INSERT OVERWRITE TABLE zip_profits
> SELECT SUM(w.price * s.qty) AS sales_vol, s.zip FROM SALES s
> JOIN widgets w ON (s.widget_id = w.id) GROUP BY s.zip;
...
3 Rows loaded to zip_profits
OK
hive> SELECT * FROM zip_profits ORDER BY sales_vol DESC;
...
OK
403.71 90210
28.0 10005
20.0 95014
Importing Large Objects:
Most databases provide the capability to store large amounts of data in a single field.
Depending on whether this data is textual or binary in nature, it is usually represented
as a CLOB or BLOB column in the table. These “large objects” are often handled specially
by the database itself. In particular, most tables are physically laid out on disk.
When scanning through rows to determine which rows match the criteria
When scanning through rows to determine which rows match the criteria
for a particular query, this typically involves reading all columns of each row from disk.
If large objects were stored “inline” in this fashion, they would adversely affect the
performance of such scans. Therefore, large objects are often stored externally from
their rows. Accessing a large object often requires “opening” it
through the reference contained in the row.
The difficulty of working with large objects in a database suggests that a system such
as Hadoop, which is much better suited to storing and processing large, complex data
objects, is an ideal repository for such information. Sqoop can extract large objects
from tables and store them in HDFS for further processing.
As in a database, MapReduce typically materializes every record before passing it along
to the mapper. If individual records are truly large, this can be very inefficient.
As shown earlier, records imported by Sqoop are laid out on disk in a fashion very
similar to a database’s internal structure: an array of records with all fields of a record
concatenated together. When running a MapReduce program over imported records,
each map task must fully materialize all fields of each record in its input split. If the
contents of a large object field are only relevant for a small subset of the total number
of records used as input to a MapReduce program, it would be inefficient to fully ma-
terialize all these records. Furthermore, depending on the size of the large object, full
materialization in memory may be impossible.
To overcome these difficulties, Sqoop will store imported large objects in a separate
file called a LobFile. The LobFile format can store individual records of very large size
(a 64-bit address space is used). Each record in a LobFile holds a single large object.
The LobFile format allows clients to hold a reference to a record without accessing the
record contents. When records are accessed, this is done through a java.io.Input
Stream (for binary objects) or java.io.Reader (for character-based objects).
When a record is imported, the “normal” fields will be materialized together in a text
file, along with a reference to the LobFile where a CLOB or BLOB column is stored.
For example, suppose our widgets table contained a BLOB field named schematic
holding the actual schematic diagram for each widget.
An imported record might then look like:
2,gizmo,4.00,2009-11-30,4,null,externalLob(lf,lobfile0,100,5011714)
The externalLob(...) text is a reference to an externally stored large object, stored in
LobFile format (lf) in a file named lobfile0, with the specified byte offset and length
inside that file.
When working with this record, the Widget.get_schematic() method would return an
object of type BlobRef referencing the schematic column, but not actually containing
its contents. The BlobRef.getDataStream() method actually opens the LobFile and returns
an InputStream
allowing you to access the schematic
field’s contents.
When running a MapReduce job processing many Widget records, you might need to
access the schematic field of only a handful of records. This system allows you to incur
the I/O costs of accessing only the required large object entries, as individual schematics
may be several megabytes or more of data.
The BlobRef and ClobRef classes cache references to underlying LobFiles within a map
task. If you do access the schematic field of several sequentially ordered records, they
will take advantage of the existing file pointer’s alignment on the next record body.
Performing an export:
In Sqoop, an import refers to the movement of data from a database system into HDFS.
By contrast, an export uses HDFS as the source of data and a remote database as the
destination. In the previous sections, we imported some data and then performed some
analysis using Hive. We can export the results of this analysis to a database for consumption
by other tools.
Before exporting a table from HDFS to a database, we must prepare the database to
receive the data by creating the target table. While Sqoop can infer which Java types
are appropriate to hold SQL data types, this translation does not work in both directions
(for example, there are several possible SQL column definitions that can hold data in
a Java String; this could be CHAR(64), VARCHAR(200), or something else entirely). Consequently,
you must determine which types are most appropriate.
We are going to export the zip_profits table from Hive. We need to create a table in
MySQL that has target columns in the same order, with the appropriate SQL types:
% mysql hadoopguide
mysql> CREATE TABLE sales_by_zip (volume DECIMAL(8,2), zip INTEGER);
Query OK, 0 rows affected (0.01 sec)
Then we run the export command:
% sqoop export --connect jdbc:mysql://localhost/hadoopguide -m 1 \
> --table sales_by_zip --export-dir /user/hive/warehouse/zip_profits \
> --input-fields-terminated-by '\0001'
...
10/07/02 16:16:50 INFO mapreduce.ExportJobBase: Transferred 41 bytes in 10.8947
seconds (3.7633 bytes/sec)
10/07/02 16:16:50 INFO mapreduce.ExportJobBase: Exported 3 records.
Finally, we can verify that the export worked by checking MySQL:
% mysql hadoopguide -e 'SELECT * FROM sales_by_zip'
+--------+-------+
| volume | zip |
+--------+-------+
| 28.00 | 10005 |
| 403.71 | 90210 |
| 20.00 | 95014 |
+--------+-------+
When we created the zip_profits table in Hive, we did not specify any delimiters. So
Hive used its default delimiters: a Ctrl-A character (Unicode 0x0001) between fields,
and a newline at the end of each record. When we used Hive to access the contents of
this table (in a SELECT statement), Hive converted this to a tab-delimited representation
for display on the console. But when reading the tables directly from files, we need to
tell Sqoop which delimiters to use. Sqoop assumes records are newline-delimited by
default, but needs to be told about the Ctrl-A field delimiters. The --input-fieldsterminated-by
argument to sqoop export specified this information. Sqoop supports
several escape sequences (which start with a '\' character) when specifying delimiters.
In the example syntax above, the escape sequence is enclosed in 'single quotes' to
ensure that the shell processes it literally. Without the quotes, the leading backslash
itself may need to be escaped (for example, --input-fields-terminated-by \\0001).
The escape sequences supported by Sqoop are listed in Table 15-1.
Exports: Deeper look
The architecture of Sqoop’s export capability is very similar in nature to how Sqoop
performs imports. Before performing the export, Sqoop picks a strategy
based on the database connect string. For most systems, Sqoop uses JDBC. Sqoop then generates a Java class based on the target table definition. This generated class has the ability to parse records from text files and insert values of the appropriate types into a table (in addition to the ability to read the columns from a ResultSet).
A MapReduce job is then launched that reads the source data files from HDFS, parses the records
using the generated class, and executes the chosen export strategy.
The JDBC-based export strategy builds up batch INSERT statements that will each add
multiple records to the target table. Inserting many records per statement performs
much better than executing many single-row INSERT statements on most database systems.
Separate threads are used to read from HDFS and communicate with the database, to ensure that I/O operations involving different systems are overlapped as much as possible. For MySQL, Sqoop can employ a direct-mode strategy using mysqlimport. Each map task spawns a mysqlimport process that it communicates with via a named FIFO on the local filesystem. Data is then streamed into mysqlimport via the FIFO channel, and from there into the database.
While most MapReduce jobs reading from HDFS pick the degree of parallelism (number
of map tasks) based on the number and size of the files to process, Sqoop’s export system allows users explicit control over the number of tasks. The performance of the export can be affected by the number of parallel writers to the database, so Sqoop uses the CombineFileInputFormat class to group up the input files into a smaller number of map tasks.
Exports and Transactionality:
Due to the parallel nature of the process, an export is often not an atomic operation.
Sqoop will spawn multiple tasks to export slices of the data in parallel. These tasks can
complete at different times, meaning that even though transactions are used inside
tasks, results from one task may be visible before the results of another task. Moreover,
databases often use fixed-size buffers to store transactions. As a result, one transaction
cannot necessarily contain the entire set of operations performed by a task. Sqoop
commits results every few thousand rows, to ensure that it does not run out of memory.
These intermediate results are visible while the export continues. Applications that will
use the results of an export should not be started until the export process is complete,
or they may see partial results.
To solve this problem, Sqoop can export to a temporary staging table, then at the end
of the job—if the export has succeeded—move the staged data into the destination
table in a single transaction. You can specify a staging table with the --staging-table
option. The staging table must already exist and have the same schema as the destination.
It must also be empty, unless the --clear-staging-table
option is also supplied.
Exports and SequenceFiles:
The example export read source data from a Hive table, which is stored in HDFS as a
delimited text file. Sqoop can also export delimited text files that were not Hive tables.
For example, it can export text files that are the output of a MapReduce job.
Sqoop can also export records stored in SequenceFiles to an output table, although
some restrictions apply. A SequenceFile can contain arbitrary record types. Sqoop’s
export tool will read objects from SequenceFiles and send them directly to the Output
Collector, which passes the objects to the database export OutputFormat. To work with
Sqoop, the record must be stored in the “value” portion of the SequenceFile’s key-value
pair format and must subclass the org.apache.sqoop.lib.SqoopRecord abstract class (as
is done by all classes generated by Sqoop).
If you use the codegen tool (sqoop-codegen) to generate a SqoopRecord implementation
for a record based on your export target table, you can then write a MapReduce program,
which populates instances of this class and writes them to SequenceFiles. sqoopexport can then export these SequenceFiles to the table. Another means by which data may be in SqoopRecord instances in SequenceFiles is if data is imported from a database table to HDFS, modified in some fashion, and the results stored in SequenceFiles holding records of the same data type.
In this case, Sqoop should reuse the existing class definition to read data from SequenceFiles,
rather than generate a new (temporary) record container class to perform the export, as is done when converting text-based records to database rows. You can suppress code generation and instead use an existing record class and jar by providing the --class-name and --jar-file arguments to Sqoop. Sqoop will use the specified class,
loaded from the specified jar, when exporting records.
In the following example, we will re-import the widgets table as SequenceFiles, and
then export it back to the database in a different table:
% sqoop import --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets -m 1 --class-name WidgetHolder --as-sequencefile \
> --target-dir widget_sequence_files --bindir .
...
10/07/05 17:09:13 INFO mapreduce.ImportJobBase: Retrieved 3 records.
% mysql hadoopguide
mysql> CREATE TABLE widgets2(id INT, widget_name VARCHAR(100),
-> price DOUBLE, designed DATE, version INT, notes VARCHAR(200));
Query OK, 0 rows affected (0.03 sec)
mysql> exit;
% sqoop export --connect jdbc:mysql://localhost/hadoopguide \
> --table widgets2 -m 1 --class-name WidgetHolder \
> --jar-file widgets.jar --export-dir widget_sequence_files
...
10/07/05 17:26:44 INFO mapreduce.ExportJobBase: Exported 3 records.
During the import, we specified the SequenceFile format, and that we wanted the jar
file to be placed in the current directory (with --bindir), so we can reuse it. Otherwise,
it would be placed in a temporary directory. We then created a destination table for
the export, which had a slightly different schema, albeit one that is compatible with
the original data. We then ran an export that used the existing generated code to read
the records from the SequenceFile and write them to the database.
No comments:
Post a Comment