Friday, May 16, 2014

Working of MapReduce

6: Anatomy of MapReduce Job Run

- We can run MapReduce job with single method call: submit() on a Job object(note that you can also call waitForCompletion(), which will submit job if it has not been submitted already, then wait for it to finish).
- In releases of Hadoop up to and including the 0.20 release series, mapred.job.tracker determines the means of execution.
- If configuration property is set to local, the default, then local job runner is used.
- New implementation is built on system called YARN. Just not that the framework that is used for execution is set by the mapreduce.framework.name property, which takes the values local(for local job runner), classic(for classic MapReduce framework).

Classic MapReduce(MapReduce 1):

- At the highest level, there are four independent entities:
- Client, which submits the MapReduce job
- JobTracker, which coordinate the job run. The jobtracker is a Java application whose main class is JobTracker.
- tasktrackers, which run the tasks that the job has been split into. Tasktrackers are Java applications whose main class is TaskTracker.
- distributed filesystem (normally HDFS) which is used for sharing job files between the other entities.

Job Submission:

- submit() method on the Job creates an internal JobSubmitter instance and calls submitJobInternal() on it.
Having submitted the job, waitForCompletion() polls the jobs progress once a second and reports the progress to the console if it has changed since the last report.
-
Job submission process implemented by JobSubmitter does the following:
2) Ask the jobtracker for a new job ID(by calling getNewJobId() on JobTracker)
Checks the output specification of the job. For ex, if output directory has not been specified or it already exists, job is not submitted and an error is thrown to MapReduce program.
Copies the resources needed to run the job, including the job JAR file, the configuration file, and the computed input splits, to the jobtrackers filesystem in a directory named after the job ID.
3) The job JAR is copied with high replication factor(controlled by mapred.submit.replication property, which defaults to 10) so that there are lots of copies across cluster for tasktrackers to access when they run tasks for the job.
4) Tell the jobtracker that the job is ready for execution(by calling submitJob() on JobTracker).

Job Initialization:

- When JobTracker receives a call to its submitJob() method, it puts it into an internal queue from where job scheduler will pick up and initialize it.
5) Initialization involves creating an object to represent the job being run, which encapsulates its tasks, and bookkeeping information to keep track of the tasks status and progress.
6) To create a list of tasks to run, job scheduler first retreives the input split computed by the client from shared filesystem.
It then creates one map task for each split. The number of reduce tasks to create is determined by the mapred.reduce.tasks property in the Job, which is set by the setNumReduceTasks() method, and the schedule simply creates this number of reduce tasks to be run. Tasks are given IDs at this point.
In addition to map and reduce tasks, two further tasks are created: job setup task and job cleanup task.
These are run by tasktrackers and are used to run code to setup the job before any map tasks run, and to cleanup after the job determines the code to be run, and by default this is a FileOutputCommitter.
For job setup task it will create the final output directory for the job and temporary working space for the task output, and for job cleanup task it will delete the temporary working space for task output.

Task Assignment:

- Tasktrackers run a simple loop that periodically sends heartbeat method calls to the jobtracker.
- Heartbeats tell the jobtracker that a tasktracker is alive, but they also double as channel for messages.
7) As part of hearbeat, tasktracker will indicate whether it is ready to run a new task, and if it is, jobtracker will allocate it a task, which is communicates to the tasktracker using heartbeat return value.

Before it can choose the task for tasktracker, jobtracker must choose a job to select task from.
Having chosen a job, jobtracker now chooses a task for the job.

Tasktracker have fixed number of slots for map tasks and for reduce tasks: for ex, tasktracker may be able to run two map tasks and two reduce tasks simultaneously.
The default scheduler fills the empty map task slots before reduce task slots, so if the tasktracker has at least one empty map task slot, the jobtracker will select a map task; otherwise it will select a reduce task.

Task Execution:

- Now that the tasktracker has been assigned a task, the next step is for it to run a task.
- First, it localizes the job JAR by copying it from shared filesystem to tasktrackers filesystem.
8) It also copies any files needed from distributed cache by the application to the local disk.
Second, it creates a local working directory for the task, and un jars the contents of the JAR into this directory. Third, it creates an instance of TaskRunner to run the task.
9) TaskRunner launches a new JVM to run each task in so that any bugs in user defined map and reduce functions dont affect the task tracker.

Child process communicates with its parent through the umbilical interface. This way it informs the parent of the tasks progress every few seconds until the task is complete.

