Thursday, May 29, 2014

HBase

HBase

- Is distributed column oriented database built on top of HDFS.
- Is Hadoop application when you require real-time read/write random access to very large datasets.
- It can host very large, sparsely populated tables on clusters made from commodity hardware.
- HBase usecase is the webtable, table of crawled webpages and their attributes keyed by webpage URL.
- Webtable is large, with row counts that run into the billions.
- Table is randomly accessed by crawlers running at various rates updating random rows while random web pages are served in real time as users click on websites cached page feature.


Concepts:

Whirlwind Tour of the Data Model:

- Applications store data into labelled tables.
- Tables are made of rows and columns.
- Table cells- intersection of row and column coordinates- are versioned.
- Their version is a timestamp auto assigned by HBase at the time of cell insertion.
- Tables column families must be specified upfront as part of the table schema definition, but new column family members can be added on demand.

- Physically, all column family members are stored together on the filesystem.
- So, though we described HBase as column oriented store,  it would be more accurate if it were described as column family oriented store.
- As tunings and storage specifications are done at column family level, it is advised that all column family members have same general access pattern and size characteristics.
- HBase tables are like those in RDBMS, only cells are versioned, rows are sorted and columns can be added on fly by client as long as column family they belong to persists.

Regions:

- Tables are partitioned horizontally by HBase into regions.
- Each region comprises subset of tables rows.
- Region is denoted by table it belongs to, its first row, inclusive and last row, exclusive.
- Initially table comprises single region, but as size of region grows, after it crosses configurable size threshold, it splits at row boundary into two new regions of equal size.
- Until this first split happens, all loading will be against single server hosting original region.
- As table grows number of its regions grow.
- Regions are units that get distributed over HBase cluster.
- In this way, table that is too big for any one server can be carried by cluster of servers with each node hosting a subset of tables total regions.
- This is also means by which loading on table gets distributed.
- The online set of sorted regions comprises tables total content.

Locking:

- Row updates are atomic, no matter how many row columns constitute row level transaction.
This keeps locking model simple.

Implementation:

- HBase is modeled with an HBase master node orchestrating cluster of one or more regionserver slaves.
- HBase master is responsible for bootstrapping virgin install, for assigning regions to registered regionservers, and for recovering regionserver failures.
- The master node is lightly loaded.
- Regionservers carry zero or more regions and field client read/write requests.
- Also manage region splits informing HBase master about new daughter regions for it to manage offlining of parent region and assignment of replacement daughters.
-
- HBase depends on ZooKeeper and by default it manages the ZooKeeper instance as the authority on cluster state.
- HBase hosts vital such as location of root catalog table and address of the current cluster Master.
- Assignment of regions is mediated via ZooKeeper in case paricipating servers crash mid assignment.
- Hosting the assignment transaction state in ZooKeeper makes it so recovery can pick up on assignment at where crashed server left off.
- At minimum, bootstrapping client connection to an HBase cluster, client must be passed location of the ZooKeeper ensemble.
- Thus clients navigates Zookeeper hierarchy to learn cluster attributes such as server locations.

- Regionserver slave nodes are listed in HBase conf/regionservers file as you would list datanodes and tasktrackers in Hadoop conf/slaves file.
- Start and stop scripts are like those in Hadoop using the same SSH- based running of remote commands mechanism.
- Cluster site-specific configuration is made in the HBase conf/hbase-site.xml and conf/hbase-env.sh files, which have same format as that of their equivalents up in the Hadoop parent project.

- HBase persists data via the Hadoop filesystem API.
- HBase writes to local filesystem.
- Local filesystem is fine for experimenting with your initial HBase install, but thereafter, usually first configuration made in an HBase cluster involves pointing HBase at the HDFS cluster to use.

HBase in operation:

- HBase internally keeps special catalog tables named -ROOT- and .META within which it maintains the current list, state and location of all regions afloat on cluster.
- -ROOT- table holds the list of .META table regions.
- .META. table holds the list of user space regions.
- Entries in these tables are keyed by region name, where region name is made up of table name and region belongs to, regions start row, its time of creation, and finally MD5 hashing of all former.
- As regions transition are split, disabled/enabled, deleted, redeployed by region load balancer, or redeployed due to regionserver crash- catalog tables are updated so state of all regions on cluster is kept current.
- First client connects to the ZooKeeper cluster first to learn the location of -ROOT- clients consult -ROOT- to elicit location of the .META. region whose scope covers that of requested row.
- Client then does lookup against the found .META. region to figure hosting user-space region and its location.
- Thus, client interacts directly with hosting regionserver.
-  If in turn, consulted .META. region has moved, then -ROOT- is reconsulted.
- Writing arriving at regionserver are first appended to commit log and then are added to an inmemory memstore.
- When memstore fills, its content is flushed to the filesystem.
- Commit log is hosted on HDFS, so it remains available through regionserver crash.
- When master notices that regionserver is no longer reachable, usually because the servers znode has expired in ZooKeeper, it splits the dead regionservers commit log by region.
- Reading, the regions memstore is consulted first.
- If sufficient versions are found reading memstore alone, query completes there.

