Wednesday, May 14, 2014

Hadoop I/0 and developing MapReduce application

4: Hadoop IO

Data Integrity in HDFS:

- HDFS transparently checksums all data written to it and by default verifies checksums when reading data.
- Seperate checksum is created for every io.bytes.per.checksum bytes of data.
- Default is 512 bytes, and since CRC-32 checksum is 4 bytes long, storage overhead is less than 1%.
- Datanodes are responsible for verifying the data they receive before storing the data and its checksum.
This applies to data that they receive from clients and from other datanodes during replication.
A client writing data sends it to pipeline of datanodes and last datanode in the pipeline verifies the checksum. If it detects and error, client receives the ChecksumException, a subclass of IOException, which it should handle in application specific manner by retrying the operation.
- When clients read data from datanodes, they verify checksums as well, comparing them with ones stored at datanode.
- Each datanode keeps persistent log of checksum verifications.
- When client successfully verifies a block, it tells the datanode, which updates its log.
- Each datanode runs a DataBlockScanner in background thread that periodically verifies all blocks stored on the datanode.
- Since HDFS stores replicas of blocks, it can heal corrupted blocks by copying one of the good replicas to produce a new, uncorrupt replica.
If client detects an error when reading a block, it reports bad  block and the datanode it was trying to read from to the namenode before throwing an ChecksumException.
- The namenode marks the block replica as corrupt, so it does not direct clients to it, or try to copy this replica to another datanode. It then schedules a copy of the block to be replicated on another datanode, so its replication factor is back at the expected level. Once this has happened, corrupt replica is deleted.

Compression:

- File compression brings two major benefits: it reduces space needed to store files and it speeds up the data transfer across the network or to or from disk.
- There are many different compression formats, tools and algorithms, each with different characteristics.

- Following command creates a compressed file file.gz using the fastest compression method:
gzip -1 file

- Gzip is general purpose compressor and sits in middle of space/time tradeoff.
- Bzip2 compresses more effictively than gzip but is slower. It decompression speed is faster than its compression speed, but it is still slower than other formats.
- LZO and Snappy, on other hand both optimize for speed and are around an order of magnitude faster than gzip, but compress less effectively. Snappy is also significantly faster than LZO for decompression.

Codecs:

- Codec is the implementation of compression-decompression algorithm.
- In Hadoop, codec is represented by an implementation of the CompressionCodec interface.
- For ex, GzipCodec encapsulates compression and decompression algorithm for gzip.
Compression and Input Splits:

- Consider an uncompressed file stored in HDFS whose size is 1GB.
- With an HDFS block size of 64MB, file will be stored as 16 blocks, and a MapReduce job using this file as input will create 16 input split, each processed independently as input to the seperate map task.
- Gzip does not support splitting. Problem is start of the block is not distinguished in any way that would allow the reader positioned at any arbitrary point in the stream to advance to beginning of next block,thereby synchronizing itself with the stream. 
- LZO file would have same problem since the underlying compression format does not provide way for reader to synchronize itself with the stream.
- bzip2 file does provide a synchronization marker between blocks so it doe support splitting.


Serialization:

- Process of turning structured objects into byte stream for transmission over network or for writing to persistent storage.
- Deserialization is reverse process of turning byte stream block into series of structured objects.
- Serialization appears in two quite distinct areas of distributed data processing: for interprocess communication and for persistent storage.
- In Hadoop, interprocess communication between nodes in the system is implemented using remote procedure call(RPC). RPC protocol uses serialization to render the message into binary stream to be sent to remote node, which then deserializes the binary stream into original message. It is desirable that RPC serialization format is:

- Compact: This format makes best use of network bandwidth, which is most scarce resource in data center.
- Fast: Interprocess communication forms a backbone for distributed system, so it is essential that there is little performance overhead as possible for serialization and deserialization process.
- Extensible: It should be possible to add new argument to a method call and have new servers accept messages in old format from old clients.
- Interoperable: It is desirable to be able to support clients that are written in different languages to the server, so format needs to be designed to make this possible.

Hadoop uses its own serialization format, Writables, which is certainly compact and fast but not as easy to extend or use from languages other than Java.
Since Writables are central to Hadoop(most MapReduce programs use them for their key and value types).

Writable interface:

- Defines two methods: one for writing  its state to DataOutput binary stream, and one for reading its state from DataInput binary stream.

package org.apache.hadoop.io;
   
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable {
  void write(DataOutput out) throws IOException;
  void readFields(DataInput in) throws IOException;
}

Let’s look at a particular Writable to see what we can do with it. We will use
IntWritable, a wrapper for a Java int. We can create one and set its value using the
set() method:
    IntWritable writable = new IntWritable();
    writable.set(163);
Equivalently, we can use the constructor that takes the integer value:
    IntWritable writable = new IntWritable(163);
To examine the serialized form of the IntWritable, we write a small helper method that
wraps a java.io.ByteArrayOutputStream in a java.io.DataOutputStream (an implementation
of java.io.DataOutput)
to capture the bytes in the serialized stream:
  public static byte[] serialize(Writable writable) throws IOException {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    DataOutputStream dataOut = new DataOutputStream(out);
    writable.write(dataOut);
    dataOut.close();
    return out.toByteArray();
  }
An integer is written using four bytes (as we see using JUnit 4 assertions):
    byte[] bytes = serialize(writable);
    assertThat(bytes.length, is(4));

Let’s try deserialization. Again, we create a helper method to read a Writable object
from a byte array:
  public static byte[] deserialize(Writable writable, byte[] bytes)
      throws IOException {
    ByteArrayInputStream in = new ByteArrayInputStream(bytes);
    DataInputStream dataIn = new DataInputStream(in);
    writable.readFields(dataIn);
    dataIn.close();
    return bytes;
  }
We construct a new, value-less, IntWritable, then call deserialize() to read from the
output data that we just wrote. Then we check that its value, retrieved using the
get() method, is the original value, 163:
    IntWritable newWritable = new IntWritable();
    deserialize(newWritable, bytes);
    assertThat(newWritable.get(), is(163));

WritableComparable and comparators

IntWritable implements the WritableComparable interface, which is just a subinterface
of the Writable and java.lang.Comparable interfaces:
package org.apache.hadoop.io;
    
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
Comparison of types is crucial for MapReduce, where there is a sorting phase during
which keys are compared with one another. One optimization that Hadoop provides
is the RawComparator extension of Java’s Comparator:
package org.apache.hadoop.io;
    
import java.util.Comparator;
public interface RawComparator<T> extends Comparator<T> {
    
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
    
}

WritableComparator is a general-purpose implementation of RawComparator for
WritableComparable classes. It provides two main functions. First, it provides a default
implementation of the raw compare() method that deserializes the objects to be compared
from the stream and invokes the object compare() method. Second, it acts as a
factory for RawComparator instances (that Writable implementations have registered).
For example, to obtain a comparator for IntWritable, we just use:
    RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);
The comparator can be used to compare two IntWritable objects:
    IntWritable w1 = new IntWritable(163);
    IntWritable w2 = new IntWritable(67);
    assertThat(comparator.compare(w1, w2), greaterThan(0));
or their serialized representations:


Text:

- Text is a Writable for UTF-8 sequences. It can be thought of as the Writable equivalent
of java.lang.String. Text is a replacement for the UTF8 class, which was deprecated
because it didn’t support strings whose encoding was over 32,767 bytes, and because
it used Java’s modified UTF-8.
- The Text class uses an int (with a variable-length encoding) to store the number of
bytes in the string encoding, so the maximum value is 2 GB. Furthermore, Text uses
standard UTF-8, which makes it potentially easier to interoperate with other tools that
understand UTF-8.

Indexing:

- Indexing.
Because of its emphasis on using standard UTF-8, there are some differences
between Text and the Java String class. Indexing for the Text class is in terms of position
in the encoded byte sequence, not the Unicode character in the string, or the Java
char code unit (as it is for String). For ASCII strings, these three concepts of index
position coincide. Here is an example to demonstrate the use of the charAt() method:

Text t = new Text("hadoop");
    assertThat(t.getLength(), is(6));
    assertThat(t.getBytes().length, is(6));
   
    assertThat(t.charAt(2), is((int) 'd'));
    assertThat("Out of bounds", t.charAt(100), is(-1));

Notice that charAt() returns an int representing a Unicode code point, unlike the
String variant that returns a char. Text also has a find() method, which is analogous
to String’s indexOf():
    Text t = new Text("hadoop");
    assertThat("Find a substring", t.find("do"), is(2));
    assertThat("Finds first 'o'", t.find("o"), is(3));
    assertThat("Finds 'o' from position 4 or later", t.find("o", 4), is(4));
    assertThat("No match", t.find("pig"), is(-1));

Example: Tests showing the differences between the String and Text classes
public class StringTextComparisonTest {
  @Test
  public void string() throws UnsupportedEncodingException {
   
    String s = "\u0041\u00DF\u6771\uD801\uDC00";
    assertThat(s.length(), is(5));
    assertThat(s.getBytes("UTF-8").length, is(10));
   
    assertThat(s.indexOf("\u0041"), is(0));
    assertThat(s.indexOf("\u00DF"), is(1));
    assertThat(s.indexOf("\u6771"), is(2));
    assertThat(s.indexOf("\uD801\uDC00"), is(3));
   
    assertThat(s.charAt(0), is('\u0041'));
    assertThat(s.charAt(1), is('\u00DF'));
    assertThat(s.charAt(2), is('\u6771'));
    assertThat(s.charAt(3), is('\uD801'));
    assertThat(s.charAt(4), is('\uDC00'));


  assertThat(s.codePointAt(0), is(0x0041));
    assertThat(s.codePointAt(1), is(0x00DF));
    assertThat(s.codePointAt(2), is(0x6771));
    assertThat(s.codePointAt(3), is(0x10400));
  }
  @Test
  public void text() {
   
    Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00");
    assertThat(t.getLength(), is(10));
   
    assertThat(t.find("\u0041"), is(0));
    assertThat(t.find("\u00DF"), is(1));
    assertThat(t.find("\u6771"), is(3));
    assertThat(t.find("\uD801\uDC00"), is(6));
    assertThat(t.charAt(0), is(0x0041));
    assertThat(t.charAt(1), is(0x00DF));
    assertThat(t.charAt(3), is(0x6771));
    assertThat(t.charAt(6), is(0x10400));
  } 
}

The test confirms that the length of a String is the number of char code units it contains
(5, one from each of the first three characters in the string, and a surrogate pair from
the last), whereas the length of a Text object is the number of bytes in its UTF-8 encoding
(10 = 1+2+3+4). Similarly, the indexOf() method in String returns an index in char
code units, and find() for Text is a byte offset.
The charAt() method in String returns the char code unit for the given index, which
in the case of a surrogate pair will not represent a whole Unicode character. The code
PointAt() method, indexed by char code unit, is needed to retrieve a single Unicode
character represented as an int. In fact, the charAt() method in Text is more like the
codePointAt() method than its namesake in String. The only difference is that it is
indexed by byte offset.

Mutability

Another difference with String is that Text is mutable (like all Writable implementations in Hadoop,
except NullWritable, which is a singleton).
You can reuse a Text instance by calling one of the set() methods on it. For example:
    Text t = new Text("hadoop");
    t.set("pig");
    assertThat(t.getLength(), is(3));
    assertThat(t.getBytes().length, is(3));


NullWritable
NullWritable is a special type of Writable, as it has a zero-length serialization. No bytes
are written to, or read from, the stream. It is used as a placeholder; for example, in
MapReduce, a key or a value can be declared as a NullWritable when you don’t need
to use that position—it effectively stores a constant empty value. NullWritable can also
be useful as a key in SequenceFile when you want to store a list of values, as opposed
to key-value pairs. It is an immutable singleton: the instance can be retrieved by calling
NullWritable.get().

Writable collections
There are six Writable collection types in the org.apache.hadoop.io package: Array
Writable, ArrayPrimitiveWritable, TwoDArrayWritable, MapWritable, SortedMapWrita
ble, and EnumSetWritable.
ArrayWritable and TwoDArrayWritable are Writable implementations for arrays and
two-dimensional arrays (array of arrays) of Writable instances. All the elements of an
ArrayWritable or a TwoDArrayWritable must be instances of the same class, which is
specified at construction, as follows:
    ArrayWritable writable = new ArrayWritable(Text.class);

ArrayWritable and TwoDArrayWritable both have get() and set() methods, as well as a
toArray() method, which creates a shallow copy of the array (or 2D array).
ArrayPrimitiveWritable is a wrapper for arrays of Java primitives. The component type
is detected when you call set(), so there is no need to subclass to set the type.
MapWritable and SortedMapWritable are implementations of java.util.Map<Writable,
Writable> and java.util.SortedMap<WritableComparable, Writable>, respectively. The
type of each key and value field is a part of the serialization format for that field. The
type is stored as a single byte that acts as an index into an array of types. The array is
populated with the standard types in the org.apache.hadoop.io package, but custom
Writable types are accommodated, too, by writing a header that encodes the type array
for nonstandard types. As they are implemented, MapWritable and SortedMapWritable
use positive byte values for custom types, so a maximum of 127 distinct nonstandard
Writable classes can be used in any particular MapWritable and SortedMapWritable instance.

Implementing Custom Writable

Example: A Writable implementation that stores a pair of Text objects
import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair> {
  private Text first;
  private Text second;

  public TextPair() {
    set(new Text(), new Text());
  }

  public TextPair(String first, String second) {
    set(new Text(first), new Text(second));
  }

  public TextPair(Text first, Text second) {
    set(first, second);
  }

  public void set(Text first, Text second) {
    this.first = first;
    this.second = second;
  }

  public Text getFirst() {
    return first;
  }

  public Text getSecond() {
    return second;
  }
  @Override
  public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
  }
  @Override
  public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
  }

  @Override
  public int hashCode() {
    return first.hashCode() * 163 + second.hashCode();
  }

  @Override
  public boolean equals(Object o) {
    if (o instanceof TextPair) {
      TextPair tp = (TextPair) o;
      return first.equals(tp.first) && second.equals(tp.second);
    }
    return false;
  }
  @Override
  public String toString() {
    return first + "\t" + second;
  }

  @Override
  public int compareTo(TextPair tp) {
    int cmp = first.compareTo(tp.first);
    if (cmp != 0) {
      return cmp;
    }
    return second.compareTo(tp.second);
  }

File based data structures:

- For MapReduce based processsing, putting each blob of binary data into its own file does not scale, so Hadoop developed number of containers for these situations.

SequenceFile:

- Imagine a logFile, where each log record is a new line of text.
- If you want to log binary types, plain text is not suitable format.
- Hadoops SequenceFile class fits the bill in this situation, providing persistent data structure for binary key value pairs. To use it as logfile format, you would choose a key, such as timestamp represented by LongWritable and the value is Writable that represents quantity being logged.
- SequenceFiles also work well as containers for smaller files.
- HDFS and MapReduce are optimized for large files, so packaging files into SequenceFile makes storing and processing smaller files more efficient.

Writing a SequenceFile:

Example: Writing a SequenceFile
public class SequenceFileWriteDemo {

