Wednesday, May 14, 2014

HDFS (Hadoop Distributed File System)


3: HDFS

- When data set outgrows the storage capacity of single physical machine, it becomes necessary to
partition it across number of seperate machines.
- Filesystems that manage the storage across number of machines are called distributed file systems.
- One of the biggest challenges is making the file system tolerate node failure without suffering data loss.

Design of HDFS:

- HDFS is file system designed for storing very large files with streaming data access patterns, running on clusters of commodity hardware.
- Very large file: files that are hundreds of megabytes, gigabytes, or terabytes in size. There are Hadoop clusters running today that store petabytes of data.
- Streaming data access:
HDFS is built with idea that most efficient data processing pattern is a write-once, read many times pattern.
Data set is generated or copied from source, then various analysis are performed on that data set over time.
Each analysis will involve a large proportion of data set
So time to read whole data set is more important than latency in reading the first record.
- Commodity hardware:
Hadoop does not require expensive, highly reliable hardware to run on.
It is designed to run on clusters of commodity hardware.
Chance of node failure across cluster is high at least for large clusters.
HDFS is designed to carry on without the noticeable interruption to the user in the face of such failure.

There are applications where Hadoop does not work so well:
- Low latency data access:
Applications requiring low latency access to data, tens of milliseconds range, will not work well with HDFS.
HBase is better choice for low latency access.
- Lots of small files:
Since namenode holds filesystem metadata in the memory, the limit to the number of files in the file system is governed by amount of memory in the namenode.
As a rule of thumb, each file, directory and block takes about 150 bytes.
If you had one million files, each taking one block, you would need at least 300MB of memory.
While storing millions of files is feasible, billions is beyond capability of current hardware.
- Multiple writers, arbitrary file modifications
Files in HDFS may be written to by a single writer.
Writes are always made at the end of the file.
There is no support for multiple writers, or for modifications at arbitrary offsets in the file.

HDFS concepts:

Blocks:

- A disk has a block size, which is the minimum amount of data that it can read or write.
- Filesystem blocks are typically few Kb in size, while disk blocks are normally 512 bytes.
-  HDFS has concept of a block, but it is much larger unit - 64MB by default.
-  Like in filesystem for single disk, files in HDFS are broken into block sized chunks,
which are stored as independent units.

Why is block in HDFS so large?

- HDFS blocks are large compared to disk blocks, and the reason is to 
minimize the cost of seeks.
- By making block large enough, time to transfer data from disk can be made
significantly large that the time to seek to start of block.
- Thus time to transfer large file made of multiple blocks operates at disk 
transfer rate.
- Map tasks in MapReduce normally operate on one block at a time, so if you
have too few tasks, your jobs will run slower than they could otherwise.

Having block abstraction for DFS brings several benefits.
- file can be larger than any single disk in the network.
theres nothing that requires the blocks from file to be stored on same disk,
so they can take advantage of any disks in the cluster.
it would be possible to store single file on HDFS cluster whose blocks filled
all the disks in the cluster.
- Making unit of abstraction block rather than file simplifies the storage 
subsystem.
Storage subsystem deals with blocks, simplifying storage management and
eliminating metadata concerns.
- Block fit well with replication for providing fault tolerance and availability.
To insure against corrupted blocks and disk and machine failure, each block is 
replicated to small number of physically seperate machines.
If block becomes unavailable, copy can be read from another location in way
that its transparent to client.
Block that is no longer available due to corruption or machine failure can be replicated
from its alternative locations to other live machines to bring replication factor
back to normal level.

% hadoop fsck /-files  -blocks

will list the blocks that make up each file in the filesystem.

Namenodes and Datanodes:

- HDFS has two types of node operating in master-worker pattern:
- namenode(master)
- datanode(workers).
- namenode manages the file system namespace. It maintains the file system tree
and the metadata for all files and directories in the tree. This information is stored
persistently on local disk in form of two files: 
- namespace image and edit log.
Namenode also knows datanodes on which all the blocks for given file are
located., although it does not store block locations persistently as this 
information is reconstructed from datanodes when system starts.
- Client accesses filesystem on behalf of user by communicating with the 
namenode and datanodes.
- Datanodes are workhorses of the filesystem.
They store and retreive blocks when they are told to, and they report 
back to the namenode periodically with lists of blocks that they are storing.
- Without namenode, the filesystem cannot be used.
- It is important to make the namenode resilient to failure, and adoop 
provides two mechanism for this.
1) back up the files that make up the persistent state of filesystem metadata.
Hadoop can be configured so that namenode writes its persistent state to
multiple filesystems. These writes are sychronous and atomic. Usual config
is to write to the local disk as well as remote NFS mount.
- It is also possible to run secondary namenode. which despite its name does 
not act as a namenode. Its main role is to periodically merge the namespace
image with the edit log to prevent edit log from becoming too large. Secondary
namenode runs on separate physical machine, as it requires plenty of CPU and 
as much memory as namenode to perform the merge. It keeps copy of the merged
namespace image, which can be used in event of namenode failing. However, state
of the secondary namenode lags that of primary, so in event of total failure of primary,
data loss is almost certain.
Usual course of action in this case is to copy the namenodes metadata files
that are on NFS to seconday and run it as the new primary.

