Merge pull request #393 from fyrz/RocksJava-Flush

[RocksJava] Memtable flush functionality
This commit is contained in:
Yueh-Hsuan Chiang 2014-11-11 01:11:59 -08:00
commit 625e162c69
7 changed files with 313 additions and 17 deletions

View File

@ -1,4 +1,39 @@
NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.DBOptions org.rocksdb.WriteBatch org.rocksdb.WriteBatch.Handler org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle org.rocksdb.MergeOperator org.rocksdb.StringAppendOperator
NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\
org.rocksdb.AbstractSlice\
org.rocksdb.BackupableDB\
org.rocksdb.BackupableDBOptions\
org.rocksdb.BlockBasedTableConfig\
org.rocksdb.BloomFilter\
org.rocksdb.ColumnFamilyHandle\
org.rocksdb.Comparator\
org.rocksdb.ComparatorOptions\
org.rocksdb.DBOptions\
org.rocksdb.DirectComparator\
org.rocksdb.DirectSlice\
org.rocksdb.FlushOptions\
org.rocksdb.Filter\
org.rocksdb.GenericRateLimiterConfig\
org.rocksdb.HashLinkedListMemTableConfig\
org.rocksdb.HashSkipListMemTableConfig\
org.rocksdb.MergeOperator\
org.rocksdb.Options\
org.rocksdb.PlainTableConfig\
org.rocksdb.ReadOptions\
org.rocksdb.RestoreBackupableDB\
org.rocksdb.RestoreOptions\
org.rocksdb.RocksDB\
org.rocksdb.RocksEnv\
org.rocksdb.RocksIterator\
org.rocksdb.SkipListMemTableConfig\
org.rocksdb.Slice\
org.rocksdb.Statistics\
org.rocksdb.VectorMemTableConfig\
org.rocksdb.StringAppendOperator\
org.rocksdb.WriteBatch\
org.rocksdb.WriteBatch.Handler\
org.rocksdb.WriteBatchInternal\
org.rocksdb.WriteBatchTest\
org.rocksdb.WriteOptions\
ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
@ -43,6 +78,7 @@ test: java
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.DBOptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ColumnFamilyTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.FilterTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.FlushTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.KeyMayExistTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.MemTableTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest

View File

@ -0,0 +1,51 @@
package org.rocksdb;
/**
* FlushOptions to be passed to flush operations of
* {@link org.rocksdb.RocksDB}.
*/
public class FlushOptions extends RocksObject {
/**
* Construct a new instance of FlushOptions.
*/
public FlushOptions(){
super();
newFlushOptions();
}
/**
* Set if the flush operation shall block until it terminates.
*
* @param waitForFlush boolean value indicating if the flush
* operations waits for termination of the flush process.
*
* @return instance of current FlushOptions.
*/
public FlushOptions setWaitForFlush(boolean waitForFlush) {
assert(isInitialized());
waitForFlush(nativeHandle_);
return this;
}
/**
* Wait for flush to finished.
*
* @return boolean value indicating if the flush operation
* waits for termination of the flush process.
*/
public boolean waitForFlush() {
assert(isInitialized());
return waitForFlush(nativeHandle_);
}
@Override protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
private native void newFlushOptions();
private native void disposeInternal(long handle);
private native void setWaitForFlush(long handle,
boolean wait);
private native boolean waitForFlush(long handle);
}

View File