Installation:

Download a stable release from an Apache Download Mirror and unpack it on your
local filesystem. For example:
% tar xzf hbase-x.y.z.tar.gz

- As with Hadoop, you first need to tell HBase where Java is located on your system. If
you have the JAVA_HOME environment variable set to point to a suitable Java installation,
then that will be used, and you don’t have to configure anything further. Otherwise,
you can set the Java installation that HBase uses by editing HBase’s conf/hbaseenv.sh,
and specifying the JAVA_HOME variable (see Appendix A for some examples) to
point to version 1.6.0 of Java.

- For convenience, add the HBase binary directory to your command-line path. For
example:
% export HBASE_HOME=/home/hbase/hbase-x.y.z
% export PATH=$PATH:$HBASE_HOME/bin
To get the list of HBase options, type:
% hbase
Usage: hbase <command>
where <command> is one of:
  shell            run the HBase shell
  master           run an HBase HMaster node
  regionserver     run an HBase HRegionServer node
  zookeeper        run a Zookeeper server
  rest             run an HBase REST server
  thrift           run an HBase Thrift server
  avro             run an HBase Avro server
  migrate          upgrade an hbase.rootdir
  hbck             run the hbase 'fsck' tool
 or
  CLASSNAME        run the class named CLASSNAME

Test Drive:

- To start a temporary instance of HBase that uses the /tmp directory on the local filesystem
for persistence, type:
% start-hbase.sh
This will launch a standalone HBase instance that persists to the local filesystem; by
default, HBase will write to /tmp/hbase-${USERID}.

To administer your HBase instance, launch the HBase shell by typing:
% hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version: 0.89.0-SNAPSHOT, ra4ea1a9a7b074a2e5b7b24f761302d4ea28ed1b2, Sun Jul 18
15:01:50 PDT 2010 hbase(main):001:0>

Now let us create a simple table, add some data, and then clean up.

To create a table, you must name your table and define its schema. A table’s schema
comprises table attributes and the list of table column families. Column families
themselves have attributes that you in turn set at schema definition time. Examples of
column family attributes include whether the family content should be compressed on
the filesystem and how many versions of a cell to keep. Schemas can be later edited by
offlining the table using the shell disable command, making the necessary alterations
using alter, then putting the table back online with enable.

To create a table named test with a single column family name data using defaults for
table and column family attributes, enter:
hbase(main):007:0> create 'test', 'data'
0 row(s) in 1.3066 seconds

To prove the new table was created successfully, run the list command. This will
output all tables in user space:
hbase(main):019:0> list
test                                                                                                         
1 row(s) in 0.1485 seconds

To insert data into three different rows and columns in the data column family, and
then list the table content, do the following:
hbase(main):021:0> put 'test', 'row1', 'data:1', 'value1'
0 row(s) in 0.0454 seconds
hbase(main):022:0> put 'test', 'row2', 'data:2', 'value2'
0 row(s) in 0.0035 seconds
hbase(main):023:0> put 'test', 'row3', 'data:3', 'value3'
0 row(s) in 0.0090 seconds
hbase(main):024:0> scan 'test'
ROW                          COLUMN+CELL                                                                     
 row1                        column=data:1, timestamp=1240148026198, value=value1                            
 row2                        column=data:2, timestamp=1240148040035, value=value2                            
 row3                        column=data:3, timestamp=1240148047497, value=value3                            
3 row(s) in 0.0825 seconds

To remove the table, you must first disable it before dropping it:
hbase(main):025:0> disable 'test'
09/04/19 06:40:13 INFO client.HBaseAdmin: Disabled test
0 row(s) in 6.0426 seconds
hbase(main):026:0> drop 'test'
09/04/19 06:40:17 INFO client.HBaseAdmin: Deleted test