When task is running, it keeps track of its progress, that is, proportion of the task completed.
For map tasks, this is the proportion of the input that has been processed. 
For reduce tasks, its a little more complex, but the system can still estimate the proportion of reduce input processed.
- It does this by dividing the total progress into three parts, corresponding to the three phases of shuffle.
- If the task has run the reducer on half its input, then the tasks progress is 5/6, since it has completed the copy and sort phases(1/3 each) and is halfway through the reduce phase(1/6).

What constitutes Progress in MapReduce?

- Progress is not always measurable, but it tells Hadoop that task is doing something.
- For ex, task writing output records is making progress even though it cannot be expressed as percentage of total number that will be written.

Progress reporting is important, as it means Hadoop will not fail task thats making progress. All of the following operations constitute progress:

- Reading an input record(in mapper or reducer).
- Writing an output record( in mapper or reducer).
- Setting a status description on reporter( using Reporters setStatus() method).
- Incrementing a counter(using Reporters incrCounter() method).
- Calling Reporters progress() method.

Tasks also have set of counters that count various events as the task runs
If task reports progress, it sets a flag to indicate that status change should be sent to tasktracker.
- Flag is checked in seperate thread every three seconds, and if set it notifies the tasktracker to current task status.
- Tasktracker is sending heartbeat to jobtracker every five seconds and status of all the tasks being run by tasktracker is sent in the call.
- Jobtracker combines these updates to produce global view of status of all the jobs being run and their constituent tasks. Finally, Job receives the latest status by polling the jobtracker every second.
- Clients can also use Jobs getStatus() method to obtain JobStatus instance, which contains all of the status information for the job.

Job Completion:

- When jobtracker receives the notification that last task for job is complete, it changes the status for the job to successful.
- Then, when the Job polls for status, it learns that the job has completed successfully, so it prints a message to tell the user and then returns from waitforCompletion() method.
- jobtracker also sends HTTP job notification if it is configured to do so.
- This can be configured by clients wishing to receive callbacks, via the job.end.notification.url property.
- Last jobtracker cleans up its working state for the job and instructs tasktrackers to do the same

YARN(MapReduce 2)

- For very large clusters in the region of 4000 nodes and higher, MapReduce begins to hit scalability bottlenecks, so in 2010 a group at Yahoo! began to design next generation of MapReduce.
- The result was YARN, short for Yet Another Resource Negotiator.
- YARN meets scalability shortcomings of classic MapReduce by splitting the responsibilities of the jobtracker into seperate entities.
- jobtracker takes care of both job scheduling(matching tasks with tasktrackers) and task progress monitoring( keeping tracks of tasks and restarting failed or slow tasks, and doing task bookkeeping such as maintaining counter totals).
- YARN seperates these two roles into two independent daemons: resource manager to manage the use of resources across the cluster, and an application master to manage lifecycle of applications running on the cluster.
- Application master negotiates with resource manager for cluster resources- described in terms of number of containers each with a certain memory limit- then run application specific processes in those containers.
- Containers are overseen by node managers running on cluster nodes, which ensure that the application does not use more resources than it has been allocated.

- Beauty of YARNs design is that different YARN applications can coexist on same cluster.

MapReduce on YARN involves more entities than classic MapReduce. They are:

- client, which submits the MapReduce job
- YARN resource manager which coordinates allocation of compute resources on cluster.
- YARN node managers, which launch and monitor the compute containers on machines in the cluster.
- MapReduce application master, which coordinates the tasks running the MapReduce job. The application master and MapReduce tasks run in containers that are scheduled by resource manager and managed by node managers.
- Distributed file system HDFS used for sharing job files between other entities.

Job Submission:

1) Jobs are submitted in MapReduce2 using same user API as MapReduce1
MapReduce 2 has implementation of ClientProtocol that is activated when mapreduce.framework.name is set to yarn.
2) Submission process is very similar to classic implementation.
The new job ID is retrieved from resource manager(rather than jobtracker), although in nomenclature of YARN it is application ID.
3) Job client checks the output specification of the job; computes input splits and copies job resources(including job JAR, configuration, and split information) to HDFS.
4) Finally job is submitted by calling submitApplication() on the resource manager.
5) When resource manager receives call to its submitApplication(), it hands off the request to the scheduler.
The scheduler allocates a container, and the resource manager then launches application masters process there, under the node managers management.
6) Application master for MapReduce job is a Java application whose main class is MRAppMaster.
It initializes the job by creating a number of bookkeeping objects to keep track of jobs progress, as it will receive progress and completion reports from tasks.
7) Next, it retreives the input split computed in the client from shared filesystem.
It then creates a map task object for each split, and number of reduce task objects determined by the mapreduce.job,reduces property.