@ -1085,6 +1085,40 @@ public class RocksDB extends RocksObject {
dropColumnFamily(nativeHandle_, columnFamilyHandle.nativeHandle_);
}
/**
* <p>Flush all memory table data.</p>
*
* <p>Note: it must be ensured that the FlushOptions instance
* is not GC'ed before this method finishes. If the wait parameter is
* set to false, flush processing is asynchronous.</p>
*
* @param flushOptions {@link org.rocksdb.FlushOptions} instance.
* @throws RocksDBException thrown if an error occurs within the native
* part of the library.
*/
public void flush(FlushOptions flushOptions)
throws RocksDBException {
flush(nativeHandle_, flushOptions.nativeHandle_);
}
/**
* <p>Flush all memory table data.</p>
*
* <p>Note: it must be ensured that the FlushOptions instance
* is not GC'ed before this method finishes. If the wait parameter is
* set to false, flush processing is asynchronous.</p>
*
* @param flushOptions {@link org.rocksdb.FlushOptions} instance.
* @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} instance.
* @throws RocksDBException thrown if an error occurs within the native
* part of the library.
*/
public void flush(FlushOptions flushOptions,
ColumnFamilyHandle columnFamilyHandle) throws RocksDBException {
flush(nativeHandle_, flushOptions.nativeHandle_,
columnFamilyHandle.nativeHandle_);
}
/**
* Private constructor.
*/
@ -1197,10 +1231,13 @@ public class RocksDB extends RocksObject {
protected native void releaseSnapshot(
long nativeHandle, long snapshotHandle);
private native void disposeInternal(long handle);
private native long createColumnFamily(long handle, long opt_handle,
String name) throws RocksDBException;
private native void dropColumnFamily(long handle, long cfHandle) throws RocksDBException;
private native void flush(long handle, long flushOptHandle)
throws RocksDBException;
private native void flush(long handle, long flushOptHandle,
long cfHandle) throws RocksDBException;
protected Options options_;
}

View File

@ -0,0 +1,47 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb.test;
import org.rocksdb.*;
public class FlushTest {
static final String db_path = "/tmp/rocksdbjni_flush_test";
static {
RocksDB.loadLibrary();
}
public static void main(String[] args) {
RocksDB db = null;
Options options = new Options();
WriteOptions wOpt = new WriteOptions();
FlushOptions flushOptions = new FlushOptions();
try {
// Setup options
options.setCreateIfMissing(true);
options.setMaxWriteBufferNumber(10);
options.setMinWriteBufferNumberToMerge(10);
flushOptions.setWaitForFlush(true);
wOpt.setDisableWAL(true);
db = RocksDB.open(options, db_path);
db.put(wOpt, "key1".getBytes(), "value1".getBytes());
db.put(wOpt, "key2".getBytes(), "value2".getBytes());
db.put(wOpt, "key3".getBytes(), "value3".getBytes());
db.put(wOpt, "key4".getBytes(), "value4".getBytes());
assert(db.getProperty("rocksdb.num-entries-active-mem-table").equals("4"));
db.flush(flushOptions);
assert(db.getProperty("rocksdb.num-entries-active-mem-table").equals("0"));
} catch (RocksDBException e) {
assert(false);
}
db.close();
options.dispose();
wOpt.dispose();
flushOptions.dispose();
}
}

View File

