Friday, May 30, 2014

ZooKeeper

ZooKeeper:

- For building distributed applications using Hadoop distributed coordination service
- When message is sent between two nodes and the network fails, sender does not know whether the receiver got the message.
- It have have gotten before network failed, or it may not have or perhaps the receivers process died.
- Only way sender can find out what happened is to reconnect to receiver and ask it.
- This is partial failure, when we dont even know whether operation failed.

-ZooKeeper gives you a set of tools to build distributed applications that can safely handle partial failures.

Zookeeper has following characteristics:

- It is simple: it is stripped down filesystem that exposes a few simple operations, and some extra abstractions such as ordering and notifications.
- It is expressive: can be used to build large class of coordination data structures and protocols. Ex: distributed queues distributed locks, and leader election among group of peers.
- Highly available: runs on collection of machines and is designed to be highly available, so applications can depend on it. It helps to avoid single point of failure in your system, so you can build a reliable application.
- Loosely coupled interactions: support participants who do not need to know about one another.
- It is library: provides open source, shared repository of implementations and recipes of common coordination patterns.
- High performance: throughput for ZooKeeper cluster has been benchmarked at over 10,000 operations per second.

Installing and Running ZooKeeper:

- When trying out ZooKeeper for the first time, it’s simplest to run it in standalone mode
with a single ZooKeeper server. You can do this on a development machine, for example.
ZooKeeper requires Java 6 to run, so make sure you have it installed first.
 You don’t need Cygwin to run ZooKeeper on Windows, since there are Windows versions of the ZooKeeper scripts.
(Windows is supported only as a development platform, not as a
production platform.)

Download a stable release of ZooKeeper from the Apache ZooKeeper releases page at
http://zookeeper.apache.org/releases.html, and unpack the tarball in a suitable location:
% tar xzf zookeeper-x.y.z.tar.gz

ZooKeeper provides a few binaries to run and interact with the service, and it’s convenient
to put the directory containing the binaries on your command-line path:
% export ZOOKEEPER_INSTALL=/home/tom/zookeeper-x.y.z
% export PATH=$PATH:$ZOOKEEPER_INSTALL/bin

Before running the ZooKeeper service, we need to set up a configuration file. The configuration
file is conventionally called zoo.cfg and placed in the conf subdirectory (although  you can also place it in /etc/zookeeper, or in the directory defined by the ZOOCFGDIR environment variable, if set). Here’s an example:
tickTime=2000
dataDir=/Users/tom/zookeeper
clientPort=2181

This is a standard Java properties file, and the three properties defined in this example
are the minimum required for running ZooKeeper in standalone mode. Briefly,
tickTime is the basic time unit in ZooKeeper (specified in milliseconds), dataDir is the
local filesystem location where ZooKeeper stores persistent data, and clientPort is the
port the ZooKeeper listens on for client connections (2181 is a common choice). You
should change dataDir to an appropriate setting for your system.
With a suitable configuration defined, we are now ready to start a local ZooKeeper
server:
% zkServer.sh start
To check whether ZooKeeper is running, send the ruok command (“Are you OK?”) to
the client port using nc (telnet works, too):
% echo ruok | nc localhost 2181
imok

Example:

- Imagine group of servers that provide some service to clients.
- We want clients to be able to locate one of the servers, so they can use the service.
- One of the challenges is to maintain list of servers in group.

Group Membership in ZooKeeper:

- One way of understanding ZooKeeper is to think of it as providing high availability filesystem.
- It does not have files and directories, but unified concept of a node, called znode which acts both as container of data(like a file) and container of other znodes( like a directory).
- Znodes form hierarchical namespace, and natural way to build a membership list is to create parent znode with name of the group and child znodes with name of the group members(Servers).

Creating a Group:

Example: A program to create a znode representing a group in ZooKeeper
public class CreateGroup implements Watcher {

  private static final int SESSION_TIMEOUT = 5000;

  private ZooKeeper zk;
  private CountDownLatch connectedSignal = new CountDownLatch(1);

  public void connect(String hosts) throws IOException, InterruptedException {
    zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
    connectedSignal.await();
  }

  @Override
  public void process(WatchedEvent event) { // Watcher interface
    if (event.getState() == KeeperState.SyncConnected) {
      connectedSignal.countDown();
    }
  }

  public void create(String groupName) throws KeeperException,
      InterruptedException {
    String path = "/" + groupName;
    String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,
        CreateMode.PERSISTENT);
    System.out.println("Created " + createdPath);
  }

  public void close() throws InterruptedException {
    zk.close();
  }
  public static void main(String[] args) throws Exception {
    CreateGroup createGroup = new CreateGroup();
    createGroup.connect(args[0]);
    createGroup.create(args[1]);
    createGroup.close();
  }
}

When the main() method is run, it creates a CreateGroup instance and then calls its
connect() method. This method instantiates a new ZooKeeper object, the main class of
the client API and the one that maintains the connection between the client and the
ZooKeeper service. The constructor takes three arguments: the first is the host address
(and optional port, which defaults to 2181) of the ZooKeeper service;
 the second is the session timeout in milliseconds (which we set to 5 seconds), explained in more
detail later; and the third is an instance of a Watcher object. The Watcher object receives
callbacks from ZooKeeper to inform it of various events. In this case, CreateGroup is a
Watcher, so we pass this to the ZooKeeper constructor.

When a ZooKeeper instance is created, it starts a thread to connect to the ZooKeeper
service. The call to the constructor returns immediately, so it is important to wait for
the connection to be established before using the ZooKeeper object. We make use of
Java’s CountDownLatch class (in the java.util.concurrent package) to block until the
ZooKeeper instance is ready. This is where the Watcher comes in. The Watcher interface
has a single method:
public void process(WatchedEvent event);
When the client has connected to ZooKeeper, the Watcher receives a call to its
process() method with an event indicating that it has connected. On receiving a connection
event (represented by the Watcher.Event.KeeperState enum, with value
SyncConnected), we decrement the counter in the CountDownLatch, using its count
Down() method. The latch was created with a count of one, representing the number of
events that need to occur before it releases all waiting threads. After calling count
Down() once, the counter reaches zero and the await() method returns.


The connect() method has now returned, and the next method to be invoked on the
CreateGroup is the create() method. In this method, we create a new ZooKeeper znode
using the create() method on the ZooKeeper instance. The arguments it takes are the
path (represented by a string), the contents of the znode (a byte array, null here), an
access control list (or ACL for short, which here is a completely open ACL, allowing
any client to read or write the znode), and the nature of the znode to be created.

Znodes may be ephemeral or persistent. An ephemeral znode will be deleted by the
ZooKeeper service when the client that created it disconnects, either by explicitly disconnecting
or if the client terminates  for whatever reason. A persistent znode, on the other hand, is not deleted when the client disconnects. We want the znode representing a  group to live longer than the lifetime of the program that creates it, so we create a persistent znode.
The return value of the create() method is the path that was created by ZooKeeper.
We use it to print a message that the path was successfully created. We will see how
the path returned by create() may differ from the one passed into the method when
we look at sequential znodes.
To see the program in action, we need to have ZooKeeper running on the local machine,
and then we can type:
% export CLASSPATH=ch14/target/classes/:$ZOOKEEPER_INSTALL/*:$ZOOKEEPER_INSTALL/lib/*:\
$ZOOKEEPER_INSTALL/conf
% java CreateGroup localhost zoo
Created /zoo

Joining a Group:

- The next part of the application is a program to register a member in a group. Each
member will run as a program and join a group. When the program exits, it should be
removed from the group, which we can do by creating an ephemeral znode that represents
it in the ZooKeeper namespace.
The JoinGroup program implements this idea, and its listing is in Example. The
logic for creating and connecting to a ZooKeeper instance has been refactored into a base
class, ConnectionWatcher, and appears in Example.

Example:  A program that joins a group
public class JoinGroup extends ConnectionWatcher {

  public void join(String groupName, String memberName) throws KeeperException,
      InterruptedException {
    String path = "/" + groupName + "/" + memberName;
    String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,
      CreateMode.EPHEMERAL);
    System.out.println("Created " + createdPath);
  }

  public static void main(String[] args) throws Exception {
    JoinGroup joinGroup = new JoinGroup();
    joinGroup.connect(args[0]);
    joinGroup.join(args[1], args[2]);
 
    // stay alive until process is killed or thread is interrupted
    Thread.sleep(Long.MAX_VALUE);
  }
}

Example: A helper class that waits for the connection to ZooKeeper to be established
public class ConnectionWatcher implements Watcher {

  private static final int SESSION_TIMEOUT = 5000;
  protected ZooKeeper zk;
  private CountDownLatch connectedSignal = new CountDownLatch(1);
  public void connect(String hosts) throws IOException, InterruptedException {
    zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
    connectedSignal.await();
  }

  @Override
  public void process(WatchedEvent event) {
    if (event.getState() == KeeperState.SyncConnected) {
      connectedSignal.countDown();
    }
  }

  public void close() throws InterruptedException {
    zk.close();
  }
}

The code for JoinGroup is very similar to CreateGroup. It creates an ephemeral znode as
a child of the group znode in its join() method, then simulates doing work of some
kind by sleeping until the process is forcibly terminated. Later, you will see that upon
termination, the ephemeral znode is removed by ZooKeeper.

Listing Members in a Group:

- Example: A program to list the members in a group
public class ListGroup extends ConnectionWatcher {
 
  public void list(String groupName) throws KeeperException,
      InterruptedException {
    String path = "/" + groupName;
 
    try {
      List<String> children = zk.getChildren(path, false);
      if (children.isEmpty()) {
        System.out.printf("No members in group %s\n", groupName);

        System.exit(1);
      }
      for (String child : children) {
        System.out.println(child);
      }
    } catch (KeeperException.NoNodeException e) {
      System.out.printf("Group %s does not exist\n", groupName);
      System.exit(1);
    }
  }

  public static void main(String[] args) throws Exception {
    ListGroup listGroup = new ListGroup();
    listGroup.connect(args[0]);
    listGroup.list(args[1]);
    listGroup.close();
  }
}

In the list() method, we call getChildren() with a znode path and a watch flag to
retrieve a list of child paths for the znode, which we print out. Placing a watch on a
znode causes the registered Watcher to be triggered if the znode changes state. Although
we’re not using it here, watching a znode’s children would permit a program to get
notifications of members joining or leaving the group, or of the group being deleted.
We catch KeeperException.NoNodeException, which is thrown in the case when the
group’s znode does not exist.
Let’s see ListGroup in action. As expected, the zoo group is empty, since we haven’t
added any members yet:
% java ListGroup localhost zoo
No members in group zoo
We can use JoinGroup to add some members. We launch them as background processes,
since they don’t terminate on their own (due to the sleep statement):
% java JoinGroup localhost zoo duck &
% java JoinGroup localhost zoo cow &
% java JoinGroup localhost zoo goat &
% goat_pid=$!

The last line saves the process ID of the Java process running the program that adds
goat as a member. We need to remember the ID so that we can kill the process in a
moment, after checking the members:
% java ListGroup localhost zoo
goat
duck
cow
To remove a member, we kill its process:
% kill $goat_pid

And a few seconds later, it has disappeared from the group because the process’s ZooKeeper

session has terminated (the timeout was set to 5 seconds) and its associated
ephemeral node has been removed:
% java ListGroup localhost zoo
duck
cow
Let’s stand back and see what we’ve built here. We have a way of building up a list of
a group of nodes that are participating in a distributed system. The nodes may have no
knowledge of each other. A client that wants to use the nodes in the list to perform
some work, for example, can discover the nodes without them being aware of the client’s
existence.
Finally, note that group membership is not a substitution for handling network errors
when communicating with a node. Even if a node is a group member, communications
with it may fail, and such failures must be handled in the usual ways (retrying, trying
a different member of the group, and so on).

ZooKeeper command-line tools:

ZooKeeper comes with a command-line tool for interacting with the ZooKeeper namespace.
We can use it to list the znodes under the /zoo znode as follows:
% zkCli.sh localhost ls /zoo
Processing ls
WatchedEvent: Server state change. New state: SyncConnected
[duck, cow]

Deleting a group:

- To round off the example, let’s see how to delete a group. The ZooKeeper class provides
a delete() method that takes a path and a version number. ZooKeeper will delete a
znode only if the version number specified is the same as the version number of the
znode it is trying to delete, an optimistic locking mechanism that allows clients to detect
conflicts over znode modification. You can bypass the version check, however, by using
a version number of –1 to delete the znode regardless of its version number.
There is no recursive delete operation in ZooKeeper, so you have to delete child znodes
before parents. This is what we do in the DeleteGroup class, which will remove a group
and all its members.
Example: A program to delete a group and its members
public class DeleteGroup extends ConnectionWatcher {
 
  public void delete(String groupName) throws KeeperException,
      InterruptedException {
    String path = "/" + groupName;
 
    try {
      List<String> children = zk.getChildren(path, false);
      for (String child : children) {
        zk.delete(path + "/" + child, -1);
      }
      zk.delete(path, -1);
    } catch (KeeperException.NoNodeException e) {
      System.out.printf("Group %s does not exist\n", groupName);
      System.exit(1);
    }
  }

  public static void main(String[] args) throws Exception {
    DeleteGroup deleteGroup = new DeleteGroup();
    deleteGroup.connect(args[0]);
    deleteGroup.delete(args[1]);
    deleteGroup.close();
  }
}
Finally, we can delete the zoo group that we created earlier:
% java DeleteGroup localhost zoo
% java ListGroup localhost zoo
Group zoo does not exist

ZooKeeper Service:

ZooKeeper is a highly available, high-performance coordination service. In this section,
we look at the nature of the service it provides: its model, operations, and
implementation.

Data Model:

ZooKeeper maintains a hierarchical tree of nodes called znodes. A znode stores data
and has an associated ACL. ZooKeeper is designed for coordination (which typically
uses small data files), not high-volume data storage, so there is a limit of 1 MB on the
amount of data that may be stored in any znode.
Data access is atomic. A client reading the data stored at a znode will never receive only
some of the data; either the data will be delivered in its entirety, or the read will fail.
Similarly, a write will replace all the data associated with a znode. ZooKeeper guarantees
that the write will either succeed or fail; there is no such thing as a partial write, where only
someof the data written by the client is stored.
ZooKeeper does not support an append operation. These characteristics contrast with HDFS,
which is designed for high-volume  data storage, with streaming data access, and provides an append
operation.

Znodes are referenced by paths, which in ZooKeeper are represented as slash-delimited
Unicode character strings, like filesystem paths in Unix. Paths must be absolute, so
they must begin with a slash character. Furthermore, they are canonical, which means
that each path has a single representation, and so paths do not undergo resolution. For
example, in Unix, a file with the path /a/b can equivalently be referred to by the
path /a/./b, since “.” refers to the current directory at the point it is encountered in the
path. In ZooKeeper, “.” does not have this special meaning and is actually illegal as a
path component (as is “..” for the parent of the current directory).
Path components are composed of Unicode characters, with a few restrictions (these
are spelled out in the ZooKeeper reference documentation). The string “zookeeper” is
a reserved word and may not be used as a path component. In particular, ZooKeeper
uses the /zookeeper subtree to store management information, such as information on
quotas.
Note that paths are not URIs, and they are represented in the Java API by a
java.lang.String, rather than the Hadoop Path class (or by the java.net.URI class, for
that matter).
Znodes have some properties that are very useful for building distributed applications,
which we discuss in the following sections.

Ephemeral Znodes:

Znodes can be one of two types: ephemeral or persistent. A znode’s type is set at creation
time and may not be changed later. An ephemeral znode is deleted by ZooKeeper when
the creating client’s session ends. By contrast, a persistent znode is not tied to the client’s
session and is deleted only when explicitly deleted by a client (not necessarily the one
that created it). An ephemeral znode may not have children, not even ephemeral ones.
Even though ephemeral nodes are tied to a client session, they are visible to all clients
(subject to their ACL policy, of course).
Ephemeral znodes are ideal for building applications that need to know when certain
distributed resources are available. The example earlier in this chapter uses ephemeral
znodes to implement a group membership service, so any process can discover the
members of the group at any particular time.

Sequence Numbers:

A sequential znode is given a sequence number by ZooKeeper as a part of its name. If
a znode is created with the sequential flag set, then the value of a monotonically increasing
counter (maintained by the parent znode) is appended to its name.
If a client asks to create a sequential znode with the name /a/b-, for example, then the
znode created may actually have the name /a/b-3.
 If, later on, another sequential znode
with the name /a/b- is created, then it will be given a unique name with a larger value
of the counter—for example, /a/b-5. In the Java API, the actual path given to sequential
znodes is communicated back to the client as the return value of the create() call.
Sequence numbers can be used to impose a global ordering on events in a distributed
system, and may be used by the client to infer the ordering. In “A Lock Service”
you will learn how to use sequential znodes to build a shared lock.

Watches:

Watches allow clients to get notifications when a znode changes in some way. Watches
are set by operations on the ZooKeeper service, and are triggered by other operations
on the service. For example, a client might call the exists operation on a znode, placing
a watch on it at the same time. If the znode doesn’t exist, then the exists operation
will return false. If, some time later, the znode is created by a second client, then the
watch is triggered, notifying the first client of the znode’s creation. You will see precisely
which operations trigger others in the next section.
Watchers are triggered only once.
To receive multiple notifications, a client needs to
re register the watch. If the client in the previous example wishes to receive further
notifications for the znode’s existence (to be notified when it is deleted, for example),
it needs to call the exists operation again to set a new watch.

Operations:

- Nine basic operations in ZooKeeper:

Table: Operations in the ZooKeeper service
Operation Description
create Creates a znode (the parent znode must already exist)
delete Deletes a znode (the znode must not have any children)
exists Tests whether a znode exists and retrieves its metadata
getACL, setACL Gets/sets the ACL for a znode
getChildren Gets a list of the children of a znode
getData, setData Gets/sets the data associated with a znode
sync Synchronizes a client’s view of a znode with ZooKeeper

Update operations in ZooKeeper are conditional. A delete or setData operation has to
specify the version number of the znode that is being updated (which is found from a
previous exists call). If the version number does not match, the update will fail. Updates
are a nonblocking operation, so a client that loses an update (because another
process updated the znode in the meantime) can decide whether to try again or take
some other action, and it can do so without blocking the progress of any other process.
Although ZooKeeper can be viewed as a filesystem, there are some filesystem primitives
that it does away with in the name of simplicity. Because files are small and are written
and read in their entirety, there is no need to provide open, close, or seek operations.

Multi-update:

There is another ZooKeeper operation, called multi, which batches together multiple
primitive operations into a single unit that either succeeds or fails in its entirety. The
situation where some of the primitive operations succeed and some fail can never arise.
Multi-update is very useful for building structures in ZooKeeper that maintain some
global invariant. One example is an undirected graph. Each vertex in the graph is naturally
represented as a znode in ZooKeeper, and to add or remove an edge we need to update the two znodes corresponding to its vertices, since each has a reference to the other. If we only used primitive ZooKeeper operations, it would be possible for another client  to observe the graph in an inconsistent state where one  vertex is connected to another but the reverse connection is absent. Batching the updates on the two znodes into one multi operation ensures that the update is atomic, so a pair of vertices can
never have a dangling connection.

ACLs:

A znode is created with a list of ACLs, which determines who can perform certain
operations on it.
ACLs depend on authentication, the process by which the client identifies itself to
ZooKeeper. There are a few authentication schemes that ZooKeeper provides:
digest
The client is authenticated by a username and password.
sasl
The client is authenticated using Kerberos.
ip
The client is authenticated by its IP address.
Clients may authenticate themselves after establishing a ZooKeeper session. Authentication
is optional, although a znode’s ACL may require an authenticated client, in
which case the client must authenticate itself to access the znode. Here is an example
of using the digest scheme to authenticate with a username and password:
zk.addAuthInfo("digest", "tom:secret".getBytes());
An ACL is the combination of an authentication scheme, an identity for that scheme,
and a set of permissions. For example, if we wanted to give a client with the IP address
10.0.0.1 read access to a znode, we would set an ACL on the znode with the ip scheme,
an ID of 10.0.0.1, and READ permission. In Java, we would create the ACL object as
follows:
new ACL(Perms.READ,
new Id("ip", "10.0.0.1"));
The full set of permissions are listed in Table Note that the exists operation is
not governed by an ACL permission, so any client may call exists to find the Stat for
a znode or to discover that a znode does not in fact exist.

Table: ACL permissions
ACL permission Permitted operations
CREATE create (a child znode)
READ getChildren
getData
WRITE setData
DELETE delete (a child znode)
ADMIN setACL

There are a number of predefined ACLs defined in the ZooDefs.Ids class, including
OPEN_ACL_UNSAFE, which gives all permissions (except ADMIN permission) to everyone.
In addition, ZooKeeper has a pluggable authentication mechanism, which makes it
possible to integrate third-party authentication systems if needed.

Implementation:

The ZooKeeper service can run in two modes. In standalone mode, there is a single
ZooKeeper server, which is useful for testing due to its simplicity (it can even be
embedded in unit tests), but provides no guarantees of high-availability or resilience.
In production, ZooKeeper runs in replicated mode, on a cluster of machines called an
ensemble. ZooKeeper achieves high-availability through replication, and can provide a
service as long as a majority of the machines in the ensemble are up. For example, in a
five-node ensemble, any two machines can fail and the service will still work because
a majority of three remain. Note that a six-node ensemble can also tolerate only two
machines failing, since with three failures the remaining three do not constitute a majority
of the six. For this reason, it is usual to have an odd number of machines in an ensemble.
Conceptually, ZooKeeper is very simple: all it has to do is ensure that every modification
to the tree of znodes is replicated to a majority of the ensemble. If a minority of the
machines fail, then a minimum of one machine will survive with the latest state. The
other remaining replicas will eventually catch up with this state.
The implementation of this simple idea, however, is nontrivial. ZooKeeper uses a protocol
called Zab that runs in two phases, which may be repeated indefinitely:

Phase 1: Leader election
The machines in an ensemble go through a process of electing a distinguished
member, called the leader. The other machines are termed followers. This phase is
finished once a majority (or quorum) of followers have synchronized their state
with the leader.
Phase 2: Atomic broadcast
All write requests are forwarded to the leader, which broadcasts the update to the
followers. When a majority have persisted the change, the leader commits the update,
and the client gets a response saying the update succeeded. The protocol for achieving consensus is designed to be atomic, so a change either succeeds or fails. It resembles a two-phase commit.

Consistency:

Understanding the basis of ZooKeeper’s implementation helps in understanding the
consistency guarantees that the service makes. The terms “leader” and “follower” for
the machines in an ensemble are apt, for they make the point that a follower may lag
the leader by a number of updates. This is a consequence of the fact that only a majority
and not all of the ensemble needs to have persisted a change before it is committed. A
good mental model for ZooKeeper is of clients connected to ZooKeeper servers that
are following the leader. A client may actually be connected to the leader, but it has no
control over this, and cannot even know if this is the case.
Every update made to the znode tree is given a globally unique identifier, called a
zxid (which stands for “ZooKeeper transaction ID”). Updates are ordered, so if zxid
z1  is less than z 2 , then z1 happened before z, according to ZooKeeper, which is the
single authority on ordering in the distributed system.

The following guarantees for data consistency flow from ZooKeeper’s design:
Sequential consistency
Updates from any particular client are applied in the order that they are sent. This
means that if a client updates the znode z to the value a, and in a later operation,
it updates z to the value b, then no client will ever see z with value a after it has
seen it with value b (if no other updates are made to z).
Atomicity
Updates either succeed or fail. This means that if an update fails, no client will ever
see it.
Single system image
A client will see the same view of the system regardless of the server it connects to.
This means that if a client connects to a new server during the same session, it will
not see an older state of the system than the one it saw with the previous server.
When a server fails and a client tries to connect to another in the ensemble, a server
that is behind the one that failed will not accept connections from the client until
it has caught up with the failed server.
Durability
Once an update has succeeded, it will persist and will not be undone. This means
updates will survive server failures.
Timeliness
The lag in any client’s view of the system is bounded, so it will not be out of date
by more than some multiple of tens of seconds. This means that rather than allow
a client to see data that is very stale, a server will shut down, forcing the client to
switch to a more up-to-date server.
For performance reasons, reads are satisfied from a ZooKeeper server’s memory and
do not participate in the global ordering of writes. This property can lead to the appearance
of inconsistent ZooKeeper states from clients that communicate through a
mechanism outside ZooKeeper.

For example, client A updates znode z from a to a’, A tells B to read z, B reads the value
of z as a, not a’. This is perfectly compatible with the guarantees that ZooKeeper makes
(this condition that it does not promise is called “Simultaneously Consistent CrossClient
Views”). To prevent this condition from happening, B should call sync on z,
before reading z’s value. The sync operation forces the ZooKeeper server to which B is
connected to “catch up” with the leader, so that when B reads z’s value it will be the
one that A set (or a later value).

Sessions:

A ZooKeeper client is configured with the list of servers in the ensemble. On startup,
it tries to connect to one of the servers in the list. If the connection fails, it tries another
server in the list, and so on, until it either successfully connects to one of them or fails
if all ZooKeeper servers are unavailable.

Once a connection has been made with a ZooKeeper server, the server creates a new
session for the client. A session has a timeout period that is decided on by the application
that creates it. If the server hasn’t received a request within the timeout period, it may expire the session. Once a session has expired, it may not be reopened, and any ephemeral nodes associated with the session will be lost. Although session expiry is a comparatively rare event, since sessions are long-lived, it is important for applications to  handle it.
Sessions are kept alive by the client sending ping requests (also known as heartbeats)
whenever the session is idle for longer than a certain period. (Pings are automatically
sent by the ZooKeeper client library, so your code doesn’t need to worry about maintaining
the session.) The period is chosen to be low enough to detect server failure
(manifested by a read timeout) and reconnect to another server within the session
timeout period.

Failover to another ZooKeeper server is handled automatically by the ZooKeeper client,
and, crucially, sessions (and associated ephemeral znodes) are still valid after another
server takes over from the failed one.
During failover, the application will receive notifications of disconnections and connections
to the service. Watch notifications will not be delivered while the client is disconnected, but they will be delivered when the client successfully reconnects. Also, if the application tries to perform an operation while the client is reconnecting to another server, the operation will fail. This underlines the importance of handling connection loss exceptions in real-world ZooKeeper applications.

Time:

There are several time parameters in ZooKeeper. The tick time is the fundamental period
of time in ZooKeeper and is used by servers in the ensemble to define the schedule on
which their interactions run. Other settings are defined in terms of tick time, or are at
least constrained by it. The session timeout, for example, may not be less than 2 ticks
or more than 20. If you attempt to set a session timeout outside this range, it will be
modified to fall within the range.
A common tick time setting is 2 seconds (2,000 milliseconds). This translates to an
allowable session timeout of between 4 and 40 seconds. There are a few considerations
in selecting a session timeout.
A low session timeout leads to faster detection of machine failure. In the group membership
example, the session timeout is the time it takes for a failed machine to be
removed from the group. Beware of setting the session timeout too low, however, since
a busy network can cause packets to be delayed and may cause inadvertent session
expiry. In such an event, a machine would appear to “flap”: leaving and then rejoining
the group repeatedly in a short space of time.

Applications that create more complex ephemeral state should favor longer session
timeouts, as the cost of reconstruction is higher. In some cases, it is possible to design
the application so it can restart within the session timeout period and avoid session
expiry. (This might be desirable to perform maintenance or upgrades.) Every session
is given a unique identity and password by the server, and if these are passed to ZooKeeper
while a connection is being made, it is possible to recover a session (as long as it hasn’t expired). An application can therefore arrange a graceful shutdown, whereby it stores the session identity and password to stable storage  before restarting the process, retrieving the stored session identity and password and  recovering the session. You should view this feature as an optimization, which can help avoid expire sessions. It does not remove the need to handle session expiry, which can still occur if a machine
fails unexpectedly, or even if an application is shut down gracefully but does not restart
before its session expires—for whatever reason.

As a general rule, the larger the ZooKeeper ensemble, the larger the session timeout
should be. Connection timeouts, read timeouts, and ping periods are all defined internally
as  a function of the number of servers in the ensemble, so as the ensemble grows, these  periods decrease. Consider increasing the timeout if you experience frequent connection loss. You can monitor ZooKeeper metrics—such as request latency statistics—using JMX.

States:

The ZooKeeper object transitions through different states in its lifecycle.
You can query its state at any time by using the getState() method:
public States getState()
States is an enum representing the different states that a ZooKeeper object may be in.
(Despite the enum’s name, an instance of ZooKeeper may only be in one state at a time.)
A newly constructed ZooKeeper instance is in the CONNECTING state, while it tries to
establish a connection with the ZooKeeper service. Once a connection is established,
it goes into the CONNECTED state.

A client using the ZooKeeper object can receive notifications of the state transitions by
registering a Watcher object. On entering the CONNECTED state, the watcher receives a
WatchedEvent whose KeeperState value is SyncConnected.

The ZooKeeper instance may disconnect and reconnect to the ZooKeeper service, moving
between the CONNECTED  and CONNECTING states. If it disconnects, the watcher receives a 
Disconnected event. Note that these state transitions are initiated by the ZooKeeper
instance itself, and it will automatically try to reconnect if the connection is lost.
The ZooKeeper instance may transition to a third state, CLOSED, if either the close()
method is called or the session times out as indicated by a KeeperState of type
Expired. Once in the CLOSED state, the ZooKeeper object is no longer considered to be
alive (this can be tested using the isAlive() method on States) and cannot be reused.
To reconnect to the ZooKeeper service, the client must construct a new ZooKeeper
instance.

Building Applications using ZooKeeper:

A configuration service:

One of the most basic services that a distributed application needs is a configuration
service so that common pieces of configuration information can be shared by machines
in a cluster. At the simplest level, ZooKeeper can act as a highly available store for
configuration, allowing application participants to retrieve or update configuration
files. Using ZooKeeper watches, it is possible to create an active configuration service,
where interested clients are notified of changes in configuration.
Let’s write such a service. We make a couple of assumptions that simplify the implementation
(they could be removed with a little more work). First, the only configuration values we need to store are strings, and keys are just znode paths, so we use a znode to store each key-value pair. Second, there is a single client that performs updates at any one time. 
Among other things, this model fits with the idea of a master (such as the namenode in HDFS) that wishes to update information that its workers need to follow.

We wrap the code up in a class called ActiveKeyValueStore:
public class ActiveKeyValueStore extends ConnectionWatcher {
  private static final Charset CHARSET = Charset.forName("UTF-8");
  public void write(String path, String value) throws InterruptedException,
      KeeperException {
    Stat stat = zk.exists(path, false);
    if (stat == null) {
      zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE,
          CreateMode.PERSISTENT);
    } else {
      zk.setData(path, value.getBytes(CHARSET), -1);
    }
  }
}

The contract of the write() method is that a key with the given value is written to
ZooKeeper. It hides the difference between creating a new znode and updating an existing
znode with a new value, by testing first for the znode using the exists operation and then performing the
appropriate operation. The other detail worth mentioning is the need to convert the string value to a byte array, for which we just use the getBytes() method with a UTF-8 encoding.
To illustrate the use of the ActiveKeyValueStore, consider a ConfigUpdater class that
updates a configuration property with a value. The listing appears in Example 14-6.

Example: An application that updates a property in ZooKeeper at random times
public class ConfigUpdater {

  public static final String PATH = "/config";

  private ActiveKeyValueStore store;
  private Random random = new Random();

  public ConfigUpdater(String hosts) throws IOException, InterruptedException {
    store = new ActiveKeyValueStore();
    store.connect(hosts);
  }

  public void run() throws InterruptedException, KeeperException {
    while (true) {
      String value = random.nextInt(100) + "";
      store.write(PATH, value);
      System.out.printf("Set %s to %s\n", PATH, value);
      TimeUnit.SECONDS.sleep(random.nextInt(10));
    }
  }


  public static void main(String[] args) throws Exception {
    ConfigUpdater configUpdater = new ConfigUpdater(args[0]);
    configUpdater.run();
  }
}

The program is simple. A ConfigUpdater has an ActiveKeyValueStore that connects to
ZooKeeper in ConfigUpdater’s constructor. The run() method loops forever, updating
the /config znode at random times with random values.
Next, let’s look at how to read the /config configuration property. First, we add a read
method to ActiveKeyValueStore:
  public String read(String path, Watcher watcher) throws InterruptedException,
      KeeperException {
    byte[] data = zk.getData(path, watcher, null/*stat*/);
    return new String(data, CHARSET);
  }