HDFS federation:

- HDFS federation introduced in 0.23 release series, allows cluster to scale by
adding namenodes, each of which manages portion of filesystem namespace.
- One namenode might manage all files rooter under /user.
- 2nd namenode might handle files under /share.
- Under federation, each namenode manages namespace volume, which is 
made up of metadata for the namespace and the block pool containing all
the blocks for the files in the namespaces.
- Namespace volumes are independent of each other which means namenode
do not communicate with one another and also failure of one namenode does not
affect the availability of the namespaces managed by other namenodes.
- Block pool storage is not partitioned, so datanodes register with each
namenode in cluster and store blocks from multiple block pools.

HDFS High Availability:

- To recover from failed namenode, admin starts a new primary namenode with one
of the filesystem metadata replicas, and configures datanodes and clients to use this
new namenode.
- The new name node cannot serve requests until:
1) It has loaded its namespace image into memory
2) Replayed its edit log
3) Received enough block reports from datanodes to leave safe mode.
On large cluster with many files and blocks, time it takes for namenode to start
from cold can be 30 minutes or more.
Long recovery time is problem for routine maintainence too.
- 0.23 release series of Hadoop remedies this situation by adding support
for HDFS high availability(HA).
In this implementation, there is pair of namenodes in an active-standby configuration.
In event of failure of active namenode, standby takes over its duties to continue
servicing client requests without significant interruption.
Few architectural changes needed to allow this to happen:
- Namenodes must use highly available shared storage to share edit log. Initial implementation 
of HA will require an NFS filer, but in future releases more options provided, such as 
BookKeeper-based system built on Zoo-Keeper.
When standby namenode comes up it reads up to the end of the shared
edit log to synchronize its state with active namenode.
- Datanodes must send block reports to both namenodes since the block mappings
are stored in namenodes memory and not on disk.
- Clients must be configured to handle namenode failover, which
uses mechanism that is transparent to users.
If the active namenode fails, then the standby can take over very quickly since it has 
the latest state available in memory: both the latest edit log entries, and up to date
block mapping.
- In the unlikely event of standby being down when active fails, the
admin can still start the standby from cold.

Failover and fencing:

- Transition from active namenode to standby is managed by new entity in the system
called failover controller.
- Failover controllers are pluggable, but first implementation uses
ZooKeeper to ensure that only one namenode is active.
- Each namenode runs a lightweight failover controller process whose job
is to monitor its namenode for failures and trigger a failover should a namenode fail.
- Failover can also be initiated by an administrator, in case of routine maintainence.
- This is known as graceful failover, since failover controller arranges an orderly transition
for both namenodes to switch roles.

- In case of an ungraceful failover, however it is impossible to be sure that the failed
namenode has stopped running. For ex, slow network or network partition can trigger
a failover transition, even though the previously active namenode is still running, and thinks
it is still the active namenode.
HA implementation goes to great lengths to ensure that previously active namenode is prevented
from doing any damage and causing corruption- method known as fencing.
System employs range of fencing mechanism, including killing namenodes process, revoking its acces
to shared storage directory. and disabling its network port via remote management command.
- Previously active namenode can be fenced with technique rather graphically known as STONITH, or
Shoot the other node in the head, which uses specialized power distribution unit to forcibly power down
the host machine.

Command Line Interface:

- We are going to have look at the HDFS by interacting with it from the command line.
There are many other interfaces to HDFS, but command line is one of the simplest and
to many developers, the most familiar.
- We are going to run HDFS on one machine, so first follow instructions 
for setting up Hadoop in pseudo distributed mode.
- Later we will see how to run cluster of machines to give us 
scalability and fault tolerance.
- There are two properties that we set in pseudo distributed configuration that deserve further explanation.
- First is fs.default.name, set to hdfs://locahost/ which is used to set default filesystem for Hadoop.
- Filesystem are specified by URI and here we used an hdfs URI to configure Hadoop to use HDFS 
by default.
- HDFS deamons will use this property to determine the host and port of the HDFS namenode.
- We will be running it on localhost, on default HDFS port, 8020.
- We set the second property, dfs.replication, to 1 so that HDFS does not replicate
filesystem blocks by default factor of three.
- While running with single datanode, HDFS cant replicate blocks to
three datanodes, so it would perpetually warn about blocks being 
under replicated. This setting solves that problem.

Basic Filesystem Operations:

- Filesystem is ready to be used, and we can do all of the usual 
filesystem operations such as read files, creating directories, moving files,
deleting data, and listing directories.
- You can use hadoop fs -help to get detailed help on every command.