0 row(s) in 0.0210 seconds
hbase(main):027:0> list
0 row(s) in 2.0645 seconds
Shut down your HBase instance by running:
% stop-hbase.sh

Clients:

There are number of client options for interacting with an HBase cluster.

Example: Basic table administration and access
public class ExampleClient {
  public static void main(String[] args) throws IOException {
    Configuration config = HBaseConfiguration.create();
    // Create table
    HBaseAdmin admin = new HBaseAdmin(config);
    HTableDescriptor htd = new HTableDescriptor("test");
    HColumnDescriptor hcd = new HColumnDescriptor("data");
    htd.addFamily(hcd);
    admin.createTable(htd);
    byte [] tablename = htd.getName();
    HTableDescriptor [] tables = admin.listTables();
    if (tables.length != 1 && Bytes.equals(tablename, tables[0].getName())) {
      throw new IOException("Failed create of table");
    }
    // Run some operations -- a put, a get, and a scan -- against the table.
    HTable table = new HTable(config, tablename);
    byte [] row1 = Bytes.toBytes("row1");
    Put p1 = new Put(row1);
    byte [] databytes = Bytes.toBytes("data");
    p1.add(databytes, Bytes.toBytes("1"), Bytes.toBytes("value1"));
    table.put(p1);
    Get g = new Get(row1);
   Result result = table.get(g);
    System.out.println("Get: " + result);
    Scan scan = new Scan();
    ResultScanner scanner = table.getScanner(scan);
    try {
      for (Result scannerResult: scanner) {
        System.out.println("Scan: " + scannerResult);
      }
    } finally {
      scanner.close();
    }
   
    // Drop the table
    admin.disableTable(tablename);
    admin.deleteTable(tablename);
  }
}

This class has a main method only. For the sake of brevity, we do not include package
name nor imports. In this class, we first create an instance of
org.apache.hadoop.conf.Configuration. We ask the org.apache.hadoop.hbase.HBase
Configuration class to create the instance. It will return a Configuration that has read
HBase configuration from hbase-site.xml and hbase-default.xml files found on the
program’s classpath. This Configuration is subsequently used to create instances of
HBaseAdmin and HTable, two classes found in the org.apache.hadoop.hbase.client Java
package. HBaseAdmin is used for administering your HBase cluster, for adding and dropping
tables.
HTable is used to access a specific table. 
The Configuration instance points these classes at the cluster the code is to work against.

To create a table, we need to first create an instance of HBaseAdmin and then ask it to
create the table named test with a single column family named data. In our example,
our table schema is the default. Use methods on org.apache.hadoop.hbase.HTableDe
scriptor and org.apache.hadoop.hbase.HColumnDescriptor to change the table schema.
The code next asserts the table was actually created and then it moves to run operations
against the just-created table.

Operating on a table, we will need an instance of org.apache.hadoop.hbase.client.HTable passing it our Configuration instance and the name of the table we want to operate on. After creating an HTable, we then create an instance of org.apache.hadoop.hbase.client. Put to put a single cell value of value1 into a row
named row1 on the column named data:1 (The column name is specified in two parts;
the column family name as bytes—databytes in the code above—and then the column
family qualifier specified as Bytes.toBytes("1")). Next we create an
org.apache.hadoop.hbase.client.Get, do a get of the just-added cell, and then use an
org.apache.hadoop.hbase.client.Scan to scan over the table against the just-created
table printing out what we find.
Finally, we clean up by first disabling the table and then deleting it. A table must be
disabled before it can be dropped.