The getData() method of ZooKeeper takes the path, a Watcher, and a Stat object. The
Stat object is filled in with values by getData(), and is used to pass information back
to the caller. In this way, the caller can get both the data and the metadata for a znode,
although in this case, we pass a null Stat because we are not interested in the metadata.
As a consumer of the service, ConfigWatcher (see Example 14-7) creates an ActiveKey
ValueStore, and after starting, calls the store’s read() method (in its displayConfig()
method) to pass a reference to itself as the watcher. It displays the initial value of the
configuration that it reads.

Example: An application that watches for updates of a property in ZooKeeper and prints them
to the console
public class ConfigWatcher implements Watcher {

  private ActiveKeyValueStore store;

  public ConfigWatcher(String hosts) throws IOException, InterruptedException {
    store = new ActiveKeyValueStore();
    store.connect(hosts);
  }

  public void displayConfig() throws InterruptedException, KeeperException {
    String value = store.read(ConfigUpdater.PATH, this);
    System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value);
  }
  @Override
  public void process(WatchedEvent event) {
    if (event.getType() == EventType.NodeDataChanged) {
      try {
        displayConfig();
      } catch (InterruptedException e) {
        System.err.println("Interrupted. Exiting.");    
        Thread.currentThread().interrupt();

      } catch (KeeperException e) {
        System.err.printf("KeeperException: %s. Exiting.\n", e);    
      }
    }
  }

  public static void main(String[] args) throws Exception {
    ConfigWatcher configWatcher = new ConfigWatcher(args[0]);
    configWatcher.displayConfig();
 
    // stay alive until process is killed or thread is interrupted
    Thread.sleep(Long.MAX_VALUE);
  }
}