Next thing application master does is decide how to run the tasks that make up the MapReduce job.
- If job is small, application master may choose to run them in the same JVM as itself. Such a job is said to be uberized, or run as an uber task.

What qualifies a small job?

- By default one that has less than 10 mappers, only one reducer and the input size is less than size of one HDFS block.
- Before any task can be run, the job setup method is called to create the jobs output directory.
- In YARN implementation, method is called directly by the application master.

Task assignment:
8) If job does not qualify for running as an uber task, then application master requests containers for all the map and reduce tasks in the job from resource manager.
Requests also specify memory requirements for tasks.
By default both map and reduce tasks are allocated 1024MB of memory, but this is configurable by setting mapreduce.map.memory.mb and mapreduce.reduce.memory.mb.

- In YARN, resources are more fine grained.
- Applications may request an memory capability that is anywhere between the minimum allocation and maximum allocation and which must be multiple of minimum allocation.
- Default memory allocations are scheduler specific, and for capacity scheduler default minimum is 1024MB. Tasks can request any memory allocation between 1 and 10GB inclusive, in multiples of 1GB by setting the mapreduce.map.memory.mb and mapreduce.reduce.memory.mb appropriately.

Task execution:

9) Once task has been assigned a container by resource managers scheduler, the application master starts the container by contacting the node manager.
10) The task is executed by a Java application whose  main class is YarnChild. Before it can run the task it localizes the resources that the task needs, including the job configuration and JAR file, and any files from distributed cache..
11) Finally, it runs the map or reduce task.
Unlike MapReduce 1, YARN does not support JVM reuse so each task runs in a new JVM.

Streaming and Pipes programs work in same way as MapReduce 1.

Progress and Status Updates:

- When running under YARN, task reports its progress and status(including counters) back to its application master every three seconds.
- Client polls application master every second to receive progress updates, which are usually displayed to user.

Job Completion:

- As well as polling the application master for progress, every five seconds the client checks whether the job has completed when using the waitForCompletion() method on Job.
- Polling interval can be set via the mapreduce.client.completion.pollinterval configuration property.
- In MapReduce 2 application master initiates the callback
- On job completion application master and task containers clean up their working state, and the OutputCommiters job cleanup method is called.

Failures:

- In real world, user code is buggy, processes crash, and machines fail.
- One of major benefits of Hadoop is its ability to handle such failures and allow your job to complete.

Failures in classic Mapreduce:

- In MapReduce 1 runtime there are three failures modes to consider:
1) Failure of running task
2) Failure of tasktracker
3) Failure of jobtracker

Task failure:

Tasktracker marks tasks as failed in following situations:

- User code in the map or reduce task throws a runtime exception.
- If streaming process exits with a nonzero exit code.
- Sudden exit of child JVM
- Tasktracker notices that it has not received progress update for a while and proceeds to mark the task as failed.

When jobtracker is notified of task attempt that has failed, it will reschedule execution of the task.
If the task fails four times or more, it will not be retried further.

User may also kill or fail task attempts using the Web UI or the command line. 

Tasktracker Failure:

- Failure of tasktracker is another failure mode.
- If tasktracker fails by crashing, or running very slowly, it will stop sending heartbeats to jobtracker.
- Jobtracker will notice tasktracker that has stopped sending hearbeats and remove it from its pool of tasktrackers to schedule tasks on.
- Tasktracker can also be blacklisted by jobtracker, even if the tasktracker has not failed.
- If more than four tasks from the same job fail on particular tasktracker, then the jobtracker records this as a fault.
- Blacklisted tasktrackers are not assigned tasks, but they continue to communicate with the jobtracker.
- Faults expire overtime(at rate of one per day), so tasktrackers get chance to run jobs again simply by leaving them running.

Jobtracker failure:

- Failure of jobtracker is most serious failure mode.
- Hadoop has no mechanism for dealing with the failure of jobtracker - it is a single point of failure- so in this case the job fails.
- This failure mode has low chance of occuring, since the chance of particular machine failing is low.
- Situation is improved in YARN, since one of its design goals is to eliminate single points of failure in MapReduce.
- After restarting a jobtracker, any jobs that were running at the time it was stopped will need to be resubmitted.

Failures in YARN:

- We need to consider failure of any of following entities:
- task
- application master
- node manager
- resource manager

Task failure:

- Failure of running task is similar to the classic case.

Application Master failure:

- Application master sends periodic heartbeats to the resource manager, and in event of application master failure, resource manager will detect the failure and start new instance of master running on new container.
- In case of MapReduce application master, it can recover the state of the tasks that had already been run by the failed application so they dont have to be rerun.
- By default, recovery is not enabled, so failed application masters will not rerun all their tasks, but you can turn it on by setting yarn.app.mapreduce.am.job.recovery.enable to true.
- Client polls the application master for progress reports, so if its application master fails the client needs to locate the new instance.
- During job initialization client asks resource manager for application masters address, and then caches it, so it does not overload the resource manager with request every time it needs to poll application master.
- If application master fails, client will experience timeout when it issues status update, at which point client will go back to resource manager to ask for new application masters address.

Node Manager failure:

- If node manager fails, it will stop sending heartbeats to the resource manager, and node manager will be removed from the resource managers pool of available nodes.
- property yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms, which defaults to 600000(10 minutes) determines the minimum time resource manager waits before considering the node manager that has sent no heartbeat in that time as failed.

- Node managers can be black listed if number of failures for application is high.
Blacklisting is done by the application master, and for MapReduce application master will try to reschedule tasks on different nodes if more then three tasks fail on a node manager.
Threshold can be set with mapreduce.job.maxtaskfailures.per.tracker

Resource Manager failure:

- Failure of resource manager is serious, since without it neither jobs nor task containers can be launched.
- After crash, new resource manager instance is bought up and it recovers from saved state.
- State consists of node managers in system as well as running applications.
- Storage used by resource manager is configureable via tha yarn.resourcemanager.store.class property.
- Default is org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore, which keeps store in memory, and is therefore not highly available.

Job Scheduling:

- Default in MapReduce 1 is original FIFO queue based scheduler, and there are also multiuser schedulers called fair scheduler and capacity scheduler.
- MapReduce 2 comes with the CapacityScheduler(default) and the FIFO scheduler.

Fair Scheduler:

- Gives every user a fair share of the cluster capacity over time.
- If single job is running, it gets all of the cluster. As more jobs are submitted, free task slots are given to jobs in such a way as to give each user a fair share of cluster.
- A short job belonging to one user will complete in reasonable time even while another users long job is running, and long job will still make progress.
- Jobs are placed in pools, and each user gets their own pool.
- Fair scheduler supports preemption, so if a pool has not received its fair share for certain period of time, then schedule will kill tasks in pools running over capacity in order to give slots to pool running under capacity.
- To enable it, place its JAR file on Hadoops classpath, by copying it from Hadoops contrib/fairscheduler directory to the lib direcory. 
Then set the mapred.jobtracker.taskScheduler property to:
org.apache.hadoop.mapred.FairScheduler

CapacityScheduler:

- Cluster is made up of number of queues(like a Fair Schedulers pools), which may be hierarchical(so queue may be child of another queue) and each queue has an allocated capacity.
- This is like fair scheduler except that within each queue, jobs are scheduled using FIFO scheduling(with priorities).
-  CapacityScheduler allows users or organizations to simulate separate MapReduce cluster with FIFO scheduling for each user or organization.

Shuffle and Sort:

- MapReduce makes guarantee that input to every reducer is sorted by key.
- Process by which system performs the sort - and transfers the map outputs to the reducers as inputs- is known as shuffle.
- Shuffle is area of codebase where improvements and refinements are continually made.
In many ways, shuffle is the heart of MapReduce and is where the magic happens.

Map Side:

- When map function starts producing output, it is not simply written to disk.
- Each map task has circular memory buffer that it writes the output to.
- The buffer is 100MB  by default, a size which can be tuned by changing the io.sort.mb property.
- When contents of buffer reaches certain threshold size,(80%) background thread will start to spill the contents to disk.
- Map outputs will continue to be written to the buffer while spill takes place, but if buffer fills up during this time, map will block until the spill is complete.
- Spills are written in roundrobin fashion to directories specified by mapred.local.dir property, in job specific subdirectory.

- Each time memory buffer reaches the spill threshold, new spill file is created so after the map task has written its last output record there could be several spill files.
- Before the task is finished , spill files are merged into single partitioned and sorted output file.
- Configuration property io.sort.factor controls the maximum number of streams to merge at once, default is 10.
- It is often good idea to compress the map output as it is written to disk, since doing so makes it faster to write to disk, saves disk space, and reduces amount of data to transfer to reducer,
- By default output is not compressed, but it is easy to enable by setting mapred.compress.map.output to true.
- Compression library to use is specified by mapred.map.output.compression.codec
- Output file partitions are made available to reducers over HTTP.
- Maximum number of worker threads used to serve file partitions is controlled by the tasktracker.http.threads property. Default of 40 may need increasing for large clusters running large jobs.
- In MapReduce 2, this property is not applicable since maximum number of threads used is set automatically based on number of processors on machine.

