Add support for non-direct ByteBuffer in WriteBatch.

This commit is contained in:
Radek Hubner 2024-08-09 18:07:04 +04:00
parent 7e0a9aadcc
commit 0a98bf32fb
8 changed files with 226 additions and 59 deletions

View file

@ -2198,8 +2198,8 @@ class JniUtil {
std::function<ROCKSDB_NAMESPACE::Status(ROCKSDB_NAMESPACE::Slice,
ROCKSDB_NAMESPACE::Slice)>
op,
JNIEnv* env, jbyteArray jkey, jint jkey_len, jbyteArray jvalue,
jint jvalue_len) {
JNIEnv* env, jbyteArray jkey, jint jkey_offset, jint jkey_len,
jbyteArray jvalue, jint jvalue_offset, jint jvalue_len) {
jbyte* key = env->GetByteArrayElements(jkey, nullptr);
if (env->ExceptionCheck()) {
// exception thrown: OutOfMemoryError
@ -2215,8 +2215,11 @@ class JniUtil {
return nullptr;
}
ROCKSDB_NAMESPACE::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
ROCKSDB_NAMESPACE::Slice value_slice(reinterpret_cast<char*>(value),
ROCKSDB_NAMESPACE::Slice key_slice(
reinterpret_cast<char*>(key) + static_cast<size_t>(jkey_offset),
jkey_len);
ROCKSDB_NAMESPACE::Slice value_slice(
reinterpret_cast<char*>(value) + static_cast<size_t>(jvalue_offset),
jvalue_len);
auto status = op(key_slice, value_slice);
@ -2232,6 +2235,21 @@ class JniUtil {
new ROCKSDB_NAMESPACE::Status(status));
}
/*
* Helper for operations on a key and value
* for example WriteBatch->Put
*
* TODO(AR) could be used for RocksDB->Put etc.
*/
static std::unique_ptr<ROCKSDB_NAMESPACE::Status> kv_op(
std::function<ROCKSDB_NAMESPACE::Status(ROCKSDB_NAMESPACE::Slice,
ROCKSDB_NAMESPACE::Slice)>
op,
JNIEnv* env, jbyteArray jkey, jint jkey_len, jbyteArray jvalue,
jint jvalue_len) {
return kv_op(op, env, jkey, 0, jkey_len, jvalue, 0, jvalue_len);
}
/*
* Helper for operations on a key
* for example WriteBatch->Delete

View file

@ -156,9 +156,10 @@ void Java_org_rocksdb_WriteBatch_setMaxBytesJni(JNIEnv* /*env*/,
* Method: put
* Signature: (J[BI[BI)V
*/
void Java_org_rocksdb_WriteBatch_putJni__J_3BI_3BI(
JNIEnv* env, jclass, jlong jwb_handle, jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
void Java_org_rocksdb_WriteBatch_putJni__J_3BII_3BII(
JNIEnv* env, jclass, jlong jwb_handle, jbyteArray jkey, jint jkey_offset,
jint jkey_len, jbyteArray jentry_value, jint jvalue_offset,
jint jentry_value_len) {
auto* wb = reinterpret_cast<ROCKSDB_NAMESPACE::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto put = [&wb](ROCKSDB_NAMESPACE::Slice key,
@ -166,7 +167,8 @@ void Java_org_rocksdb_WriteBatch_putJni__J_3BI_3BI(
return wb->Put(key, value);
};
std::unique_ptr<ROCKSDB_NAMESPACE::Status> status =
ROCKSDB_NAMESPACE::JniUtil::kv_op(put, env, jkey, jkey_len, jentry_value,
ROCKSDB_NAMESPACE::JniUtil::kv_op(put, env, jkey, jkey_offset, jkey_len,
jentry_value, jvalue_offset,
jentry_value_len);
if (status != nullptr && !status->ok()) {
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, status);
@ -178,9 +180,10 @@ void Java_org_rocksdb_WriteBatch_putJni__J_3BI_3BI(
* Method: put
* Signature: (J[BI[BIJ)V
*/
void Java_org_rocksdb_WriteBatch_putJni__J_3BI_3BIJ(
JNIEnv* env, jclass, jlong jwb_handle, jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
void Java_org_rocksdb_WriteBatch_putJni__J_3BII_3BIIJ(
JNIEnv* env, jclass, jlong jwb_handle, jbyteArray jkey, jint jkey_offset,
jint jkey_len, jbyteArray jentry_value, jint jvalue_offset,
jint jentry_value_len, jlong jcf_handle) {
auto* wb = reinterpret_cast<ROCKSDB_NAMESPACE::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto* cf_handle =
@ -191,7 +194,8 @@ void Java_org_rocksdb_WriteBatch_putJni__J_3BI_3BIJ(
return wb->Put(cf_handle, key, value);
};
std::unique_ptr<ROCKSDB_NAMESPACE::Status> status =
ROCKSDB_NAMESPACE::JniUtil::kv_op(put, env, jkey, jkey_len, jentry_value,
ROCKSDB_NAMESPACE::JniUtil::kv_op(put, env, jkey, jkey_offset, jkey_len,
jentry_value, jvalue_offset,
jentry_value_len);
if (status != nullptr && !status->ok()) {
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, status);

View file

@ -85,11 +85,12 @@ jint Java_org_rocksdb_WriteBatchWithIndex_count0Jni(JNIEnv* /*env*/,
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: put
* Signature: (J[BI[BI)V
* Signature: (J[BII[BII)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_putJni__J_3BI_3BI(
JNIEnv* env, jclass, jlong jwbwi_handle, jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
void Java_org_rocksdb_WriteBatchWithIndex_putJni__J_3BII_3BII(
JNIEnv* env, jclass, jlong jwbwi_handle, jbyteArray jkey, jint jkey_offset,
jint jkey_len, jbyteArray jentry_value, jint jvalue_offset,
jint jentry_value_len) {
auto* wbwi =
reinterpret_cast<ROCKSDB_NAMESPACE::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
@ -98,7 +99,8 @@ void Java_org_rocksdb_WriteBatchWithIndex_putJni__J_3BI_3BI(
return wbwi->Put(key, value);
};
std::unique_ptr<ROCKSDB_NAMESPACE::Status> status =
ROCKSDB_NAMESPACE::JniUtil::kv_op(put, env, jkey, jkey_len, jentry_value,
ROCKSDB_NAMESPACE::JniUtil::kv_op(put, env, jkey, jkey_offset, jkey_len,
jentry_value, jvalue_offset,
jentry_value_len);
if (status != nullptr && !status->ok()) {
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, status);
@ -108,11 +110,12 @@ void Java_org_rocksdb_WriteBatchWithIndex_putJni__J_3BI_3BI(
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: put
* Signature: (J[BI[BIJ)V
* Signature: (J[BII[BIIJ)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_putJni__J_3BI_3BIJ(
JNIEnv* env, jclass, jlong jwbwi_handle, jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
void Java_org_rocksdb_WriteBatchWithIndex_putJni__J_3BII_3BIIJ(
JNIEnv* env, jclass, jlong jwbwi_handle, jbyteArray jkey, jint jkey_offset,
jint jkey_len, jbyteArray jentry_value, jint jentry_value_offset,
jint jentry_value_len, jlong jcf_handle) {
auto* wbwi =
reinterpret_cast<ROCKSDB_NAMESPACE::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
@ -124,7 +127,8 @@ void Java_org_rocksdb_WriteBatchWithIndex_putJni__J_3BI_3BIJ(
return wbwi->Put(cf_handle, key, value);
};
std::unique_ptr<ROCKSDB_NAMESPACE::Status> status =
ROCKSDB_NAMESPACE::JniUtil::kv_op(put, env, jkey, jkey_len, jentry_value,
ROCKSDB_NAMESPACE::JniUtil::kv_op(put, env, jkey, jkey_offset, jkey_len,
jentry_value, jentry_value_offset,
jentry_value_len);
if (status != nullptr && !status->ok()) {
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, status);

View file

@ -21,13 +21,13 @@ public abstract class AbstractWriteBatch extends RocksObject
@Override
public void put(final byte[] key, final byte[] value) throws RocksDBException {
put(nativeHandle_, key, key.length, value, value.length);
put(nativeHandle_, key, 0, key.length, value, 0, value.length);
}
@Override
public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value)
throws RocksDBException {
put(nativeHandle_, key, key.length, value, value.length,
put(nativeHandle_, key, 0, key.length, value, 0, value.length,
columnFamilyHandle.nativeHandle_);
}
@ -45,9 +45,15 @@ public abstract class AbstractWriteBatch extends RocksObject
@Override
public void put(final ByteBuffer key, final ByteBuffer value) throws RocksDBException {
assert key.isDirect() && value.isDirect();
if (key.isDirect() && value.isDirect()) {
putDirect(nativeHandle_, key, key.position(), key.remaining(), value, value.position(),
value.remaining(), 0);
} else if (key.hasArray() && value.hasArray()) {
put(nativeHandle_, key.array(), key.arrayOffset() + key.position(), key.remaining(),
value.array(), value.arrayOffset() + value.position(), value.remaining());
} else {
throw new RocksDBException(RocksDB.BB_ALL_DIRECT_OR_INDIRECT);
}
key.position(key.limit());
value.position(value.limit());
}
@ -55,9 +61,16 @@ public abstract class AbstractWriteBatch extends RocksObject
@Override
public void put(final ColumnFamilyHandle columnFamilyHandle, final ByteBuffer key,
final ByteBuffer value) throws RocksDBException {
assert key.isDirect() && value.isDirect();
if (key.isDirect() && value.isDirect()) {
putDirect(nativeHandle_, key, key.position(), key.remaining(), value, value.position(),
value.remaining(), columnFamilyHandle.nativeHandle_);
} else if (key.hasArray() && value.hasArray()) {
put(nativeHandle_, key.array(), key.arrayOffset() + key.position(), key.remaining(),
value.array(), value.arrayOffset() + value.position(), value.remaining(),
columnFamilyHandle.nativeHandle_);
} else {
throw new RocksDBException(RocksDB.BB_ALL_DIRECT_OR_INDIRECT);
}
key.position(key.limit());
value.position(value.limit());
}
@ -75,16 +88,30 @@ public abstract class AbstractWriteBatch extends RocksObject
@Override
public void delete(final ByteBuffer key) throws RocksDBException {
if (key.isDirect()) {
deleteDirect(nativeHandle_, key, key.position(), key.remaining(), 0);
} else if (key.hasArray()) {
delete(nativeHandle_, key.array(), key.arrayOffset() + key.position(), key.remaining());
} else {
throw new RocksDBException(RocksDB.BB_ALL_DIRECT_OR_INDIRECT);
}
key.position(key.limit());
}
@Override
public void delete(final ColumnFamilyHandle columnFamilyHandle, final ByteBuffer key)
throws RocksDBException {
if (key.isDirect()) {
deleteDirect(
nativeHandle_, key, key.position(), key.remaining(), columnFamilyHandle.nativeHandle_);
key.position(key.limit());
} else if (key.hasArray()) {
// TODO - Refactor, add native method
ByteBuffer buffer = ByteBuffer.allocateDirect(key.remaining());
buffer.put(key).flip();
deleteDirect(nativeHandle_, buffer, buffer.position(), buffer.remaining(),
columnFamilyHandle.nativeHandle_);
}
}
@Override
@ -147,11 +174,11 @@ public abstract class AbstractWriteBatch extends RocksObject
abstract int count0(final long handle);
abstract void put(final long handle, final byte[] key, final int keyLen,
final byte[] value, final int valueLen) throws RocksDBException;
abstract void put(final long handle, final byte[] key, final int keyOffset, final int keyLen,
final byte[] value, final int valueOffset, final int valueLen) throws RocksDBException;
abstract void put(final long handle, final byte[] key, final int keyLen,
final byte[] value, final int valueLen, final long cfHandle)
abstract void put(final long handle, final byte[] key, final int keyOffset, final int keyLen,
final byte[] value, final int valueOffset, final int valueLen, final long cfHandle)
throws RocksDBException;
abstract void putDirect(final long handle, final ByteBuffer key, final int keyOffset,

View file

@ -232,21 +232,22 @@ public class WriteBatch extends AbstractWriteBatch {
private static native int count0Jni(final long handle);
@Override
final void put(final long handle, final byte[] key, final int keyLen, final byte[] value,
final int valueLen) {
putJni(handle, key, keyLen, value, valueLen);
final void put(final long handle, final byte[] key, final int ketOffset, final int keyLen,
final byte[] value, final int valueOffset, final int valueLen) {
putJni(handle, key, ketOffset, keyLen, value, valueOffset, valueLen);
}
private static native void putJni(final long handle, final byte[] key, final int keyLen,
final byte[] value, final int valueLen);
private static native void putJni(final long handle, final byte[] key, final int keyOffset,
final int keyLen, final byte[] value, final int valueOffset, final int valueLen);
@Override
final void put(final long handle, final byte[] key, final int keyLen, final byte[] value,
final int valueLen, final long cfHandle) {
putJni(handle, key, keyLen, value, valueLen, cfHandle);
final void put(final long handle, final byte[] key, final int ketOffset, final int keyLen,
final byte[] value, final int valueOffset, final int valueLen, final long cfHandle) {
putJni(handle, key, ketOffset, keyLen, value, valueOffset, valueLen, cfHandle);
}
private static native void putJni(final long handle, final byte[] key, final int keyLen,
final byte[] value, final int valueLen, final long cfHandle);
private static native void putJni(final long handle, final byte[] key, final int keyOffset,
final int keyLen, final byte[] value, final int valueOffset, final int valueLen,
final long cfHandle);
@Override
final void putDirect(final long handle, final ByteBuffer key, final int keyOffset,

View file

@ -304,22 +304,23 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
private static native int count0Jni(final long handle);
@Override
final void put(final long handle, final byte[] key, final int keyLen, final byte[] value,
final int valueLen) {
putJni(handle, key, keyLen, value, valueLen);
final void put(final long handle, final byte[] key, final int keyOffset, final int keyLen,
final byte[] value, final int valueOffset, final int valueLen) {
putJni(handle, key, keyOffset, keyLen, value, valueOffset, valueLen);
}
private static native void putJni(final long handle, final byte[] key, final int keyLen,
final byte[] value, final int valueLen);
private static native void putJni(final long handle, final byte[] key, final int keyOffset,
final int keyLen, final byte[] value, final int valueOffset, final int valueLen);
@Override
final void put(final long handle, final byte[] key, final int keyLen, final byte[] value,
final int valueLen, final long cfHandle) {
putJni(handle, key, keyLen, value, valueLen, cfHandle);
final void put(final long handle, final byte[] key, final int keyOffset, final int keyLen,
final byte[] value, final int valueOffset, final int valueLen, final long cfHandle) {
putJni(handle, key, keyOffset, keyLen, value, valueOffset, valueLen, cfHandle);
}
private static native void putJni(final long handle, final byte[] key, final int keyLen,
final byte[] value, final int valueLen, final long cfHandle);
private static native void putJni(final long handle, final byte[] key, final int keyOffset,
final int keyLen, final byte[] value, final int valueOffset, final int valueLen,
final long cfHandle);
@Override
final void putDirect(final long handle, final ByteBuffer key, final int keyOffset,

View file

@ -10,6 +10,7 @@ package org.rocksdb;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.rocksdb.util.CapturingWriteBatchHandler.Action.DELETE;
import static org.rocksdb.util.CapturingWriteBatchHandler.Action.DELETE_RANGE;
import static org.rocksdb.util.CapturingWriteBatchHandler.Action.LOG;
@ -482,6 +483,55 @@ public class WriteBatchTest {
}
}
@Test
public void byteBuffers() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
try (WriteBatch writeBatch = new WriteBatch()) {
ByteBuffer key = ByteBuffer.allocateDirect(16);
ByteBuffer value = ByteBuffer.allocateDirect(16);
key.put("key".getBytes(UTF_8)).flip();
value.put("value".getBytes(UTF_8)).flip();
writeBatch.put(key, value);
}
try (WriteBatch writeBatch = new WriteBatch()) {
ByteBuffer key = ByteBuffer.allocateDirect(16);
ByteBuffer value = ByteBuffer.allocateDirect(16);
key.put("key".getBytes(UTF_8)).flip();
value.put("value".getBytes(UTF_8)).flip();
writeBatch.put(db.getDefaultColumnFamily(), key, value);
}
try (WriteBatch writeBatch = new WriteBatch()) {
byte[] keyByffer = new byte[16];
byte[] valueByffer = new byte[16];
ByteBuffer key = ByteBuffer.wrap(keyByffer, 2, 6).slice();
ByteBuffer value = ByteBuffer.wrap(valueByffer, 2, 6).slice();
key.put("key".getBytes(UTF_8)).flip();
value.put("value".getBytes(UTF_8)).flip();
writeBatch.put(key, value);
}
try (WriteBatch writeBatch = new WriteBatch()) {
byte[] keyByffer = new byte[16];
byte[] valueByffer = new byte[16];
ByteBuffer key = ByteBuffer.wrap(keyByffer, 2, 6).slice();
ByteBuffer value = ByteBuffer.wrap(valueByffer, 2, 6).slice();
key.put("key".getBytes(UTF_8)).flip();
value.put("value".getBytes(UTF_8)).flip();
writeBatch.put(db.getDefaultColumnFamily(), key, value);
}
try (WriteBatch writeBatch = new WriteBatch()) {
ByteBuffer key = ByteBuffer.allocateDirect(16);
ByteBuffer value = ByteBuffer.allocate(16);
key.put("key".getBytes(UTF_8)).flip();
value.put("value".getBytes(UTF_8)).flip();
assertThatThrownBy(() -> writeBatch.put(db.getDefaultColumnFamily(), key, value))
.isInstanceOf(RocksDBException.class)
.hasMessage(RocksDB.BB_ALL_DIRECT_OR_INDIRECT);
}
}
}
static byte[] getContents(final WriteBatch wb) {
return getContents(wb.nativeHandle_);
}

View file

@ -11,6 +11,7 @@ package org.rocksdb;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -1065,4 +1066,65 @@ public class WriteBatchWithIndexTest {
}
}
}
@Test
public void byteBuffers() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
try (WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
final DBOptions dbOptions = new DBOptions()) {
ByteBuffer key = ByteBuffer.allocateDirect(16);
ByteBuffer value = ByteBuffer.allocateDirect(16);
key.put("key".getBytes(UTF_8)).flip();
value.put("value".getBytes(UTF_8)).flip();
wbwi.put(key, value);
byte[] readBackValue = wbwi.getFromBatch(dbOptions, "key".getBytes(UTF_8));
assertThat(readBackValue).isEqualTo("value".getBytes(UTF_8));
}
try (WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
final DBOptions dbOptions = new DBOptions()) {
ByteBuffer key = ByteBuffer.allocateDirect(16);
ByteBuffer value = ByteBuffer.allocateDirect(16);
key.put("key".getBytes(UTF_8)).flip();
value.put("value".getBytes(UTF_8)).flip();
wbwi.put(db.getDefaultColumnFamily(), key, value);
byte[] readBackValue = wbwi.getFromBatch(dbOptions, "key".getBytes(UTF_8));
assertThat(readBackValue).isEqualTo("value".getBytes(UTF_8));
}
try (WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
final DBOptions dbOptions = new DBOptions()) {
byte[] keyByffer = new byte[16];
byte[] valueByffer = new byte[16];
ByteBuffer key = ByteBuffer.wrap(keyByffer, 2, 6).slice();
ByteBuffer value = ByteBuffer.wrap(valueByffer, 2, 6).slice();
key.put("key".getBytes(UTF_8)).flip();
value.put("value".getBytes(UTF_8)).flip();
wbwi.put(key, value);
byte[] readBackValue = wbwi.getFromBatch(dbOptions, "key".getBytes(UTF_8));
assertThat(readBackValue).isEqualTo("value".getBytes(UTF_8));
}
try (WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
final DBOptions dbOptions = new DBOptions()) {
byte[] keyByffer = new byte[16];
byte[] valueByffer = new byte[16];
ByteBuffer key = ByteBuffer.wrap(keyByffer, 2, 6).slice();
ByteBuffer value = ByteBuffer.wrap(valueByffer, 2, 6).slice();
key.put("key".getBytes(UTF_8)).flip();
value.put("value".getBytes(UTF_8)).flip();
wbwi.put(db.getDefaultColumnFamily(), key, value);
byte[] readBackValue = wbwi.getFromBatch(dbOptions, "key".getBytes(UTF_8));
assertThat(readBackValue).isEqualTo("value".getBytes(UTF_8));
}
try (WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true)) {
ByteBuffer key = ByteBuffer.allocateDirect(16);
ByteBuffer value = ByteBuffer.allocate(16);
key.put("key".getBytes(UTF_8)).flip();
value.put("value".getBytes(UTF_8)).flip();
assertThatThrownBy(() -> wbwi.put(db.getDefaultColumnFamily(), key, value))
.isInstanceOf(RocksDBException.class)
.hasMessage(RocksDB.BB_ALL_DIRECT_OR_INDIRECT);
}
}
}
}