When the ConfigUpdater updates the znode, ZooKeeper causes the watcher to fire with
an event type of EventType.NodeDataChanged. ConfigWatcher acts on this event in its
process() method by reading and displaying the latest version of the config.
Because watches are one-time signals, we tell ZooKeeper of the new watch each time
we call read() on ActiveKeyValueStore—this ensures we see future updates. Furthermore,
we are not  guaranteed to receive every update, since between the receipt of the watch event and the next read, the znode may have been updated, possibly many times, and as the client has no watch registered during that period, it is not notified. For the  configuration service, this is not a problem because clients care only about the latest value of a property, as it takes precedence over previous values, but in general you
should be aware of this potential limitation.
Let’s see the code in action. Launch the ConfigUpdater in one terminal window:
% java ConfigUpdater localhost
Set /config to 79
Set /config to 14
Set /config to 78
Then launch the ConfigWatcher in another window immediately afterward:
% java ConfigWatcher localhost
Read /config as 79
Read /config as 14
Read /config as 78

Resilient ZooKeeper Application

The first of the Fallacies of Distributed Computing states that “The network is reliable.”
As they stand, the programs so far have been assuming a reliable network, so when they run on a real network, they can fail in several ways. Let’s examine possible failure modes and what we can do to  correct them so that our programs are resilient in the face of failure.

Every ZooKeeper operation in the Java API declares two types of exception in its throws
clause: InterruptedException and KeeperException.
InterruptedException
An InterruptedException is thrown if the operation is interrupted. There is a standard
Java mechanism for canceling blocking methods, which is to call interrupt() on the
thread from which the blocking method was called. A successful cancellation will result
in an InterruptedException. ZooKeeper adheres to this standard, so you can cancel a
ZooKeeper operation in this way. Classes or libraries that use ZooKeeper should usually
propagate the InterruptedException so that their clients can cancel their operations.
An InterruptedException does not indicate a failure, but rather that the operation has
been canceled, so in the configuration application example, it is appropriate to propagate
the exception, causing the application to terminate.

KeeperException
A KeeperException is thrown if the ZooKeeper server signals an error or if there is a
communication problem with the server. There are various subclasses of
KeeperException for different error cases. For example, KeeperException.NoNodeExcep
tion is a subclass of KeeperException that is thrown if you try to perform an operation
on a znode that doesn’t exist.
Every subclass of KeeperException has a corresponding code with information about
the type of error. For example, for KeeperException.NoNodeException the code is Keep
erException.Code.NONODE (an enum value).
There are two ways then to handle KeeperException: either catch KeeperException and
test its code to determine what remedying action to take, or catch the equivalent
KeeperException subclasses and perform the appropriate action in each catch block.
KeeperExceptions fall into three broad categories.


A state exception occurs when the operation fails because it cannot be
State exceptions.
applied to the znode tree. State exceptions usually happen because another process is
mutating a znode at the same time. For example, a setData operation with a version
number will fail with a KeeperException.BadVersionException if the znode is updated
by another process first, since the version number does not match. The programmer is
usually aware that this kind of conflict is possible and will code to deal with it.
Some state exceptions indicate an error in the program, such as KeeperExcep
tion.NoChildrenForEphemeralsException, which is thrown when trying to create a child
znode of an ephemeral znode.

Recoverable exceptions.
Recoverable exceptions are those from which the application can
recover within the same ZooKeeper session. A recoverable exception is manifested by
KeeperException.ConnectionLossException, which means that the connection to
ZooKeeper has been lost. ZooKeeper will try to reconnect, and in most cases the reconnection
will succeed and ensure that the session is intact.
However, ZooKeeper cannot tell whether the operation that failed with KeeperExcep
tion.ConnectionLossException was applied. This is an example of partial failure (which
we introduced at the beginning of the chapter). The onus is therefore on the programmer
to deal with the uncertainty, and the action that should be taken depends on the application.
At this point, it is useful to make a distinction between idempotent and nonidempotent operations.
An idempotent operation is one that may be applied one or more times with the same result, such as a read request or an unconditional setData. These can simply be retried.

A nonidempotent operation cannot be indiscriminately retried, as the effect of applying
it multiple times is not the same as applying it once. The program needs a way of
detecting whether its update was applied by encoding information in the znode’s path
name or its data. We shall discuss how to deal with failed nonidempotent operations
in “Recoverable exceptions” on page 518, when we look at the implementation of a
lock service.
Unrecoverable exceptions.
In some cases, the ZooKeeper session becomes invalid—
perhaps because of a timeout or because the session was closed (both get a KeeperEx
ception.SessionExpiredException), or perhaps because authentication failed (Keeper
Exception.AuthFailedException). In any case, all ephemeral nodes associated with the
session will be lost, so the application needs to rebuild its state before reconnecting to
ZooKeeper.

A reliable configuration service
Going back to the write() method in ActiveKeyValueStore, recall that it is composed
of an exists operation followed by either a create or a setData:
  public void write(String path, String value) throws InterruptedException,
      KeeperException {
    Stat stat = zk.exists(path, false);
    if (stat == null) {
      zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE,
          CreateMode.PERSISTENT);
    } else {
      zk.setData(path, value.getBytes(CHARSET), -1);
    }
  }
Taken as a whole, the write() method is idempotent, so we can afford to unconditionally
retry it. Here’s a modified version of the write() method that retries in a loop.

It is set to try a maximum number of retries (MAX_RETRIES) and sleeps for
RETRY_PERIOD_SECONDS between each attempt:
  public void write(String path, String value) throws InterruptedException,
      KeeperException {
    int retries = 0;
    while (true) {
      try {
        Stat stat = zk.exists(path, false);
        if (stat == null) {
          zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE,
              CreateMode.PERSISTENT);
        } else {
          zk.setData(path, value.getBytes(CHARSET), stat.getVersion());
        }
      } catch (KeeperException.SessionExpiredException e) {
        throw e;
      } catch (KeeperException e) {
        if (retries++ == MAX_RETRIES) {
          throw e;
        }
        // sleep then retry
        TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS);
      }
    }
  } 

The code is careful not to retry KeeperException.SessionExpiredException, since when
a session expires, the ZooKeeper object enters the CLOSED state, from which it can never
reconnect (refer to Figure 14-3). We simply rethrow the exception
and let the caller create a new ZooKeeper instance, so that the whole write() method can be retried. A
simple way to create a new instance is to create a new ConfigUpdater (which we’ve
actually renamed ResilientConfigUpdater) to recover from an expired session:
  public static void main(String[] args) throws Exception {
    while (true) {
      try {
        ResilientConfigUpdater configUpdater =
          new ResilientConfigUpdater(args[0]);
        configUpdater.run();
      } catch (KeeperException.SessionExpiredException e) {
        // start a new session
      } catch (KeeperException e) {
        // already retried, so exit
        e.printStackTrace();
        break;
      }
    }
  }

An alternative way of dealing with session expiry would be to look for a KeeperState
of type Expired in the watcher (that would be the ConnectionWatcher in the example
here), and create a new connection when this is detected. This way, we would just keep
retrying in the write() method, even if we got a KeeperException.SessionExpiredExcep
tion, since the connection should eventually be reestablished. Regardless of the precise
mechanics of how we recover from an expired session, the important point is that it is
a different kind of failure from connection loss and needs to be handled differently.

This is just one strategy for retry handling—there are many others, such as using exponential
backoff where the period between retries is multiplied by a constant each
time. The org.apache.hadoop.io.retry package in Hadoop Core is a set of utilities for
adding retry logic into your code in a reusable way, and it may be helpful for building
ZooKeeper applications.

Lock Service:

A distributed lock is a mechanism for providing mutual exclusion between a collection
of processes. At any one time, only a single process may hold the lock. Distributed locks
can be used for leader election in a large distributed system, where the leader is the
process that holds the lock at any point in time.

To implement a distributed lock using ZooKeeper, we use sequential znodes to impose
an order on the processes vying for the lock. The idea is simple: first designate a lock
znode, typically describing the entity being locked on, say /leader; then clients that want
to acquire the lock create sequential ephemeral znodes as children of the lock znode.
At any point in time, the client with the lowest sequence number holds the lock. For
example, if two clients create znodes at around the same time, /leader/lock-1
and /leader/lock-2, then the client that created /leader/lock-1 holds the lock, since its
znode has the lowest sequence number. The ZooKeeper service is the arbiter of order,
since it assigns the sequence numbers.
The lock may be released simply by deleting the znode /leader/lock-1; alternatively, if
the client process dies, it will be deleted by virtue of it being an ephemeral znode. The
client that created /leader/lock-2 will then hold the lock, since it has the next lowest
sequence number. It will be notified that it has the lock by creating a watch that fires
when znodes go away.
The pseudocode for lock acquisition is as follows:
1. Create an ephemeral sequential znode named lock- under the lock znode and re-
member its actual path name (the return value of the create operation).
2. Get the children of the lock znode and set a watch.
3. If the path name of the znode created in 1 has the lowest number of the children
returned in 2, then the lock has been acquired. Exit.
4. Wait for the notification from the watch set in 2 and go to step 2.