Reduce side:

- Map task may finish at different times, so reduce task starts copying their outputs as soon as each completes. This is known as copy phase of reduce task.
- reduce task has small number of copier threads so that it can fetch map outputs in parallel.
- default is five threads but this number can be changed by setting mapred.reduce.parallel.copies property.
- Map outputs are copied to reduce task JVMs memory if they are small enough, otherwise they are copied to disk.
- When the inmemory buffer reaches threashold number of map outputs, it is merged and spilled to disk. If combiner is specified it will be run during the merge to reduce the amount of data written to disk.
- Any map outputs that were compressed by map task have to be decompressed in memory in order to perform merge on them.
- When all map output is copied, reduce task moves into the sort phase, which merges the map outputs, maintaining their sort ordering. This is done in rounds.
- If there are 50 map outputs and merge factor was 10, then it will be 5 rounds,
- The final merge can come from mixture of in memory and on disk segments
- During the reduce phase, reduce function is invoked for each key in the sorted output. 
- The output of this phase is written directly to output filesystem, typically HDFS. 
In case of HDFS, since tasktracker node is also running on datanode, first block replica will be written to local disk.

Configuration tuning:

- General principle is to give shuffle as much memory as possible.
- You need to make sure that your map and reduce functions get enough memory to operate.
- Best is to write map and reduce functions to use as little memory as possible.
- Amount of memory given to the JVMs in which the map and reduce tasks run is set by the mapred.child.java.opts property.
-Try to make this as large as possible for amount of memory on your task nodes.
- On map side, best performance can be obtained by avoiding multiple spills to disk; one is optimal.
- If you can estimate size of your map outputs, then you can set the io.set.* properties to minimize number of spills.
- Increase io.sort.mb if you can.
- There is a MapReduce counter that counts the total number of records that were spilled to disk over course of job, which can be useful for tuning.
- On reduce side, best performance is obtained when the intermediate data can reside entirely in memory.
- By default all memory is reserved for reduce function.
- If your reduce function has light memory requirements, then setting:
mapred.inmem.merge.threshold to 0 and 
mapred.job.reduce.input.buffer.percent to 1.0 may bring performance boost.
- Hadoop uses buffer size of 4KB by default, which is low, so you should increase this across cluster. by setting io.file.buffer.size






Task execution:

Task Execution Environment:

- Hadoop provides information to a map or reduce task about the environment in which it is running

Speculative Execution:

- MapReduce model is to break jobs into tasks and run the tasks in parallel to make overall job execution time smaller than it would otherwise be if the tasks ran sequentially.
- Hadoop does not try to diagnose and fix slow running tasks; instead it tries to detect when a task is running slower than expected and launches another, equivalent, task as backup. This is termed speculative execution of tasks.
- Speculative task is launched only after all tasks for job have been launched, and then only for tasks that have been running for sometime(atleast a minute) and have failed to make as such progress, on average, as other tasks from the job.
- When task completes successfully, any duplicate tasks that are running are killed since they are no longer needed.
- So, if original task completes before the speculative task, then speculative task is killed; on the other hand, if speculative task finishes first, then the original is killed.
- Speculative execution is an optimization, not a feature to make jobs run more reliably.
If there are bugs that sometimes cause the task to hang or slow down, then relying on speculative execution to avoid these problems is unwise, and wont work reliably, since the same bugs are likely to affect the speculative task.
- Speculative execution is turned on by default.
- It can be enabled or disabled independently for map tasks and reduce tasks, on cluster wide basis, or on a per job basis.


Why would you ever want to turn off speculative execution?
- The goal of speculative execution is to reduce job execution time, but this comes at cost of cluster efficiency.
- On busy cluster, speculative execution can reduce overall throughput, since redundant tasks are being executed in attempt to bring down the execution time for single job.
- There is good use for turning off speculative execution for reduce tasks, since any duplicate reduce tasks have to fetch same map output as original task, and this can increase network traffic on cluster.

Output Committers:

- Hadoop MapReduce uses a commit protocol to ensure that jobs and tasks either succeed or fail cleanly.
- The behavior is implemented by OutputCommitter in use for the job, and this is set in old MapReduce API by calling setOutputCommitter() on JobConf, or by setting mapred.output.committer.class in the configuration.
- In the new MapReduce API, the OutputCommitter is determined by the OutputFormat via its getOutputCommitter() method. Default is FileOutputCommitter which is appropriate for file based MapReduce.
- The OutputCommitter API is as follows (in both old and new MapReduce APIs):

public abstract class OutputCommitter {
  public abstract void setupJob(JobContext jobContext) throws IOException;
  public void commitJob(JobContext jobContext) throws IOException { }
  public void abortJob(JobContext jobContext, JobStatus.State state)
    throws IOException { }
  public abstract void setupTask(TaskAttemptContext taskContext)
    throws IOException;
  public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
    throws IOException;
  public abstract void commitTask(TaskAttemptContext taskContext)
    throws IOException;
  public abstract void abortTask(TaskAttemptContext taskContext)
    throws IOException;
  }
}

- Framework ensures that in event of multiple task attempts for particular task, only one will be committed, and the others will be aborted.
- This situation would arise as first attempt failed for some reason -  in which case it would be aborted, and later, successful attempt would be committed.
- Another case is if two task attempts were running concurrently as speculative duplicates, then the one that finished first would be committed, and other would be aborted.

Task side effect files:

- Usual way of writing output from map and reduce tasks is by using the OutputCollector to collect key-value pairs.
- Some applications needs more flexibility than single key-value pair model, so these applications write output files directly from map or reduce task to distributed filesystem, like HDFS.
- Care needs to be taken that multiple instances of same task dont try to write to same file.
- Take simple example, imagine a program for converting image files from one format to another.
- One way to do this is to have map only job where each map is given set of images to convert.
- If map task writes the converted images into working directory, then they are promoted to output directory when task successfully finishes.

Task JVM Reuse:

- Hadoop runs tasks in their own JVM to isolate them from other running tasks.
- However, jobs that have large number of very short lived tasks, or that have lengthy initialization, can see performance gains when JVM is reused for subsequent tasks.
- Note that, with task JVM reuse enabled, tasks are not run concurrently in single JVM, rather the JVM runs tasks sequentially. Tasktrackers can however run more than one task at a time, but this is always done in separate JVMs.
- Property for controlling task JVM reuse is mapred.job.reuse.jvm.num.tasks: it specifies the maximum number of tasks to run for given job for each JVM launched; default is 1.
- Tasks from different jobs are always run in seperate JVMs.
- The method set NumTasksToExecutePerJvm() on JobConf can also be used to configure this property.

Skipping bad records:

- Large datasets are messy. They often have corrupt records.
- They often have records in different format. They often have missing fields.
- Depending on the analysis being performed, if only small percentage of records are affected, then skipping them may not affect the result.
- If task trips up when it encounters a bad record, by throwing runtime exception- then the task fails.
- Failing tasks are retried, but if a task fails four times, then the whole job is marked as failed.
- If it is data that is causing the task to throw an exception, rerunning the task wont help, since it will fail in exactly same way each time.
- Best way to handle corrupt records is in your mapper or reducer code.
- You can detect bad record and ignore it, or you can abort the job by throwing an exception.
- You can also count total number of bad records in job using counters to see how widespread problem is.
- In rare cases, you cant handle problem as there is bug in third party library that you cant work around in your mapper or reducer. In these cases, you can use Hadoops optional skipping mode for automatically skipping bad records.
- When task fails, tasktracker retries the task, skipping the records that caused the failure. Due to extra network traffic and bookkeeping to maintain failed record ranges, skipping mode is turned on for task only after it has failed twice.
- For task consistently failing on bad record, tasktracker runs the following task attempts with these outcomes:

- Task fails
- Task fails
- Skipping mode is enabled. Task fails, but failed record is stored by tasktracker
- Skipping mode is still enabled. Task succeeds by skipping bad record that failed in previous attempt.
- Skipping mode is off by default; you enable it independently for map and reduce tasks using the SkipBadRecords class.
- It is important to note that the skipping mode can detect only one bad record per task attempt, so this mechanism is appropriate only for detecting occasional bad records. You may need to increase maximum number of task attempts(via mapred.map.max.attempts and mapred.reduce.map.attempts) to give skipping mode enought attempts to detect and skip all bad records in an input split.
- Bad records that have been detected by Hadoop are saved as sequence files in jobs output directory under the _logs/skip directory. These can be inspected for diagnostic purposes after the job has completed(using hadoop fs -text, for example).





No comments:

Post a Comment