  private static final String[] DATA = {
    "One, two, buckle my shoe",
    "Three, four, shut the door",
    "Five, six, pick up sticks",
    "Seven, eight, lay them straight",
    "Nine, ten, a big fat hen"
  };

  public static void main(String[] args) throws IOException {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    Path path = new Path(uri);
    IntWritable key = new IntWritable();

    Text value = new Text();
    SequenceFile.Writer writer = null;
    try {
      writer = SequenceFile.createWriter(fs, conf, path,
          key.getClass(), value.getClass());
     
      for (int i = 0; i < 100; i++) {
        key.set(100 - i);
        value.set(DATA[i % DATA.length]);
        System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
        writer.append(key, value);
      }
    } finally {
      IOUtils.closeStream(writer);
    }
  }
}

We write the position out to the console, along with the key and value pairs. The result of running it is shown here:

% hadoop SequenceFileWriteDemo numbers.seq
[128]   100     One, two, buckle my shoe
[173]   99      Three, four, shut the door
[220]   98      Five, six, pick up sticks
[264]   97      Seven, eight, lay them straight
[314]   96      Nine, ten, a big fat hen
[359]   95      One, two, buckle my shoe
[404]   94      Three, four, shut the door
[451]   93      Five, six, pick up sticks
[495]   92      Seven, eight, lay them straight
[545]   91      Nine, ten, a big fat hen
...
[1976]  60      One, two, buckle my shoe
[2021]  59      Three, four, shut the door
[2088]  58      Five, six, pick up sticks
[2132]  57      Seven, eight, lay them straight
[2182]  56      Nine, ten, a big fat hen
...
[4557]  5       One, two, buckle my shoe
[4602]  4       Three, four, shut the door
[4649]  3       Five, six, pick up sticks
[4693]  2       Seven, eight, lay them straight
[4743]  1       Nine, ten, a big fat hen

Reading a SequenceFile:

Example: Reading a SequenceFile
public class SequenceFileReadDemo {

  public static void main(String[] args) throws IOException {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    Path path = new Path(uri);
    SequenceFile.Reader reader = null;
    try {
      reader = new SequenceFile.Reader(fs, path, conf);
      Writable key = (Writable)
        ReflectionUtils.newInstance(reader.getKeyClass(), conf);
      Writable value = (Writable)
        ReflectionUtils.newInstance(reader.getValueClass(), conf);
      long position = reader.getPosition();
      while (reader.next(key, value)) {
        String syncSeen = reader.syncSeen() ? "*" : "";
        System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
        position = reader.getPosition(); // beginning of next record
      }
    } finally {
      IOUtils.closeStream(reader);
    }
  }
}

Running the program shows the sync points in the sequence file as asterisks.

% hadoop SequenceFileReadDemo numbers.seq
[128]   100     One, two, buckle my shoe
[173]   99      Three, four, shut the door
[220]   98      Five, six, pick up sticks
[264]   97      Seven, eight, lay them straight
[314]   96      Nine, ten, a big fat hen
[359]   95      One, two, buckle my shoe
[404]   94      Three, four, shut the door
[451]   93      Five, six, pick up sticks
[495]   92      Seven, eight, lay them straight
[545]   91      Nine, ten, a big fat hen
[590]   90      One, two, buckle my shoe
...
[1976]  60      One, two, buckle my shoe
[2021*] 59      Three, four, shut the door
[2088]  58      Five, six, pick up sticks
[2132]  57      Seven, eight, lay them straight
[2182]  56      Nine, ten, a big fat hen
...
[4557]  5       One, two, buckle my shoe
[4602]  4       Three, four, shut the door
[4649]  3       Five, six, pick up sticks
[4693]  2       Seven, eight, lay them straight
[4743]  1       Nine, ten, a big fat hen

Displaying SequenceFile with command-line interface:

Running it on the sequence file we created in the previous section gives the following
output:
% hadoop fs -text numbers.seq | head
100     One, two, buckle my shoe
99      Three, four, shut the door
98      Five, six, pick up sticks
97      Seven, eight, lay them straight
96      Nine, ten, a big fat hen
95      One, two, buckle my shoe
94      Three, four, shut the door
93      Five, six, pick up sticks
92      Seven, eight, lay them straight
91      Nine, ten, a big fat hen

Sorting and merging SequenceFiles:

- Most powerful way of sorting one or more sequence files is to use MapReduce.
- MapReduce is inherently parallel and will let you specify the number reducers to use,  which determines number of output partitions.
- For ex, by specifying one reducer, you get single output file.
- We can use sort example that comes with Hadoop by specifying that the input and output are sequence files, and by setting the key and value types:

% hadoop jar $HADOOP_INSTALL/hadoop-*-examples.jar sort -r 1 \
  -inFormat org.apache.hadoop.mapred.SequenceFileInputFormat \
  -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
  -outKey org.apache.hadoop.io.IntWritable \
  -outValue org.apache.hadoop.io.Text \
  numbers.seq sorted
% hadoop fs -text sorted/part-00000 | head

1       Nine, ten, a big fat hen
2       Seven, eight, lay them straight
3       Five, six, pick up sticks
4       Three, four, shut the door
5       One, two, buckle my shoe
6       Nine, ten, a big fat hen
7       Seven, eight, lay them straight
8       Five, six, pick up sticks
9       Three, four, shut the door
10      One, two, buckle my shoe

- As an alternative to using MapReduce for sort/merge, there is SequenceFile.Sorter class that has number of sort() and merge() methods. 
- In general, MapReduce is preferred approach to sort and merge sequence files.

SequenceFile format

- Sequence file consists of header followed by one or more records.
- First three bytes of sequence file are bytes SEQ, which act as magic number, followed by single byte representing version number.
- Header contains other fields including names of key and value classes, compression details, user defined metadata, and the sync marker.
- Sync markers appear between records in the sequence file.

MapFile:

- Is a sorted SequenceFile with an index to permit lookups by key.
- MapFile can be thought of as persistent form of java.util.Map, which is able to grow beyond the size of Map that is kept in memory.

Writing a MapFile:

- Similar to writing a SequenceFile: you create an instance of MapFile.Writer, then call the append() method to add entries in order.
- Attempting to add entries out of order will result in IOException.
- Keys must be instance of WritableComparable, and values must be Writable - contrast this to SequenceFile, which can use any serialization framework for its entries.

Example: Writing a MapFile
public class MapFileWriteDemo {

  private static final String[] DATA = {
    "One, two, buckle my shoe",
    "Three, four, shut the door",
    "Five, six, pick up sticks",
    "Seven, eight, lay them straight",
    "Nine, ten, a big fat hen"
  };

  public static void main(String[] args) throws IOException {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    IntWritable key = new IntWritable();
    Text value = new Text();
    MapFile.Writer writer = null;
    try {
      writer = new MapFile.Writer(conf, fs, uri,
          key.getClass(), value.getClass());
     
      for (int i = 0; i < 1024; i++) {
        key.set(i + 1);
        value.set(DATA[i % DATA.length]);
        writer.append(key, value);
      }
    } finally {
      IOUtils.closeStream(writer);
    }
  }
}

Lets use this program to build a MapFile:

% hadoop MapFileWriteDemo numbers.map

- If we look at the MapFile, we see its actually directory containing two files called data and index:

% ls -l numbers.map
total 104
-rw-r--r--   1 tom  tom  47898 Jul 29 22:06 data
-rw-r--r--   1 tom  tom    251 Jul 29 22:06 index

Both files are SequenceFiles. Data files contain all of the entries in order:

% hadoop fs -text numbers.map/data | head
1       One, two, buckle my shoe
2       Three, four, shut the door
3       Five, six, pick up sticks
4       Seven, eight, lay them straight
5       Nine, ten, a big fat hen
6       One, two, buckle my shoe
7       Three, four, shut the door
8       Five, six, pick up sticks
9       Seven, eight, lay them straight
10      Nine, ten, a big fat hen

- Index file contains fraction of keys, and contains a mapping from key to that keys offset in the data file:
% hadoop fs -text numbers.map/index
1       128
129     6079
257     12054
385     18030
513     24002
641     29976
769     35947
897     41922

- By default only every 128th key is included in the index, although you can change this value either by setting the io.map.index.interval property or by calling the setIndexInterval() method on the MapFile.Writer instance.
- Reason to increase the index would be to decrease amount of memory that MapFile needs to store the index.
- You might also decrease the interval to improve time for random selection at expense of memory usage.

Reading a MapFile

Iterating through the entries in order in a MapFile is similar to the procedure for a
SequenceFile: you create a MapFile.Reader, then call the next() method until it returns
false, signifying that no entry was read because the end of the file was reached:
public boolean next(WritableComparable key, Writable val) throws IOException
A random access lookup can be performed by calling the get() method:
public Writable get(WritableComparable key, Writable val) throws IOException

Converting a SequenceFile to MapFile

- Example: Re-creating the index for a MapFile
public class MapFileFixer {
  public static void main(String[] args) throws Exception {
    String mapUri = args[0];
   
    Configuration conf = new Configuration();
   
    FileSystem fs = FileSystem.get(URI.create(mapUri), conf);
    Path map = new Path(mapUri);
    Path mapData = new Path(map, MapFile.DATA_FILE_NAME);
   
    // Get key and value types from data sequence file
    SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf);
    Class keyClass = reader.getKeyClass();
    Class valueClass = reader.getValueClass();
    reader.close();
    
    // Create the map file index file
    long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf);
    System.out.printf("Created MapFile %s with %d entries\n", map, entries);
  }

1. Sort the sequence file numbers.seq into a new directory called number.map that become the MapFile (if the sequence file is already sorted, then you can skip step. Instead, copy it to a file number.map/data, then go to step 3):
% hadoop jar $HADOOP_INSTALL/hadoop-*-examples.jar sort -r 1 \
  -inFormat org.apache.hadoop.mapred.SequenceFileInputFormat \
  -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
  -outKey org.apache.hadoop.io.IntWritable \
  -outValue org.apache.hadoop.io.Text \
  numbers.seq numbers.map
2. Rename the MapReduce output to be the data file:
% hadoop fs -mv numbers.map/part-00000 numbers.map/data
3. Create the index file:
% hadoop MapFileFixer numbers.map
Created MapFile numbers.map with 100 entries
The MapFile numbers.map now exists and can be used.

5: Developing MapReduce Application:

- Writing a program in MapReduce has certain flow to it.
- You start by writing your map and reduce functions, ideally with unit test to make sure they do what you expect.
- When program runs as expected against small dataset, you are ready to unleash it on cluster.
- Debugging failing programs in cluster is a challenge, so we look at some common techniques to make it easier.
- After program is working, you may wish to do some tuning, first by running through some standard checks for making MapReduce programs faster and then by doing task profiling.
- Before we start writing MapReduce program, we need to setup and configure the development environment.

The Configuration API

- Components in Hadoop are configured using Hadoops own configuration API.
- Instance of Configuration class represents a collection of configuration properties and their values.
- Configuration read their properties from resources - XML files with simple structure for defining name-value pairs.

Example A simple configuration file, configuration-1.xml
<?xml version="1.0"?>
<configuration>
  <property>
    <name>color</name>
    <value>yellow</value>
    <description>Color</description>
  </property>
  <property>
    <name>size</name>
    <value>10</value>
    <description>Size</description>
  </property>
  <property>
    <name>weight</name>
    <value>heavy</value>
    <final>true</final>
    <description>Weight</description>
  </property>
  <property>
    <name>size-weight</name>
    <value>${size},${weight}</value>
    <description>Size and weight</description>
  </property>
</configuration>

Assuming this configuration file is in a file called configuration-1.xml, we can access its properties using piece of code like this:

    Configuration conf = new Configuration();
    conf.addResource("configuration-1.xml");
    assertThat(conf.get("color"), is("yellow"));
    assertThat(conf.getInt("size", 0), is(10));
    assertThat(conf.get("breadth", "wide"), is("wide"));

Combining Resources

- Things get interesting when more than one resource is used to define configuration.
- This is used in Hadoop to separate out default properties for the system, defined internally in a file called core-default.xml, from the site specific overrides, in core-site.xml.

Example: A second configuration file, configuration-2.xml
<?xml version="1.0"?>
<configuration>
  <property>
    <name>size</name>
    <value>12</value>
  </property>
  <property>
    <name>weight</name>
    <value>light</value>
  </property>
</configuration>
Resources are added to a Configuration in order:
    Configuration conf = new Configuration();
    conf.addResource("configuration-1.xml");
    conf.addResource("configuration-2.xml");

Configuring the Development Environment

- First step is to download the version of Hadoop that you plan to use and unpack it on your development machine.
- Then in your favorite IDE, create a new project and add all the JAR files from the top level of the unpacked distribution and from the lib directory to the classpath.
- You will then be able to compile Java Hadoop programs and run them in local mode within the IDE.

Managing Configuration:

- When developing Hadoop applications, it is common to switch between running the application locally and running it on a cluster.
- You may have several clusters you work with, or you may have local pseudo-distributed cluster that you like to test on.
- We assume the existence of a directory called conf that contains three configuration files: hadoop-local.xml, hadoop-localhost.xml and hadoop-cluster.xml.
- There is nothing special about names of these files - they are just convenient ways to package up some configuration settings.
- hadoop-local.xml file contains the default hadoop configuration for default filesystem and the jobtracker:

<?xml version="1.0"?>
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>file:///</value>
  </property>
  <property>
    <name>mapred.job.tracker</name>
    <value>local</value>
  </property>
</configuration>

- Settings in hadoop-localhost.xml point to a namenode and a jobtracker both running on localhost:

<?xml version="1.0"?>
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost/</value>
  </property>
  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:8021</value>
  </property>
</configuration>

- Finally, hadoop-cluster.xml contains details of the clusters namenode and jobtracker addresses.
- In practice, you would name the file after the name of the cluster, rather than cluster as we have here:

<?xml version="1.0"?>
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://namenode/</value>
  </property>
  <property>
    <name>mapred.job.tracker</name>
    <value>jobtracker:8021</value>
  </property>
</configuration>

- The following command shows directory listing on the HDFS server running in pseudo-distributed mode on localhost:

% hadoop fs -conf conf/hadoop-localhost.xml -ls .
Found 2 items
drwxr-xr-x   - tom supergroup          0 2009-04-08 10:32 /user/tom/input
drwxr-xr-x   - tom supergroup          0 2009-04-08 13:09 /user/tom/output    

GenericOptionsParser, Tool and ToolRunner

Example 5-3 shows a very simple implementation of Tool, for printing the keys and
values of all the properties in the Tool’s Configuration object.
Example: - The following command shows directory listing on the HDFS server running in pseudo-distributed mode on localhost:

% hadoop fs -conf conf/hadoop-localhost.xml -ls .
Found 2 items
drwxr-xr-x   - tom supergroup          0 2009-04-08 10:32 /user/tom/input
drwxr-xr-x   - tom supergroup          0 2009-04-08 13:09 /user/tom/output    

GenericOptionsParser also allows you to set individual properties. For example:
% hadoop ConfigurationPrinter -D color=yellow | grep color
color=yellow

Writing Unit Test:

- Map and reduce functions in MapReduce are easy to test in isolation, which is consequence of their functional style.
- For known inputs, they produce known outputs.
- However since outputs are written to a Context(or an OutputController in old API), rather than simply being returned from the method call, Context needs to be replaced with a mock so that its outputs can be verified.
- There are several Java mock object frameworks that can help build mocks: here we use Mockito, which is known for its clean syntax, although any mock framework should work well.

Example: Unit test for MaxTemperatureMapper
import static org.mockito.Mockito.*;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.junit.*;
public class MaxTemperatureMapperTest {
  @Test
  public void processesValidRecord() throws IOException, InterruptedException {
    MaxTemperatureMapper mapper = new MaxTemperatureMapper();
   
    Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
                                  // Year ^^^^
        "99999V0203201N00261220001CN9999999N9-00111+99999999999");
                              // Temperature ^^^^^
    MaxTemperatureMapper.Context context =
      mock(MaxTemperatureMapper.Context.class);
    mapper.map(null, value, context);

   
    verify(context).write(new Text("1950"), new IntWritable(-11));
  }
}

Test is very simple:
- It passes weather record as input to mapper, then checks the output is the year and temperature readings.
- Input key is ignored by mapper, so we can pass in anything, including null as we do here.
- To create mock Context, we call Mockitos mock() method, passing the class of the type we want to mock.
- Then we invoke mappers map() method, which executes code being tested. 
- Finally we verify that the mock object was called with correct method and arguments, using Mockitos verify() method.
- Here we verify that Contexts write() method was called with Text object representing the year and an IntWritable representing the temperature (-1.1C)

Example: First version of a Mapper that passes MaxTemperatureMapperTest
public class MaxTemperatureMapper
  extends Mapper<LongWritable, Text, Text, IntWritable> {
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
   
    String line = value.toString();
    String year = line.substring(15, 19);
    int airTemperature = Integer.parseInt(line.substring(87, 92));
    context.write(new Text(year), new IntWritable(airTemperature));
  }
}

This is a very simple implementation, which pulls the year and temperature fields from
the line and writes them to the Context. Let’s add a test for missing values, which in
the raw data are represented by a temperature of +9999:

  @Test
  public void ignoresMissingTemperatureRecord() throws IOException,
      InterruptedException {
    MaxTemperatureMapper mapper = new MaxTemperatureMapper();
   
    Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
                                  // Year ^^^^
        "99999V0203201N00261220001CN9999999N9+99991+99999999999");
                              // Temperature ^^^^^
    MaxTemperatureMapper.Context context =
      mock(MaxTemperatureMapper.Context.class);

   
    mapper.map(null, value, context);
   
    verify(context, never()).write(any(Text.class), any(IntWritable.class));
  }


Since records with missing temperatures should be filtered out, this test uses Mockito
to verify that the write() method on the Context is never called for any Text key or
IntWritable value.
The existing test fails with a NumberFormatException, as parseInt() cannot parse integers
with a leading plus sign, so we fix up the implementation (version 2) to handle missing
values:

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
   
    String line = value.toString();
    String year = line.substring(15, 19);
    String temp = line.substring(87, 92);
    if (!missing(temp)) {
        int airTemperature = Integer.parseInt(temp);
        context.write(new Text(year), new IntWritable(airTemperature));
    }
  }
  private boolean missing(String temp) {
    return temp.equals("+9999");
  }

Reducer:

- It has to find the maximum value for a given key.
- Here is simple test for this feature:

  @Test
  public void returnsMaximumIntegerInValues() throws IOException,
      InterruptedException {
    MaxTemperatureReducer reducer = new MaxTemperatureReducer();
   
    Text key = new Text("1950");
    List<IntWritable> values = Arrays.asList(
        new IntWritable(10), new IntWritable(5));
    MaxTemperatureReducer.Context context =
      mock(MaxTemperatureReducer.Context.class);
   
    reducer.reduce(key, values, context);
   
    verify(context).write(key, new IntWritable(10));
  }

We construct a list of some IntWritable values and then verify that
MaxTemperatureReducer picks the largest. The code in example below is for an implementation
of MaxTemperatureReducer that passes the test. 
Notice that we haven’t tested the case of an empty values iterator, but arguably we don’t need to, since MapReduce would never call the reducer in this case, as every key produced by a mapper has a value.

Example: Reducer for maximum temperature example
public class MaxTemperatureReducer
  extends Reducer<Text, IntWritable, Text, IntWritable> {
  @Override
  public void reduce(Text key, Iterable<IntWritable> values,
      Context context)
      throws IOException, InterruptedException {
   
    int maxValue = Integer.MIN_VALUE;
    for (IntWritable value : values) {
      maxValue = Math.max(maxValue, value.get());
    }
    context.write(key, new IntWritable(maxValue));
  }
}

Running locally on test data:

- Next step is to write job driver and run it on some test data on development machine.

Running a job in Local Job Runner:

Using the Tool interface introduced earlier in the chapter, it’s easy to write a driver to
run our MapReduce job for finding the maximum temperature by year (see
MaxTemperatureDriver in Example below).

Example: Application to find the maximum temperature
public class MaxTemperatureDriver extends Configured implements Tool {
  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.printf("Usage: %s [generic options] <input> <output>\n",
          getClass().getSimpleName());
      ToolRunner.printGenericCommandUsage(System.err);
      return -1;
    }
   
    Job job = new Job(getConf(), "Max temperature");
    job.setJarByClass(getClass());

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
   
    job.setMapperClass(MaxTemperatureMapper.class);
    job.setCombinerClass(MaxTemperatureReducer.class);
    job.setReducerClass(MaxTemperatureReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
   
    return job.waitForCompletion(true) ? 0 : 1;
  }
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
    System.exit(exitCode);
  }
}

- MaxTemperatureDriver implements the Tool interface, so we get the benefit of being able to set options that GenericOptionsParser supports.
- run() method constructs Job object based on tools configuration, which it uses to launch a job.

- Hadoop comes with local job runner, cut down version of the MapReduce execution engine for running MapReduce jobs in single JVM. Its designed for testing and is very convenient for use in an IDE, since you can run it in a debugger to step through code in your mapper and reducer.

- Local job runner is only designed for simple testing of MapReduce programs, so it differs from full MapReduce implementation.
- Biggest difference is that it cant run more than one reducer.
- This is normally not a problem as most applications can work with one reducer, although on a cluster you would choose large number to take advantage of parallelism.
- Even if you set number of reducers to value over one, local runner will silently ignore the setting and use single reducer.

Fixing the Mapper:

- Example: A class for parsing weather records in NCDC format
public class NcdcRecordParser {
  private static final int MISSING_TEMPERATURE = 9999;
  private String year;
  private int airTemperature;
  private String quality;
  public void parse(String record) {
    year = record.substring(15, 19);
    String airTemperatureString;
    // Remove leading plus sign as parseInt doesn't like them
    if (record.charAt(87) == '+') {
      airTemperatureString = record.substring(88, 92);
    } else {
      airTemperatureString = record.substring(87, 92);
    }
    airTemperature = Integer.parseInt(airTemperatureString);
    quality = record.substring(92, 93);

  }
  public void parse(Text record) {
    parse(record.toString());
  }
  public boolean isValidTemperature() {
    return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");
  }
  public String getYear() {
    return year;
  }
  public int getAirTemperature() {
    return airTemperature;
  }
}

Example: A Mapper that uses a utility class to parse records
public class MaxTemperatureMapper
  extends Mapper<LongWritable, Text, Text, IntWritable> {
  private NcdcRecordParser parser = new NcdcRecordParser();
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
   
    parser.parse(value);
    if (parser.isValidTemperature()) {
      context.write(new Text(parser.getYear()),
          new IntWritable(parser.getAirTemperature()));
    }
  }
}