The herd effect
Although this algorithm is correct, there are some problems with it. The first problem
is that this implementation suffers from the herd effect. Consider hundreds or thousands
 of clients, all trying to acquire the lock. Each client places a watch on the lock znode 
for changes in its set of children. Every time the lock is released, or another
process starts the lock acquisition process, the watch fires and every client receives a
notification. The “herd effect” refers to a large number of clients being notified of the
same event, when only a small number of them can actually proceed. In this case, only
one client will successfully acquire the lock, and the process of maintaining and sending
watch events to all clients causes traffic spikes, which put pressure on the ZooKeeper
servers.
To avoid the herd effect, the condition for notification needs to be refined. The key
observation for implementing locks is that a client needs to be notified only when the
child znode with the previous sequence number goes away, not when any child znode
is deleted (or created). In our example, if clients have created the znodes /leader/
lock-1, /leader/lock-2, and /leader/lock-3, then the client holding /leader/lock-3 only
needs to be notified when /leader/lock-2 disappears. It does not need to be notified
when /leader/lock-1 disappears or when a new znode /leader/lock-4 is added.

Recoverable exceptions
Another problem with the lock algorithm as it stands is that it doesn’t handle the case
when the create operation fails due to connection loss. Recall that in this case we do
not know if the operation succeeded or failed. Creating a sequential znode is a
nonidempotent operation, so we can’t simply retry, since if the first create had
succeeded, we would have an orphaned znode that would never be deleted (until the
client session ended, at least). Deadlock would be the unfortunate result.
The problem is that after reconnecting, the client can’t tell whether it created any of
the child znodes. By embedding an identifier in the znode name, if it suffers a connection
loss, it can check to see whether any of the children of the lock node have its identifier
in their name. If a child contains its identifier, it knows that the create operation succeeded,
and it shouldn’t create another child znode. If no child has the identifier in its name, then the client can safely create a new sequential child znode.
The client’s session identifier is a long integer that is unique for the ZooKeeper service
and therefore ideal for the purpose of identifying a client across connection loss events.
The session identifier can be obtained by calling the getSessionId() method on the
ZooKeeper Java class.
The ephemeral sequential znode should be created with a name of the form lock-
<sessionId>-, so that when the sequence number is appended by ZooKeeper, the name
becomes lock-<sessionId>-<sequenceNumber>. The sequence numbers are unique to the
parent, not to the name of the child, so this technique allows the child znodes to identify
their creators as well as impose an order of creation.

Unrecoverable exceptions
If a client’s ZooKeeper session expires, the ephemeral znode created by the client will
be deleted, effectively relinquishing the lock or at least forfeiting the client’s turn to
acquire the lock. The application using the lock should realize that it no longer holds
the lock, clean up its state, and then start again by creating a new lock object and trying
to acquire it. Notice that it is the application that controls this process, not the lock
implementation, since it cannot second-guess how the application needs to clean up
its state.
Implementation
Implementing a distributed lock correctly is a delicate matter, since accounting for all
of the failure modes is nontrivial. ZooKeeper comes with a production-quality lock
implementation in Java called WriteLock that is very easy for clients to use.

BookKeeper and Hedwig
BookKeeper is a highly-available and reliable logging service. It can be used to provide
write-ahead logging, which is a common technique for ensuring data integrity in storage
systems. In a system using write-ahead logging, every write operation is written to the
transaction log before it is applied. Using this procedure, we don’t have to write the
data to permanent storage after every write operation because in the event of a system
failure, the latest state may be recovered by replaying the transaction log for any writes
that had not been applied.
BookKeeper clients create logs called ledgers, and each record appended to a ledger is
called a ledger entry, which is simply a byte array. Ledgers are managed by bookies,
which are servers that replicate the ledger data. Note that ledger data is not stored in
ZooKeeper, only metadata is.
Traditionally, the challenge has been to make systems that use write-ahead logging
robust in the face of failure of the node writing the transaction log. This is usually done
by replicating the transaction log in some manner. Hadoop’s HDFS namenode, for
instance, writes its edit log to multiple disks, one of which is typically an NFS mounted
disk. However, in the event of failure of the primary, failover is still manual. By providing
logging as a highly available service, BookKeeper promises to make failover
transparent, since it can tolerate the loss of bookie servers. (In the case of HDFS HighAvailability, described on 50, a BookKeeper-based edit log will remove the requirement
for using NFS for shared storage.)
Hedwig is a topic-based publish-subscribe system built on BookKeeper. Thanks to its
ZooKeeper underpinnings, Hedwig is a highly available service and guarantees message
delivery even if subscribers are offline for extended periods of time.
BookKeeper is a ZooKeeper subproject, and you can find more information on how to
use it, and Hedwig, at http://zookeeper.apache.org/bookkeeper/.

ZooKeeper in Production:

In production, you should run ZooKeeper in replicated mode. Here we will cover some
of the considerations for running an ensemble of ZooKeeper servers. However, this
section is not exhaustive, so you should consult the ZooKeeper Administrator’s
Guide for detailed up-to-date instructions, including supported platforms, recommended
hardware, maintenance procedures, and configuration properties.

Resilience and Performance:

ZooKeeper machines should be located to minimize the impact of machine and network
failure. In practice, this means that servers should be spread across racks, power supplies,
and switches, so that the failure of any one of these does not cause the ensemble to lose a majority of its servers.
For applications that require low-latency service (on the order of a few milliseconds),
it is important to run all the servers in an ensemble in a single data center. Some use
cases don’t require low-latency responses, however, which makes it feasible to spread
servers across data centers (at least two per data center) for extra resilience. Example
applications in this category are leader election and distributed coarse-grained locking,
both of which have relatively infrequent state changes so the overhead of a few tens of
milliseconds that inter-data center messages incurs is not significant to the overall
functioning of the service.

ZooKeeper is a highly available system, and it is critical that it can perform its functions
in a timely manner. Therefore, ZooKeeper should run on machines that are dedicated
to ZooKeeper alone. Having other applications contend for resources can cause ZooKeeper’s
performance to degrade significantly.
Configure ZooKeeper to keep its transaction log on a different disk drive from its snapshots.

By default, both go in the directory specified by the dataDir property, but by
specifying a location for dataLogDir, the transaction log will be written there. By having
its own dedicated device (not just a partition), a ZooKeeper server can maximize the
rate at which it writes log entries to disk, which it does sequentially, without seeking.
Since all writes go through the leader, write throughput does not scale by adding servers,
so it is crucial that writes are as fast as possible.
If the process swaps to disk, performance will be adversely affected. This can be avoided
by setting the Java heap size to less than the amount of unused physical memory on
the machine. The ZooKeeper scripts will source a file called java.env from its configu-
ration directory, and this can be used to set the JVMFLAGS environment variable to set
the heap size (and any other desired JVM arguments).
Configuration
Each server in the ensemble of ZooKeeper servers has a numeric identifier that is unique
within the ensemble, and must fall between 1 and 255. The server number is specified
in plain text in a file named myid in the directory specified by the dataDir property.
Setting each server number is only half of the job. We also need to give all the servers
all the identities and network locations of the others in the ensemble. The ZooKeeper
configuration file must include a line for each server, of the form:
server.n=hostname:port:port
The value of n is replaced by the server number. There are two port settings: the first
is the port that followers use to connect to the leader, and the second is used for leader
election. Here is a sample configuration for a three-machine replicated ZooKeeper
ensemble:
tickTime=2000
dataDir=/disk1/zookeeper
dataLogDir=/disk2/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888

Servers listen on three ports: 2181 for client connections; 2888 for follower connections,
if they are the leader; and 3888 for other server connections during the leader election
phase. When a ZooKeeper server starts up, it reads the myid file to determine which
server it is, then reads the configuration file to determine the ports it should listen on,
as well as the network addresses of the other servers in the ensemble.
Clients connecting to this ZooKeeper ensemble should use zookeeper1:2181,zoo
keeper2:2181,zookeeper3:2181 as the host string in the constructor for the ZooKeeper
object.
In replicated mode, there are two extra mandatory properties: initLimit and
syncLimit, both measured in multiples of tickTime.
initLimit is the amount of time to allow for followers to connect to and sync with the
leader. If a majority of followers fail to sync within this period, then the leader renounces
its leadership status and another leader election takes place. If this happens often (and
you can discover if this is the case because it is logged), it is a sign that the setting is too
low.

syncLimit is the amount of time to allow a follower to sync with the leader. If a follower
fails to sync within this period, it will restart itself. Clients that were attached to this
follower will connect to another one.
These are the minimum settings needed to get up and running with a cluster of ZooKeeper
servers.
There are, however, more configuration options, particularly for tuning performance, documented in the ZooKeeper Administrator’s Guide.


Thursday, May 29, 2014

HBase

HBase

- Is distributed column oriented database built on top of HDFS.
- Is Hadoop application when you require real-time read/write random access to very large datasets.
- It can host very large, sparsely populated tables on clusters made from commodity hardware.
- HBase usecase is the webtable, table of crawled webpages and their attributes keyed by webpage URL.
- Webtable is large, with row counts that run into the billions.
- Table is randomly accessed by crawlers running at various rates updating random rows while random web pages are served in real time as users click on websites cached page feature.


Concepts:

Whirlwind Tour of the Data Model:

- Applications store data into labelled tables.
- Tables are made of rows and columns.
- Table cells- intersection of row and column coordinates- are versioned.
- Their version is a timestamp auto assigned by HBase at the time of cell insertion.
- Tables column families must be specified upfront as part of the table schema definition, but new column family members can be added on demand.

- Physically, all column family members are stored together on the filesystem.
- So, though we described HBase as column oriented store,  it would be more accurate if it were described as column family oriented store.
- As tunings and storage specifications are done at column family level, it is advised that all column family members have same general access pattern and size characteristics.
- HBase tables are like those in RDBMS, only cells are versioned, rows are sorted and columns can be added on fly by client as long as column family they belong to persists.

Regions:

- Tables are partitioned horizontally by HBase into regions.
- Each region comprises subset of tables rows.
- Region is denoted by table it belongs to, its first row, inclusive and last row, exclusive.
- Initially table comprises single region, but as size of region grows, after it crosses configurable size threshold, it splits at row boundary into two new regions of equal size.
- Until this first split happens, all loading will be against single server hosting original region.
- As table grows number of its regions grow.
- Regions are units that get distributed over HBase cluster.
- In this way, table that is too big for any one server can be carried by cluster of servers with each node hosting a subset of tables total regions.
- This is also means by which loading on table gets distributed.
- The online set of sorted regions comprises tables total content.

Locking:

- Row updates are atomic, no matter how many row columns constitute row level transaction.
This keeps locking model simple.

Implementation:

- HBase is modeled with an HBase master node orchestrating cluster of one or more regionserver slaves.
- HBase master is responsible for bootstrapping virgin install, for assigning regions to registered regionservers, and for recovering regionserver failures.
- The master node is lightly loaded.
- Regionservers carry zero or more regions and field client read/write requests.
- Also manage region splits informing HBase master about new daughter regions for it to manage offlining of parent region and assignment of replacement daughters.
-
- HBase depends on ZooKeeper and by default it manages the ZooKeeper instance as the authority on cluster state.
- HBase hosts vital such as location of root catalog table and address of the current cluster Master.
- Assignment of regions is mediated via ZooKeeper in case paricipating servers crash mid assignment.
- Hosting the assignment transaction state in ZooKeeper makes it so recovery can pick up on assignment at where crashed server left off.
- At minimum, bootstrapping client connection to an HBase cluster, client must be passed location of the ZooKeeper ensemble.
- Thus clients navigates Zookeeper hierarchy to learn cluster attributes such as server locations.

- Regionserver slave nodes are listed in HBase conf/regionservers file as you would list datanodes and tasktrackers in Hadoop conf/slaves file.
- Start and stop scripts are like those in Hadoop using the same SSH- based running of remote commands mechanism.
- Cluster site-specific configuration is made in the HBase conf/hbase-site.xml and conf/hbase-env.sh files, which have same format as that of their equivalents up in the Hadoop parent project.

- HBase persists data via the Hadoop filesystem API.
- HBase writes to local filesystem.
- Local filesystem is fine for experimenting with your initial HBase install, but thereafter, usually first configuration made in an HBase cluster involves pointing HBase at the HDFS cluster to use.

HBase in operation:

- HBase internally keeps special catalog tables named -ROOT- and .META within which it maintains the current list, state and location of all regions afloat on cluster.
- -ROOT- table holds the list of .META table regions.
- .META. table holds the list of user space regions.
- Entries in these tables are keyed by region name, where region name is made up of table name and region belongs to, regions start row, its time of creation, and finally MD5 hashing of all former.
- As regions transition are split, disabled/enabled, deleted, redeployed by region load balancer, or redeployed due to regionserver crash- catalog tables are updated so state of all regions on cluster is kept current.
- First client connects to the ZooKeeper cluster first to learn the location of -ROOT- clients consult -ROOT- to elicit location of the .META. region whose scope covers that of requested row.
- Client then does lookup against the found .META. region to figure hosting user-space region and its location.
- Thus, client interacts directly with hosting regionserver.
-  If in turn, consulted .META. region has moved, then -ROOT- is reconsulted.
- Writing arriving at regionserver are first appended to commit log and then are added to an inmemory memstore.
- When memstore fills, its content is flushed to the filesystem.
- Commit log is hosted on HDFS, so it remains available through regionserver crash.
- When master notices that regionserver is no longer reachable, usually because the servers znode has expired in ZooKeeper, it splits the dead regionservers commit log by region.
- Reading, the regions memstore is consulted first.
- If sufficient versions are found reading memstore alone, query completes there.

Installation:

Download a stable release from an Apache Download Mirror and unpack it on your
local filesystem. For example:
% tar xzf hbase-x.y.z.tar.gz

- As with Hadoop, you first need to tell HBase where Java is located on your system. If
you have the JAVA_HOME environment variable set to point to a suitable Java installation,
then that will be used, and you don’t have to configure anything further. Otherwise,
you can set the Java installation that HBase uses by editing HBase’s conf/hbaseenv.sh,
and specifying the JAVA_HOME variable (see Appendix A for some examples) to
point to version 1.6.0 of Java.

- For convenience, add the HBase binary directory to your command-line path. For
example:
% export HBASE_HOME=/home/hbase/hbase-x.y.z
% export PATH=$PATH:$HBASE_HOME/bin
To get the list of HBase options, type:
% hbase
Usage: hbase <command>
where <command> is one of:
  shell            run the HBase shell
  master           run an HBase HMaster node
  regionserver     run an HBase HRegionServer node
  zookeeper        run a Zookeeper server
  rest             run an HBase REST server
  thrift           run an HBase Thrift server
  avro             run an HBase Avro server
  migrate          upgrade an hbase.rootdir
  hbck             run the hbase 'fsck' tool
 or
  CLASSNAME        run the class named CLASSNAME

Test Drive:

- To start a temporary instance of HBase that uses the /tmp directory on the local filesystem
for persistence, type:
% start-hbase.sh
This will launch a standalone HBase instance that persists to the local filesystem; by
default, HBase will write to /tmp/hbase-${USERID}.

To administer your HBase instance, launch the HBase shell by typing:
% hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version: 0.89.0-SNAPSHOT, ra4ea1a9a7b074a2e5b7b24f761302d4ea28ed1b2, Sun Jul 18
15:01:50 PDT 2010 hbase(main):001:0>

Now let us create a simple table, add some data, and then clean up.

To create a table, you must name your table and define its schema. A table’s schema
comprises table attributes and the list of table column families. Column families
themselves have attributes that you in turn set at schema definition time. Examples of
column family attributes include whether the family content should be compressed on
the filesystem and how many versions of a cell to keep. Schemas can be later edited by
offlining the table using the shell disable command, making the necessary alterations
using alter, then putting the table back online with enable.

To create a table named test with a single column family name data using defaults for
table and column family attributes, enter:
hbase(main):007:0> create 'test', 'data'
0 row(s) in 1.3066 seconds

To prove the new table was created successfully, run the list command. This will
output all tables in user space:
hbase(main):019:0> list
test                                                                                                         
1 row(s) in 0.1485 seconds

To insert data into three different rows and columns in the data column family, and
then list the table content, do the following:
hbase(main):021:0> put 'test', 'row1', 'data:1', 'value1'
0 row(s) in 0.0454 seconds
hbase(main):022:0> put 'test', 'row2', 'data:2', 'value2'
0 row(s) in 0.0035 seconds
hbase(main):023:0> put 'test', 'row3', 'data:3', 'value3'
0 row(s) in 0.0090 seconds
hbase(main):024:0> scan 'test'
ROW                          COLUMN+CELL                                                                     
 row1                        column=data:1, timestamp=1240148026198, value=value1                            
 row2                        column=data:2, timestamp=1240148040035, value=value2                            
 row3                        column=data:3, timestamp=1240148047497, value=value3                            
3 row(s) in 0.0825 seconds

To remove the table, you must first disable it before dropping it:
hbase(main):025:0> disable 'test'
09/04/19 06:40:13 INFO client.HBaseAdmin: Disabled test
0 row(s) in 6.0426 seconds
hbase(main):026:0> drop 'test'
09/04/19 06:40:17 INFO client.HBaseAdmin: Deleted test

0 row(s) in 0.0210 seconds
hbase(main):027:0> list
0 row(s) in 2.0645 seconds
Shut down your HBase instance by running:
% stop-hbase.sh

Clients:

There are number of client options for interacting with an HBase cluster.

Example: Basic table administration and access
public class ExampleClient {
  public static void main(String[] args) throws IOException {
    Configuration config = HBaseConfiguration.create();
    // Create table
    HBaseAdmin admin = new HBaseAdmin(config);
    HTableDescriptor htd = new HTableDescriptor("test");
    HColumnDescriptor hcd = new HColumnDescriptor("data");
    htd.addFamily(hcd);
    admin.createTable(htd);
    byte [] tablename = htd.getName();
    HTableDescriptor [] tables = admin.listTables();
    if (tables.length != 1 && Bytes.equals(tablename, tables[0].getName())) {
      throw new IOException("Failed create of table");
    }
    // Run some operations -- a put, a get, and a scan -- against the table.
    HTable table = new HTable(config, tablename);
    byte [] row1 = Bytes.toBytes("row1");
    Put p1 = new Put(row1);
    byte [] databytes = Bytes.toBytes("data");
    p1.add(databytes, Bytes.toBytes("1"), Bytes.toBytes("value1"));
    table.put(p1);
    Get g = new Get(row1);
   Result result = table.get(g);
    System.out.println("Get: " + result);
    Scan scan = new Scan();
    ResultScanner scanner = table.getScanner(scan);
    try {
      for (Result scannerResult: scanner) {
        System.out.println("Scan: " + scannerResult);
      }
    } finally {
      scanner.close();
    }
   
    // Drop the table
    admin.disableTable(tablename);
    admin.deleteTable(tablename);
  }
}

This class has a main method only. For the sake of brevity, we do not include package
name nor imports. In this class, we first create an instance of
org.apache.hadoop.conf.Configuration. We ask the org.apache.hadoop.hbase.HBase
Configuration class to create the instance. It will return a Configuration that has read
HBase configuration from hbase-site.xml and hbase-default.xml files found on the
program’s classpath. This Configuration is subsequently used to create instances of
HBaseAdmin and HTable, two classes found in the org.apache.hadoop.hbase.client Java
package. HBaseAdmin is used for administering your HBase cluster, for adding and dropping
tables.
HTable is used to access a specific table. 
The Configuration instance points these classes at the cluster the code is to work against.

To create a table, we need to first create an instance of HBaseAdmin and then ask it to
create the table named test with a single column family named data. In our example,
our table schema is the default. Use methods on org.apache.hadoop.hbase.HTableDe
scriptor and org.apache.hadoop.hbase.HColumnDescriptor to change the table schema.
The code next asserts the table was actually created and then it moves to run operations
against the just-created table.

Operating on a table, we will need an instance of org.apache.hadoop.hbase.client.HTable passing it our Configuration instance and the name of the table we want to operate on. After creating an HTable, we then create an instance of org.apache.hadoop.hbase.client. Put to put a single cell value of value1 into a row
named row1 on the column named data:1 (The column name is specified in two parts;
the column family name as bytes—databytes in the code above—and then the column
family qualifier specified as Bytes.toBytes("1")). Next we create an
org.apache.hadoop.hbase.client.Get, do a get of the just-added cell, and then use an
org.apache.hadoop.hbase.client.Scan to scan over the table against the just-created
table printing out what we find.
Finally, we clean up by first disabling the table and then deleting it. A table must be
disabled before it can be dropped.

