mirror of https://github.com/facebook/rocksdb.git
Fix Java API ComparatorOptions use after delete error (#11176)
Summary: The problem ------------- ComparatorOptions is AutoCloseable. AbstractComparator does not hold a reference to its ComparatorOptions, but the native C++ ComparatorJniCallback holds a reference to the ComparatorOptions’ native C++ options structure. This gets deleted when the ComparatorOptions is closed, either explicitly, or as part of try-with-resources. Later, the deleted C++ options structure gets used by the callback and the comparator options are effectively random. The original bug report https://github.com/facebook/rocksdb/issues/8715 was caused by a GC-initiated finalization closing the still-in-use ComparatorOptions. As of 7.0, finalization of RocksDB objects no longer closes them, which worked round the reported bug, but still left ComparatorOptions with a potentially broken lifetime. In any case, we encourage API clients to use the try-with-resources model, and so we need it to work. And if they don't use it, they leak resources. The solution ------------- The solution implemented here is to make a copy of the native C++ options object into the ComparatorJniCallback, rather than a reference. Then the deletion of the native object held by ComparatorOptions is *correctly* deleted when its scope is closed in try/finally. Testing ------- We added a regression unit test based on the original test for the reported ticket. This checkin closes https://github.com/facebook/rocksdb/issues/8715 We expect that there are more instances of "lifecycle" bugs in the Java API. They are a major source of support time/cost, and we note that they could be addressed as a whole using the model proposed/prototyped in https://github.com/facebook/rocksdb/pull/10736 Pull Request resolved: https://github.com/facebook/rocksdb/pull/11176 Reviewed By: cbi42 Differential Revision: D43160885 Pulled By: pdillinger fbshipit-source-id: 60b54215a02ad9abb17363319650328c00a9ad62
This commit is contained in:
parent
b6640c3117
commit
d47126875b
|
@ -110,6 +110,7 @@ JAVA_TESTS = \
|
|||
org.rocksdb.BlobOptionsTest\
|
||||
org.rocksdb.BlockBasedTableConfigTest\
|
||||
org.rocksdb.BuiltinComparatorTest\
|
||||
org.rocksdb.ByteBufferUnsupportedOperationTest\
|
||||
org.rocksdb.BytewiseComparatorRegressionTest\
|
||||
org.rocksdb.util.BytewiseComparatorTest\
|
||||
org.rocksdb.util.BytewiseComparatorIntTest\
|
||||
|
|
|
@ -14,7 +14,8 @@ namespace ROCKSDB_NAMESPACE {
|
|||
ComparatorJniCallback::ComparatorJniCallback(
|
||||
JNIEnv* env, jobject jcomparator,
|
||||
const ComparatorJniCallbackOptions* options)
|
||||
: JniCallback(env, jcomparator), m_options(options) {
|
||||
: JniCallback(env, jcomparator),
|
||||
m_options(std::make_unique<ComparatorJniCallbackOptions>(*options)) {
|
||||
// cache the AbstractComparatorJniBridge class as we will reuse it many times
|
||||
// for each callback
|
||||
m_abstract_comparator_jni_bridge_clazz = static_cast<jclass>(
|
||||
|
|
|
@ -45,15 +45,12 @@ enum ReusedSynchronisationType {
|
|||
struct ComparatorJniCallbackOptions {
|
||||
// Set the synchronisation type used to guard the reused buffers.
|
||||
// Only used if max_reused_buffer_size > 0.
|
||||
// Default: ADAPTIVE_MUTEX
|
||||
ReusedSynchronisationType reused_synchronisation_type =
|
||||
ReusedSynchronisationType::ADAPTIVE_MUTEX;
|
||||
ReusedSynchronisationType reused_synchronisation_type = ADAPTIVE_MUTEX;
|
||||
|
||||
// Indicates if a direct byte buffer (i.e. outside of the normal
|
||||
// garbage-collected heap) is used for the callbacks to Java,
|
||||
// as opposed to a non-direct byte buffer which is a wrapper around
|
||||
// an on-heap byte[].
|
||||
// Default: true
|
||||
bool direct_buffer = true;
|
||||
|
||||
// Maximum size of a buffer (in bytes) that will be reused.
|
||||
|
@ -63,7 +60,6 @@ struct ComparatorJniCallbackOptions {
|
|||
// if it requires less than max_reused_buffer_size, then an
|
||||
// existing buffer will be reused, else a new buffer will be
|
||||
// allocated just for that callback. -1 to disable.
|
||||
// Default: 64 bytes
|
||||
int32_t max_reused_buffer_size = 64;
|
||||
};
|
||||
|
||||
|
@ -92,7 +88,7 @@ class ComparatorJniCallback : public JniCallback, public Comparator {
|
|||
virtual void FindShortestSeparator(std::string* start,
|
||||
const Slice& limit) const;
|
||||
virtual void FindShortSuccessor(std::string* key) const;
|
||||
const ComparatorJniCallbackOptions* m_options;
|
||||
const std::unique_ptr<ComparatorJniCallbackOptions> m_options;
|
||||
|
||||
private:
|
||||
struct ThreadLocalBuf {
|
||||
|
|
|
@ -20,8 +20,8 @@ public abstract class AbstractComparator
|
|||
super();
|
||||
}
|
||||
|
||||
protected AbstractComparator(final ComparatorOptions copt) {
|
||||
super(copt.nativeHandle_);
|
||||
protected AbstractComparator(final ComparatorOptions comparatorOptions) {
|
||||
super(comparatorOptions.nativeHandle_);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -11,8 +11,8 @@ import java.util.*;
|
|||
/**
|
||||
* ColumnFamilyOptions to control the behavior of a database. It will be used
|
||||
* during the creation of a {@link org.rocksdb.RocksDB} (i.e., RocksDB.open()).
|
||||
*
|
||||
* As a descendent of {@link AbstractNativeReference}, this class is {@link AutoCloseable}
|
||||
* <p>
|
||||
* As a descendant of {@link AbstractNativeReference}, this class is {@link AutoCloseable}
|
||||
* and will be automatically released if opened in the preamble of a try with resources block.
|
||||
*/
|
||||
public class ColumnFamilyOptions extends RocksObject
|
||||
|
@ -24,7 +24,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
|
||||
/**
|
||||
* Construct ColumnFamilyOptions.
|
||||
*
|
||||
* <p>
|
||||
* This constructor will create (by allocating a block of memory)
|
||||
* an {@code rocksdb::ColumnFamilyOptions} in the c++ side.
|
||||
*/
|
||||
|
@ -34,13 +34,13 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
|
||||
/**
|
||||
* Copy constructor for ColumnFamilyOptions.
|
||||
*
|
||||
* <p>
|
||||
* NOTE: This does a shallow copy, which means comparator, merge_operator, compaction_filter,
|
||||
* compaction_filter_factory and other pointers will be cloned!
|
||||
*
|
||||
* @param other The ColumnFamilyOptions to copy.
|
||||
*/
|
||||
public ColumnFamilyOptions(ColumnFamilyOptions other) {
|
||||
public ColumnFamilyOptions(final ColumnFamilyOptions other) {
|
||||
super(copyColumnFamilyOptions(other.nativeHandle_));
|
||||
this.memTableConfig_ = other.memTableConfig_;
|
||||
this.tableFormatConfig_ = other.tableFormatConfig_;
|
||||
|
@ -707,7 +707,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnFamilyOptions setBloomLocality(int bloomLocality) {
|
||||
public ColumnFamilyOptions setBloomLocality(final int bloomLocality) {
|
||||
setBloomLocality(nativeHandle_, bloomLocality);
|
||||
return this;
|
||||
}
|
||||
|
@ -742,9 +742,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnFamilyOptions
|
||||
setMemtableHugePageSize(
|
||||
long memtableHugePageSize) {
|
||||
public ColumnFamilyOptions setMemtableHugePageSize(final long memtableHugePageSize) {
|
||||
setMemtableHugePageSize(nativeHandle_,
|
||||
memtableHugePageSize);
|
||||
return this;
|
||||
|
@ -756,7 +754,8 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnFamilyOptions setSoftPendingCompactionBytesLimit(long softPendingCompactionBytesLimit) {
|
||||
public ColumnFamilyOptions setSoftPendingCompactionBytesLimit(
|
||||
final long softPendingCompactionBytesLimit) {
|
||||
setSoftPendingCompactionBytesLimit(nativeHandle_,
|
||||
softPendingCompactionBytesLimit);
|
||||
return this;
|
||||
|
@ -768,7 +767,8 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnFamilyOptions setHardPendingCompactionBytesLimit(long hardPendingCompactionBytesLimit) {
|
||||
public ColumnFamilyOptions setHardPendingCompactionBytesLimit(
|
||||
final long hardPendingCompactionBytesLimit) {
|
||||
setHardPendingCompactionBytesLimit(nativeHandle_, hardPendingCompactionBytesLimit);
|
||||
return this;
|
||||
}
|
||||
|
@ -779,7 +779,8 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnFamilyOptions setLevel0FileNumCompactionTrigger(int level0FileNumCompactionTrigger) {
|
||||
public ColumnFamilyOptions setLevel0FileNumCompactionTrigger(
|
||||
final int level0FileNumCompactionTrigger) {
|
||||
setLevel0FileNumCompactionTrigger(nativeHandle_, level0FileNumCompactionTrigger);
|
||||
return this;
|
||||
}
|
||||
|
@ -790,7 +791,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnFamilyOptions setLevel0SlowdownWritesTrigger(int level0SlowdownWritesTrigger) {
|
||||
public ColumnFamilyOptions setLevel0SlowdownWritesTrigger(final int level0SlowdownWritesTrigger) {
|
||||
setLevel0SlowdownWritesTrigger(nativeHandle_, level0SlowdownWritesTrigger);
|
||||
return this;
|
||||
}
|
||||
|
@ -801,7 +802,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnFamilyOptions setLevel0StopWritesTrigger(int level0StopWritesTrigger) {
|
||||
public ColumnFamilyOptions setLevel0StopWritesTrigger(final int level0StopWritesTrigger) {
|
||||
setLevel0StopWritesTrigger(nativeHandle_, level0StopWritesTrigger);
|
||||
return this;
|
||||
}
|
||||
|
@ -812,7 +813,8 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnFamilyOptions setMaxBytesForLevelMultiplierAdditional(int[] maxBytesForLevelMultiplierAdditional) {
|
||||
public ColumnFamilyOptions setMaxBytesForLevelMultiplierAdditional(
|
||||
final int[] maxBytesForLevelMultiplierAdditional) {
|
||||
setMaxBytesForLevelMultiplierAdditional(nativeHandle_, maxBytesForLevelMultiplierAdditional);
|
||||
return this;
|
||||
}
|
||||
|
@ -823,7 +825,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnFamilyOptions setParanoidFileChecks(boolean paranoidFileChecks) {
|
||||
public ColumnFamilyOptions setParanoidFileChecks(final boolean paranoidFileChecks) {
|
||||
setParanoidFileChecks(nativeHandle_, paranoidFileChecks);
|
||||
return this;
|
||||
}
|
||||
|
@ -931,7 +933,8 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnFamilyOptions setSstPartitionerFactory(SstPartitionerFactory sstPartitionerFactory) {
|
||||
public ColumnFamilyOptions setSstPartitionerFactory(
|
||||
final SstPartitionerFactory sstPartitionerFactory) {
|
||||
setSstPartitionerFactory(nativeHandle_, sstPartitionerFactory.nativeHandle_);
|
||||
this.sstPartitionerFactory_ = sstPartitionerFactory;
|
||||
return this;
|
||||
|
@ -967,9 +970,9 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* for reads. See also the options min_blob_size, blob_file_size,
|
||||
* blob_compression_type, enable_blob_garbage_collection, and
|
||||
* blob_garbage_collection_age_cutoff below.
|
||||
*
|
||||
* <p>
|
||||
* Default: false
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through
|
||||
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
|
||||
*
|
||||
|
@ -990,9 +993,9 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* for reads. See also the options min_blob_size, blob_file_size,
|
||||
* blob_compression_type, enable_blob_garbage_collection, and
|
||||
* blob_garbage_collection_age_cutoff below.
|
||||
*
|
||||
* <p>
|
||||
* Default: false
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through
|
||||
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
|
||||
*
|
||||
|
@ -1008,9 +1011,9 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* alongside the keys in SST files in the usual fashion. A value of zero for
|
||||
* this option means that all values are stored in blob files. Note that
|
||||
* enable_blob_files has to be set in order for this option to have any effect.
|
||||
*
|
||||
* <p>
|
||||
* Default: 0
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through
|
||||
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
|
||||
*
|
||||
|
@ -1029,9 +1032,9 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* alongside the keys in SST files in the usual fashion. A value of zero for
|
||||
* this option means that all values are stored in blob files. Note that
|
||||
* enable_blob_files has to be set in order for this option to have any effect.
|
||||
*
|
||||
* <p>
|
||||
* Default: 0
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through
|
||||
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
|
||||
*
|
||||
|
@ -1046,9 +1049,9 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* Set the size limit for blob files. When writing blob files, a new file is opened
|
||||
* once this limit is reached. Note that enable_blob_files has to be set in
|
||||
* order for this option to have any effect.
|
||||
*
|
||||
* <p>
|
||||
* Default: 256 MB
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through
|
||||
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
|
||||
*
|
||||
|
@ -1066,9 +1069,9 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* Get the size limit for blob files. When writing blob files, a new file is opened
|
||||
* once this limit is reached. Note that enable_blob_files has to be set in
|
||||
* order for this option to have any effect.
|
||||
*
|
||||
* <p>
|
||||
* Default: 256 MB
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through
|
||||
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
|
||||
*
|
||||
|
@ -1083,9 +1086,9 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* Set the compression algorithm to use for large values stored in blob files. Note
|
||||
* that enable_blob_files has to be set in order for this option to have any
|
||||
* effect.
|
||||
*
|
||||
* <p>
|
||||
* Default: no compression
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through
|
||||
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
|
||||
*
|
||||
|
@ -1103,9 +1106,9 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* Get the compression algorithm to use for large values stored in blob files. Note
|
||||
* that enable_blob_files has to be set in order for this option to have any
|
||||
* effect.
|
||||
*
|
||||
* <p>
|
||||
* Default: no compression
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through
|
||||
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
|
||||
*
|
||||
|
@ -1122,7 +1125,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* relocated to new files as they are encountered during compaction, which makes
|
||||
* it possible to clean up blob files once they contain nothing but
|
||||
* obsolete/garbage blobs. See also blob_garbage_collection_age_cutoff below.
|
||||
*
|
||||
* <p>
|
||||
* Default: false
|
||||
*
|
||||
* @param enableBlobGarbageCollection true iff blob garbage collection is to be enabled
|
||||
|
@ -1142,7 +1145,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* relocated to new files as they are encountered during compaction, which makes
|
||||
* it possible to clean up blob files once they contain nothing but
|
||||
* obsolete/garbage blobs. See also blob_garbage_collection_age_cutoff below.
|
||||
*
|
||||
* <p>
|
||||
* Default: false
|
||||
*
|
||||
* @return true iff blob garbage collection is currently enabled
|
||||
|
@ -1158,7 +1161,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* where N = garbage_collection_cutoff * number_of_blob_files. Note that
|
||||
* enable_blob_garbage_collection has to be set in order for this option to have
|
||||
* any effect.
|
||||
*
|
||||
* <p>
|
||||
* Default: 0.25
|
||||
*
|
||||
* @param blobGarbageCollectionAgeCutoff the new blob garbage collection age cutoff
|
||||
|
@ -1178,7 +1181,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* where N = garbage_collection_cutoff * number_of_blob_files. Note that
|
||||
* enable_blob_garbage_collection has to be set in order for this option to have
|
||||
* any effect.
|
||||
*
|
||||
* <p>
|
||||
* Default: 0.25
|
||||
*
|
||||
* @return the current blob garbage collection age cutoff
|
||||
|
@ -1194,12 +1197,12 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
* the blob files in question, assuming they are all eligible based on the
|
||||
* value of {@link #blobGarbageCollectionAgeCutoff} above. This option is
|
||||
* currently only supported with leveled compactions.
|
||||
*
|
||||
* <p>
|
||||
* Note that {@link #enableBlobGarbageCollection} has to be set in order for this
|
||||
* option to have any effect.
|
||||
*
|
||||
* <p>
|
||||
* Default: 1.0
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through the SetOptions() API
|
||||
*
|
||||
* @param blobGarbageCollectionForceThreshold new value for the threshold
|
||||
|
@ -1223,9 +1226,9 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
|
||||
/**
|
||||
* Set compaction readahead for blob files.
|
||||
*
|
||||
* <p>
|
||||
* Default: 0
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through
|
||||
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
|
||||
*
|
||||
|
@ -1252,9 +1255,9 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
|
||||
/**
|
||||
* Set a certain LSM tree level to enable blob files.
|
||||
*
|
||||
* <p>
|
||||
* Default: 0
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through
|
||||
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
|
||||
*
|
||||
|
@ -1270,7 +1273,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
|
||||
/**
|
||||
* Get the starting LSM tree level to enable blob files.
|
||||
*
|
||||
* <p>
|
||||
* Default: 0
|
||||
*
|
||||
* @return the current LSM tree level to enable blob files.
|
||||
|
@ -1282,9 +1285,9 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
|
||||
/**
|
||||
* Set a certain prepopulate blob cache option.
|
||||
*
|
||||
* <p>
|
||||
* Default: 0
|
||||
*
|
||||
* <p>
|
||||
* Dynamically changeable through
|
||||
* {@link RocksDB#setOptions(ColumnFamilyHandle, MutableColumnFamilyOptions)}.
|
||||
*
|
||||
|
@ -1301,7 +1304,7 @@ public class ColumnFamilyOptions extends RocksObject
|
|||
|
||||
/**
|
||||
* Get the prepopulate blob cache option.
|
||||
*
|
||||
* <p>
|
||||
* Default: 0
|
||||
*
|
||||
* @return the current prepopulate blob cache option.
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
package org.rocksdb;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.rocksdb.util.ReverseBytewiseComparator;
|
||||
|
||||
public class ByteBufferUnsupportedOperationTest {
|
||||
@ClassRule
|
||||
public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE =
|
||||
new RocksNativeLibraryResource();
|
||||
|
||||
@Rule public TemporaryFolder dbFolder = new TemporaryFolder();
|
||||
|
||||
public static class Handler {
|
||||
private final RocksDB database;
|
||||
private final Map<UUID, ColumnFamilyHandle> columnFamilies;
|
||||
|
||||
public Handler(final String path, final Options options) throws RocksDBException {
|
||||
RocksDB.destroyDB(path, options);
|
||||
this.database = RocksDB.open(options, path);
|
||||
this.columnFamilies = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
public void addTable(final UUID streamID) throws RocksDBException {
|
||||
final ColumnFamilyOptions tableOptions = new ColumnFamilyOptions();
|
||||
tableOptions.optimizeUniversalStyleCompaction();
|
||||
try (final ComparatorOptions comparatorOptions = new ComparatorOptions()) {
|
||||
// comparatorOptions.setReusedSynchronisationType(ReusedSynchronisationType.ADAPTIVE_MUTEX);
|
||||
tableOptions.setComparator(new ReverseBytewiseComparator(comparatorOptions));
|
||||
final ColumnFamilyDescriptor tableDescriptor = new ColumnFamilyDescriptor(
|
||||
streamID.toString().getBytes(StandardCharsets.UTF_8), tableOptions);
|
||||
final ColumnFamilyHandle tableHandle = database.createColumnFamily(tableDescriptor);
|
||||
columnFamilies.put(streamID, tableHandle);
|
||||
}
|
||||
}
|
||||
|
||||
public void updateAll(final List<byte[][]> keyValuePairs, final UUID streamID)
|
||||
throws RocksDBException {
|
||||
final ColumnFamilyHandle currTable = columnFamilies.get(streamID);
|
||||
try (final WriteBatch batchedWrite = new WriteBatch();
|
||||
final WriteOptions writeOptions = new WriteOptions()) {
|
||||
for (final byte[][] pair : keyValuePairs) {
|
||||
final byte[] keyBytes = pair[0];
|
||||
final byte[] valueBytes = pair[1];
|
||||
batchedWrite.put(currTable, keyBytes, valueBytes);
|
||||
}
|
||||
database.write(writeOptions, batchedWrite);
|
||||
}
|
||||
}
|
||||
public boolean containsValue(final byte[] encodedValue, final UUID streamID) {
|
||||
try (final RocksIterator iter = database.newIterator(columnFamilies.get(streamID))) {
|
||||
iter.seekToFirst();
|
||||
while (iter.isValid()) {
|
||||
final byte[] val = iter.value();
|
||||
if (Arrays.equals(val, encodedValue)) {
|
||||
return true;
|
||||
}
|
||||
iter.next();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
for (final ColumnFamilyHandle handle : columnFamilies.values()) {
|
||||
handle.close();
|
||||
}
|
||||
database.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void inner(final int numRepeats) throws RocksDBException {
|
||||
final Options opts = new Options();
|
||||
opts.setCreateIfMissing(true);
|
||||
final Handler handler = new Handler("testDB", opts);
|
||||
final UUID stream1 = UUID.randomUUID();
|
||||
|
||||
final List<byte[][]> entries = new ArrayList<>();
|
||||
for (int i = 0; i < numRepeats; i++) {
|
||||
final byte[] value = value(i);
|
||||
final byte[] key = key(i);
|
||||
entries.add(new byte[][] {key, value});
|
||||
}
|
||||
handler.addTable(stream1);
|
||||
handler.updateAll(entries, stream1);
|
||||
|
||||
for (int i = 0; i < numRepeats; i++) {
|
||||
final byte[] val = value(i);
|
||||
final boolean hasValue = handler.containsValue(val, stream1);
|
||||
if (!hasValue) {
|
||||
throw new IllegalStateException("not has value " + i);
|
||||
}
|
||||
}
|
||||
|
||||
handler.close();
|
||||
}
|
||||
|
||||
private static byte[] key(final int i) {
|
||||
return ("key" + i).getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static byte[] value(final int i) {
|
||||
return ("value" + i).getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void unsupportedOperation() throws RocksDBException {
|
||||
final int numRepeats = 1000;
|
||||
final int repeatTest = 10;
|
||||
|
||||
// the error is not always reproducible... let's try to increase the odds by repeating the main
|
||||
// test body
|
||||
for (int i = 0; i < repeatTest; i++) {
|
||||
try {
|
||||
inner(numRepeats);
|
||||
} catch (final RuntimeException runtimeException) {
|
||||
System.out.println("Exception on repeat " + i);
|
||||
throw runtimeException;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue