Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KeyValue callback handlers for get and scan. #59

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion src/HBaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,23 @@ public Deferred<ArrayList<KeyValue>> get(final GetRequest request) {
return sendRpcToRegion(request).addCallbacks(got, Callback.PASSTHROUGH);
}

/**
* Retrieves data from HBase and invokes a callback for each KeyValue
* as they are processed and match the get request. It is up to the handler
* to keep or discard the KeyValue. This is a way to process items as they
* come in and free up the memory as you go.
* @param request The {@code get} request.
* @param handler The {@code Callback} to invoke, null is OK.
* @return A deferred list of key-values that matched the get request if
* handler is not null. Otherwise, the list is always empty.
*/
public Deferred<ArrayList<KeyValue>> get(final GetRequest request,
Callback<Boolean,KeyValue> kvHandler) {
request.kvHandler = kvHandler;
num_gets.increment();
return sendRpcToRegion(request).addCallbacks(got, Callback.PASSTHROUGH);
}

/** Singleton callback to handle responses of "get" RPCs. */
private static final Callback<ArrayList<KeyValue>, Object> got =
new Callback<ArrayList<KeyValue>, Object>() {
Expand All @@ -983,14 +1000,42 @@ public Scanner newScanner(final byte[] table) {
return new Scanner(this, table);
}

/**
* Creates a new {@link Scanner} for a particular table with the specified
* handler {@link Callback}. The handler is called for each KeyValue as they
* are processed. It is up to the handler to keep or discard the KeyValue.
* This is a way to process items as they come in and free up the memory as
* you go.
* @param table The name of the table you intend to scan.
* @param kvHandler The Callback to handle scanned KeyVales. null is OK.
* @return A new scanner for this table.
*/
public Scanner newScanner(final byte[] table, Callback<Boolean,KeyValue> kvHandler) {
Scanner s = new Scanner(this, table);
s.kvHandler = kvHandler;
return s;
}

/**
* Creates a new {@link Scanner} for a particular table.
* @param table The name of the table you intend to scan.
* The string is assumed to use the platform's default charset.
* @return A new scanner for this table.
*/
public Scanner newScanner(final String table) {
return new Scanner(this, table.getBytes());
return newScanner(table.getBytes(), null);
}

/**
* Creates a new {@link Scanner} for a particular table with the specified
* filtering {@link Callback}.
* @param table The name of the table you intend to scan.
* The string is assumed to use the platform's default charset.
* @param filter The filtering Callback, null is OK.
* @return A new scanner for this table.
*/
public Scanner newScanner(final String table, Callback<Boolean,KeyValue> kvHandler) {
return newScanner(table.getBytes(), kvHandler);
}

/**
Expand Down
26 changes: 26 additions & 0 deletions src/HBaseRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

/**
Expand Down Expand Up @@ -367,6 +368,12 @@ interface IsEdit {
*/
byte attempt; // package-private for RegionClient and HBaseClient only.

/**
* A Callback instance. If it is non-null it will be invoked for
* each KeyValue encountered during a scan or get.
*/
Callback<Boolean,KeyValue> kvHandler;

/**
* Package private constructor for RPCs that aren't for any region.
* @param method The name of the method to invoke on the RegionServer.
Expand Down Expand Up @@ -455,6 +462,25 @@ final boolean hasDeferred() {
return deferred != null;
}


/**
* If the KeyValue should be handled by a callback.
*/
Boolean hasKvHandler() throws Exception {
return kvHandler != null;
}

/**
* Invokes the KeyValue callback if present.
* @see RegionClient
*/
Boolean invokeKvHandler(KeyValue kv) throws Exception {
if (kvHandler != null) {
return kvHandler.call(kv);
}
return false;
}

public String toString() {
// Try to rightsize the buffer.
final StringBuilder buf = new StringBuilder(16 + method.length + 2
Expand Down
8 changes: 4 additions & 4 deletions src/MultiAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -485,12 +485,12 @@ public String toString() {
* De-serializes the response to a {@link MultiAction} RPC.
* See HBase's {@code MultiResponse}.
*/
Response responseFromBuffer(final ChannelBuffer buf) {
Response responseFromBuffer(final ChannelBuffer buf, int rpcid, RegionClient rc) {
switch (buf.readByte()) {
case 58:
return deserializeMultiPutResponse(buf);
case 67:
return deserializeMultiResponse(buf);
return deserializeMultiResponse(buf, rpcid, rc);
}
throw new NonRecoverableException("Couldn't de-serialize "
+ Bytes.pretty(buf));
Expand All @@ -500,7 +500,7 @@ Response responseFromBuffer(final ChannelBuffer buf) {
* De-serializes a {@code MultiResponse}.
* This is only used when talking to HBase 0.92 and above.
*/
Response deserializeMultiResponse(final ChannelBuffer buf) {
Response deserializeMultiResponse(final ChannelBuffer buf, final int rpcid, RegionClient rc) {
final int nregions = buf.readInt();
HBaseRpc.checkNonEmptyArrayLength(buf, nregions);
final Object[] resps = new Object[batch.size()];
Expand All @@ -517,7 +517,7 @@ Response deserializeMultiResponse(final ChannelBuffer buf) {
final HBaseException e = RegionClient.deserializeException(buf, null);
resp = e;
} else {
resp = RegionClient.deserializeObject(buf, this);
resp = rc.deserializeObject(buf, rpcid, this);
// A successful response to a `Put' will be an empty `Result'
// object, which we de-serialize as an empty `ArrayList'.
// There's no need to waste memory keeping these around.
Expand Down
Loading