Example: A MapReduce application to count the number of rows in an HBase table
public class RowCounter {
  /** Name of this 'program'. */
  static final String NAME = "rowcounter";
  static class RowCounterMapper
  extends TableMapper<ImmutableBytesWritable, Result> {
    /** Counter enumeration to count the actual rows. */
    public static enum Counters {ROWS}
    @Override
    public void map(ImmutableBytesWritable row, Result values,
      Context context)
    throws IOException {
      for (KeyValue value: values.list()) {
        if (value.getValue().length > 0) {
          context.getCounter(Counters.ROWS).increment(1);
          break;
        }
      }
    }
  }
  public static Job createSubmittableJob(Configuration conf, String[] args)
  throws IOException {
    String tableName = args[0];
    Job job = new Job(conf, NAME + "_" + tableName);
    job.setJarByClass(RowCounter.class);
    // Columns are space delimited
    StringBuilder sb = new StringBuilder();
    final int columnoffset = 1;
    for (int i = columnoffset; i < args.length; i++) {
      if (i > columnoffset) {
        sb.append(" ");
      }
      sb.append(args[i]);
    }
    Scan scan = new Scan();
    scan.setFilter(new FirstKeyOnlyFilter());
    if (sb.length() > 0) {
      for (String columnName :sb.toString().split(" ")) {
        String [] fields = columnName.split(":");
        if(fields.length == 1) {
          scan.addFamily(Bytes.toBytes(fields[0]));
        } else {
          scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1]));
        }
      }
    }
    // Second argument is the table name.
    job.setOutputFormatClass(NullOutputFormat.class);
    TableMapReduceUtil.initTableMapperJob(tableName, scan,
      RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
    job.setNumReduceTasks(0);
    return job;
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 1) {
      System.err.println("ERROR: Wrong number of parameters: " + args.length);
      System.err.println("Usage: RowCounter <tablename> [<column1> <column2>...]");
      System.exit(-1);
    }
    Job job = createSubmittableJob(conf, otherArgs);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Avro, REST and Thrift

- HBase ships with Avro, REST and Thrift interfaces..
- These are useful when interacting application is written in language other than Java.

REST:
- To put up a stargate intstance, start it using following command:
% hbase-daemon.sh start rest
- This will start server instance by default on port 8080.
- To stop REST server, type:
% hbase-daemon.sh stop rest

Thrift:

- Start thrift service by putting up a server to field Thrift clients by running following:
% hbase-daemon.sh start thrift
This will start server instance on default port 9090.
 The HBase Thrift IDL
can be found at src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift in the
HBase source code.
To stop the Thrift server, type:
% hbase-daemon.sh stop thrift

Avro:

- it is started and stopped in same manner as you start and stop the Thrift or REST services.
- Avro server by default uses port 9090.

Example:

- The existing weather dataset described in previous chapters contains observations for
tens of thousands of stations over 100 years and this data is growing without bound.
- In this example, we will build a simple web interface that allows a user to navigate the
different stations and page through their historical temperature observations in time
order. For the sake of this example, let us allow that the dataset is massive, that the
observations run to the billions, and that the rate at which temperature updates arrive
is significant—say hundreds to thousands of updates a second from around the world
across the whole range of weather stations. Also, let us allow that it is a requirement
that the web application must display the most up-to-date observation within a second
or so of receipt.

Schemas:

- In our example, there will be two tables:
Stations
This table holds station data. Let the row key be the stationid. Let this table have
a column family info that acts as a key/val dictionary for station information. Let
the dictionary keys be the column names info:name, info:location, and
info:description. This table is static and the info family, in this case, closely mirrors
a typical RDBMS table design.
Observations
This table holds temperature observations. Let the row key be a composite key of
stationid + reverse order timestamp. Give this table a column family data that will
contain one column airtemp with the observed temperature as the column value.

For the stations table, the choice of stationid as key is obvious because we will always
access information for a particular station by its id. The observations table, however,
uses a composite key that adds the observation timestamp at the end. This will group
all observations for a particular station together, and by using a reverse order timestamp
(Long.MAX_VALUE - epoch) and storing it as binary, observations for each station will be
ordered with most recent observation first.

In the shell, you would define your tables as follows:
hbase(main):036:0> create 'stations', {NAME => 'info', VERSIONS => 1}
0 row(s) in 0.1304 seconds
hbase(main):037:0> create 'observations', {NAME => 'data', VERSIONS => 1}
0 row(s) in 0.1332 seconds
In both cases, we are interested only in the latest version of a table cell, so set VERSIONS to
1. The default is 3.

Example: A MapReduce application to import temperature data from HDFS into an HBase table
public class HBaseTemperatureImporter extends Configured implements Tool {
  // Inner-class for map
  static class HBaseTemperatureMapper<K, V> extends MapReduceBase implements
      Mapper<LongWritable, Text, K, V> {
    private NcdcRecordParser parser = new NcdcRecordParser();
    private HTable table;
    public void map(LongWritable key, Text value,
      OutputCollector<K, V> output, Reporter reporter)
    throws IOException {
      parser.parse(value.toString());
      if (parser.isValidTemperature()) {
        byte[] rowKey = RowKeyConverter.makeObservationRowKey(parser.getStationId(),
          parser.getObservationDate().getTime());
        Put p = new Put(rowKey);
        p.add(HBaseTemperatureCli.DATA_COLUMNFAMILY,
          HBaseTemperatureCli.AIRTEMP_QUALIFIER,
          Bytes.toBytes(parser.getAirTemperature()));

       table.put(p);
      }
    }
    public void configure(JobConf jc) {
      super.configure(jc);
      // Create the HBase table client once up-front and keep it around
      // rather than create on each map invocation.

      try {
        this.table = new HTable(new HBaseConfiguration(jc), "observations");
      } catch (IOException e) {
        throw new RuntimeException("Failed HTable construction", e);
      }
    }
    @Override
    public void close() throws IOException {
      super.close();
      table.close();
    }
  }
  public int run(String[] args) throws IOException {
    if (args.length != 1) {
      System.err.println("Usage: HBaseTemperatureImporter <input>");
      return -1;
    }
    JobConf jc = new JobConf(getConf(), getClass());
    FileInputFormat.addInputPath(jc, new Path(args[0]));
    jc.setMapperClass(HBaseTemperatureMapper.class);
    jc.setNumReduceTasks(0);
    jc.setOutputFormat(NullOutputFormat.class);
    JobClient.runJob(jc);
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new HBaseConfiguration(),
        new HBaseTemperatureImporter(), args);
    System.exit(exitCode);
  }
}

HBaseTemperatureImporter has an inner class named HBaseTemperatureMapper that is like
the MaxTemperatureMapper class from Chapter 5. The outer class implements Tool and
does the setup to launch the HBaseTemperatureMapper inner class. HBaseTemperatureMap
per takes the same input as MaxTemperatureMapper and does the same parse—using the
NcdcRecordParser introduced in Chapter 5—to check for valid temperatures, but rather
than add valid temperatures to the output collector as MaxTemperatureMapper does,
instead it adds valid temperatures to the observations HBase table into the data:airtemp
column. (We are using static defines for data and airtemp imported from HBase
TemperatureCli class described later below.) In the configure() method, we create an
HTable instance once against the observations table and use it afterward in map invocations
talking to HBase. 
Finally, we call close on our HTable instance to flush out any write buffers not yet cleared.

The row key used is created in the makeObservationRowKey() method on RowKey
Converter from the station ID and observation time:
public class RowKeyConverter {
  private static final int STATION_ID_LENGTH = 12;
  /**
   * @return A row key whose format is: <station_id> <reverse_order_epoch>
   */
  public static byte[] makeObservationRowKey(String stationId,
      long observationTime) {
    byte[] row = new byte[STATION_ID_LENGTH + Bytes.SIZEOF_LONG];
    Bytes.putBytes(row, 0, Bytes.toBytes(stationId), 0, STATION_ID_LENGTH);
    long reverseOrderEpoch = Long.MAX_VALUE - observationTime;
    Bytes.putLong(row, STATION_ID_LENGTH, reverseOrderEpoch);
    return row;
  }
}        

The conversion takes advantage of the fact that the station ID is a fixed-length string.
The Bytes class used in makeObservationRowKey() is from the HBase utility package. It
includes methods for converting between byte arrays and common Java and Hadoop
types. In makeObservationRowKey(), the Bytes.putLong() method is used to fill the key
byte array. The Bytes.SIZEOF_LONG constant is used for sizing and positioning in the
row key array.
We can run the program with the following:
% hbase HBaseTemperatureImporter input/ncdc/all

Web Queries:

To implement the web application, we will use the HBase Java API directly. Here it
becomes clear how important your choice of schema and storage format is.
The simplest query will be to get the static station information. This type of query is
simple in a traditional database, but HBase gives you additional control and flexibility.
Using the info family as a key/value dictionary (column names as keys, column values
as values), the code would look like this:
  public Map<String, String> getStationInfo(HTable table, String stationId)
      throws IOException {
    Get get = new Get(Bytes.toBytes(stationId));
    get.addColumn(INFO_COLUMNFAMILY);
    Result res = table.get(get);
    if (res == null) {
      return null;
    }

    Map<String, String> resultMap = new HashMap<String, String>();
    resultMap.put("name", getValue(res, INFO_COLUMNFAMILY, NAME_QUALIFIER));
    resultMap.put("location", getValue(res, INFO_COLUMNFAMILY, LOCATION_QUALIFIER));
    resultMap.put("description", getValue(res, INFO_COLUMNFAMILY,
      DESCRIPTION_QUALIFIER));
    return resultMap;
  }
  private static String getValue(Result res, byte [] cf, byte [] qualifier) {
    byte [] value = res.getValue(cf, qualifier);
    return value == null? "": Bytes.toString(value);
  }
In this example, getStationInfo() takes an HTable instance and a station ID. To get the
station info, we use HTable.get() passing a Get instance configured to get all the column
values for the row identified by the station ID in the defined column family, INFO_COL
UMNFAMILY.

The get() results are returned in Result. It contains the row and you can fetch cell
values by stipulating the column cell wanted. The getStationInfo() method converts
the Result Map into a more friendly Map of String keys and values.
We can already see how there is a need for utility functions when using HBase. There
are an increasing number of abstractions being built atop HBase to deal with this lowlevel
interaction, but it’s important to understand how this works and how storage
choices make a difference.
One of the strengths of HBase over a relational database is that you don’t have to
prespecify the columns. So, in the future, if each station now has at least these three
attributes but there are hundreds of optional ones, we can just insert them without
modifying the schema. Your applications reading and writing code would of course
need to be changed. The example code might change in this case to looping through
Result rather than grabbing each value explicitly.
We will make use of HBase scanners for retrieval of observations in our web
application.
Here we are after a Map<ObservationTime, ObservedTemp> result. We will use a
NavigableMap<Long, Integer> because it is sorted and has a descendingMap() method,
so we can access observations in both ascending or descending order.