Example: A MapReduce application to count the number of rows in an HBase table
public class RowCounter {
  /** Name of this 'program'. */
  static final String NAME = "rowcounter";
  static class RowCounterMapper
  extends TableMapper<ImmutableBytesWritable, Result> {
    /** Counter enumeration to count the actual rows. */
    public static enum Counters {ROWS}
    @Override
    public void map(ImmutableBytesWritable row, Result values,
      Context context)
    throws IOException {
      for (KeyValue value: values.list()) {
        if (value.getValue().length > 0) {
          context.getCounter(Counters.ROWS).increment(1);
          break;
        }
      }
    }
  }
  public static Job createSubmittableJob(Configuration conf, String[] args)
  throws IOException {
    String tableName = args[0];
    Job job = new Job(conf, NAME + "_" + tableName);
    job.setJarByClass(RowCounter.class);
    // Columns are space delimited
    StringBuilder sb = new StringBuilder();
    final int columnoffset = 1;
    for (int i = columnoffset; i < args.length; i++) {
      if (i > columnoffset) {
        sb.append(" ");
      }
      sb.append(args[i]);
    }
    Scan scan = new Scan();
    scan.setFilter(new FirstKeyOnlyFilter());
    if (sb.length() > 0) {
      for (String columnName :sb.toString().split(" ")) {
        String [] fields = columnName.split(":");
        if(fields.length == 1) {
          scan.addFamily(Bytes.toBytes(fields[0]));
        } else {
          scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1]));
        }
      }
    }
    // Second argument is the table name.
    job.setOutputFormatClass(NullOutputFormat.class);
    TableMapReduceUtil.initTableMapperJob(tableName, scan,
      RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
    job.setNumReduceTasks(0);
    return job;
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 1) {
      System.err.println("ERROR: Wrong number of parameters: " + args.length);
      System.err.println("Usage: RowCounter <tablename> [<column1> <column2>...]");
      System.exit(-1);
    }
    Job job = createSubmittableJob(conf, otherArgs);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Avro, REST and Thrift

- HBase ships with Avro, REST and Thrift interfaces..
- These are useful when interacting application is written in language other than Java.

REST:
- To put up a stargate intstance, start it using following command:
% hbase-daemon.sh start rest
- This will start server instance by default on port 8080.
- To stop REST server, type:
% hbase-daemon.sh stop rest

Thrift:

- Start thrift service by putting up a server to field Thrift clients by running following:
% hbase-daemon.sh start thrift
This will start server instance on default port 9090.
 The HBase Thrift IDL
can be found at src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift in the
HBase source code.
To stop the Thrift server, type:
% hbase-daemon.sh stop thrift

Avro:

- it is started and stopped in same manner as you start and stop the Thrift or REST services.
- Avro server by default uses port 9090.

Example:

- The existing weather dataset described in previous chapters contains observations for
tens of thousands of stations over 100 years and this data is growing without bound.
- In this example, we will build a simple web interface that allows a user to navigate the
different stations and page through their historical temperature observations in time
order. For the sake of this example, let us allow that the dataset is massive, that the
observations run to the billions, and that the rate at which temperature updates arrive
is significant—say hundreds to thousands of updates a second from around the world
across the whole range of weather stations. Also, let us allow that it is a requirement
that the web application must display the most up-to-date observation within a second
or so of receipt.

Schemas:

- In our example, there will be two tables:
Stations
This table holds station data. Let the row key be the stationid. Let this table have
a column family info that acts as a key/val dictionary for station information. Let
the dictionary keys be the column names info:name, info:location, and
info:description. This table is static and the info family, in this case, closely mirrors
a typical RDBMS table design.
Observations
This table holds temperature observations. Let the row key be a composite key of
stationid + reverse order timestamp. Give this table a column family data that will
contain one column airtemp with the observed temperature as the column value.

For the stations table, the choice of stationid as key is obvious because we will always
access information for a particular station by its id. The observations table, however,
uses a composite key that adds the observation timestamp at the end. This will group
all observations for a particular station together, and by using a reverse order timestamp
(Long.MAX_VALUE - epoch) and storing it as binary, observations for each station will be
ordered with most recent observation first.

In the shell, you would define your tables as follows:
hbase(main):036:0> create 'stations', {NAME => 'info', VERSIONS => 1}
0 row(s) in 0.1304 seconds
hbase(main):037:0> create 'observations', {NAME => 'data', VERSIONS => 1}
0 row(s) in 0.1332 seconds
In both cases, we are interested only in the latest version of a table cell, so set VERSIONS to
1. The default is 3.