Start by copying file from local file system to HDFS:

- %hadoop fs -copyFromLocal input/docs/quangle.txt hdfs://locahost/user/tom/quangle.txt

The above command invokes Hadoops filesystem shell command fs, which supports a number
of subcommands - in this case, we are running -copyFromLocal.
- We can omit the scheme and host of the URI and picked up default,
hdfs://localhost, as specified in core-site.xml.

- %hadoop fs -copyFromLocal input/docs/quangle.txt  /user/tom/quangle.txt

We could also have used:

- % hadoop fs -copyFromLocal input/docs/quangle.txt quangle.txt

Let’s copy the file back to the local filesystem and check whether it’s the same:
- % hadoop fs -copyToLocal quangle.txt quangle.copy.txt
- % md5 input/docs/quangle.txt quangle.copy.txt
MD5 (input/docs/quangle.txt) = a16f231da6b05e2ba7a339320e7dacd9
MD5 (quangle.copy.txt) = a16f231da6b05e2ba7a339320e7dacd9

MD5 digests are the same, showing that the file survived its trip to HDFS
and is back intact.

Lets look at an HDFS file listing.
- We create a directory first just to see how it is displayed in listing:
% hadoop fs -mkdir books
% hadoop fs -ls .
Found 2 items
drwxr-xr-x   - tom supergroup          0 2009-04-02 22:41 /user/tom/books
-rw-r--r--   1 tom supergroup        118 2009-04-02 22:29 /user/tom/quangle.txt

- First column shows the file node.
- 2nd column: replication factor of file. Remember we set the default replication factor in site wide configuration to be 1, which is why we see same here. Entry in this column is empty for directories since the concept of replication does not apply to them - directories are treated as metadata and stored by namenode, not datanodes.
- 3rd column: file owner
- 4th column: group
- 5th column: size of file in bytes or zero for directories.
- 6th column: last modified date 
- 7th column: last modified time
- 8th column: absolute name of the file or directory.

File permission in HDFS:

- Three types of permission:
r: read permission
w: write permission
x: execute permission(ignored for file since you cannot execute file on HDFS)

Hadoop Filesystems:

- Hadoop has an abstract notion of filesystem, of which HDFS is just one implementation.
- Java abstract class org.apache.hadoop.fs.FileSystem represents filesystem in Hadoop.


Hadoop provides many interfaces to its filesystems, and it generally uses the URI schema to pick the correct filesystem instance to communicate with.

- To list the files in root directory of local filesystem, type:

% hadoop fs -ls file:///

Interfaces:

- Hadoop is written in Java and all Hadoop filesystem interactions are mediated through Java API.
- Filesystem shell, for ex, is a Java application that uses the Java FileSystem class to provide filesystem operations.
- Other filesystem interfaces are discussed briefly in this section.
- These interfaces are most commonly used with HDFS. since other filesystems in Hadoop have existing tools to access the underlying filesystem(FTP clients for FTP, S3 tools for S3 etc), many of them will work with any Hadoop filesystem.

HTTP:

- There are two ways of accessing HDFS over HTTP:
- directly where the HDFS deamons serve HTTP requests to clients
- and via proxy(or proxies) which accesses HDFS on clients behalf using the 
usual DistributedFileSystem API.

- In the first case, directory listings are served by the namenodes embedded web server(which runs on port 50070) formatted in XML or JSON, while file data is streamed from datanodes by their web servers(running on port 50075).
- Original direct HTTP interface(HFTP and HSFTP) was read only, while the new WebHDFS implementation supports all filesystem operations, including Kerberos authentication. WebHDFS must be enabled by setting dfs.webhdfs.enabled to true, for you to be able to use webhdfs URIs.

- The second way of accessing HDFS over HTTP relies on one or more standalone proxy servers. All traffic to cluster passes through proxy. This allows for stricter firewall and BW limiting policies to be put in place.
Its common to use proxy for transfers between hadoop clusters located in different data centers.
- Original HDFS proxy was read only, and could be accessed by clients using HSFTP FileSystem implementation. For release 0.23, there is new proxy called HttpFS that has read and write capabilities and which exposes the same HTTP interface as WebHDFS, so clients can access either using webhdfs URIs.

C:

- Hadoop provides the C library called libhdfs that mirrors the Java FileSystem interface. It works using Java Native Interface (JNI) to call the Java filesystem client.
- C API does not support new features.

FUSE:

- Filesystem in Userspace(FUSE) allows filesystems that are implemented in user space to be integrated as UNIX filesystem.
- Hadoops Fuse -DFS contrib module allows any Hadoop Filesystem to be mounted as standard filesystem.


Java Interface:

Reading data from Hadoop URL:

- One of the simplest ways to read the file from Hadoop filesystem is by using a java.net.URL object to open a stream to read data from.

