From 8fb4751d504379c8316fada9d647cce328daf37c Mon Sep 17 00:00:00 2001 From: Adam Retter Date: Thu, 23 Oct 2014 16:19:38 +0100 Subject: [PATCH] Iterator support for Write Batches --- java/Makefile | 2 +- java/org/rocksdb/WriteBatch.java | 48 ++++++++- java/rocksjni/portal.h | 74 ++++++++++++++ java/rocksjni/write_batch.cc | 44 +++++++++ java/rocksjni/writebatchhandlerjnicallback.cc | 98 +++++++++++++++++++ java/rocksjni/writebatchhandlerjnicallback.h | 47 +++++++++ 6 files changed, 310 insertions(+), 3 deletions(-) create mode 100644 java/rocksjni/writebatchhandlerjnicallback.cc create mode 100644 java/rocksjni/writebatchhandlerjnicallback.h diff --git a/java/Makefile b/java/Makefile index a6d3c95f3d..2b3e904bbc 100644 --- a/java/Makefile +++ b/java/Makefile @@ -1,4 +1,4 @@ -NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.DBOptions org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle org.rocksdb.MergeOperator org.rocksdb.StringAppendOperator org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice +NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.DBOptions org.rocksdb.WriteBatch org.rocksdb.WriteBatch.Handler org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle org.rocksdb.MergeOperator org.rocksdb.StringAppendOperator ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) diff --git a/java/org/rocksdb/WriteBatch.java b/java/org/rocksdb/WriteBatch.java index 118695512a..68049aded6 100644 --- a/java/org/rocksdb/WriteBatch.java +++ b/java/org/rocksdb/WriteBatch.java @@ -5,8 +5,6 @@ package org.rocksdb; -import java.util.*; - /** * WriteBatch holds a collection of updates to apply atomically to a DB. * @@ -105,6 +103,13 @@ public class WriteBatch extends RocksObject { putLogData(blob, blob.length); } + /** + * Support for iterating over the contents of a batch. + */ + public void iterate(Handler handler) { + iterate(handler.nativeHandle_); + } + /** * Clear all updates buffered in this batch */ @@ -133,7 +138,46 @@ public class WriteBatch extends RocksObject { private native void remove(byte[] key, int keyLen, long cfHandle); private native void putLogData(byte[] blob, int blobLen); + private native void iterate(long handlerHandle); private native void disposeInternal(long handle); + + /** + * Handler callback for iterating over the contents of a batch. + */ + public static abstract class Handler extends RocksObject { + public Handler() { + super(); + createNewHandler0(); + } + + public abstract void put(byte[] key, byte[] value); + public abstract void merge(byte[] key, byte[] value); + public abstract void delete(byte[] key); + public abstract void logData(byte[] blob); + + /** + * shouldContinue is called by the underlying iterator + * (WriteBatch::Iterate.If it returns false, + * iteration is halted. Otherwise, it continues + * iterating. The default implementation always + * returns true. + */ + public boolean shouldContinue() { + return true; + } + + /** + * Deletes underlying C++ handler pointer. + */ + @Override + protected void disposeInternal() { + assert(isInitialized()); + disposeInternal(nativeHandle_); + } + + private native void createNewHandler0(); + private native void disposeInternal(long handle); + } } /** diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 03c15cb241..54b3b27662 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -20,6 +20,7 @@ #include "rocksdb/status.h" #include "rocksdb/utilities/backupable_db.h" #include "rocksjni/comparatorjnicallback.h" +#include "rocksjni/writebatchhandlerjnicallback.h" namespace rocksdb { @@ -288,6 +289,79 @@ class WriteBatchJni { } }; +class WriteBatchHandlerJni { + public: + static jclass getJClass(JNIEnv* env) { + jclass jclazz = env->FindClass("org/rocksdb/WriteBatch$Handler"); + assert(jclazz != nullptr); + return jclazz; + } + + static jfieldID getHandleFieldID(JNIEnv* env) { + static jfieldID fid = env->GetFieldID( + getJClass(env), "nativeHandle_", "J"); + assert(fid != nullptr); + return fid; + } + + // Get the java method `put` of org.rocksdb.WriteBatch.Handler. + static jmethodID getPutMethodId(JNIEnv* env) { + static jmethodID mid = env->GetMethodID( + getJClass(env), "put", "([B[B)V"); + assert(mid != nullptr); + return mid; + } + + // Get the java method `merge` of org.rocksdb.WriteBatch.Handler. + static jmethodID getMergeMethodId(JNIEnv* env) { + static jmethodID mid = env->GetMethodID( + getJClass(env), "merge", "([B[B)V"); + assert(mid != nullptr); + return mid; + } + + // Get the java method `delete` of org.rocksdb.WriteBatch.Handler. + static jmethodID getDeleteMethodId(JNIEnv* env) { + static jmethodID mid = env->GetMethodID( + getJClass(env), "delete", "([B)V"); + assert(mid != nullptr); + return mid; + } + + // Get the java method `logData` of org.rocksdb.WriteBatch.Handler. + static jmethodID getLogDataMethodId(JNIEnv* env) { + static jmethodID mid = env->GetMethodID( + getJClass(env), "logData", "([B)V"); + assert(mid != nullptr); + return mid; + } + + // Get the java method `shouldContinue` of org.rocksdb.WriteBatch.Handler. + static jmethodID getContinueMethodId(JNIEnv* env) { + static jmethodID mid = env->GetMethodID( + getJClass(env), "shouldContinue", "()Z"); + assert(mid != nullptr); + return mid; + } + + // Get the pointer to rocksdb::WriteBatchHandlerJniCallback of the specified + // org.rocksdb.WriteBatchHandler. + static rocksdb::WriteBatchHandlerJniCallback* getHandle( + JNIEnv* env, jobject jobj) { + return reinterpret_cast( + env->GetLongField(jobj, getHandleFieldID(env))); + } + + // Pass the rocksdb::WriteBatchHandlerJniCallback pointer to the java side. + static void setHandle( + JNIEnv* env, jobject jobj, + const rocksdb::WriteBatchHandlerJniCallback* op) { + env->SetLongField( + jobj, getHandleFieldID(env), + reinterpret_cast(op)); + } +}; + class HistogramDataJni { public: static jmethodID getConstructorMethodId(JNIEnv* env, jclass jclazz) { diff --git a/java/rocksjni/write_batch.cc b/java/rocksjni/write_batch.cc index aea85fab92..57f4cb1364 100644 --- a/java/rocksjni/write_batch.cc +++ b/java/rocksjni/write_batch.cc @@ -8,13 +8,16 @@ #include #include "include/org_rocksdb_WriteBatch.h" +#include "include/org_rocksdb_WriteBatch_Handler.h" #include "include/org_rocksdb_WriteBatchInternal.h" #include "include/org_rocksdb_WriteBatchTest.h" #include "rocksjni/portal.h" +#include "rocksjni/writebatchhandlerjnicallback.h" #include "rocksdb/db.h" #include "rocksdb/immutable_options.h" #include "db/memtable.h" #include "rocksdb/write_batch.h" +#include "rocksdb/status.h" #include "db/write_batch_internal.h" #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" @@ -224,6 +227,25 @@ void Java_org_rocksdb_WriteBatch_putLogData( env->ReleaseByteArrayElements(jblob, blob, JNI_ABORT); } +/* + * Class: org_rocksdb_WriteBatch + * Method: iterate + * Signature: (J)V + */ +void Java_org_rocksdb_WriteBatch_iterate( + JNIEnv* env , jobject jobj, jlong handlerHandle) { + rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj); + assert(wb != nullptr); + + rocksdb::Status s = wb->Iterate( + reinterpret_cast(handlerHandle)); + + if (s.ok()) { + return; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); +} + /* * Class: org_rocksdb_WriteBatch * Method: disposeInternal @@ -276,6 +298,28 @@ void Java_org_rocksdb_WriteBatchInternal_append( rocksdb::WriteBatchInternal::Append(wb1, wb2); } +/* + * Class: org_rocksdb_WriteBatch_Handler + * Method: createNewHandler0 + * Signature: ()V + */ +void Java_org_rocksdb_WriteBatch_00024Handler_createNewHandler0( + JNIEnv* env, jobject jobj) { + const rocksdb::WriteBatchHandlerJniCallback* h = + new rocksdb::WriteBatchHandlerJniCallback(env, jobj); + rocksdb::WriteBatchHandlerJni::setHandle(env, jobj, h); +} + +/* + * Class: org_rocksdb_WriteBatch_Handler + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_WriteBatch_00024Handler_disposeInternal( + JNIEnv* env, jobject jobj, jlong handle) { + delete reinterpret_cast(handle); +} + /* * Class: org_rocksdb_WriteBatchTest * Method: getContents diff --git a/java/rocksjni/writebatchhandlerjnicallback.cc b/java/rocksjni/writebatchhandlerjnicallback.cc new file mode 100644 index 0000000000..475ab18f17 --- /dev/null +++ b/java/rocksjni/writebatchhandlerjnicallback.cc @@ -0,0 +1,98 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the callback "bridge" between Java and C++ for +// rocksdb::Comparator. + +#include "rocksjni/writebatchhandlerjnicallback.h" +#include "rocksjni/portal.h" + +namespace rocksdb { +WriteBatchHandlerJniCallback::WriteBatchHandlerJniCallback( + JNIEnv* env, jobject jWriteBatchHandler) { + + // Note: WriteBatchHandler methods may be accessed by multiple threads, + // so we ref the jvm not the env + const jint rs = env->GetJavaVM(&m_jvm); + assert(rs == JNI_OK); + + // Note: we want to access the Java WriteBatchHandler instance + // across multiple method calls, so we create a global ref + m_jWriteBatchHandler = env->NewGlobalRef(jWriteBatchHandler); + + m_jPutMethodId = WriteBatchHandlerJni::getPutMethodId(env); + m_jMergeMethodId = WriteBatchHandlerJni::getMergeMethodId(env); + m_jDeleteMethodId = WriteBatchHandlerJni::getDeleteMethodId(env); + m_jLogDataMethodId = WriteBatchHandlerJni::getLogDataMethodId(env); + m_jContinueMethodId = WriteBatchHandlerJni::getContinueMethodId(env); +} + +/** + * Attach/Get a JNIEnv for the current native thread + */ +JNIEnv* WriteBatchHandlerJniCallback::getJniEnv() const { + JNIEnv *env; + jint rs = m_jvm->AttachCurrentThread(reinterpret_cast(&env), NULL); + assert(rs == JNI_OK); + return env; +} + +void WriteBatchHandlerJniCallback::Put(const Slice& key, const Slice& value) { + getJniEnv()->CallVoidMethod( + m_jWriteBatchHandler, + m_jPutMethodId, + sliceToJArray(key), + sliceToJArray(value)); +} + +void WriteBatchHandlerJniCallback::Merge(const Slice& key, const Slice& value) { + getJniEnv()->CallVoidMethod( + m_jWriteBatchHandler, + m_jMergeMethodId, + sliceToJArray(key), + sliceToJArray(value)); +} + +void WriteBatchHandlerJniCallback::Delete(const Slice& key) { + getJniEnv()->CallVoidMethod( + m_jWriteBatchHandler, + m_jDeleteMethodId, + sliceToJArray(key)); +} + +void WriteBatchHandlerJniCallback::LogData(const Slice& blob) { + getJniEnv()->CallVoidMethod( + m_jWriteBatchHandler, + m_jLogDataMethodId, + sliceToJArray(blob)); +} + +bool WriteBatchHandlerJniCallback::Continue() { + jboolean jContinue = getJniEnv()->CallBooleanMethod( + m_jWriteBatchHandler, + m_jContinueMethodId); + + return static_cast(jContinue == JNI_TRUE); +} + +jbyteArray WriteBatchHandlerJniCallback::sliceToJArray(const Slice& s) { + jbyteArray ja = getJniEnv()->NewByteArray(s.size()); + getJniEnv()->SetByteArrayRegion( + ja, 0, s.size(), + reinterpret_cast(s.data())); + return ja; +} + +WriteBatchHandlerJniCallback::~WriteBatchHandlerJniCallback() { + JNIEnv* m_env = getJniEnv(); + + m_env->DeleteGlobalRef(m_jWriteBatchHandler); + + // Note: do not need to explicitly detach, as this function is effectively + // called from the Java class's disposeInternal method, and so already + // has an attached thread, getJniEnv above is just a no-op Attach to get + // the env jvm->DetachCurrentThread(); +} +} // namespace rocksdb diff --git a/java/rocksjni/writebatchhandlerjnicallback.h b/java/rocksjni/writebatchhandlerjnicallback.h new file mode 100644 index 0000000000..69f68a5333 --- /dev/null +++ b/java/rocksjni/writebatchhandlerjnicallback.h @@ -0,0 +1,47 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the callback "bridge" between Java and C++ for +// rocksdb::WriteBatch::Handler. + +#ifndef JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_ +#define JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_ + +#include +#include "rocksdb/write_batch.h" + +namespace rocksdb { +/** + * This class acts as a bridge between C++ + * and Java. The methods in this class will be + * called back from the RocksDB storage engine (C++) + * we then callback to the appropriate Java method + * this enables Write Batch Handlers to be implemented in Java. + */ +class WriteBatchHandlerJniCallback : public WriteBatch::Handler { + public: + WriteBatchHandlerJniCallback( + JNIEnv* env, jobject jWriteBackHandler); + ~WriteBatchHandlerJniCallback(); + void Put(const Slice& key, const Slice& value); + void Merge(const Slice& key, const Slice& value); + void Delete(const Slice& key); + void LogData(const Slice& blob); + bool Continue(); + + private: + JavaVM* m_jvm; + jobject m_jWriteBatchHandler; + JNIEnv* getJniEnv() const; + jbyteArray sliceToJArray(const Slice& s); + jmethodID m_jPutMethodId; + jmethodID m_jMergeMethodId; + jmethodID m_jDeleteMethodId; + jmethodID m_jLogDataMethodId; + jmethodID m_jContinueMethodId; +}; +} // namespace rocksdb + +#endif // JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_