Example: A MapReduce application to import temperature data from HDFS into an HBase table
public class HBaseTemperatureImporter extends Configured implements Tool {
  // Inner-class for map
  static class HBaseTemperatureMapper<K, V> extends MapReduceBase implements
      Mapper<LongWritable, Text, K, V> {
    private NcdcRecordParser parser = new NcdcRecordParser();
    private HTable table;
    public void map(LongWritable key, Text value,
      OutputCollector<K, V> output, Reporter reporter)
    throws IOException {
      parser.parse(value.toString());
      if (parser.isValidTemperature()) {
        byte[] rowKey = RowKeyConverter.makeObservationRowKey(parser.getStationId(),
          parser.getObservationDate().getTime());
        Put p = new Put(rowKey);
        p.add(HBaseTemperatureCli.DATA_COLUMNFAMILY,
          HBaseTemperatureCli.AIRTEMP_QUALIFIER,
          Bytes.toBytes(parser.getAirTemperature()));

       table.put(p);
      }
    }
    public void configure(JobConf jc) {
      super.configure(jc);
      // Create the HBase table client once up-front and keep it around
      // rather than create on each map invocation.

      try {
        this.table = new HTable(new HBaseConfiguration(jc), "observations");
      } catch (IOException e) {
        throw new RuntimeException("Failed HTable construction", e);
      }
    }
    @Override
    public void close() throws IOException {
      super.close();
      table.close();
    }
  }
  public int run(String[] args) throws IOException {
    if (args.length != 1) {
      System.err.println("Usage: HBaseTemperatureImporter <input>");
      return -1;
    }
    JobConf jc = new JobConf(getConf(), getClass());
    FileInputFormat.addInputPath(jc, new Path(args[0]));
    jc.setMapperClass(HBaseTemperatureMapper.class);
    jc.setNumReduceTasks(0);
    jc.setOutputFormat(NullOutputFormat.class);
    JobClient.runJob(jc);
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new HBaseConfiguration(),
        new HBaseTemperatureImporter(), args);
    System.exit(exitCode);
  }
}

HBaseTemperatureImporter has an inner class named HBaseTemperatureMapper that is like
the MaxTemperatureMapper class from Chapter 5. The outer class implements Tool and
does the setup to launch the HBaseTemperatureMapper inner class. HBaseTemperatureMap
per takes the same input as MaxTemperatureMapper and does the same parse—using the
NcdcRecordParser introduced in Chapter 5—to check for valid temperatures, but rather
than add valid temperatures to the output collector as MaxTemperatureMapper does,
instead it adds valid temperatures to the observations HBase table into the data:airtemp
column. (We are using static defines for data and airtemp imported from HBase
TemperatureCli class described later below.) In the configure() method, we create an
HTable instance once against the observations table and use it afterward in map invocations
talking to HBase. 
Finally, we call close on our HTable instance to flush out any write buffers not yet cleared.

The row key used is created in the makeObservationRowKey() method on RowKey
Converter from the station ID and observation time:
public class RowKeyConverter {
  private static final int STATION_ID_LENGTH = 12;
  /**
   * @return A row key whose format is: <station_id> <reverse_order_epoch>
   */
  public static byte[] makeObservationRowKey(String stationId,
      long observationTime) {
    byte[] row = new byte[STATION_ID_LENGTH + Bytes.SIZEOF_LONG];
    Bytes.putBytes(row, 0, Bytes.toBytes(stationId), 0, STATION_ID_LENGTH);
    long reverseOrderEpoch = Long.MAX_VALUE - observationTime;
    Bytes.putLong(row, STATION_ID_LENGTH, reverseOrderEpoch);
    return row;
  }
}        

The conversion takes advantage of the fact that the station ID is a fixed-length string.
The Bytes class used in makeObservationRowKey() is from the HBase utility package. It
includes methods for converting between byte arrays and common Java and Hadoop
types. In makeObservationRowKey(), the Bytes.putLong() method is used to fill the key
byte array. The Bytes.SIZEOF_LONG constant is used for sizing and positioning in the
row key array.
We can run the program with the following:
% hbase HBaseTemperatureImporter input/ncdc/all

Web Queries:

To implement the web application, we will use the HBase Java API directly. Here it
becomes clear how important your choice of schema and storage format is.
The simplest query will be to get the static station information. This type of query is
simple in a traditional database, but HBase gives you additional control and flexibility.
Using the info family as a key/value dictionary (column names as keys, column values
as values), the code would look like this:
  public Map<String, String> getStationInfo(HTable table, String stationId)
      throws IOException {
    Get get = new Get(Bytes.toBytes(stationId));
    get.addColumn(INFO_COLUMNFAMILY);
    Result res = table.get(get);
    if (res == null) {
      return null;
    }

    Map<String, String> resultMap = new HashMap<String, String>();
    resultMap.put("name", getValue(res, INFO_COLUMNFAMILY, NAME_QUALIFIER));
    resultMap.put("location", getValue(res, INFO_COLUMNFAMILY, LOCATION_QUALIFIER));
    resultMap.put("description", getValue(res, INFO_COLUMNFAMILY,
      DESCRIPTION_QUALIFIER));
    return resultMap;
  }
  private static String getValue(Result res, byte [] cf, byte [] qualifier) {
    byte [] value = res.getValue(cf, qualifier);
    return value == null? "": Bytes.toString(value);
  }