Example: Methods for retrieving a range of rows of weather station observations from an HBase
table
  public NavigableMap<Long, Integer> getStationObservations(HTable table,
      String stationId, long maxStamp, int maxCount) throws IOException {
    byte[] startRow = RowKeyConverter.makeObservationRowKey(stationId, maxStamp);
    NavigableMap<Long, Integer> resultMap = new TreeMap<Long, Integer>();
    Scan scan = new Scan(startRow);
    scan.addColumn(DATA_COLUMNFAMILY, AIRTEMP_QUALIFIER);
    ResultScanner scanner = table.getScanner(scan);
    Result res = null;
    int count = 0;
    try {
      while ((res = scanner.next()) != null && count++ < maxCount) {
        byte[] row = res.getRow();
        byte[] value = res.getValue(DATA_COLUMNFAMILY, AIRTEMP_QUALIFIER);
        Long stamp = Long.MAX_VALUE 
         Bytes.toLong(row, row.length - Bytes.SIZEOF_LONG, Bytes.SIZEOF_LONG);

       Integer temp = Bytes.toInt(value);

       resultMap.put(stamp, temp);

     }

   } finally {

     scanner.close();

   }

   return resultMap;

 }

  /**
   * Return the last ten observations.
   */
  public NavigableMap<Long, Integer> getStationObservations(HTable table,
      String stationId) throws IOException {
    return getStationObservations(table, stationId, Long.MAX_VALUE, 10);
  
The getStationObservations() method takes a station ID and a range defined by max
Stamp and a maximum number of rows (maxCount). Note that the NavigableMap that is
returned is actually now in descending time order. If you want to read through it in
ascending order, you would make use of NavigableMap.descendingMap().

Scanners:

HBase scanners are like cursors in a traditional database or Java iterators, except—
unlike the latter—they have to be closed after use. Scanners return rows in order. Users
obtain a scanner on an HBase table by calling HTable.getScanner(scan) where the
scan parameter is a configured instance of a Scan object. In the Scan instance, you can
pass the row at which to start and stop the scan, which columns in a row to return in
the row result, and optionally, a filter to run on the server side.
interface, which is returned when you call HTable.getScanner(), is as follows:
public interface ResultScanner extends Closeable, Iterable<Result> {
  public Result next() throws IOException;
  public Result [] next(int nbRows) throws IOException;
  public void close(); 
}

The ResultScanner
You can ask for the next row’s results or a number of rows. Each invocation of
next() involves a trip back to the regionserver, so grabbing a bunch of rows at once can
make for significant performance savings.

The advantage of storing things as Long.MAX_VALUE - stamp may not be clear in the
previous example. It has more use when you want to get the newest observations for a
given offset and limit, which is often the case in web applications. If the observations
were stored with the actual stamps, we would be able to get only the oldest observations
for a given offset and limit efficiently. Getting the newest would mean getting all of
them and then grabbing them off the end. One of the prime reasons for moving from
RDBMS to HBase is to allow for these types of “early-out” scenarios.

HBase Versus RDBMS:

HBase and other column-oriented databases are often compared to more traditional
and popular relational databases or RDBMSs. Although they differ dramatically in their
implementations and in what they set out to accomplish, the fact that they are potential
solutions to the same problems means that despite their enormous differences, the
comparison is a fair one to make.
As described previously, HBase is a distributed, column-oriented data storage system.
It picks up where Hadoop left off by providing random reads and writes on top of
HDFS. It has been designed from the ground up with a focus on scale in every direction:
tall in numbers of rows (billions), wide in numbers of columns (millions), and to be
horizontally partitioned and replicated across thousands of commodity nodes automatically.
The table schemas mirror the physical storage, creating a system for efficient data structure serialization, storage, and retrieval. The burden is on the application developer to make use of this storage and retrieval in the right way.

Successful Service:

Here is a synopsis of how the typical RDBMS scaling story runs. The following list
presumes a successful growing service:
Initial public launch
Move
from local workstation to shared, remote hosted MySQL instance with a
well-defined schema.
Service becomes more popular; too many reads hitting the database
Add memcached to cache common queries. Reads are now no longer strictly ACID;
cached data must expire.
Service continues to grow in popularity; too many writes hitting the database
Scale MySQL vertically by buying a beefed up server with 16 cores, 128 GB of RAM,
and banks of 15 k RPM hard drives. Costly.
New features increases query complexity; now we have too many joins
Denormalize your data to reduce joins. (That’s not what they taught me in DBA
school!)
Rising popularity swamps the server; things are too slow
Stop doing any server-side computations.
Some queries are still too slow
Periodically prematerialize the most complex queries, try to stop joining in most
cases.

Reads are OK, but writes are getting slower and slower
Drop secondary indexes and triggers (no indexes?).

HBase
Enter HBase, which has the following characteristics:
No real indexes
Rows are stored sequentially, as are the columns within each row. Therefore, no
issues with index bloat, and insert performance is independent of table size.
Automatic partitioning
As your tables grow, they will automatically be split into regions and distributed
across all available nodes.
Scale linearly and automatically with new nodes
Add a node, point it to the existing cluster, and run the regionserver. Regions will
automatically rebalance and load will spread evenly.
Commodity hardware
Clusters are built on $1,000–$5,000 nodes rather than $50,000 nodes. RDBMSs
are I/O hungry, requiring more costly hardware.
Fault tolerance
Lots of nodes means each is relatively insignificant. No need to worry about individual
node downtime.
Batch processing
MapReduce integration allows fully parallel, distributed jobs against your data
with locality awareness.

If you stay up at night worrying about your database (uptime, scale, or speed), then
you should seriously consider making a jump from the RDBMS world to HBase. Utilize
a solution that was intended to scale rather than a solution based on stripping down
and throwing money at what used to work. With HBase, the software is free, the hardware
is cheap, and the distribution is intrinsic.

Life with HBase:

Our RDBMS-based system was always capable of correctly implementing our requirements;
the issue was scaling. 
When you start to focus on scale and performance rather than correctness, you end up short-cutting and optimizing for your domain-specific use cases everywhere possible. Once you start implementing your own solutions to your data problems, the overhead and complexity of an RDBMS gets in your way. The
abstraction from the storage layer and ACID requirements are an enormous barrier and
luxury that you cannot always afford when building for scale. HBase is a distributed,
column-oriented, sorted map store and not much else. The only major part that is
abstracted from the user is the distribution, and that’s exactly what we don’t want to
deal with. Business logic, on the other hand, is very specialized and optimized. With
HBase not trying to solve all of our problems, we’ve been able to solve them better
ourselves and rely on HBase for scaling our storage, not our logic. It was an extremely
liberating experience to be able to focus on our applications and logic rather than the
scaling of the data itself.

We currently have tables with hundreds of millions of rows and tens of thousands of
columns; the thought of storing billions of rows and millions of columns is exciting,
not scary.

Praxis:

Versions:

Up until HBase 0.20, HBase aligned its versioning with that of Hadoop. A particular
HBase version would run on any Hadoop that had a matching minor version, where
minor version in this context is considered the number between the periods (e.g., 20 is
the minor version of an HBase 0.20.5). HBase 0.20.5 would run on an Hadoop 0.20.2,
but HBase 0.19.5 would not run on Hadoop 0.20.0.
With HBase 0.90, the version relationship was broken. The Hadoop release cycle has
slowed and no longer aligns with that of HBase developments. Also, the intent is that
now a particular HBase version can run on multiple versions of Hadoop. For example,
HBase 0.90.x will work with both Hadoop 0.20.x and 0.21.x.
This said, ensure you are running compatible versions of Hadoop and HBase. Check
the requirements section of your download. Incompatible versions will throw an exception
complaining about the version mismatch, if you are lucky. 
If they cannot talk to each sufficiently to pass versions, you may see your HBase cluster hang indefinitely,
soon after startup. 
The mismatch exception or HBase hang can also happen on upgrade if older versions of either HBase or
Hadoop can still be found on the classpath because of imperfect cleanup of the old software.

HDFS:

HBase’s use of HDFS is very different from how it’s used by MapReduce. In MapReduce,
generally, HDFS files are opened, with their content streamed through a map
task and then closed. In HBase, data files are opened on cluster startup and kept open
so that we avoid paying the file open costs on each access. Because of this, HBase tends
to see issues not normally encountered by MapReduce clients:
Running out of file descriptors
Because we keep files open, on a loaded cluster, it doesn’t take long before we run
into system- and Hadoop-imposed limits. For instance, say we have a cluster that
has three nodes each running an instance of a datanode and a regionserver and
we’re running an upload into a table that is currently at 100 regions and 10 column
families. Allow that each column family has on average two flush files. Doing the
math, we can have 100 × 10 × 2, or 2,000, files open at any one time. Add to this
total miscellaneous other descriptors consumed by outstanding scanners and Java
libraries. Each open file consumes at least one descriptor over on the remote datanode.

The default limit on the number of file descriptors per process is 1,024.
When we exceed the filesystem ulimit, we’ll see the complaint about Too many
open files in logs, but often you’ll first see indeterminate behavior in HBase. The
fix requires increasing the file descriptor ulimit count.

 You can verify that the HBase process is running with sufficient file descriptors by looking at the first few
lines of a regionserver’s log. It emits vitals such as the JVM being used and environment
settings such as the file descriptor ulimit.

Running out of datanode threads
Similarly, the Hadoop datanode has an upper bound of 256 on the number of
threads it can run at any one time. Given the same table statistics quoted in the
preceding bullet, it’s easy to see how we can exceed this upper bound relatively
early, given that in the datanode as of this writing each open connection to a file
block consumes a thread. If you look in the datanode log, you’ll see a complaint
like xceiverCount 258 exceeds the limit of concurrent xcievers 256 but again, you’ll
likely see HBase act erratically before you encounter this log entry. Increase the
dfs.datanode.max.xcievers (note that the property name is misspelled) count in
HDFS and restart your cluster.

Sync
You must run HBase on an HDFS that has a working sync. Otherwise, you will
lose data. This means running HBase on Hadoop 0.20.205.0 or later.

UI:

HBase runs a web server on the master to present a view on the state of your running
cluster. By default, it listens on port 60010. The master UI displays a list of basic attributes
such as software versions, cluster load, request rates, lists of cluster tables, and participating regionservers.
Click on a regionserver in the master UI and you are taken to the web server running on the individual regionserver.
It lists the regions this server is carrying and basic metrics such as resources consumed and request rates.

Schema Design:

HBase tables are like those in an RDBMS, except that cells are versioned, rows are
sorted, and columns can be added on the fly by the client as long as the column family
they belong to preexists. These factors should be considered when designing schemas
for HBase, but far and away the most important concern designing schemas is consideration
of how the data will be accessed. All access is via primary key so the key design should lend itself
to how the data is going to be queried. 
The other property to keep in mind when designing schemas is that a defining attribute of column(-family)-oriented stores, like HBase, is that it can host wide and sparsely populated tables at no incurred cost. 

Row keys:

Take time designing your row key. In the weather data example in this chapter, the
compound row key has a station prefix that served to group temperatures by station.
The reversed timestamp suffix made it so temperatures could be scanned ordered from
most recent to oldest. A smart compound key can be used to cluster data in ways
amenable to how it will be accessed.
Designing compound keys, you may have to zero-pad number components so row keys
sort properly. Otherwise, you will run into the issue where 10 sorts before 2 when only
byte-order is considered (02 sorts before 10).
If your keys are integers, use a binary representation rather than persist the string version
of a number—it consumes less space.

Bulk Load:

HBase has an efficient facility for bulk loading HBase by writing its internal data format
directly into the filesystem from MapReduce. Going this route, it’s possible to load an
HBase instance at rates that are an order of magnitude or more beyond those attainable
by writing via the HBase client API. The facility is described at http://hbase.apache.org/
docs/current/bulk-loads.html. It’s also possible to bulk load into a live table.