7: MapReduce types and Formats:
- MapReduce has simple model of data processing: inputs and outputs for the map and reduce function are key-value pairs.
MapReduce Types:
- Map and reduce functions in Hadoop MapReduce have following general form:
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
- In general, map input key and value types(K1 and V1) are different from the map output types(K2 and V2). However, reduce input must have same types as map output, although reduce output types may be different again(K3 and V3).
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
// ...
}
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
// ...
}
}
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public class Context extends ReducerContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
// ...
}
protected void reduce(KEYIN key, Iterable<VALUEIN> values,
Context context) throws IOException, InterruptedException {
// ...
}
}
- Context objects are used for emitting key-value pairs, so they are parameterized by output types, so that signature of the write() method is:
public void write(KEYOUT key, VALUEOUT value)
throws IOException, InterruptedException
- Since Mapper and Reducer are separate classes the type parameters have different scopes and the actual type argument of KEYIN(say) in the Mapper may be different to the type of type parameter of same name(KEYIN) in the Reducer.
- In maximum temperature example, KEYIN is replaced by LongWritable for the Mapper, and by Text for the Reducer.
- Even though map output types and reduce input types must match, this is not enforced by the Java compiler.
- If combine function is used, then it is same form as the reduce function, except its output types are intermediate key and value types(K2 and V2) so they can feed the reduce function:
map: (K1, V1) → list(K2, V2)
combine: (K2, list(V2)) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
- Often the combine and reduce functions are the same, in which case, K3 is same as K2 and V3 is same as V2.
- Partition function operates on intermediate key and value types(K2 and V2), and returns the partition index. In practice, partition is determined solely by the key(value is ignored):
partition: (K2, V2) → integer
- MapReduce has simple model of data processing: inputs and outputs for the map and reduce function are key-value pairs.
MapReduce Types:
- Map and reduce functions in Hadoop MapReduce have following general form:
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
- In general, map input key and value types(K1 and V1) are different from the map output types(K2 and V2). However, reduce input must have same types as map output, although reduce output types may be different again(K3 and V3).
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
// ...
}
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
// ...
}
}
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public class Context extends ReducerContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
// ...
}
protected void reduce(KEYIN key, Iterable<VALUEIN> values,
Context context) throws IOException, InterruptedException {
// ...
}
}
- Context objects are used for emitting key-value pairs, so they are parameterized by output types, so that signature of the write() method is:
public void write(KEYOUT key, VALUEOUT value)
throws IOException, InterruptedException
- Since Mapper and Reducer are separate classes the type parameters have different scopes and the actual type argument of KEYIN(say) in the Mapper may be different to the type of type parameter of same name(KEYIN) in the Reducer.
- In maximum temperature example, KEYIN is replaced by LongWritable for the Mapper, and by Text for the Reducer.
- Even though map output types and reduce input types must match, this is not enforced by the Java compiler.
- If combine function is used, then it is same form as the reduce function, except its output types are intermediate key and value types(K2 and V2) so they can feed the reduce function:
map: (K1, V1) → list(K2, V2)
combine: (K2, list(V2)) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
- Often the combine and reduce functions are the same, in which case, K3 is same as K2 and V3 is same as V2.
- Partition function operates on intermediate key and value types(K2 and V2), and returns the partition index. In practice, partition is determined solely by the key(value is ignored):
partition: (K2, V2) → integer
Default MapReduce job:
- What happens when you run MapReduce without setting mapper or reducer?
public class MinimalMapReduce extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MinimalMapReduce(), args);
System.exit(exitCode);
}
}
- Only configuration we set is an input path and an output path. We run it over subset of weather data with following:
% hadoop MinimalMapReduce "input/ncdc/all/190{1,2}.gz" output
- We do get some output: one file named part-r-00000 in the output directory.
Heres what first few lines look like:
0→0029029070999991901010106004+64333+023450FM-12+000599999V0202701N01591...
0→0035029070999991902010106004+64333+023450FM-12+000599999V0201401N01181...
135→0029029070999991901010113004+64333+023450FM-12+000599999V0202901N00821...
141→0035029070999991902010113004+64333+023450FM-12+000599999V0201401N01181...
270→0029029070999991901010120004+64333+023450FM-12+000599999V0209991C00001...
282→0035029070999991902010120004+64333+023450FM-12+000599999V0201401N01391...
- Each line is an integer followed by tab character, followed by original weather data record.
public class MinimalMapReduceWithDefaults extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(Mapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args);
System.exit(exitCode);
}
- We have simplified first few lines of the run() method by extracting logic for printing usage and setting input and output paths into helper method.
- Almost all MapReduce drivers take these two arguments(input and output), so reducing the boilerplate code here is good thing.
- Below are relevant methods in JobBuilder class for reference:
public static Job parseInputAndOutput(Tool tool, Configuration conf,
String[] args) throws IOException {
if (args.length != 2) {
printUsage(tool, "<input> <output>");
return null;
}
Job job = new Job(conf);
job.setJarByClass(tool.getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job;
}
public static void printUsage(Tool tool, String extraArgsUsage) {
System.err.printf("Usage: %s [genericOptions] %s\n\n",
tool.getClass().getSimpleName(), extraArgsUsage);
GenericOptionsParser.printGenericCommandUsage(System.err);
}
Default mapper is just the Mapper class, which writes input key and value unchanged to output:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
- Mapper is generic type which allows it to work with any key or value types.
- Map input and output key is of type LongWritable and map input and output value is of type Text.
Choosing Number of Reducers:
- Almost all real world jobs should set this to larger number, or, job will be very slow since all intermediate data flows through single reduce task.
- Optimal number of reducers is related to total no of available reducer slots in your cluster.
- Total number of slots is found by multiplying the number of nodes in cluster and number of slots per node.
- One common setting is to have slightly fewer reducers than total slots, which gives one wave of reduce tasks. If your reduce tasks are very big, then it makes sense to have larger number of reducers so tasks are more fine grained, and failure does not affect job execution time significantly.
Default reducer is Reducer, again a generic type, which simply writes all its input to its output:
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
Context context) throws IOException, InterruptedException {
for (VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
}
- All keys for MapReduce program are LongWritable, and all the values are Text, since these are input keys and values, and the map and reduce functions are both identity functions by definition preserve type.
- Records are sorted by MapReduce system before being presented to the reducer.
- In this case, keys are sorted numerically, which has effect of interleaving lines from input files into one combined output file.
- Seperators may be configured independently for maps and reduces.
Input Formats:
- Hadoop can process many different types of data formats, from flat text files to databases.
Input Splits and Records:
- Input split is chunk of input that is processed by single map.
- Each map processes single split.
- Each split is divided into records and map processes each record - key-value pair - in turn.
- In DB context, split might correspond to range or rows from table and record to row in that range.
- InputSplit has length in bytes and set of storage locations, which are just hostname strings.
- Notice that split does not contain input data; it is just reference to the data.
- Storage locations are used by MapReduce system to place map tasks as close to splits data as possible and size is used to order splits so largest get processed first, in attempt to minimize job runtime.
- As MapReduce application writer, you dont need to deal with InputSplits directly, as they are created by an InputFormat.
- InputFormat is responsible for creating input splits and dividing them into records.
public abstract class InputFormat<K, V> {
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException;
public abstract RecordReader<K, V>
createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException;
}
- RecordReader is little more than iterator over records, and map tasks uses one to generate record key-value pairs, which it passes to map function.
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
- After running setup(), the nextKeyValue() is called repeatedly on the Context, to populate the key and value objects for mapper.
- Key and value are retreived from RecordReader by way of Context, and passed to the map() method for it to do its work.
- When reader gets to end of stream, nextKeyValue() method returns false, and the map tasks runs its cleanup() method, then completes.
FileInputFormat:
- Base class for all implementations of InputFormat that use files as their data source.
- It provides two things:
- Place to define which files are included as input to job and
- implementation for generating splits for input files.
- job of dividing splits into records is performed by subclasses.
FileInputFormat input paths:
- Input to job is specified as collection of paths, which offers great flexibility in constraining input to job.
- FileInputFormat offers four static convenience methods for setting Jobs input paths:
public static void addInputPath(Job job, Path path)
public static void addInputPaths(Job job, String commaSeparatedPaths)
public static void setInputPaths(Job job, Path... inputPaths)
public static void setInputPaths(Job job, String commaSeparatedPaths)
- addInputPath() and addInputPaths() method add path or paths to list of inputs.
- You call these methods repeatedly to build list of paths.
- You call these methods repeatedly to build list of paths.
- Add and set methods allow files to be specified by inclusion only.
- To exclude certain files from the input, you can set filter using the setInputPathFilter() method on FileInputFormat:
public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter)
- To exclude certain files from the input, you can set filter using the setInputPathFilter() method on FileInputFormat:
public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter)
- Setting paths is done with the -input option for both streaming and Pipes interfaces, so setting paths directly is not usually needed.
FileInputFormat input splits:
- Given a set of files, how does FileInputFormat turn them into splits?
- FileInputFormat splits only large files.
- Here large means larger than an HDFS block.
- Split size is normally the size of an HDFS block, which is appropriate for most applications.
The minimum split size is usually 1 byte, although some formats have lower bound on split size
The maximum split size defaults to the maximum value that can be represented by a Java long type.
- The split size is calculated by formula method in FileInputFormat.
max(minimumSize, min(maximumSize, blockSize))
by default:
minimumSize < blockSize < maximumSize
so split size is blockSize.
Small files and CombineFileInputFormat
- Hadoop works better with small number of large files than large number of small files.
- One reason for this is that FileInputFormat generates split in such as way that each split is all or part of a single file.
- If file is very small(smaller than HDFS block) and there are lot of them, then each map task will process very little input and there will be lot of them(one per file), each of which imposes extra bookkeeping overhead.
- Compare 1GB file broken into sixteen 64MB blocks, and 10,000 or so 100KB files.
- 10,000 files use one map each, and the job time can be tens or hundreds of times slower than equivalent one with single input file and 16 map tasks.
- Situation can be alleviated little by CombineFileInputFormat, which was designed to work well with small files.
- Where FileInputFormat creates a split per file, CombineFileInputFormat packs many files into each split so that each mapper has more to process.
- CombineFileInputFormat takes nodes and rack locality into account when deciding which blocks to place in same split.
- Storing large number of small files in HDFS is wasteful of namenodes memory.
- One technique for avoiding many small files is to merge small files into large files by using SequenceFile and values as file contents.
Preventing splitting:
- Some applications dont want files to be split, so that single mapper can process each input file in its entirety.
- Simple way to check if all records in a file are sorted is to go through records in order, checking whether each record is not less than preceding one.
- Implemented as may task, this algorithm will work only if one map processes the whole file.
- There are some ways to ensure that existing file is not split.
- First way is to increase the minimum split size to be larger than the largest file in your system.
- Setting it to its maximum value, Long.MAX_VALUE has this effect.
- Second is to subclass concrete subclass FileInputFormat that you want to use, to override the isSplittable() method to return false.
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class NonSplittableTextInputFormat extends TextInputFormat {
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
}
File information in the mapper:
- A mapper processing a file input split can find the information about split by calling getInputSplit() method on the Mappers Context object.
- When input format derives from FileInputFormat, InputSplit returned by this method can be cast to a FileSplit to access the file information.
Processing whole file as a record:
Example. An InputFormat for reading a whole file as a record
public class WholeFileInputFormat
extends FileInputFormat<NullWritable, BytesWritable> {
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
Example: The RecordReader used by WholeFileInputFormat for reading a whole file as a record
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
// do nothing
}
}
Example: A MapReduce program for packaging a collection of small files as a single SequenceFile
public class SmallFilesToSequenceFileConverter extends Configured
implements Tool {
static class SequenceFileMapper
extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
private Text filenameKey;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
filenameKey = new Text(path.toString());
}
@Override
protected void map(NullWritable key, BytesWritable value, Context context)
throws IOException, InterruptedException {
context.write(filenameKey, value);
}
}
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setMapperClass(SequenceFileMapper.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args);
System.exit(exitCode);
}
}
Here’s a run on a few small files. We’ve chosen to use two reducers, so we get two
output sequence files:
% hadoop jar hadoop-examples.jar SmallFilesToSequenceFileConverter \
-conf conf/hadoop-localhost.xml -D mapred.reduce.tasks=2 input/smallfiles output
Two part files are created, each of which is a sequence file, which we can inspect with
the -text option to the filesystem shell:
% hadoop fs -conf conf/hadoop-localhost.xml -text output/part-r-00000
hdfs://localhost/user/tom/input/smallfiles/a 61 61 61 61 61 61 61 61 61 61
hdfs://localhost/user/tom/input/smallfiles/c 63 63 63 63 63 63 63 63 63 63
hdfs://localhost/user/tom/input/smallfiles/e
% hadoop fs -conf conf/hadoop-localhost.xml -text output/part-r-00001
hdfs://localhost/user/tom/input/smallfiles/b 62 62 62 62 62 62 62 62 62 62
hdfs://localhost/user/tom/input/smallfiles/d 64 64 64 64 64 64 64 64 64 64
hdfs://localhost/user/tom/input/smallfiles/f 66 66 66 66 66 66 66 66 66 66
Text Input:
- Hadoop excels at processing unstructured text.
- Different InputFormats that Hadoop provides to process text.
TextInputFormat:
- Default input format. Each record is line of input.
- Key is byte offset within the file of beginning of the line.
- Value is contents of the line, excluding any line terminators and is packaged as Text object.
- So file containing following text:
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
is divided into one spit of four records.
- The records are interpreted as key-value pairs:
(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
(57, But his face you could not see,)
(89, On account of his Beaver Hat.)
- Keys are not line numbers.
- Splits are processed independently.
- Line numbers are really a sequential notion: you have to keep count of lines as you consume them, so knowing line number within a split would be possible, but not within the file.
- Offset is usually sufficient for applications that need unique identifier for each line. Combined within the files name, it is unique within the filesystem.
KeyValueTextInputFormat:
- You can specify the separator via the mapreduce.input.keyvaluelinerecordreader.key.value.separator property. It is a tab character by default.
line1→On the top of the Crumpetty Tree
line2→The Quangle Wangle sat,
line3→But his face you could not see,
line4→On account of his Beaver Hat.
- Like in TextInputFormat case, the input is in single split comprising four records although this time the keys are the Text sequences before the tab in each line:
(line1, On the top of the Crumpetty Tree)
(line2, The Quangle Wangle sat,)
(line3, But his face you could not see,)
(line4, On account of his Beaver Hat.)
NLineInputFormat:
- If you want your mappers to receive fixed number of lines of input, then NLineInputFormat is the InputFormat to use.
- Keys are the byte offset in file and values are lines themselves.
- N: number of lines of input that each mapper receives.
- With N set to one(default), each mapper receives exactly one line of input.
- The mapreduce.input.lineinputformat.linespermap property(mapred.line.input.format.linespermap in old API) controls the value of N.
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
If, for example, N is two, then each split contains two lines. One mapper will receive
the first two key-value pairs:
(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
And another mapper will receive the second two key-value pairs:
(57, But his face you could not see,)
(89, On account of his Beaver Hat.)
- Keys and values are same as TextInputFormat produces. Difference is in the way splits are constructed.
XML:
- Most XML parsers operate on whole XML documents, so if large XML document is made up of multiple input splits, then it is challenge to parse these individually.
- We can process entire XML document in one mapper if it is not too large.
- Large XML documents that are composed of series of records can be broken into these records using simple string or regular expression matching to find start and end tags of records.
- Hadoop comes with a class for this purpose called StreamXmlRecordReader(which is in org.apache.hadoop.streaming package, although it can be used outside of Streaming).
- You can use it by setting your input format to StreamInputFormat and setting the stream.recordreader.class property to org.apache.hadoop.streaming.StreamXmlRecordReader.
- For ex, Wikipedia provides dumps of its content in XML form, which are appropriate for processing in parallel using MapReduce using this approach.
- Data is contained in one large XML wrapper document, which contains series of elements such as page elements that contain pages content and associated metadata.
- Using StreamXmlRecordReader, page elements can be interpreted as records for processing by the mapper.
Binary Input:
- Hadoop MapReduce is not just restricted to processing textual data- it has support for binary formats too.
SequenceFileInputFormat:
- Stores sequences of binary key-value pairs.
- Sequence files are well suited as format for MapReduce data since they are splittable, they support compression as part of the format, and they can store arbitrary types using variety of serialization frameworks.
- To use data from sequence files as input to MapReduce, we use SequenceFileInputFormat.
- Keys and values are determined by sequence file, and you need to make sure that your map input types correspond.
- If your sequence file has IntWritable keys and text values, then map signature would be Mapper<IntWritable, Text, K, V>, where K and V are types of maps output keys and values.
SequenceFileAsTextInputFormat:
- Is variant of SequenceFileInputFormat that converts the sequence files keys and values to Text objects.
- Conversion is performed by calling toString() on the keys and values.
- This format makes sequence files suitable input for streaming
SequenceFileAsBinaryInputFormat:
- Variant of SequenceFileInputFormat that retreives sequence files keys and values as opaque binary objects.
- Combined with process that creates sequence files with SequenceFile.Writers appendRaw() method, this provides a way to use any binary data types with MapReduce.
Multiple Inputs:
- Input to MapReduce job may consist of multiple input files, all of the input is interpreted by single InputFormat and single Mapper.
- Overtime, data format evolves so you have to write your mapper to cope with all of your legacy formats.
- You also have data sources that provide same type of data but in different formats.
- Even if they are in same format, they may have different representations and thus needs to be parsed differently.
- These cases are handled by using MultipleInputs class, which allows you to specify InputFormat and Mapper to use on per path basis.
- If we had weather data from UK met office that we wanted to combine with NCDC data for our maximum temperature analysis, then we might set up the input as follows:
MultipleInputs.addInputPath(job, ncdcInputPath,
TextInputFormat.class, MaxTemperatureMapper.class);
MultipleInputs.addInputPath(job, metOfficeInputPath,
TextInputFormat.class, MetOfficeMaxTemperatureMapper.class);
- Both Met Office and NCDC data is text based, so we use TextInputFormat for each.
- But line format of two data sources is different, so we use two different mappers
- MaxTemperatureMapper reads NCDC input data and extracts the year and temperature fields.
- MetOfficeMaxTemperatureMapper reads Met Office input data and extracts the year and temperature fields.
- Important thing to note is map outputs have same types since the reducers see aggregated map outputs and are not aware of different mappers used to produce them.
- MultipleInputs class has an overloaded version of addInputPath() that does not take a mapper:
public static void addInputPath(Job job, Path path,
Class<? extends InputFormat> inputFormatClass)
- This is useful when you only have one mapper but multiple input formats.
Database Input( and Output):
- DBInputFormat: input format for reading data from relational database, using JDBC.
- Corresponding output format is DBOutputFormat, which is useful for dumping job outputs into database.
- As an alternative way of moving data between relational databases and HDFS, consider using Sqoop
- HBases TableInputFormat is designed to allow MapReduce program to operate on data stored in an HBase table.
- TableOutputFormat is for writing MapReduce outputs into an HBase table.
Output Formats:
Text Output:
- Default output format, TextOutputFormat, writes records as lines of text.
- Its key and values may be of any type, since TextOutputFormat turns them to strings by calling toString() on them.
- Each key-value pair is seperated by tab character, although that may be changed using the mapreduce.output.textoutputformat.separator property.
- We can suppress key or the value from output using the NullWritable type.
Binary Output:
SequenceFileOutputFormat:
- Writes sequence files for its output.
- This is good choice of output if it forms input to further MapReduce job, since it is compact and is readily compressed.
- Compression is controlled via the static methods on SequenceFileOutputFormat.
SequenceFileAsBinaryOutputFormat:
- Counterpart to SequenceFileAsBinaryInputFormat, and it writes keys and values in raw binary format into SequenceFile container.
MapFileOutputFormat
1
- Writes MapFiles as output.
- keys in MapFile must be added in order, so you need to ensure that your reducers emit keys in sorted order.
Multiple Outputs:
- FileOutputFormat and its subclasses generate set of files in output directory.
- There is one file per reducer, and files are named by partition number: part-r-00000, part-r-00001 etc.
- There is sometimes a need to have more control over naming of files or to produce multiple files per reducer.
Ex: Partitioning data:
- Consider problem of partitioning the weather dataset by weather station.
- We would like to run a job whose output is file per station, with each file containing all the records for that station.
- One way to do this is to have reducer for each weather station.
- First, write a partitioner that puts records from same weather station into same partition.
- Set number of reducers on job to be number of weather stations.
public class StationPartitioner extends Partitioner<LongWritable, Text> {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public int getPartition(LongWritable key, Text value, int numPartitions) {
parser.parse(value);
return getPartition(parser.getStationId());
}
private int getPartition(String stationId) {
...
}
}
Drawbacks in this approach:
- Since the number of partitions needs to be known before job is run, so does the number of weather stations.
- It is generally bad idea to allow number of partitions to be rigidly fixed by the application, since it can lead to small or uneven sized partitions. It is better to have few reducers doing more amount of work, as overhead in running a task is then reduced.
- There are two special cases when it does make sense to allow application to set number of partitions:
- Zero reducers: there are no partitions, as application needs to run only map tasks.
- One reducer: it can be convenient to run small jobs to combine output of previous jobs into single file. This should not be attempted when amount of data is small enough to be processed comfortably by one reducer.
MultipleOutputs:
- Allows each reducer to create more than single file.
- File names are of the form name-m-nnnnn for map outputs and name-r-nnnnn for reduce outputs where name is the arbitrary name and nnnnn is an integer designating the part number, starting from zero.
- Use MultipleOutput to partition dataset by station:
Example: Partitions whole dataset into files named by the station ID using MultipleOutputs
public class PartitionByStationUsingMultipleOutputs extends Configured
implements Tool {
static class StationMapper
extends Mapper<LongWritable, Text, Text, Text> {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
context.write(new Text(parser.getStationId()), value);
}
}
static class MultipleOutputsReducer
extends Reducer<Text, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> multipleOutputs;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
multipleOutputs.write(NullWritable.get(), value, key.toString());
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
multipleOutputs.close();
}
}
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setMapperClass(StationMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setReducerClass(MultipleOutputsReducer.class);
job.setOutputKeyClass(NullWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputs(),
args);
System.exit(exitCode);
}
}
Lazy Output:
- Some applications prefer that empty files not be created, which is where LazyOutputFormat helps.
- It is wrapper output format that ensures that output file is created only when first record is emitted for given partition.
- Streaming and Pipes support a -lazyOutput option to enable LazyOutputFormat.
8: MapReduce Features:
Counters:
- If you were counting invalid records and discovered that the proportion of invalid records in whole dataset was very high, you might be prompted to check why so many records were being marked as invalid - perhaps there is a bug in the part of program that detects invalid records?
- Counters are useful channel for gathering statistics about the job: for quality control or for application level statistics.
- They are also useful for problem diagnosis.
Built-in Counters:
- Hadoop maintains some built in counters for every job, which report various metrics for your job.
- For ex there are counters for number of bytes and records processed, which allows you to confirm that expected amount of input was consumed and the expected amount of output was produced.
Counters:
- If you were counting invalid records and discovered that the proportion of invalid records in whole dataset was very high, you might be prompted to check why so many records were being marked as invalid - perhaps there is a bug in the part of program that detects invalid records?
- Counters are useful channel for gathering statistics about the job: for quality control or for application level statistics.
- They are also useful for problem diagnosis.
Built-in Counters:
- Hadoop maintains some built in counters for every job, which report various metrics for your job.
- For ex there are counters for number of bytes and records processed, which allows you to confirm that expected amount of input was consumed and the expected amount of output was produced.
- Each group either contains task counters (which are updated as task progresses) or job counters( updated as job progresses).
Task Counters:
- Gather information about tasks over course of their execution, and the results are aggregated over all tasks in the job.
- MAP_INPUT_RECORDS counter counts input records read by each map task and aggregates over all map tasks in a job.
- Task counters are maintained by each task attempt and periodically sent to task tracker and then to the jobtracker so they can be globally aggregated.
- During job run, counters may go down if task fails.
- Counter values are definitive only once a job has successfully completed.
- For ex, PHYSICAL_MEMORY_BYTES, VIRTUAL_MEMORY_BYTES and COMMITTED_HEAP_BYTES provide an indication of how memory usage various over course of particular task attempt.
Job Counters:
- Maintained by jobtracker, so they dont need to be sent across the network, unlike all other counters, including user defined roles.
- for ex, TOTAL_LAUNCHED_MAPS counts the number of map tasks that were launched over course of job.
User Defined Java Counters:
- MapReduce allows user code to define a set of counters, which are then incremented as desired in mapper or reducer.
- Counters are defined by Java enum, which serves to group related counters.
Example: Application to run the maximum temperature job, including counting missing and
malformed fields and quality codes
public class MaxTemperatureWithCounters extends Configured implements Tool {
enum Temperature {
MISSING,
MALFORMED
}
static class MaxTemperatureMapperWithCounters extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private NcdcRecordParser parser = new NcdcRecordParser();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
parser.parse(value);
if (parser.isValidTemperature()) {
int airTemperature = parser.getAirTemperature();
output.collect(new Text(parser.getYear()),
new IntWritable(airTemperature));
} else if (parser.isMalformedTemperature()) {
System.err.println("Ignoring possibly corrupt input: " + value);
reporter.incrCounter(Temperature.MALFORMED, 1);
} else if (parser.isMissingTemperature()) {
reporter.incrCounter(Temperature.MISSING, 1);
}
// dynamic counter
reporter.incrCounter("TemperatureQuality", parser.getQuality(), 1);
}
}
@Override
public int run(String[] args) throws IOException {
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (conf == null) {
return -1;
}
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MaxTemperatureMapperWithCounters.class);
conf.setCombinerClass(MaxTemperatureReducer.class);
conf.setReducerClass(MaxTemperatureReducer.class);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args);
System.exit(exitCode);
}
}
The best way to see what this program does is run it over the complete dataset:
% hadoop jar hadoop-examples.jar MaxTemperatureWithCounters input/ncdc/all output-counters
When the job has successfully completed, it prints out the counters at the end (this is
done by JobClient’s runJob() method). Here are the ones we are interested in:
09/04/20 06:33:36 INFO mapred.JobClient: TemperatureQuality
09/04/20 06:33:36 INFO mapred.JobClient: 2=1246032
09/04/20 06:33:36 INFO mapred.JobClient: 1=973422173
09/04/20 06:33:36 INFO mapred.JobClient: 0=1
09/04/20 06:33:36 INFO mapred.JobClient: 6=40066
09/04/20 06:33:36 INFO mapred.JobClient: 5=158291879
09/04/20 06:33:36 INFO mapred.JobClient: 4=10764500
09/04/20 06:33:36 INFO mapred.JobClient: 9=66136858
09/04/20 06:33:36 INFO mapred.JobClient: Air Temperature Records
09/04/20 06:33:36 INFO mapred.JobClient: Malformed=3
09/04/20 06:33:36 INFO mapred.JobClient: Missing=66136856
Example: Application to calculate the proportion of records with missing temperature fields
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class MissingTemperatureFields extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 1) {
JobBuilder.printUsage(this, "<job ID>");
return -1;
}
JobClient jobClient = new JobClient(new JobConf(getConf()));
String jobID = args[0];
RunningJob job = jobClient.getJob(JobID.forName(jobID));
if (job == null) {
System.err.printf("No job with ID %s found.\n", jobID);
return -1;
}
if (!job.isComplete()) {
System.err.printf("Job %s is not complete.\n", jobID);
return -1;
}
Counters counters = job.getCounters();
long missing = counters.getCounter(
MaxTemperatureWithCounters.Temperature.MISSING);
long total = counters.findCounter("org.apache.hadoop.mapred.Task$Counter",
"MAP_INPUT_RECORDS").getCounter();
System.out.printf("Records with missing temperature fields: %.2f%%\n",
100.0 * missing / total);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MissingTemperatureFields(), args);
System.exit(exitCode);
}
}
Sorting:
- Ability to sort data is at the heart of MapReduce.
Preparation:
- We are going to sort the weather dataset by temperature.
- MapReduce job below is map only job that also filters input to remove records that dont have valid temperature reading.
- Each map creates a single block compressed sequence file as output. It is invoked with following command:
% hadoop jar hadoop-examples.jar SortDataPreprocessor input/ncdc/all \
input/ncdc/all-seq
Example: A MapReduce program for transforming the weather data into SequenceFile format
public class SortDataPreprocessor extends Configured implements Tool {
static class CleanerMapper
extends Mapper<LongWritable, Text, IntWritable, Text> {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValidTemperature()) {
context.write(new IntWritable(parser.getAirTemperature()), value);
}
}
}
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setMapperClass(CleanerMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SortDataPreprocessor(), args);
System.exit(exitCode);
}
}
Partial Sort:
- By default, MapReduce will sort input records by their keys.
- Ex below is variation for sorting sequence files with IntWritable keys.
Example: A MapReduce program for sorting a SequenceFile with IntWritable keys using the
default HashPartitioner
public class SortByTemperatureUsingHashPartitioner extends Configured
implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SortByTemperatureUsingHashPartitioner(),
args);
System.exit(exitCode);
}
}
Suppose we run this program using 30 reducers:
% hadoop jar hadoop-examples.jar SortByTemperatureUsingHashPartitioner \
-D mapred.reduce.tasks=30 input/ncdc/all-seq output-hashsort
- This command produces 30 output files, each of which is sorted.
- Having a partially sorted set of files is fine if you want to do lookups.
Total Sort:
- It is inefficient for large files, since one machine has to process all of the output, so you are throwing away benefits of parallel architecture that MapReduce provides.
- For example, if we had four partitions, we could put keys for
temperatures less than –10°C in the first partition, those between –10°C and 0°C in the
second, those between 0°C and 10°C in the third, and those over 10°C in the fourth.
- Sequence file is used by TotalOrderPartitioner to create partitions for sort job.
Ex below puts it all together.
Example: A MapReduce program for sorting a SequenceFile with IntWritable keys using the
TotalOrderPartitioner to globally sort the data
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
job.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.Sampler<IntWritable, Text> sampler =
new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10);
Path input = FileInputFormat.getInputPaths(job)[0];
input = input.makeQualified(input.getFileSystem(getConf()));
Path partitionFile = new Path(input, "_partitions");
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),
partitionFile);
InputSampler.writePartitionFile(job, sampler);
// Add to DistributedCache
URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
job.addCacheFile(partitionUri);
job.createSymlink();
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new SortByTemperatureUsingTotalOrderPartitioner(), args);
System.exit(exitCode);
}
}
Secondary Sort:
- MapReduce framework sorts the records by key before they reach reducers.
- For any particular key, values are not sorted.
- Order that the values appear is not even stable from one run to the next, since they come from different map tasks, which may finish at different times from run to run.
Example: Application to find the maximum temperature by sorting temperatures in the key
public class MaxTemperatureUsingSecondarySort
extends Configured implements Tool {
static class MaxTemperatureMapper
extends Mapper<LongWritable, Text, IntPair, NullWritable> {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
protected void map(LongWritable key, Text value,
Context context) throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValidTemperature()) {
context.write(new IntPair(parser.getYearInt(),
parser.getAirTemperature()), NullWritable.get());
}
}
}
static class MaxTemperatureReducer
extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {
@Override
protected void reduce(IntPair key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static class FirstPartitioner
extends Partitioner<IntPair, NullWritable> {
@Override
public int getPartition(IntPair key, NullWritable value, int numPartitions) {
// multiply by 127 to perform some mixing
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
if (cmp != 0) {
return cmp;
}
return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse
}
}
public static class GroupComparator extends WritableComparator {
protected GroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
return IntPair.compare(ip1.getFirst(), ip2.getFirst());
}
}
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setMapperClass(MaxTemperatureMapper.class);
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(KeyComparator.class);
job.setGroupingComparatorClass(GroupComparator.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(IntPair.class);
job.setOutputValueClass(NullWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);
System.exit(exitCode);
}
}
Running this program gives the maximum temperatures for each year:
% hadoop jar hadoop-examples.jar MaxTemperatureUsingSecondarySort input/ncdc/all \
> output-secondarysort
% hadoop fs -cat output-secondarysort/part-* | sort | head
1901 317
1902 244
1903 289
1904 256
1905 283
1906 294
1907 283
1908 289
1909 278
1910 294
Joins:
- MapReduce can perform joins between large datasets, but writing code to do joins from scratch is fairly involved.
- Rather than writing MapReduce programs, you might consider using higher level framework such as Pig, Hive, or Cascading in which join operations are core part of implementation.
- We have two datasets, for ex, weather stations database and weather records - and we want to reconcile the two.
- We want to see each stations history, with stations metadata inlined in each output row.
- Implement join depends on how large datasets are and how they are partitioned.
- If join is performed by the mapper, it is called map-side join, whereas if it is performed by reducer it is called reduce-side join.
Map-Side Joins:
- A map-side join between large inputs work by performing the join before the data reaches the map function.
- All the records for particular key must reside in same partition.
- Each input dataset must be divided into the same number of partitions and it must be sorted by same key(join key) in each source.
- Map side join can be used to join the outputs of several jobs that had the same number of reducers, same keys, and output files that are not splittable.
- The org.apache.hadoop.examples.Join example is a general-purpose command-line
program for running a map-side join, since it allows you to run a MapReduce job for
any specified mapper and reducer over multiple inputs that are joined with a given join
operation.
Reduce-Side Joins:
- More general than map-side join, in that the input datasets dont have to be structured in any particular way, but is less efficient as both datasets have to go through the MapReduce shuffle.
- Basic idea is that the mapper tags each record with its source and uses the join key as map output key, so that the records with same key are brought together in the reducer.
Multiple inputs:
The input sources for the datasets have different formats, in general, so it is very
convenient to use the MultipleInputs class (see “Multiple Inputs” on page 248) to
separate the logic for parsing and tagging each source.
Secondary sort:
As described, the reducer will see the records from both sources that have the same
key, but they are not guaranteed to be in any particular order. However, to perform
the join, it is important to have the data from one source before another. For the
weather data join, the station record must be the first of the values seen for each
key, so the reducer can fill in the weather records with the station name and emit
them straightaway. Of course, it would be possible to receive the records in any
order if we buffered them in memory, but this should be avoided, since the number
of records in any group may be very large and exceed the amount of memory available
to the reducer.
Side Data Distribution:
- It can be defined as extra read only data needed by job to process the main dataset.
- Challenge is to make side data available to all the map or reduce tasks in convenient and efficient fashion
Using the Job Configuration:
- We can set arbitrary key-value pairs in the job configuration using various setter methods on Configuration.
- This is very useful if you need to pass small piece of metadata to your tasks.
- In the task you can retreive the data from configuration returned by Contexts getConfiguration() method.
- You can either handle serialization yourself of you can use Hadoops Stringifier class.
- DefaultStringifier uses Hadoops serialization framework to serialize objects.
- You should not use this mechanism for transferring more than few KB of data as it can put pressure on memory usage in Hadoop deamons, in system running hundreds of jobs.
Distributed Cache:
- Rather than serializing side data in job configuration, it is preferable to distribute datasets using Hadoops distributed cache mechanism.
- To save network bandwidth, files are normally copied in any particular node once per job.
Usage:
- For tools that use GenericOptionsParser, you can specify the files to be distributed as comman-separated list of URLs as argument to -files option.
- Files can be on local filesystem, on HDFS, or on another Hadoop readable filesystem.
- If no schema is supplied then the files are assumed to be local.
- You can also copy archive files to your tasks, using the -archives option: these are unarchived on task node.
- The -libjars option will add JAR files to the classpath of the mapper and reducer tasks.
Let’s see how to use the distributed cache to share a metadata file for station names.
The command we will run is:
% hadoop jar hadoop-examples.jar MaxTemperatureByStationNameUsingDistributedCacheFile \
-files input/ncdc/metadata/stations-fixed-width.txt input/ncdc/all output
- Above command will copy the local file stations-fixed-width.txt to task nodes, so we can use it to look up station names.
Example: Application to find the maximum temperature by station, showing station names from
a lookup table passed as a distributed cache file
public class MaxTemperatureByStationNameUsingDistributedCacheFile
extends Configured implements Tool {
static class StationTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValidTemperature()) {
context.write(new Text(parser.getStationId()),
new IntWritable(parser.getAirTemperature()));
}
}
}
static class MaxTemperatureReducerWithStationLookup
extends Reducer<Text, IntWritable, Text, IntWritable> {
private NcdcStationMetadata metadata;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
metadata = new NcdcStationMetadata();
metadata.initialize(new File("stations-fixed-width.txt"));
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
String stationName = metadata.getStationName(key.toString());
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(new Text(stationName), new IntWritable(maxValue));
}
}
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(StationTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducerWithStationLookup.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new MaxTemperatureByStationNameUsingDistributedCacheFile(), args);
System.exit(exitCode);
}
}
We use the reducer’s setup() method to retrieve the cache file using its original name,
relative to the working directory of the task.
Here’s a snippet of the output, showing some maximum temperatures for a few weather
stations:
PEATS RIDGE WARATAH 372
STRATHALBYN RACECOU 410
SHEOAKS AWS 399
WANGARATTA AERO 409
MOOGARA 334
MACKAY AERO 331
How it works?
- When you launch the job, Hadoop copies the files specified by the -files, -archives and -libjars options to the jobtrackers filesystem(normally HDFS).
- Then, before the task is run, tasktracker copies the files from jobtrackers filesystem to local disk- the cache- so the task can access the files.
- The files are said to be localized at this point.
- From task point of view, files are just there.
- The tasktracker also maintains a reference count for number of tasks using each file in the cache.
- Before task is run, files reference count is incremented by one, then after the task has run, count is decreased by one.
- Only when count reached zero, is it eligible for deletion, since no tasks are using it.
- Files are deleted to make room for new file when cache exceeds certain size- 10GB by default.
Distributed Cache API:
- Most applications dont need to use distributed cache API as they can use the cache via GenericOptionsParser.
- However some applications may need to use more advanced features of distributed cache, and for this they can use its API directly.
- API is in two parts:
- Methods for putting data into the cache(found in Job) and
- Methods for retreiving data from the cache( found in JobContext).
Putting data in cache methods:
public void addCacheFile(URI uri)
public void addCacheArchive(URI uri)
public void setCacheFiles(URI[] files)
public void setCacheArchives(URI[] archives)
public void addFileToClassPath(Path file)
public void addArchiveToClassPath(Path archive)
public void createSymlink()
- There are two types of object that can be placed in cache: files and archives.
- Files are left intact on task node, while archives are unarchived on task node.
- For each objet there are three methods:
an addCacheXXXX() method to add file or archive to distributed cache
a setCacheXXXX() method to get entire list of files or archives to be added to cache in single call
and an addXXXXToClassPath() to add the file or archive to the MapReduce tasks classpath.
MapReduce Library Classes:
- Hadoop comes with library of mappers and reducers for commonly used functions.
No comments:
Post a Comment