Testing the driver:

- There are two approaches in doing this:
- Use local job runner and run the job against test file on local file system.

Example: A test for MaxTemperatureDriver that uses a local, in-process job runner
  @Test
  public void test() throws Exception {
    Configuration conf = new Configuration();
    conf.set("fs.default.name", "file:///");
    conf.set("mapred.job.tracker", "local");
 
    Path input = new Path("input/ncdc/micro");
    Path output = new Path("output");
 
    FileSystem fs = FileSystem.getLocal(conf);
    fs.delete(output, true); // delete old output
 
    MaxTemperatureDriver driver = new MaxTemperatureDriver();
    driver.setConf(conf);
 
    int exitCode = driver.run(new String[] {
        input.toString(), output.toString() });
    assertThat(exitCode, is(0));

   
    checkOutput(conf, output);
  }

- The test explicitly sets fs.default.name and mapred.jobtracker so it uses the local filesystem and local job runner.
- Second way of testing the driver is to run it using a "mini-" cluster.
- Hadoop has pair of testing classes, called MiniDFSCluster and MiniMRCluster, which provide a programmatic way of creating in process clusters.
- Unlike local job runner, these allow testing against full HDFS and MapReduce machinery.
- Tasktrackers in mini cluster launch separate JVMs to run tasks in, which can make debugging more difficult.
- Mini clusters are used extensively in Hadoops own automated test suite, but they can be used for testing using code too.
- Hadoops ClusterMapReduceTestCase abstract class provides useful base for writing such a test, handles details of starting and stopping in process HDFS and MapReduce clusters in its setUp() and tearDown() methods and generates suitable configuration object that is set up to work with them.

Running on a Cluster:

- We are ready to try running on full dataset on Hadoop cluster.

Packaging:

- We dont need to make any modifications to program to run on cluster rather than on single machine, but we do need to package program as JAR file to send to cluster.
- This is conveniently achieved using Ant.

<jar
destfile="hadoop-examples.jar" basedir="${classes.dir}"/>        

- If we have single job per JAR, then you can specify main class to run in JAR files manifest.
- If main class is not in manifest, then it must be specified on command line.
- Also, any dependent JAR files should be packaged in a lib subdirectory in JAR file.

Launching Job

- To launch the job, we need to run driver, specifying cluster that we want to run the job on with the -conf option.

% hadoop jar hadoop-examples.jar v3.MaxTemperatureDriver -conf conf/hadoop-cluster.xml \
  input/ncdc/all max-temp

- waitForCompletion() method on Job launches the job and polls for progress, writing line summarizing the map and reduce progress whenever either changes.
- below is output:

09/04/11 08:15:52 INFO mapred.FileInputFormat: Total input paths to process : 101
09/04/11 08:15:53 INFO mapred.JobClient: Running job: job_200904110811_0002
09/04/11 08:15:54 INFO mapred.JobClient:  map 0% reduce 0%
09/04/11 08:16:06 INFO mapred.JobClient:  map 28% reduce 0%
09/04/11 08:16:07 INFO mapred.JobClient:  map 30% reduce 0%
...
09/04/11 08:21:36 INFO mapred.JobClient:  map 100% reduce 100%
09/04/11 08:21:38 INFO mapred.JobClient: Job complete: job_200904110811_0002
09/04/11 08:21:38 INFO mapred.JobClient: Counters: 19
09/04/11 08:21:38 INFO mapred.JobClient:   Job Counters
09/04/11 08:21:38 INFO mapred.JobClient:     Launched reduce tasks=32
09/04/11 08:21:38 INFO mapred.JobClient:     Rack-local map tasks=82
09/04/11 08:21:38 INFO mapred.JobClient:     Launched map tasks=127
09/04/11 08:21:38 INFO mapred.JobClient:     Data-local map tasks=45
09/04/11 08:21:38 INFO mapred.JobClient:   FileSystemCounters
09/04/11 08:21:38 INFO mapred.JobClient:     FILE_BYTES_READ=12667214
09/04/11 08:21:38 INFO mapred.JobClient:     HDFS_BYTES_READ=33485841275
09/04/11 08:21:38 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=989397
09/04/11 08:21:38 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=904
09/04/11 08:21:38 INFO mapred.JobClient:   Map-Reduce Framework
09/04/11 08:21:38 INFO mapred.JobClient:     Reduce input groups=100
09/04/11 08:21:38 INFO mapred.JobClient:     Combine output records=4489
09/04/11 08:21:38 INFO mapred.JobClient:     Map input records=1209901509
09/04/11 08:21:38 INFO mapred.JobClient:     Reduce shuffle bytes=19140
09/04/11 08:21:38 INFO mapred.JobClient:     Reduce output records=100
09/04/11 08:21:38 INFO mapred.JobClient:     Spilled Records=9481
09/04/11 08:21:38 INFO mapred.JobClient:     Map output bytes=10282306995
09/04/11 08:21:38 INFO mapred.JobClient:     Map input bytes=274600205558
09/04/11 08:21:38 INFO mapred.JobClient:     Combine input records=1142482941
09/04/11 08:21:38 INFO mapred.JobClient:     Map output records=1142478555

Before job starts, its ID is printed: this is needed whenever you want to refer to job, in logfiles or when interrogating it via hadoop job command.
- once job completes its statistics are printed out
- For this job,we see that around 275GB of input data was analyzed(Map input bytes), read from around 34GB of compressed files on HDFS("HDFS_BYTES_READ").
- Input was broken into 101 gzipped files of reasonable size, so there was no problem with not being able to split them.

 So the job with this ID:
job_200904110811_0002
is the second (0002, job IDs are 1-based) job run by the jobtracker which started at
08:11 on April 11, 2009. The counter is formatted with leading zeros to make job IDs
sort nicely—in directory listings, for example. However, when the counter reaches
10000 it is not reset, resulting in longer job IDs (which don’t sort so well).

Tasks belong to a job, and their IDs are formed by replacing the job prefix of a job ID
with a task prefix, and adding a suffix to identify the task within the job. For example:
task_200904110811_0002_m_000003
is the fourth (000003, task IDs are 0-based) map (m) task of the job with ID
job_200904110811_0002. The task IDs are created for a job when it is initialized, so they
do not necessarily dictate the order that the tasks will be executed in.

Tasks may be executed more than once, due to failure. Task attempts are allocated during the job run as
needed, so their ordering represents the order that they were created for tasktrackers to run.

MapReduce Web UI:

- Hadoop comes with web UI for viewing information about your jobs.
- It is useful for following a jobs progress while it is running, as well as finding job statistics and logs after job has completed.
- You can find UI at http://jobtracker-host:50030/.

Jobtracker page:

- First section of pages gives details of hadoop installation, such as version number and when it was compiled, and the current state of jobtracker and when it was started.
- Next is summary of the cluster, which has measure of cluster capacity and utilization. This shows number of maps and reduce currently running on the cluster, total number of job submissions, number of tasktracker nodes currently available, and clusters capacity: in terms of number of map and reduce slots available across the cluster, and number of available slots per node, on average.

Job History: 

- refers to events and configuration for completed job. It is retained whether the job was successful or not, in an attempt to provide interesting information for user running a job.
- job history files are stored in local filesystem of job tracker in history subdirectory of logs directory.
- jobtrackers history files are kept for 30 days before being deleted by the system.
- A second copy is also stored for user in _logs/history subdirectory of jobs output directory.
- History log includes job, task and attempt events, all of which are stored in plaintext file.

Job page:

- Clicking on job ID brings you to page for the job.
- At the top of the page is summary of the job, with basic information such as job owner and name, how long the job has been running for.
- While job is running, you can monitor its progress on this page, which periodically updates itself.
- Reduce completed graph is divided into three phases of reduce task: copy(when map outputs are being transferred to reduce tasktracker), sort( when reduce inputs are being merged), and reduce(when the reduce function is being run to produce the final output).

Retrieving the results:

- Once job is finished, there are various ways to retrieve the results.
- Each reducer produces one output file, so there are 30 part files named part-r-00000 to part-r-00029 in the max-temp directory.
- This job produces very small amount of output, so it is convenient to copy it from HDFS to our development machine.
- The -getmerge option to the hadoop fs command is useful here, as it gets all files in directory specified in source pattern and merges them into single file on local filesystem:

% hadoop fs -getmerge max-temp max-temp-local
% sort max-temp-local | tail
1991       607
1992       605
1993       567
1994       568
1995       567
1996       561
1997       565
1998       568
1999       568
2000       558

Debugging a Job:

- However there are complications to consider: with programs running on tens, hundreds or thousands of nodes, how do we find and examine the output of the debug statements which may be scattered across these nodes?
- We can use a debug statement to log to standard error, in conjunction with a message to update the tasks status message to prompt us to look in error log.
- Web UI makes this easy.
- We also create a custom counter to count the total number of records with implausible temperatures in the whole dataset.
- When trying to debug a job, you should always ask yourself if you can use a counter to get information you need to find out whats happening.
- If amount of log data you produce in course of debugging is large, then you have got couple of options.
First is to write information to maps output, rather than to standard error, for analysis and aggregation by the reduce.
Alternatively you can write a program in MapReduce to analyze the logs produced by your job.

We add our debugging to the mapper, as opposed to the reducer, as we want to find out what source data causing anomalous output looks like:

public class MaxTemperatureMapper
  extends Mapper<LongWritable, Text, Text, IntWritable> {
  enum Temperature {
    OVER_100
  }

  private NcdcRecordParser parser = new NcdcRecordParser();

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
   
    parser.parse(value);
    if (parser.isValidTemperature()) {
      int airTemperature = parser.getAirTemperature();
      if (airTemperature > 1000) {
        System.err.println("Temperature over 100 degrees for input: " + value);
        context.setStatus("Detected possibly corrupt record: see logs.");
        context.getCounter(Temperature.OVER_100).increment(1);
      }
      context.write(new Text(parser.getYear()), new IntWritable(airTemperature));
    }
  }
}

- When the job has finished, we can look at the value of counter we defined to see how many records over 100 degree there are in whole dataset.
- Counters are accessible via Web UI or command line:

% hadoop job -counter job_200904110811_0003 'v4.MaxTemperatureMapper$Temperature' \
  OVER_100

Handling malformed data:

Capturing input data that causes a problem is valuable, as we can use it in a test to
check that the mapper does the right thing:

  @Test
  public void parsesMalformedTemperature() throws IOException,
      InterruptedException {
    MaxTemperatureMapper mapper = new MaxTemperatureMapper();
    Text value = new Text("0335999999433181957042302005+37950+139117SAO  +0004" +
                                  // Year ^^^^
        "RJSN V02011359003150070356999999433201957010100005+353");
                              // Temperature ^^^^^
    MaxTemperatureMapper.Context context =
      mock(MaxTemperatureMapper.Context.class);
    Counter counter = mock(Counter.class);
    when(context.getCounter(MaxTemperatureMapper.Temperature.MALFORMED))
      .thenReturn(counter);
   
    mapper.map(null, value, context);
   
    verify(context, never()).write(any(Text.class), any(IntWritable.class));

    verify(counter).increment(1);
  }

Example: Mapper for maximum temperature example
public class MaxTemperatureMapper
  extends Mapper<LongWritable, Text, Text, IntWritable> {

  enum Temperature {
    MALFORMED
  }
  private NcdcRecordParser parser = new NcdcRecordParser();

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
   
    parser.parse(value);
    if (parser.isValidTemperature()) {
      int airTemperature = parser.getAirTemperature();
      context.write(new Text(parser.getYear()), new IntWritable(airTemperature));
    } else if (parser.isMalformedTemperature()) {
      System.err.println("Ignoring possibly corrupt input: " + value);
      context.getCounter(Temperature.MALFORMED).increment(1);
    }
  }
}

Hadoop Logs:

- Hadoop produces logs in various places, for various audiences.

- Default log level is INFO, so DEBUG level messages do not appear in the syslog task log file.
- There are some controls for managing retention and size of task logs. By default, logs are deleted after a minimum of 24 hrs. 
- You can also set a cap on maximum size of each logfile using the mapred.userlog.limit.kb property which is 0 by default meaning there is no cap.

Remote Debugging:

- When task fails and there is not enough information logged to diagnose the error, you may want to resort to running debugger for that task.
- There are few options:
1) Reproduce the failure locally:
- Often the failing tasks fail consistently on particular input.
- You can try to reproduce the problem locally by downloading file that task is failing on and running the job locally, possible using the debugger such as Javas VisualVM.
2) Use JVM debugging options:
- Common cause of failure is Java out of memory error in the task JVM.
- You can set mapred.child.java.opts to include -XX:-HeadDumpOnOutOfMemoryError -XX:Heap DumpPath = /path/to/dumps to produce heap dump which can be examined afterwards with tools like jhat or Eclipse Memory Analyzer.
3) Use task profiling:
- Java profilers give lot of insight into the JVM, and Hadoop provides mechanism to profile a subset of the tasks in a job.
4) Use IsolationRunner:
Older versions of Hadoop provided special task runner called IsolationRunner that could rerun failed tasks in situ on the cluster.
- You can set keep.failed.task.files to true to keep a failed tasks files.

Tuning a Job:



Profiling tasks:

- It is possible and somewhat easier to profile job running in local job runner.
- This can be valuable way of improving the performance of your mappers and reducers.
- Local job runner is very different environment from cluster, and the data flow patterns are very different.

HPROF profiler:

- There are number of configuration properties to control profiling, which are also exposed via convenience methods on JobConf.
- HPROF is a profiling tool that comes with the JDK that can give valuable information about programs CPU and heap usage:

    Configuration conf = getConf();
    conf.setBoolean("mapred.task.profile", true);
    conf.set("mapred.task.profile.params", "-agentlib:hprof=cpu=samples," +
        "heap=sites,depth=6,force=n,thread=y,verbose=n,file=%s");
    conf.set("mapred.task.profile.maps", "0-2");
    conf.set("mapred.task.profile.reduces", ""); // no reduces
    Job job = new Job(conf, "Max temperature");

- First line enables profiling, which by default is turned off.
- Instead of using mapred.task.profile you can also use JobContext.TASK_PROFILE constant in the new API.
- Next we set the profile parameters, which are extra command-line arguments to pass to tasks JVM.
- Defalt parameters specify the HPROF profiler, here we set an extra HPROF option, depth=6, to give more stack trace depth than the HPROF default. (Using JobContext.TASK_PROFILE_PARAMS is equivalent to setting the mapred.task.profile.params property).
- Finally, we specify which tasks we want to profile.
- We normally only want profile information from few tasks, so we use properties mapred.task.profile.maps and mapred.task.profile.reduces to specify the range of (map or reduce) task IDs that we want profile information for.
- We have set maps property to 0-2(default), which means maps tasks with IDs 0,1 and 2 are profiled.
- Tasks to profile can also be controlled using JobContext.NUM_MAP_PROFILES constant for map tasks, and JobContext.NUM_REDUCE_PROFILES for reduce tasks.

MapReduce Workflows:

- We have not yet considered how to turn a data processing problem into MapReduce model.
- Data processing you have seen so far is to solve faily simple problem.
- When processing becomes more complex, this complexity is generally manifested by having more MapReduce jobs, rather than having more complex map and reduce functions.
- In other words, as rule of thumb, think about adding more jobs, rather than adding complexity to jobs.

For more complex problems, it is worth considering higher level language than MapReduce, such as:
- Pig, Hive, Cascading, Cascalog or Crunch.
- One immediate benefit is that it frees you up from having to do the translation to MapReduce jobs, allowing you to concentrate on analysis you performing.

Decomposing a Problem into MapReduce jobs:

- Lets look at example of more complex problem that we want to translate into MapReduce workflow.
- We want to calculate mean maximum record temperature for every day of year and every weather station.

- How can we compute this using MapReduce?
The computation decomposes most naturally into two stages:

1) Compute the maximum daily temperature for every station -date pair.
- The MapReduce program is a variant of maximum temperature program, except that the keys in this case are composite station-date pair, rather than just year.

2) Compute the mean of maximum daily temperature for every station day month key.
- Mapper takes output from previous job(station-date, max temp) records and projects it into (station-day-month, max temp) records by dropping year comparator.
reduce function then takes the mean of maximum temperature for each station-day-month key.

O/P from first stage looks like this for the station we are interested in.

029070-99999    19010101    0
029070-99999    19020101    -94
...

- First two fields form the key, and final column is the maximum temperature from all the readings for given station and date. Second stage averages these daily maxima over years to yield.

029070-99999    0101    -68


- There is a case for splitting these into distinct mappers and chaining them into single mapper using the ChainMapper library class that comes with Hadoop. 
- Combined with ChainReducer, you can run a chain of mappers, followed by a reducer and another chain of mappers in single MapReduce job.

JobControl:

- When there is more than one job in MapReduce workflow: how do you manage jobs so they are executed in order?
- There are several approaches: main consideration is whether you have linear chain of jobs, or more complex directed acyclic graph(DAG) of jobs.
- For a linear chain, the simplest approach is to run each job one after another, waiting
until a job completes successfully before running the next:
JobClient.runJob(conf1);
JobClient.runJob(conf2);

- If the jobs fails, runJob() method will throw an IOException, so later jobs in the pipeline dont get executed.
- Depending on your applications, you might want to catch the exception and clean up any intermediate data that was produced by any previous jobs.
- For anything more complex than linear chain, there are libraries that can help orchestrate your workflow.
- Simplest is in the org.apache.hadoop.mapreduce.jobcontrol package: the JobControl class.
- An instance of JobControl represents a graph of jobs to be run.
- You add the job configurations, then tell the JobControl instance the dependencies between jobs.
- You run job control in a thread, and it runs the jobs in dependency order.
- You can poll for progress, and when the jobs have finished, you can query for all the jobs statuses and associated errors for any failures.
- If job fails, JobControl wont run its dependencies.

Apache Oozie:

- If you need to run a complex workflow, or one on tight production schedule, or you have large number of connected workflows with data dependencies between them, then a more sophisticated approach is required.
- Apache oozie(http://incubator.apache.org/oozie/) has been designed to manage executions of thousands of dependent workflows, each composed of possibly thousands of consistuent actions at the level of an Individual MapReduce job.
- Oozie has two main parts:
1) workflow engine that stores and runs workflows composed of hadoop jobs
2) coordinator engine that runs workflow jobs based on predefined schedules and data availability.
- Latter property is powerful as it allows workflow job to wait until its input data has been produced by dependent workflow; also it make rerunning failed workflows more tractable, since no time is wasted running successful parts of workflow.
- Oozie runs as service in cluster, and clients submit workflow definitions for immediate or later execution.
- In Oozie parlance, workflow is a DAG of action nodes and control flow nodes.
- Action node performs workflow tasks like moving files in HDFS, running a MapReduce, Streaming, Pig or Hive job, performing a sqoop import, or running shell script or Java program.
- Control flow node governs workflow execution between actions by allowing such constructs as conditional logic. or parallel execution.
- When the workflow completes, Oozie can make an HTTP call back to the client to inform it of the workflow status. It is also possible to receive callbacks everytime the workflow enters or exits an action node.

Defining Oozie workflow:

- Workflow definitions are written in XML using the Hadoop Process Definition Langauage, specification for which can be found on Oozie website.

Example 5-14. Oozie workflow definition to run the maximum temperature MapReduce job
<workflow-app xmlns="uri:oozie:workflow:0.1" name="max-temp-workflow">
  <start to="max-temp-mr"/>
  <action name="max-temp-mr">
    <map-reduce>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <prepare>
        <delete path="${nameNode}/user/${wf:user()}/output"/>
      </prepare>
      <configuration>
        <property>
          <name>mapred.mapper.class</name>
          <value>OldMaxTemperature$OldMaxTemperatureMapper</value>
        </property>
        <property>
          <name>mapred.combiner.class</name>
          <value>OldMaxTemperature$OldMaxTemperatureReducer</value>
        </property>
        <property>
          <name>mapred.reducer.class</name>

      <value>OldMaxTemperature$OldMaxTemperatureReducer</value>
        </property>
        <property>
          <name>mapred.output.key.class</name>
          <value>org.apache.hadoop.io.Text</value>
        </property>
        <property>

          <name>mapred.output.value.class</name>
          <value>org.apache.hadoop.io.IntWritable</value>
        </property>       
        <property>
          <name>mapred.input.dir</name>
          <value>/user/${wf:user()}/input/ncdc/micro</value>
        </property>
        <property>
          <name>mapred.output.dir</name>
          <value>/user/${wf:user()}/output</value>
        </property>
      </configuration>
    </map-reduce>
    <ok to="end"/>
    <error to="fail"/>
  </action>
  <kill name="fail">
    <message>MapReduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
  </kill>
  <end name="end"/>
</workflow-app>

- This workflow has three control flow nodes and one action node: 
start control node
map-reduce action node
kill control node
end control node
- All workflows must have one start and one end node.
- When workflow job starts it transitions to node specified by start node.
- A workflow job succeeds when it transitions to the end node.
- If workflow job transitions to kill node, then it is considered to have failed and reports an error message as specified by the message element in workflow definition.
-  Oozie provides a set of functions for interacting with the
workflow; ${wf:user()}, for example, returns the name of the user who started the
current workflow job, and we use it to specify the correct filesystem path. 

Packaging and deploying an Oozie workflow application

- A workflow application is made up of the workflow definition plus all associated resources(MapReduce JAR filesm Pig Scripts, and so on), needed to run it.
- Applications must adhere to simple directory structure and are deployed to HDFS so they can be accessed by Oozie.
- max-temp-workflow/
├──
lib/
│   └── hadoop-examples.jar
└──
workflow.xml

- JAR files are placed in lib directory
- workflow.xml is in top level of this directory.
Once application has been built, it should be copied to HDFS using regular Hadoop tools.

% hadoop fs -put hadoop-examples/target/max-temp-workflow max-temp-workflow

Running an Oozie workflow job:

% export OOZIE_URL="http://localhost:11000/oozie"

- type oozie help to get list of commands but we are going to call job subcommand with the -run option to run the workflow job:

% oozie job -config ch05/src/main/resources/max-temp-workflow.properties -run
job: 0000009-120119174508294-oozie-tom-W

To get status of the workflow job we use the -info option using the job ID that was printed by the run command earlier.

% oozie job -info 0000009-120119174508294-oozie-tom-W

- Output shows the status: RUNNING,KILLED or SUCCEEDED.
- You can find all this information via Oozies web UI too, available at http://localhost:11000/oozie.

When the job has succeeded we can inspect the results in usual way:

% hadoop fs -cat output/part-*
1949    111
1950    22


No comments:

Post a Comment