InputStream in = null;
try {
  in = new URL("hdfs://host/path").openStream();
  // process in
} finally {
  IOUtils.closeStream(in);

Example 3-1. Displaying files from a Hadoop filesystem on standard output using a
URLStreamHandler

public class URLCat {
  static {
    URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
  }

  public static void main(String[] args) throws Exception {
    InputStream in = null;
    try {
      in = new URL(args[0]).openStream();
      IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
      IOUtils.closeStream(in);
    }
  }
}

Here’s a sample run:

% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

Reading data using FileSystem API:

- FileSystem is a general filesystem API, so first step is to retrieve an instance for the filesystem we want to use - HDFS in this case. 
- There are several static factory methods for getting FileSystem instance:

public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user) throws IOException

In some cases, you may want to retrieve a local filesystem instance, in which case you
can use the convenience method, getLocal():
public static LocalFileSystem getLocal(Configuration conf) throws IOException

Example 3-2. Displaying files from a Hadoop filesystem on standard output by using the FileSystem
directly
public class FileSystemCat {
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    InputStream in = null;
    try {
      in = fs.open(new Path(uri));
      IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
      IOUtils.closeStream(in);
    }
  }
}
The program runs as follows:
% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

FSDataInputStream:

- This class is specialization of java.io.DataInputStream with support for random access, so you can read from any part of the stream.
- package org.apache.hadoop.fs;
public class FSDataInputStream extends DataInputStream
    implements Seekable, PositionedReadable {
  // implementation elided
}

- Seekable interface permits seeking to position in the file and query method for current offset from start of file

public interface Seekable {
  void seek(long pos) throws IOException;
  long getPos() throws IOException;
}

- Example 3-3. Displaying files from a Hadoop filesystem on standard output twice, by using seek
public class FileSystemDoubleCat {
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    FSDataInputStream in = null;
    try {
      in = fs.open(new Path(uri));
      IOUtils.copyBytes(in, System.out, 4096, false);
      in.seek(0); // go back to the start of the file
      IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
      IOUtils.closeStream(in);
    }
  }
}
Here’s the result of running it on a small file:
% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

FSDataInputStream of a file at a given offset:
public interface PositionedReadable {
  public int read(long position, byte[] buffer, int offset, int length)
    throws IOException;

  public void readFully(long position, byte[] buffer, int offset, int length)
    throws IOException;

  public void readFully(long position, byte[] buffer) throws IOException;
}

Writing Data:

- Filesystem class has number of methods for creating a file.
- Simplest is the method that takes Path object for file to be created and returns an output stream to write to:
public FSDataOutputStream create(Path f) throws IOException
- create() method create any parent directories of the file to be written that dont really exist.
There is also an overloaded method for passing callback interface, Progressable, so your application can be notified of progress of data being written to the datanodes:

- package org.apache.hadoop.util;
public interface Progressable {
  public void progress();
}
- As an alternative to creating new file, you can append to existing file using the append() method:
public FSDataOutputStream append(Path f) throws IOException

Example below shows how to copy a local file to a Hadoop filesystem. 

Example: Copying a local file to a Hadoop filesystem
public class FileCopyWithProgress {
  public static void main(String[] args) throws Exception {
    String localSrc = args[0];
    String dst = args[1];
   
    InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
   
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(dst), conf);
    OutputStream out = fs.create(new Path(dst), new Progressable() {
      public void progress() {
        System.out.print(".");
      }
    });
   
    IOUtils.copyBytes(in, out, 4096, true);
  }
}

% hadoop FileCopyWithProgress input/docs/1400-8.txt hdfs://localhost/user/tom/
      1400-8.txt
...............

FSDataOutputStream:

- create() method on FileSystem returns an FSDataOutputStream which has a method for querying current position in the file:

package org.apache.hadoop.fs;

public class FSDataOutputStream extends DataOutputStream implements Syncable {
  public long getPos() throws IOException {
    // implementation elided
  }
- FSDataOutputStream does not permit seeking.
- This is because HDFS allows only sequential writes to an open file or appends to an already written file.
- There is no support for writing anywhere other than end of the file, so there is no value in being able to seek while writing.

Directories:

- FileSystem provides a method to create directory:
- public boolean mkdirs(Path f) throws IOException

Querying the FileSystem:

File metadata: FileStatus

public class ShowFileStatusTest {
  private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing
  private FileSystem fs;
  @Before
  public void setUp() throws IOException {
    Configuration conf = new Configuration();
    if (System.getProperty("test.build.data") == null) {
      System.setProperty("test.build.data", "/tmp");

  }
    cluster = new MiniDFSCluster(conf, 1, true, null);
    fs = cluster.getFileSystem();
    OutputStream out = fs.create(new Path("/dir/file"));
    out.write("content".getBytes("UTF-8"));
    out.close();
  }
  @After
  public void tearDown() throws IOException {
    if (fs != null) { fs.close(); }
    if (cluster != null) { cluster.shutdown(); }
  }
  @Test(expected = FileNotFoundException.class)
  public void throwsFileNotFoundForNonExistentFile() throws IOException {
    fs.getFileStatus(new Path("no-such-file"));
  }

 @Test
  public void fileStatusForFile() throws IOException {
    Path file = new Path("/dir/file");
    FileStatus stat = fs.getFileStatus(file);
    assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
    assertThat(stat.isDir(), is(false));
    assertThat(stat.getLen(), is(7L));
    assertThat(stat.getModificationTime(),
        is(lessThanOrEqualTo(System.currentTimeMillis())));
    assertThat(stat.getReplication(), is((short) 1));
    assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));
    assertThat(stat.getOwner(), is("tom"));
    assertThat(stat.getGroup(), is("supergroup"));
    assertThat(stat.getPermission().toString(), is("rw-r--r--"));
  }
  @Test
  public void fileStatusForDirectory() throws IOException {
    Path dir = new Path("/dir");
    FileStatus stat = fs.getFileStatus(dir);
    assertThat(stat.getPath().toUri().getPath(), is("/dir"));
    assertThat(stat.isDir(), is(true));
    assertThat(stat.getLen(), is(0L));
    assertThat(stat.getModificationTime(),
        is(lessThanOrEqualTo(System.currentTimeMillis())));
    assertThat(stat.getReplication(), is((short) 0));
    assertThat(stat.getBlockSize(), is(0L));
    assertThat(stat.getOwner(), is("tom"));
    assertThat(stat.getGroup(), is("supergroup"));
    assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
  }
}

- If no file or directory exists, FileNotFoundException is thrown.
- If you are interested only in the existence of file or directory, then the exists() method on FileSystem is more convenient.

public boolean exists(Path f) throws IOException


Listing files:

- Finding information on single file or directory is useful, but you also need to be able to list the contents of directory.
- Thats what FileSystems listStatus() methods are for:

public FileStatus[] listStatus(Path f) throws IOException
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException
public FileStatus[] listStatus(Path[] files) throws IOException
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException

Example: Showing the file statuses for a collection of paths in a Hadoop filesystem
public class ListStatus {
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
   
    Path[] paths = new Path[args.length];
    for (int i = 0; i < paths.length; i++) {
      paths[i] = new Path(args[i]);
    }
   
    FileStatus[] status = fs.listStatus(paths);
    Path[] listedPaths = FileUtil.stat2Paths(status);
    for (Path p : listedPaths) {
      System.out.println(p);
    }
  }
}

We can use this program to find the union of directory listings for a collection of paths:
% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom
hdfs://localhost/user
hdfs://localhost/user/tom/books
hdfs://localhost/user/tom/quangle.txt

File Patterns:

- It is common requirement to process sets of files in single operation.
- It is convenient to use wildcard characters to match multiple files with single
expression, an operation that is known as globbing.
- Hadoop provides two FileSystem method for processing globs:
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException

- globStatus() method returns an array of FileStatus objects whose paths match supplied pattern, sorted by path.

PathFilter:

-Glob patterns are not always powerful enough to describe the set of files you want to access.
- It is not generally possible to exclude particular file using glob pattern.
- listStatus() and globStatus() methods of FileSystem take an optional PathFilter, which allows programmatic control over matching:

package org.apache.hadoop.fs;
public interface PathFilter {
  boolean accept(Path path);
}

- PathFilter is equivalent to java.io.FileFilter for Path objects rather than File Objects.

Example: A PathFilter for excluding paths that match a regular expression
public class RegexExcludePathFilter implements PathFilter {
  private final String regex;
  public RegexExcludePathFilter(String regex) {
    this.regex = regex;
  }
  public boolean accept(Path path) {
    return !path.toString().matches(regex);
  }
}

- Filter passes only files that dont match the regular expression.
- We use the filter in conjunction with glob that picks out an initial set of files to include: filter is used to refine the results. For Ex:
fs.globStatus(new Path("/2007/*/*"), new RegexExcludeFilter("^.*/2007/12/31$"))
will expand to /2007/12/30.

- Filters can only act on files name, as represented by a Path.
- They cant use files properties, such as creation time, as basis of filter.
- If you store files in a directory structure that is laid out by date, then you can write a PathFilter to pick out files that fall in given date range.

Deleting Data:

- Use the delete() method on FileSystem to premanently remove files or directories:
public boolean delete(Path f, boolean recursive) throws IOException

- If f is a file or an empty directory, then the value of recursive is ignored.
- Nonempty directory is only deleted, along with its contents, if recursive is true(otherwise an IOException is thrown).

Data flow:

Anatomy of a File Read:

-
- Client opens the file it wishes to read by calling open() on the FileSystem object., which for HDFS is an instance of DistributedFileSystem.
- DFS called the NameNode using RPC to determine locations of blocks for first few blocks in the file.
For each block namenode returns the addresses of the datanode that have copy of that block. Furthermore datanodes are sorted according to their proximity to client. If client is itself a datanode, then it will read from local datanode, if it hosts a copy of a block.
DFS returns an FSDataInputStream to the client for it to read data from.  FSDataInputStream in turn wraps a DFSInputStream which manages the datanode and namenode I/O.
- Client then calls read() on the stream. DFSInputStream which has stored datanode addresses for first few blocks in the file, then connects to the first datanode for first block in the file.
- Data is streamed from datanode back to client which calls read() repeatedly on the stream.
- When end of block is reached, DFSInputStream will close the connection to the datanode, then find the best datanode for next block. This happens transparently to the client, which from its point of view is just reading continuous stream. Blocks are read in order with DFSInputStream opening new connections to datanodes as client reads through the stream. It will also call the namenode to retreive the datanode locations for the next batch of blocks as needed. 
- When the client has finished reading, it calls close() on the FSDataInputStrean

During reading if the DFSInputStream encounters an error while communicating with the datanode, then it will try next closest one for that block.
It will also remember the datanodes that have failed so that it does not needlessly retry them for later blocks.
DFSInputStream also verifies checksums for data transferred to it from the datanode.
If corrupted block is found, it is reported to namenode before the DFSInputStream attempts to read replica of the block from another datanode.

Important aspect of this design is that client contacts datanodes directly to retreive data and is guided by namenode to best datanode for each block.
-This allows HDFS to scale to large number of concurrent clients, since the data traffic is spread across all datanodes in the cluster. Namenode meanwhile merely has to service block location requests and does not serve data, which would quickly become bottleneck as number of clients grew.

Two nodes close to each other?
- idea is to use bandwidth between two nodes as measure of distance.
- Rather than measuring bandwidth between nodes, which can be difficult to do in practice, Hadoop takes a simple approach in which the network is represented as a tree and distance between two nodes is sum of their distance to their common ancestor.
- Levels in the tree are not predefined, but it is common to have levels that correspond to the data center, the rack, and node that process is running on.
- idea is that BW available for each of following scenarios becomes progressively less:
Processes on the same node
Different nodes on same rack
Nodes on different racks in same data center
Nodes in different data centers

For ex, imagine a node: n1, rack: r1 in data center: d1.
This can be represented as /d1/r1/n1. Using this notation, here are the distances for four scenarios:

- distance(/d1/r1/n1, /d1/r1/n1) = 0(processes on same node)
- distance(/d1/r1/n1, /d1/r1/n2) = 2(different processes on same rack)
- distance(/d1/r1/n1, /d1/r2/n3) = 4(nodes on different racks in same data center)
- distance(/d1/r1/n1, /d2/r3/n4) = 6(nodes in different data center)

Anatomy of file write:


- Client creates a file by calling create() on DFS
- DFS makes a RPC call to namenode to create new file in filesystem namespace, with no blocks associated with it.
Namenode performs various checks to make sure the file does not already exist, and that the client has the right permissions to create file.
If these checks pass, namenode makes a record of the new file, otherwise, file creation fails and the client is thrown and IOException.
DFS returns an FSDataOutputStream for client to start writing data to.
Just as in read case, FSDataOutputStream wraps a DFSOutputStream, which handles communication with datanodes and namenode.
- As client writes data, DFSOutputStream splits it into packets, which it writes to an internal queue, called the data queue. Data queue is consumed by data streamer, whose responsibility is to ask namenode to allocate new blocks by picking list of suitable datanodes to store replicas.
List of data nodes form a pipeline- we assume replication level is 3, so there are three nodes in pipeline.
Datastreamer streams the packet to first datanode in pipeline, which stores packet and forwards to second datanode in pipeline.
- Second datanode stores the packet and forwards it to the third datanode in the pipeline.
DFSOutputStream also maintains internal queue of packets that are waiting to be acknowledged by datanodes called the ack queue.
- Packet is moved from ack queue only when it has been acknowledged by all data nodes in the pipeline.

If data node fails while data is being written to it?
- pipeline is closed and any packets in the ack queue are added to the front of the data queue so that datanodes that are downstream from failed node will not miss any packets.
- current block on new data node is given new identity, which is communicated to namenode, so that the partial block on the failed datanode will be deleted if the failed datanode recovers later on.
- failed datanode is removed from pipeline and the remainder of blocks data is written to two good datanodes in the pipeline.
- namenode notices that the block is under replicated, and it arranges for further replica to be created on another node. Subsequent blocks are then treated as normal.

It is possible that multiple datanodes fail while block is being written.
As long as dfs.replication.min replicas(default one) are written, the write will succeed, and the block will be asynchronously replicated across the cluster until its target replication factor is reached.(dfs.replication defaults to three).

- When  the client has finished writing data, it calls close() on the stream.
- This action flushed all remaining packets to datanode pipeline and waits for acknowledgements before contacting namenode to signal that file is complete.
- Namenode already knows which block the file is made up of(via Data Streamer asking for block allocations), so it only has to wait for blocks to be minimally replicated before returning successfully.

Replica placement:

How does the namenode choose which datanodes to store replicas on?

- Hadoops default strategy is to place the first replica on same node as the client (for client running outside the cluster, node is chosen at random, although the system tries not to pick nodes that are too full or busy.).
- second replica is placed on different rack from first(off rack) chosen at random.
- third replica is placed on same rack as second, but on different node chosen at random
- further replicas are placed on random nodes on the cluster, although the system tried to avoid placing too many replicas on same rack.
- Once replicas are chosen, pipeline is built taking network topology into account.

Coherency Model:

- A coherency model for a filesystem describes the data visibility of reads and writes for
a file. 
- After creating a file, it is visible in the filesystem namespace, as expected:
Path p = new Path("p");
fs.create(p);
assertThat(fs.exists(p), is(true));
- However, any content written to the file is not guaranteed to be visible, even if the
stream is flushed. So the file appears to have a length of zero:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));
- Once more than a block’s worth of data has been written, the first block will be visible
to new readers. This is true of subsequent blocks, too: it is always the current block
being written that is not visible to other readers.
- HDFS provides a method for forcing all buffers to be synchronized to the datanodes
via the sync() method on FSDataOutputStream. 
- After a successful return from sync(),
HDFS guarantees that the data written up to that point in the file is persisted and visible
to all new readers:

Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
- This behavior is similar to the fsync system call in POSIX that commits buffered data
for a file descriptor. For example, using the standard Java API to write a local file, we
are guaranteed to see the content after flushing the stream and synchronizing:
FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // flush to operating system
out.getFD().sync(); // sync to disk
assertThat(localFile.length(), is(((long) "content".length())));
- Closing a file in HDFS performs an implicit sync(), too:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

Consequences of application design:

- With no calls to sync(), you should be prepared to lose up to block of data in event of client of system failure.
- For many applications, this is unacceptable, so you should call sync() at suitable points, such as after writing certain number of records or number of bytes.

Parallel Copying with distcp

- HDFS access patterns we saw so far focus on single threaded access.
- It is possible to act on collection of files, by specifying file globs.
- Hadoop comes with useful program called distcp for copying large amounts of data to and from Hadoop filesystems in parallel.
- Canonical use case for distcp is for transferring data between two HDFS clusters.
- If clusters are running identical versions of hadoop, HDFS scheme is appropriate:

% hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar

- This will copy /foo directory and its contents from first cluster to /bar directory on second cluster.
- So second directory comes up with the directory structure /bar/foo/. If /bar does not exist, it will be created first.
- By default, distcp will skip files that already exist in the destination, but they can be overwritten by supplying the -overwrite option.
- You can also update only files that have changed using the -update option.
- If we changed the file in the /foo subtree on first cluster from previous example, then we could sycnchronize the change with second cluster by running:

% hadoop distcp -update hdfs://namenode1/foo hdfs://namenode2/bar/foo

- distcp is implemented as MapReduce job where the work of copying is done by the maps that run in parallel across the cluster. There are no reducers.
- Each map copies atleast 256MB. For ex, 1GB of files will be given four map tasks. When the data size is very large, it becomes necessary to limit number of maps in order to limit bandwidth and cluster utilization.
- By default, maximum number of maps is 20 per(tasktracker) cluster node.
- For ex, copying 1000GB of files to a 100-node cluster will allocate 2000 maps(20 per node), so each will copy 512 MB on average.
- This can be reduced by specifying the -m argument to distcp. 
- For ex, -m 1000 would allocate 1000 maps, each copying 1GB on average.

- If we use distcp between two HDFS clusters that are running different versions, copy will fail if you use HDFS protocol, since RPC systems are incompatible.
- To remedy this, you can use read only HTTP based HFTP filesystem to read from the source.
- The job must run on the destination cluster so that the HDFS RPC versions are compatible.

% hadoop distcp hftp://namenode1:50070/foo  hdfs://namenode2/bar

- Using the newere webhdfs protocol. it is possible to use HTTP for both the source and destination clusters without hitting any wire in incompatibility problems:

% hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/bar

Keeping an HDFS Cluster Balanced:

- It is not possible to prevent cluster from becoming unbalanced.
- Perhaps you want to limit number of maps so that number of nodes can be used by other jobs.
- In this case, you can use balanced tool to subsequently even out the block distribution across the cluster.

Hadoop Archives:

- HDFS stores small files inefficiently, since each file is stored in a block, and block metadata is held in memory by the namenode.
- This, large number of small files can eat lot of memory on the namenode.
- Hadoop archives or HAR files, are file archiving facility that packs files into HDFS blocks more efficiently, thereby reducing namenode memory usage while still allowing transparent access to files.
- In particular, Hadoop archives can be used as input to MapReduce.

