"; */ ?>

hbase


8
Oct 15

iterator-seq: chunks and hasNext

A couple of interesting fact to keep in mind about iterator-seq:

  • it calls hasNext right away
  • it “caches” reads by chunks of 32 items

Its implementation is quite simple and returns a lazy seq, but the above two is good to keep in mind when working with iteratees:

private static final int CHUNK_SIZE = 32;
public static ISeq chunkIteratorSeq(final Iterator iter){
    if(iter.hasNext()) {
        return new LazySeq(new AFn() {
            public Object invoke() {
                Object[] arr = new Object[CHUNK_SIZE];
                int n = 0;
                while(iter.hasNext() && n < CHUNK_SIZE)
                    arr[n++] = iter.next();
                return new ChunkedCons(new ArrayChunk(arr, 0, n), chunkIteratorSeq(iter));
            }
        });
    }
    return null;
}

Why 32? I like 42 better!


32 is a good choice for the CHUNK_SIZE since it matches the number of child nodes in Clojure (persistent) collections:

static public PersistentVector create(ISeq items){
    Object[] arr = new Object[32];
    int i = 0;
    for(;items != null && i < 32; items = items.next())
        arr[i++] = items.first();
 
    if(items != null) {  // >32, construct with array directly
        PersistentVector start = new PersistentVector(32, 5, EMPTY_NODE, arr);
        TransientVector ret = start.asTransient();
        for (; items != null; items = items.next())
            ret = ret.conj(items.first());
        return ret.persistent();
    } else if(i == 32) {   // exactly 32, skip copy
        return new PersistentVector(32, 5, EMPTY_NODE, arr);
    } else {  // <32, copy to minimum array and construct
        Object[] arr2 = new Object[i];
        System.arraycopy(arr, 0, arr2, 0, i);
        return new PersistentVector(i, 5, EMPTY_NODE, arr2);
    }
}

The Dark Side of “hasNext()”


But before creating a lazy seq, the first call “iterator-seq” does is iter.hasNext(). While this makes sense (why create a seq, if there is nothing to create it from), a thing to keep in mind is the implementation of the iteratee which is passed to “iterator-seq”. Here is an example from my recent HBase journey.

cbass wraps an HBase Scanner in “iterator-seq”:

(let [results (-> (.iterator (.getScanner h-table (scan-filter criteria)))
                  iterator-seq)

Once “iterator-seq” makes a call to iter.hasNext(), HBase scanner goes out and fetches the first result based on its filter. While this sounds ok, internally, depending on HBase client caching configuration, it may end up in fetching lots a lots of data to “cache” locally before returning a single item. Not exactly a “lazy seq behavior” the one can expect. More about it here.

To conclude: it is always good to keep a fresh copy of Clojure source code in the head :)

hbaseComments Off on HBase Scan: Let me cache it for you
8
Oct 15

HBase Scan: Let me cache it for you

HBase uses Scanners to SELECT things FROM table WHERE this AND that. An interesting bit that HBase scanners use is caching.

Since HBase designed to sit between a low level HDFS and a high level SQL front end, many of its APIs are “naturally leaking” which makes it harder to come up with good names, since they have to fit both worlds.

Scanner “caching” is a good example of a feature name that makes perfect sense internally in HBase, and is more than confusing for an HBase client.

Caching scans.. what!?


Here is how it works in a nutshell:

Scanners are iteratees that return results by calling “.next” on them.

Once the “.next” is called on a scanner, it will go and fetch the next result that matches its filter (i.e. similar to the SQL’s WHERE clause).

Here is the tricky part. Before the scanner returns that single result, it caches a chunk of results, so the next call to “.next” would come from this chunk, which effectively is a local “cache”. This is done so the trip to the region server for each call to “.next” can be avoided:

@Override
public Result next() throws IOException {
  // If the scanner is closed and there's nothing left in the cache, next is a no-op.
  if (cache.size() == 0 && this.closed) {
    return null;
  }
  if (cache.size() == 0) {
    loadCache();
  }
 
  if (cache.size() > 0) {
    return cache.poll();
  }
 
  // if we exhausted this scanner before calling close, write out the scan metrics
  writeScanMetrics();
  return null;
}

the snippet above is from HBase ClientScanner.

The loadCache() fetches a chunk size (read “cache size”) from a region server, and stuffs it in cache, which is just a Queue<Result>, that will be drained on all subsequent calls to “.next”.

Oh.. and Max Result Size is in bytes


Why is that tricky? Well, the tricky part is the size of this “chunk” that will be cached by the scanner on each request.

By default, this size is Integer.MAX_VALUE. Which means that the scanner will try to cache as much as possible, which is an Integer.MAX_VALUE or “hbase.client.scanner.max.result.size” which is by default 2MB. Depending on the size of the final result set, and on how scanners are used this can get unwieldy pretty quickly.

“hbase.client.scanner.max.result.size” is in bytes (not row numbers), so it is not exactly high level SQL’s “LIMIT” / “ROWNUM”. If not properly set, “caching” may end up either taking all the memory or simply return timeouts (e.g. “ScannerTimeoutException” which could get routed to “OutOfOrderScannerNextException”s, etc..) from the region server. Hence to get these two: “max size”, “cache size” play together nicely is important to scan smoothly.

Here is another good explanation by my friend Dan on why HBase caching is tricky and what it really does behind the covers.

Clients are Humans


If scanner “caching” was named a bit different, for example a “fetch size” would not be too bad, it would be a bit more obvious for the HBase client API. But then behind the covers, these values are truly “cached” and read from cache in each call to “scanner.next()”.

Both make sense, but I would argue that a client facing API should take precedence here. HBase developers already know the internals, and fetch size (as a property set to Scan) would not hurt them. “caching” can still be used internally under HBase covers.


6
Oct 15

HBase: Delete By Anything

HBase Cookies


“Big data” is this great old buzz that no longer surprises anyone. Even politicians used it way back in 2012 to win the elections.

But the field became so large and fragmented that simple CRUD operations, that we used to just take for granted before, now require the whole new approach depending on which data store we have to deal with.

This short tale is about a tiny little feature in the universe of HBase. Namely a “delete by” feature. Which is quite simple in Datomic or SQL databases, but it is not that trivial in HBase due to the way its cookie crumbles.

Delete it like you mean it!


There is often a case where rows need to be deleted by a filter, that is similar to the one used in scan (i.e. by row key prefix, time range, etc.) HBase does not really help there besides providing a BulkDeleteEndpoint coprocessor.

This is not ideal as it delegates work to HBase “stored procedures” (effectively this is what coprocessors are). It really pays off during massive data manipulation, since it does happen directly on the server, but in simpler cases, which are many, coprocessors are less than ideal.

cbass achieves “deleting by anything” by a trivial flow: “scan + multi delete” packed in a “delete-by” function which preserves the “scan“‘s syntax:

user=> (scan conn "galaxy:planet")
{"earth"
 {:age "4.543 billion years",
  :inhabited? true,
  :population 7125000000},
 "neptune" {:age "4.503 billion years", :inhabited? :unknown},
 "pluto" {},
 "saturday" {:age "24 hours", :inhabited? :sometimes},
 "saturn" {:age "4.503 billion years", :inhabited? :unknown}}
user=> (delete-by conn "galaxy:planet" :from "sat" :to "saz")
;; deleting [saturday saturn], since they both match the 'from/to' criteria

look ma, no saturn, no saturday:

user=> (scan conn "galaxy:planet")
{"earth"
 {:age "4.543 billion years",
  :inhabited? true,
  :population 7125000000},
 "neptune" {:age "4.503 billion years", :inhabited? :unknown},
 "pluto" {}}

and of course any other criteria that is available in “scan” is available in “delete-by”.

Delete Row Key Function


Most of the time HBase keys are prefixed (salted with a prefix). This is done to avoid “RegionServer hotspotting“.

“delete-by” internally does a “scan” and returns keys that matched. Hence in order to delete these keys they have to be “re-salt-ed” according to the custom key design.

cbass addresses this by taking an optional delete-key-fn, which allows to “put some salt back” on those keys.

Here is a real world example:

;; HBase data
 
user=> (scan conn "table:name")
{"���|8276345793754387439|transfer" {...},
 "���|8276345793754387439|match" {...},
 "���|8276345793754387439|trade" {...},
 "�d\k^|28768787578329|transfer" {...},
 "�d\k^|28768787578329|match" {...},
 "�d\k^|28768787578329|trade" {...}}

a couple observations about the key:

  • it is prefixed with salt
  • it is piped delimited

In order to delete, say, all keys that start with 8276345793754387439, besides providing :from and :to, we would need to provide a :row-key-fn that would de salt and split, and then a :delete-key-fn that can reassemble it back:

(delete-by conn "table:name" :row-key-fn (comp split-key without-salt)
                             :delete-key-fn (fn [[x p]] (with-salt x p))
                             :from (-> "8276345793754387439" salt-pipe)
                             :to   (-> "8276345793754387439" salt-pipe+))))