In this example, getStationInfo() takes an HTable instance and a station ID. To get the
station info, we use HTable.get() passing a Get instance configured to get all the column
values for the row identified by the station ID in the defined column family, INFO_COL
UMNFAMILY.

The get() results are returned in Result. It contains the row and you can fetch cell
values by stipulating the column cell wanted. The getStationInfo() method converts
the Result Map into a more friendly Map of String keys and values.
We can already see how there is a need for utility functions when using HBase. There
are an increasing number of abstractions being built atop HBase to deal with this lowlevel
interaction, but it’s important to understand how this works and how storage
choices make a difference.
One of the strengths of HBase over a relational database is that you don’t have to
prespecify the columns. So, in the future, if each station now has at least these three
attributes but there are hundreds of optional ones, we can just insert them without
modifying the schema. Your applications reading and writing code would of course
need to be changed. The example code might change in this case to looping through
Result rather than grabbing each value explicitly.
We will make use of HBase scanners for retrieval of observations in our web
application.
Here we are after a Map<ObservationTime, ObservedTemp> result. We will use a
NavigableMap<Long, Integer> because it is sorted and has a descendingMap() method,
so we can access observations in both ascending or descending order.

Example: Methods for retrieving a range of rows of weather station observations from an HBase
table
  public NavigableMap<Long, Integer> getStationObservations(HTable table,
      String stationId, long maxStamp, int maxCount) throws IOException {
    byte[] startRow = RowKeyConverter.makeObservationRowKey(stationId, maxStamp);
    NavigableMap<Long, Integer> resultMap = new TreeMap<Long, Integer>();
    Scan scan = new Scan(startRow);
    scan.addColumn(DATA_COLUMNFAMILY, AIRTEMP_QUALIFIER);
    ResultScanner scanner = table.getScanner(scan);
    Result res = null;
    int count = 0;
    try {
      while ((res = scanner.next()) != null && count++ < maxCount) {
        byte[] row = res.getRow();
        byte[] value = res.getValue(DATA_COLUMNFAMILY, AIRTEMP_QUALIFIER);
        Long stamp = Long.MAX_VALUE 
         Bytes.toLong(row, row.length - Bytes.SIZEOF_LONG, Bytes.SIZEOF_LONG);

       Integer temp = Bytes.toInt(value);

       resultMap.put(stamp, temp);

     }

   } finally {

     scanner.close();

   }

   return resultMap;

 }

  /**
   * Return the last ten observations.
   */
  public NavigableMap<Long, Integer> getStationObservations(HTable table,
      String stationId) throws IOException {
    return getStationObservations(table, stationId, Long.MAX_VALUE, 10);
  
The getStationObservations() method takes a station ID and a range defined by max
Stamp and a maximum number of rows (maxCount). Note that the NavigableMap that is
returned is actually now in descending time order. If you want to read through it in
ascending order, you would make use of NavigableMap.descendingMap().

Scanners:

HBase scanners are like cursors in a traditional database or Java iterators, except—
unlike the latter—they have to be closed after use. Scanners return rows in order. Users
obtain a scanner on an HBase table by calling HTable.getScanner(scan) where the
scan parameter is a configured instance of a Scan object. In the Scan instance, you can
pass the row at which to start and stop the scan, which columns in a row to return in
the row result, and optionally, a filter to run on the server side.
interface, which is returned when you call HTable.getScanner(), is as follows:
public interface ResultScanner extends Closeable, Iterable<Result> {
  public Result next() throws IOException;
  public Result [] next(int nbRows) throws IOException;
  public void close(); 
}

The ResultScanner
You can ask for the next row’s results or a number of rows. Each invocation of
next() involves a trip back to the regionserver, so grabbing a bunch of rows at once can
make for significant performance savings.

The advantage of storing things as Long.MAX_VALUE - stamp may not be clear in the
previous example. It has more use when you want to get the newest observations for a
given offset and limit, which is often the case in web applications. If the observations
were stored with the actual stamps, we would be able to get only the oldest observations
for a given offset and limit efficiently. Getting the newest would mean getting all of
them and then grabbing them off the end. One of the prime reasons for moving from
RDBMS to HBase is to allow for these types of “early-out” scenarios.

HBase Versus RDBMS:

HBase and other column-oriented databases are often compared to more traditional
and popular relational databases or RDBMSs. Although they differ dramatically in their
implementations and in what they set out to accomplish, the fact that they are potential
solutions to the same problems means that despite their enormous differences, the
comparison is a fair one to make.
As described previously, HBase is a distributed, column-oriented data storage system.
It picks up where Hadoop left off by providing random reads and writes on top of
HDFS. It has been designed from the ground up with a focus on scale in every direction:
tall in numbers of rows (billions), wide in numbers of columns (millions), and to be
horizontally partitioned and replicated across thousands of commodity nodes automatically.
The table schemas mirror the physical storage, creating a system for efficient data structure serialization, storage, and retrieval. The burden is on the application developer to make use of this storage and retrieval in the right way.

Successful Service:

Here is a synopsis of how the typical RDBMS scaling story runs. The following list
presumes a successful growing service:
Initial public launch
Move
from local workstation to shared, remote hosted MySQL instance with a
well-defined schema.
Service becomes more popular; too many reads hitting the database
Add memcached to cache common queries. Reads are now no longer strictly ACID;
cached data must expire.
Service continues to grow in popularity; too many writes hitting the database
Scale MySQL vertically by buying a beefed up server with 16 cores, 128 GB of RAM,
and banks of 15 k RPM hard drives. Costly.
New features increases query complexity; now we have too many joins
Denormalize your data to reduce joins. (That’s not what they taught me in DBA
school!)
Rising popularity swamps the server; things are too slow
Stop doing any server-side computations.
Some queries are still too slow
Periodically prematerialize the most complex queries, try to stop joining in most
cases.

Reads are OK, but writes are getting slower and slower
Drop secondary indexes and triggers (no indexes?).

HBase
Enter HBase, which has the following characteristics:
No real indexes
Rows are stored sequentially, as are the columns within each row. Therefore, no
issues with index bloat, and insert performance is independent of table size.
Automatic partitioning
As your tables grow, they will automatically be split into regions and distributed
across all available nodes.
Scale linearly and automatically with new nodes
Add a node, point it to the existing cluster, and run the regionserver. Regions will
automatically rebalance and load will spread evenly.
Commodity hardware
Clusters are built on $1,000–$5,000 nodes rather than $50,000 nodes. RDBMSs
are I/O hungry, requiring more costly hardware.
Fault tolerance
Lots of nodes means each is relatively insignificant. No need to worry about individual
node downtime.
Batch processing
MapReduce integration allows fully parallel, distributed jobs against your data
with locality awareness.

If you stay up at night worrying about your database (uptime, scale, or speed), then
you should seriously consider making a jump from the RDBMS world to HBase. Utilize
a solution that was intended to scale rather than a solution based on stripping down
and throwing money at what used to work. With HBase, the software is free, the hardware
is cheap, and the distribution is intrinsic.

Life with HBase:

Our RDBMS-based system was always capable of correctly implementing our requirements;
the issue was scaling. 
When you start to focus on scale and performance rather than correctness, you end up short-cutting and optimizing for your domain-specific use cases everywhere possible. Once you start implementing your own solutions to your data problems, the overhead and complexity of an RDBMS gets in your way. The
abstraction from the storage layer and ACID requirements are an enormous barrier and
luxury that you cannot always afford when building for scale. HBase is a distributed,
column-oriented, sorted map store and not much else. The only major part that is
abstracted from the user is the distribution, and that’s exactly what we don’t want to
deal with. Business logic, on the other hand, is very specialized and optimized. With
HBase not trying to solve all of our problems, we’ve been able to solve them better
ourselves and rely on HBase for scaling our storage, not our logic. It was an extremely
liberating experience to be able to focus on our applications and logic rather than the
scaling of the data itself.

We currently have tables with hundreds of millions of rows and tens of thousands of
columns; the thought of storing billions of rows and millions of columns is exciting,
not scary.

Praxis:

Versions:

Up until HBase 0.20, HBase aligned its versioning with that of Hadoop. A particular
HBase version would run on any Hadoop that had a matching minor version, where
minor version in this context is considered the number between the periods (e.g., 20 is
the minor version of an HBase 0.20.5). HBase 0.20.5 would run on an Hadoop 0.20.2,
but HBase 0.19.5 would not run on Hadoop 0.20.0.
With HBase 0.90, the version relationship was broken. The Hadoop release cycle has
slowed and no longer aligns with that of HBase developments. Also, the intent is that
now a particular HBase version can run on multiple versions of Hadoop. For example,
HBase 0.90.x will work with both Hadoop 0.20.x and 0.21.x.
This said, ensure you are running compatible versions of Hadoop and HBase. Check
the requirements section of your download. Incompatible versions will throw an exception
complaining about the version mismatch, if you are lucky. 
If they cannot talk to each sufficiently to pass versions, you may see your HBase cluster hang indefinitely,
soon after startup. 
The mismatch exception or HBase hang can also happen on upgrade if older versions of either HBase or
Hadoop can still be found on the classpath because of imperfect cleanup of the old software.

HDFS:

HBase’s use of HDFS is very different from how it’s used by MapReduce. In MapReduce,
generally, HDFS files are opened, with their content streamed through a map
task and then closed. In HBase, data files are opened on cluster startup and kept open
so that we avoid paying the file open costs on each access. Because of this, HBase tends
to see issues not normally encountered by MapReduce clients:
Running out of file descriptors
Because we keep files open, on a loaded cluster, it doesn’t take long before we run
into system- and Hadoop-imposed limits. For instance, say we have a cluster that
has three nodes each running an instance of a datanode and a regionserver and
we’re running an upload into a table that is currently at 100 regions and 10 column
families. Allow that each column family has on average two flush files. Doing the
math, we can have 100 × 10 × 2, or 2,000, files open at any one time. Add to this
total miscellaneous other descriptors consumed by outstanding scanners and Java
libraries. Each open file consumes at least one descriptor over on the remote datanode.

The default limit on the number of file descriptors per process is 1,024.
When we exceed the filesystem ulimit, we’ll see the complaint about Too many
open files in logs, but often you’ll first see indeterminate behavior in HBase. The
fix requires increasing the file descriptor ulimit count.

 You can verify that the HBase process is running with sufficient file descriptors by looking at the first few
lines of a regionserver’s log. It emits vitals such as the JVM being used and environment
settings such as the file descriptor ulimit.

Running out of datanode threads
Similarly, the Hadoop datanode has an upper bound of 256 on the number of
threads it can run at any one time. Given the same table statistics quoted in the
preceding bullet, it’s easy to see how we can exceed this upper bound relatively
early, given that in the datanode as of this writing each open connection to a file
block consumes a thread. If you look in the datanode log, you’ll see a complaint
like xceiverCount 258 exceeds the limit of concurrent xcievers 256 but again, you’ll
likely see HBase act erratically before you encounter this log entry. Increase the
dfs.datanode.max.xcievers (note that the property name is misspelled) count in
HDFS and restart your cluster.

Sync
You must run HBase on an HDFS that has a working sync. Otherwise, you will
lose data. This means running HBase on Hadoop 0.20.205.0 or later.

UI:

HBase runs a web server on the master to present a view on the state of your running
cluster. By default, it listens on port 60010. The master UI displays a list of basic attributes
such as software versions, cluster load, request rates, lists of cluster tables, and participating regionservers.
Click on a regionserver in the master UI and you are taken to the web server running on the individual regionserver.
It lists the regions this server is carrying and basic metrics such as resources consumed and request rates.

Schema Design:

HBase tables are like those in an RDBMS, except that cells are versioned, rows are
sorted, and columns can be added on the fly by the client as long as the column family
they belong to preexists. These factors should be considered when designing schemas
for HBase, but far and away the most important concern designing schemas is consideration
of how the data will be accessed. All access is via primary key so the key design should lend itself
to how the data is going to be queried. 
The other property to keep in mind when designing schemas is that a defining attribute of column(-family)-oriented stores, like HBase, is that it can host wide and sparsely populated tables at no incurred cost. 

Row keys:

Take time designing your row key. In the weather data example in this chapter, the
compound row key has a station prefix that served to group temperatures by station.
The reversed timestamp suffix made it so temperatures could be scanned ordered from
most recent to oldest. A smart compound key can be used to cluster data in ways
amenable to how it will be accessed.
Designing compound keys, you may have to zero-pad number components so row keys
sort properly. Otherwise, you will run into the issue where 10 sorts before 2 when only
byte-order is considered (02 sorts before 10).
If your keys are integers, use a binary representation rather than persist the string version
of a number—it consumes less space.

Bulk Load:

HBase has an efficient facility for bulk loading HBase by writing its internal data format
directly into the filesystem from MapReduce. Going this route, it’s possible to load an
HBase instance at rates that are an order of magnitude or more beyond those attainable
by writing via the HBase client API. The facility is described at http://hbase.apache.org/
docs/current/bulk-loads.html. It’s also possible to bulk load into a live table.





No comments:

Post a Comment