@ -18,9 +18,11 @@
#include "include/org_rocksdb_WriteOptions.h"
#include "include/org_rocksdb_ReadOptions.h"
#include "include/org_rocksdb_ComparatorOptions.h"
#include "include/org_rocksdb_FlushOptions.h"
#include "rocksjni/comparatorjnicallback.h"
#include "rocksjni/portal.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/statistics.h"
@ -3607,6 +3609,32 @@ jboolean Java_org_rocksdb_ReadOptions_tailing(
return reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->tailing;
}
/*
* Class: org_rocksdb_ReadOptions
* Method: setSnapshot
* Signature: (JJ)V
*/
void Java_org_rocksdb_ReadOptions_setSnapshot(
JNIEnv* env, jobject jobj, jlong jhandle, jlong jsnapshot) {
reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->snapshot =
reinterpret_cast<rocksdb::Snapshot*>(jsnapshot);
}
/*
* Class: org_rocksdb_ReadOptions
* Method: snapshot
* Signature: (J)J
*/
jlong Java_org_rocksdb_ReadOptions_snapshot(
JNIEnv* env, jobject jobj, jlong jhandle) {
auto& snapshot =
reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->snapshot;
return reinterpret_cast<jlong>(snapshot);
}
/////////////////////////////////////////////////////////////////////
// rocksdb::ComparatorOptions
/*
* Class: org_rocksdb_ComparatorOptions
* Method: newComparatorOptions
@ -3651,25 +3679,49 @@ void Java_org_rocksdb_ComparatorOptions_disposeInternal(
rocksdb::ComparatorOptionsJni::setHandle(env, jobj, nullptr);
}
/////////////////////////////////////////////////////////////////////
// rocksdb::FlushOptions
/*
* Class: org_rocksdb_ReadOptions
* Method: setSnapshot
* Signature: (JJ)V
* Class: org_rocksdb_FlushOptions
* Method: newFlushOptions
* Signature: ()V
*/
void Java_org_rocksdb_ReadOptions_setSnapshot(
JNIEnv* env, jobject jobj, jlong jhandle, jlong jsnapshot) {
reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->snapshot =
reinterpret_cast<rocksdb::Snapshot*>(jsnapshot);
void Java_org_rocksdb_FlushOptions_newFlushOptions(
JNIEnv* env, jobject jobj) {
auto flush_opt = new rocksdb::FlushOptions();
rocksdb::FlushOptionsJni::setHandle(env, jobj, flush_opt);
}
/*
* Class: org_rocksdb_ReadOptions
* Method: snapshot
* Signature: (J)J
* Class: org_rocksdb_FlushOptions
* Method: setWaitForFlush
* Signature: (JZ)V
*/
jlong Java_org_rocksdb_ReadOptions_snapshot(
JNIEnv* env, jobject jobj, jlong jhandle) {
auto& snapshot =
reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->snapshot;
return reinterpret_cast<jlong>(snapshot);
void Java_org_rocksdb_FlushOptions_setWaitForFlush(
JNIEnv * env, jobject jobj, jlong jhandle, jboolean jwait) {
reinterpret_cast<rocksdb::FlushOptions*>(jhandle)
->wait = static_cast<bool>(jwait);
}
/*
* Class: org_rocksdb_FlushOptions
* Method: waitForFlush
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_FlushOptions_waitForFlush(
JNIEnv * env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::FlushOptions*>(jhandle)
->wait;
}
/*
* Class: org_rocksdb_FlushOptions
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_FlushOptions_disposeInternal(
JNIEnv * env, jobject jobj, jlong jhandle) {
delete reinterpret_cast<rocksdb::FlushOptions*>(jhandle);
rocksdb::FlushOptionsJni::setHandle(env, jobj, nullptr);
}

View File

@ -505,6 +505,34 @@ class ColumnFamilyHandleJni {
}
};
class FlushOptionsJni {
public:
// Get the java class id of org.rocksdb.FlushOptions.
static jclass getJClass(JNIEnv* env) {
jclass jclazz = env->FindClass("org/rocksdb/FlushOptions");
assert(jclazz != nullptr);
return jclazz;
}
// Get the field id of the member variable of org.rocksdb.FlushOptions
// that stores the pointer to rocksdb::FlushOptions.
static jfieldID getHandleFieldID(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "nativeHandle_", "J");
assert(fid != nullptr);
return fid;
}
// Pass the FlushOptions pointer to the java side.
static void setHandle(
JNIEnv* env, jobject jobj,
const rocksdb::FlushOptions* op) {
env->SetLongField(
jobj, getHandleFieldID(env),
reinterpret_cast<jlong>(op));
}
};
class ComparatorOptionsJni {
public:
// Get the java class id of org.rocksdb.ComparatorOptions.

View File

@ -1255,3 +1255,48 @@ jstring Java_org_rocksdb_RocksDB_getProperty0__JJLjava_lang_String_2I(
return env->NewStringUTF(property_value.data());
}
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Flush
void rocksdb_flush_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::FlushOptions& flush_options,
rocksdb::ColumnFamilyHandle* column_family_handle) {
rocksdb::Status s;
if (column_family_handle != nullptr) {
s = db->Flush(flush_options, column_family_handle);
} else {
s = db->Flush(flush_options);
}
if (!s.ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
}
/*
* Class: org_rocksdb_RocksDB
* Method: flush
* Signature: (JJ)V
*/
void Java_org_rocksdb_RocksDB_flush__JJ(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jlong jflush_options) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto flush_options = reinterpret_cast<rocksdb::FlushOptions*>(jflush_options);
rocksdb_flush_helper(env, db, *flush_options, nullptr);
}
/*
* Class: org_rocksdb_RocksDB
* Method: flush
* Signature: (JJJ)V
*/
void Java_org_rocksdb_RocksDB_flush__JJJ(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jlong jflush_options, jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto flush_options = reinterpret_cast<rocksdb::FlushOptions*>(jflush_options);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
rocksdb_flush_helper(env, db, *flush_options, cf_handle);
}