Using Hadoop Archives:

- is created from collection of files using the archive tool.
- tool runs a MapReduce job to process input files in parallel, so to run it, you need MapReduce cluster running to use it.
- Here are some files in HDFS that we would like to archive:

% hadoop fs - lsr /my/files
-rw-r--r--   1 tom supergroup          1 2009-04-09 19:13 /my/files/a
drwxr-xr-x   - tom supergroup          0 2009-04-09 19:13 /my/files/dir
-rw-r--r--   1 tom supergroup          1 2009-04-09 19:13 /my/files/dir/b

Now we can run archive command:

% hadoop archive -archivename files.har /my/files /my

Lets see what the archive has created:

% hadoop fs -ls /my
Found 2 items
drwxr-xr-x   - tom supergroup          0 2009-04-09 19:13 /my/files
drwxr-xr-x   - tom supergroup          0 2009-04-09 19:13 /my/files.har
% hadoop fs -ls /my/files.har
Found 3 items
-rw-r--r--  10 tom supergroup        165 2009-04-09 19:13 /my/files.har/_index
-rw-r--r--  10 tom supergroup         23 2009-04-09 19:13 /my/files.har/_masterindex
-rw-r--r--   1 tom supergroup          2 2009-04-09 19:13 /my/files.har/part-0

- The directory listing shows what a HAR file is made of: 
- two index files and collection of part files - just one in this example.
- The part files contain the contents of number of original files concatenated together.
- indexes make is possible to look up the part file that an archived file is contained in, and its offset and length.
- All the details are hidden from the application, however uses har URI scheme to interact with HAR files, using HAR filesystem that is layered on top of the underlying filesystem.
- The following command recursively lists the files in the archive:

% hadoop fs -lsr har:///my/files.har
drw-r--r--   - tom supergroup        0 2009-04-09 19:13 /my/files.har/my
drw-r--r--   - tom supergroup        0 2009-04-09 19:13 /my/files.har/my/files
-rw-r--r--  10 tom supergroup        1 2009-04-09 19:13 /my/files.har/my/files/a
drw-r--r--   - tom supergroup        0 2009-04-09 19:13 /my/files.har/my/files/dir
-rw-r--r--  10 tom supergroup        1 2009-04-09 19:13 /my/files.har/my/files/dir/b

On the other hand, if you want to refer to a HAR file on a different filesystem,
then you need to use a different form of the path URI to normal. These two commands
have the same effect, for example:
% hadoop fs -lsr har:///my/files.har/my/files/dir
% hadoop fs -lsr har://hdfs-localhost:8020/my/files.har/my/files/dir

To delete HAR file, you need to use recursive form of delete, since from underlying filesystems point of view the HAR file is a directory:

% hadoop fs -rmr /my/files.har

Limitations:

- Creating an archive creates a copy of the original files, so you need as much disk space as the files you are archiving to create the archive.
- There is currently no support for archive compression, although the files that go into the archive can be compressed.
- Archives are immutable once they have been created.
- To add or remove files, you must recreate the archive.

Hadoop Operations: Chapter 2

- First half of Apache Hadoop is a filesystem called Hadoop Distributed Filesystem or simply HDFS.
- HDFS is built to support high throughput, streaming reads and writes of extremely large files.

Goals of HDFS:

- Store millions of large files, each greater than tens of GBs, and filesystem size reaching tens of petabytes.
- Use a scale out model based on inexpensive commodity servers with internal JBOD(Just bunch of disks) rather than RAID to achieve large scale storage. Accomplish availability and high throughput through application level replication of data.
- Optimize for large, streaming reads and writes rather than low latency access to many small files.
Batch performance is more important than interactive response times.
- Gracefully deal with component failures of machines and disks.
- Support the functionality and scale requirements of MapReduce processing.

While it is true that HDFS can be used independently of MapReduce to store large datasets, it truly shines when they are used together.
MapReduce takes advantage of how the data in HDFS is split on incongestion into blocks and pushes computation to machine where blocks can be read locally.

Design:

- HDFS follows traditional filesystem design.
- Files are stored on opaque blocks and metadata exists that keeps track of the filename to block mapping, directory tree structure, permissions and so forth.
- HDFS is whats called userspace filesystem.
- In addition to userspace filesystem, HDFS is distributed filesystem.
- Distribute filesystem are used to overcome the limits of what an individual disk or machine is capable of supporting.
- Each machine cluster stores subset of the data that makes up the complete filesystem with idea being that, as we need to store more block data, we simply add more machines, each with multiple disks.
- Filesystem metadata is stored on centralized server, acting as directory of block data and providing global picture of filesystems state.

2 comments:

  1. awesome post presented by you..your writing style is fabulous and keep update with your blogs Big data hadoop online Course Hyderabad

    ReplyDelete