*salt, *split and *pipe functions are not from cbass, since they are application specific. They are here to illustrate the point of how “delete-by” can be used to take on the real world.

;; HBase data after the "delete-by"
 
user=> (scan conn "table:name")
{"�d\k^|28768787578329|transfer" {...},
 "�d\k^|28768787578329|match" {...},
 "�d\k^|28768787578329|trade" {...}}

6
Oct 15

Adding Simple to HBase

Mutate and Complect!


The usual trend in functional programing is “immutable” => good, “mutable” => bad. Not true for all cases, but it is true for most, especially when multiple threads, processes, machines are involved.

HBase APIs are very much based on mutation. Since there are so many different ways to, for example, “scan” data, instead of using overloaded constructors or builders, HBase relies on setters. Count the number of setters in Scan, for example.

This just does not sit well with “immutable is good” feeling.

A long time HBaser might not agree, but I believe a learning curve is quite steep for HBase newcomers. Тhis is due to many things, Hadoop architecture, data model, row key design, co-processors, all the cool things it does. But mainly, I think, this is due to a heavy set of APIs that are just not simple.

Connecting “with” HBase


Here is an example from HBase book on how to find all columns in a row and family that start with “abc”. In SQL this would be done with something like:

SELECT * FROM <table> WHERE <row> LIKE 'abc%';

In HBase (this is a book example) it would be:

HTableInterface t = ...;
byte[] row = ...;
byte[] family = ...;
byte[] prefix = Bytes.toBytes("abc");
Scan scan = new Scan(row, row);        // (optional) limit to one row
scan.addFamily(family);                // (optional) limit to one family
Filter f = new ColumnPrefixFilter(prefix);
scan.setFilter(f);
scan.setBatch(10);                     // set this if there could be many columns returned
ResultScanner rs = t.getScanner(scan);
for (Result r = rs.next(); r != null; r = rs.next()) {
  for (KeyValue kv : r.raw()) {
    // each kv represents a column
  }
}
rs.close();

and that is given that data is not actually read into a comprehendible data structure (done in a nested loop), and concepts like row / family / column / scan, etc.. are well understood. I say it is not that simple. But can it be?

I say yes, it can. How about:

(scan conn table-name :starts-with "abc")

while a connection (conn) needs to be created and a family might be added if needed, this is a much simpler way to “connect with” HBase.

These are some of the reasons cbass was created: mainly to add “simple” to HBase.