Merge pull request #422 from fyrz/RocksJava-Quality-Improvements

Rocks java quality improvements
This commit is contained in:
Yueh-Hsuan Chiang 2014-12-05 21:38:05 -08:00
commit 2871bc7bc8
18 changed files with 463 additions and 276 deletions

View File

@ -44,6 +44,9 @@ public class RocksDBColumnFamilySample {
db.close();
db = null;
}
if (options != null) {
options.dispose();
}
}
// open DB with two column families

View File

@ -119,7 +119,7 @@ public class RocksDBSample {
byte[] value = db.get("hello".getBytes());
assert("world".equals(new String(value)));
String str = db.getProperty("rocksdb.stats");
assert(str != null && str != "");
assert(str != null && !str.equals(""));
} catch (RocksDBException e) {
System.out.format("[ERROR] caught the unexpceted exception -- %s\n", e);
assert(db == null);

View File

@ -40,5 +40,5 @@ public class ColumnFamilyHandle extends RocksObject {
private native void disposeInternal(long handle);
private RocksDB rocksDB_;
private final RocksDB rocksDB_;
}

View File

@ -14,25 +14,59 @@ package org.rocksdb;
* compression method (if any) is used to compress a block.</p>
*/
public enum CompressionType {
NO_COMPRESSION((byte) 0),
SNAPPY_COMPRESSION((byte) 1),
ZLIB_COMPRESSION((byte) 2),
BZLIB2_COMPRESSION((byte) 3),
LZ4_COMPRESSION((byte) 4),
LZ4HC_COMPRESSION((byte) 5);
private final byte value_;
NO_COMPRESSION((byte) 0, null),
SNAPPY_COMPRESSION((byte) 1, "snappy"),
ZLIB_COMPRESSION((byte) 2, "z"),
BZLIB2_COMPRESSION((byte) 3, "bzip2"),
LZ4_COMPRESSION((byte) 4, "lz4"),
LZ4HC_COMPRESSION((byte) 5, "lz4hc");
private CompressionType(byte value) {
value_ = value;
/**
* <p>Get the CompressionType enumeration value by
* passing the library name to this method.</p>
*
* <p>If library cannot be found the enumeration
* value {@code NO_COMPRESSION} will be returned.</p>
*
* @return CompressionType instance.
*/
public static CompressionType getCompressionType(String libraryName) {
if (libraryName != null) {
for (CompressionType compressionType : CompressionType.values()) {
if (compressionType.getLibraryName() != null &&
compressionType.getLibraryName().equals(libraryName)) {
return compressionType;
}
}
}
return CompressionType.NO_COMPRESSION;
}
/**
* Returns the byte value of the enumerations value
* <p>Returns the byte value of the enumerations value.</p>
*
* @return byte representation
*/
public byte getValue() {
return value_;
}
/**
* <p>Returns the library name of the compression type
* identified by the enumeration value.</p>
*
* @return library name
*/
public String getLibraryName() {
return libraryName_;
}
private CompressionType(byte value, final String libraryName) {
value_ = value;
libraryName_ = libraryName;
}
private final byte value_;
private final String libraryName_;
}

View File

@ -18,8 +18,6 @@ import org.rocksdb.util.Environment;
public class RocksDB extends RocksObject {
public static final String DEFAULT_COLUMN_FAMILY = "default";
public static final int NOT_FOUND = -1;
private static final String[] compressionLibs_ = {
"snappy", "z", "bzip2", "lz4", "lz4hc"};
static {
RocksDB.loadLibrary();
@ -35,9 +33,11 @@ public class RocksDB extends RocksObject {
public static synchronized void loadLibrary() {
String tmpDir = System.getenv("ROCKSDB_SHAREDLIB_DIR");
// loading possibly necessary libraries.
for (String lib : compressionLibs_) {
for (CompressionType compressionType : CompressionType.values()) {
try {
System.loadLibrary(lib);
if (compressionType.getLibraryName() != null) {
System.loadLibrary(compressionType.getLibraryName());
}
} catch (UnsatisfiedLinkError e) {
// since it may be optional, we ignore its loading failure here.
}
@ -60,10 +60,14 @@ public class RocksDB extends RocksObject {
* of a library.
*/
public static synchronized void loadLibrary(List<String> paths) {
for (String lib : compressionLibs_) {
for (CompressionType compressionType : CompressionType.values()) {
if (compressionType.equals(CompressionType.NO_COMPRESSION)) {
continue;
}
for (String path : paths) {
try {
System.load(path + "/" + Environment.getSharedLibraryName(lib));
System.load(path + "/" + Environment.getSharedLibraryName(
compressionType.getLibraryName()));
break;
} catch (UnsatisfiedLinkError e) {
// since they are optional, we ignore loading fails.

View File

@ -162,5 +162,5 @@ public class RocksIterator extends RocksObject {
private native void seek0(long handle, byte[] target, int targetLen);
private native void status0(long handle);
RocksDB rocksDB_;
final RocksDB rocksDB_;
}

View File

@ -163,15 +163,6 @@ public class DbBenchmark {
EXISTING
}
enum CompressionType {
NONE,
SNAPPY,
ZLIB,
BZIP2,
LZ4,
LZ4HC
}
static {
RocksDB.loadLibrary();
}
@ -457,18 +448,16 @@ public class DbBenchmark {
// options.setPrefixSize((Integer)flags_.get(Flag.prefix_size));
// options.setKeysPerPrefix((Long)flags_.get(Flag.keys_per_prefix));
compressionType_ = (String) flags.get(Flag.compression_type);
compression_ = CompressionType.NONE;
compression_ = CompressionType.NO_COMPRESSION;
try {
if (compressionType_.equals("snappy")) {
System.loadLibrary("snappy");
} else if (compressionType_.equals("zlib")) {
System.loadLibrary("z");
} else if (compressionType_.equals("bzip2")) {
System.loadLibrary("bzip2");
} else if (compressionType_.equals("lz4")) {
System.loadLibrary("lz4");
} else if (compressionType_.equals("lz4hc")) {
System.loadLibrary("lz4hc");
if (compressionType_!=null) {
final CompressionType compressionType =
CompressionType.getCompressionType(compressionType_);
if (compressionType != null &&
compressionType != CompressionType.NO_COMPRESSION) {
System.loadLibrary(compressionType.getLibraryName());
}
}
} catch (UnsatisfiedLinkError e) {
System.err.format("Unable to load %s library:%s%n" +
@ -495,26 +484,32 @@ public class DbBenchmark {
} else {
options.setCreateIfMissing(false);
}
if (memtable_.equals("skip_list")) {
options.setMemTableConfig(new SkipListMemTableConfig());
} else if (memtable_.equals("vector")) {
options.setMemTableConfig(new VectorMemTableConfig());
} else if (memtable_.equals("hash_linkedlist")) {
options.setMemTableConfig(
new HashLinkedListMemTableConfig()
.setBucketCount(hashBucketCount_));
options.useFixedLengthPrefixExtractor(prefixSize_);
} else if (memtable_.equals("hash_skiplist") ||
memtable_.equals("prefix_hash")) {
options.setMemTableConfig(
new HashSkipListMemTableConfig()
.setBucketCount(hashBucketCount_));
options.useFixedLengthPrefixExtractor(prefixSize_);
} else {
System.err.format(
"unable to detect the specified memtable, " +
"use the default memtable factory %s%n",
options.memTableFactoryName());
switch (memtable_) {
case "skip_list":
options.setMemTableConfig(new SkipListMemTableConfig());
break;
case "vector":
options.setMemTableConfig(new VectorMemTableConfig());
break;
case "hash_linkedlist":
options.setMemTableConfig(
new HashLinkedListMemTableConfig()
.setBucketCount(hashBucketCount_));
options.useFixedLengthPrefixExtractor(prefixSize_);
break;
case "hash_skiplist":
case "prefix_hash":
options.setMemTableConfig(
new HashSkipListMemTableConfig()
.setBucketCount(hashBucketCount_));
options.useFixedLengthPrefixExtractor(prefixSize_);
break;
default:
System.err.format(
"unable to detect the specified memtable, " +
"use the default memtable factory %s%n",
options.memTableFactoryName());
break;
}
if (usePlainTable_) {
options.setTableFormatConfig(
@ -645,53 +640,65 @@ public class DbBenchmark {
int currentTaskId = 0;
boolean known = true;
if (benchmark.equals("fillseq")) {
tasks.add(new WriteSequentialTask(
currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
} else if (benchmark.equals("fillbatch")) {
tasks.add(new WriteRandomTask(
currentTaskId++, randSeed_, num_ / 1000, num_, writeOpt, 1000));
} else if (benchmark.equals("fillrandom")) {
tasks.add(new WriteRandomTask(
currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
} else if (benchmark.equals("filluniquerandom")) {
tasks.add(new WriteUniqueRandomTask(
currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
} else if (benchmark.equals("fillsync")) {
writeOpt.setSync(true);
tasks.add(new WriteRandomTask(
currentTaskId++, randSeed_, num_ / 1000, num_ / 1000,
writeOpt, 1));
} else if (benchmark.equals("readseq")) {
for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadSequentialTask(
currentTaskId++, randSeed_, reads_ / threadNum_, num_));
}
} else if (benchmark.equals("readrandom")) {
for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadRandomTask(
currentTaskId++, randSeed_, reads_ / threadNum_, num_));
}
} else if (benchmark.equals("readwhilewriting")) {
WriteTask writeTask = new WriteRandomTask(
-1, randSeed_, Long.MAX_VALUE, num_, writeOpt, 1, writesPerSeconds_);
writeTask.stats_.setExcludeFromMerge();
bgTasks.add(writeTask);
for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadRandomTask(
currentTaskId++, randSeed_, reads_ / threadNum_, num_));
}
} else if (benchmark.equals("readhot")) {
for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadRandomTask(
currentTaskId++, randSeed_, reads_ / threadNum_, num_ / 100));
}
} else if (benchmark.equals("delete")) {
destroyDb();
open(options);
} else {
known = false;
System.err.println("Unknown benchmark: " + benchmark);
switch (benchmark) {
case "fillseq":
tasks.add(new WriteSequentialTask(
currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
break;
case "fillbatch":
tasks.add(new WriteRandomTask(
currentTaskId++, randSeed_, num_ / 1000, num_, writeOpt, 1000));
break;
case "fillrandom":
tasks.add(new WriteRandomTask(
currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
break;
case "filluniquerandom":
tasks.add(new WriteUniqueRandomTask(
currentTaskId++, randSeed_, num_, num_, writeOpt, 1));
break;
case "fillsync":
writeOpt.setSync(true);
tasks.add(new WriteRandomTask(
currentTaskId++, randSeed_, num_ / 1000, num_ / 1000,
writeOpt, 1));
break;
case "readseq":
for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadSequentialTask(
currentTaskId++, randSeed_, reads_ / threadNum_, num_));
}
break;
case "readrandom":
for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadRandomTask(
currentTaskId++, randSeed_, reads_ / threadNum_, num_));
}
break;
case "readwhilewriting":
WriteTask writeTask = new WriteRandomTask(
-1, randSeed_, Long.MAX_VALUE, num_, writeOpt, 1, writesPerSeconds_);
writeTask.stats_.setExcludeFromMerge();
bgTasks.add(writeTask);
for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadRandomTask(
currentTaskId++, randSeed_, reads_ / threadNum_, num_));
}
break;
case "readhot":
for (int t = 0; t < threadNum_; ++t) {
tasks.add(new ReadRandomTask(
currentTaskId++, randSeed_, reads_ / threadNum_, num_ / 100));
}
break;
case "delete":
destroyDb();
open(options);
break;
default:
known = false;
System.err.println("Unknown benchmark: " + benchmark);
break;
}
if (known) {
ExecutorService executor = Executors.newCachedThreadPool();
@ -800,7 +807,7 @@ public class DbBenchmark {
System.out.printf(
"%-16s : %11.5f micros/op; %6.1f MB/s;%s %d / %d task(s) finished.\n",
benchmark, (double) elapsedSeconds / stats.done_ * 1e6,
benchmark, elapsedSeconds / stats.done_ * 1e6,
(stats.bytes_ / 1048576.0) / elapsedSeconds, extra,
taskFinishedCount, concurrentThreads);
}

View File

@ -9,7 +9,8 @@ import org.rocksdb.*;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import static org.assertj.core.api.Assertions.assertThat;
@ -40,7 +41,7 @@ public abstract class AbstractComparatorTest {
*
* @throws java.io.IOException if IO error happens.
*/
public void testRoundtrip(final Path db_path) throws IOException {
public void testRoundtrip(final Path db_path) throws IOException, RocksDBException {
Options opt = null;
RocksDB db = null;
@ -65,7 +66,6 @@ public abstract class AbstractComparatorTest {
}
db.close();
// re-open db and read from start to end
// integer keys should be in ascending
// order as defined by SimpleIntComparator
@ -84,9 +84,6 @@ public abstract class AbstractComparatorTest {
assertThat(count).isEqualTo(ITERATIONS);
} catch (final RocksDBException e) {
System.err.format("[ERROR]: %s%n", e);
e.printStackTrace();
} finally {
if (db != null) {
db.close();
@ -95,8 +92,97 @@ public abstract class AbstractComparatorTest {
if (opt != null) {
opt.dispose();
}
}
}
removeDb(db_path); // cleanup after ourselves!
/**
* Test which stores random keys into a column family
* in the database
* using an @see getAscendingIntKeyComparator
* it then checks that these keys are read back in
* ascending order
*
* @param db_path A path where we can store database
* files temporarily
*
* @throws java.io.IOException if IO error happens.
*/
public void testRoundtripCf(final Path db_path) throws IOException,
RocksDBException {
DBOptions opt = null;
RocksDB db = null;
List<ColumnFamilyDescriptor> cfDescriptors =
new ArrayList<>();
cfDescriptors.add(new ColumnFamilyDescriptor(
RocksDB.DEFAULT_COLUMN_FAMILY));
cfDescriptors.add(new ColumnFamilyDescriptor("new_cf",
new ColumnFamilyOptions().setComparator(
getAscendingIntKeyComparator())));
List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
try {
opt = new DBOptions().
setCreateIfMissing(true).
setCreateMissingColumnFamilies(true);
// store 10,000 random integer keys
final int ITERATIONS = 10000;
db = RocksDB.open(opt, db_path.toString(), cfDescriptors, cfHandles);
assertThat(cfDescriptors.size()).isEqualTo(2);
assertThat(cfHandles.size()).isEqualTo(2);
final Random random = new Random();
for (int i = 0; i < ITERATIONS; i++) {
final byte key[] = intToByte(random.nextInt());
if (i > 0 && db.get(cfHandles.get(1), key) != null) {
// does key already exist (avoid duplicates)
i--; // generate a different key
} else {
db.put(cfHandles.get(1), key, "value".getBytes());
}
}
for (ColumnFamilyHandle handle : cfHandles) {
handle.dispose();
}
cfHandles.clear();
db.close();
// re-open db and read from start to end
// integer keys should be in ascending
// order as defined by SimpleIntComparator
db = RocksDB.open(opt, db_path.toString(), cfDescriptors, cfHandles);
assertThat(cfDescriptors.size()).isEqualTo(2);
assertThat(cfHandles.size()).isEqualTo(2);
final RocksIterator it = db.newIterator(cfHandles.get(1));
it.seekToFirst();
int lastKey = Integer.MIN_VALUE;
int count = 0;
for (it.seekToFirst(); it.isValid(); it.next()) {
final int thisKey = byteToInt(it.key());
assertThat(thisKey).isGreaterThan(lastKey);
lastKey = thisKey;
count++;
}
for (ColumnFamilyHandle handle : cfHandles) {
handle.dispose();
}
cfHandles.clear();
db.close();
assertThat(count).isEqualTo(ITERATIONS);
} finally {
for (ColumnFamilyHandle handle : cfHandles) {
handle.dispose();
}
if (db != null) {
db.close();
}
if (opt != null) {
opt.dispose();
}
}
}
@ -127,43 +213,4 @@ public abstract class AbstractComparatorTest {
return result;
}
/**
* Utility method for deleting database files
*
* @param db_path The path to the database to remove
* from the filesystem
*/
private static void removeDb(final Path db_path) throws IOException {
Files.walkFileTree(db_path, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs)
throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(final Path file, IOException exc)
throws IOException {
// try to delete the file anyway, even if its attributes
// could not be read, since delete-only access is
// theoretically possible
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(final Path dir, IOException exc)
throws IOException {
if (exc == null) {
Files.delete(dir);
return FileVisitResult.CONTINUE;
} else {
// directory iteration failed; propagate exception
throw exc;
}
}
});
}
}

View File

@ -215,14 +215,26 @@ public class BackupableDBTest {
bdb.createNewBackup(true);
bdb.createNewBackup(true);
bdb.createNewBackup(true);
verifyNumberOfValidBackups(bdb, 4);
List<BackupInfo> infos = verifyNumberOfValidBackups(bdb, 4);
assertThat(infos.get(1).size()).
isEqualTo(infos.get(2).size());
assertThat(infos.get(1).numberFiles()).
isEqualTo(infos.get(2).numberFiles());
long maxTimeBeforePurge = Long.MIN_VALUE;
for (BackupInfo backupInfo : infos) {
if (maxTimeBeforePurge < backupInfo.timestamp()) {
maxTimeBeforePurge = backupInfo.timestamp();
}
}
// init RestoreBackupableDB
rdb = new RestoreBackupableDB(bopt);
// the same number of backups must
// exist using RestoreBackupableDB.
verifyNumberOfValidBackups(rdb, 4);
rdb.purgeOldBackups(1);
verifyNumberOfValidBackups(rdb, 1);
infos = verifyNumberOfValidBackups(rdb, 1);
assertThat(infos.get(0).timestamp()).
isEqualTo(maxTimeBeforePurge);
} finally {
if (bdb != null) {
bdb.close();

View File

@ -84,6 +84,9 @@ public class BlockBasedTableConfigTest {
@Test
public void checksumType() {
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig();
assertThat(ChecksumType.values().length).isEqualTo(3);
assertThat(ChecksumType.valueOf("kxxHash")).
isEqualTo(ChecksumType.kxxHash);
blockBasedTableConfig.setChecksumType(ChecksumType.kNoChecksum);
blockBasedTableConfig.setChecksumType(ChecksumType.kxxHash);
assertThat(blockBasedTableConfig.checksumType().equals(

View File

@ -171,6 +171,9 @@ public class ColumnFamilyTest {
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
}
}

View File

@ -26,7 +26,7 @@ public class ComparatorTest {
public TemporaryFolder dbFolder = new TemporaryFolder();
@Test
public void javaComparator() throws IOException {
public void javaComparator() throws IOException, RocksDBException {
final AbstractComparatorTest comparatorTest = new AbstractComparatorTest() {
@Override
@ -51,6 +51,32 @@ public class ComparatorTest {
dbFolder.getRoot().getAbsolutePath()));
}
@Test
public void javaComparatorCf() throws IOException, RocksDBException {
final AbstractComparatorTest comparatorTest = new AbstractComparatorTest() {
@Override
public AbstractComparator getAscendingIntKeyComparator() {
return new Comparator(new ComparatorOptions()) {
@Override
public String name() {
return "test.AscendingIntKeyComparator";
}
@Override
public int compare(final Slice a, final Slice b) {
return compareIntKeys(a.data(), b.data());
}
};
}
};
// test the round-tripability of keys written and read with the Comparator
comparatorTest.testRoundtripCf(FileSystems.getDefault().getPath(
dbFolder.getRoot().getAbsolutePath()));
}
@Test
public void builtinForwardComparator()
throws RocksDBException {
@ -195,5 +221,8 @@ public class ComparatorTest {
assertThat(
BuiltinComparator.REVERSE_BYTEWISE_COMPARATOR.ordinal())
.isEqualTo(1);
assertThat(BuiltinComparator.values().length).isEqualTo(2);
assertThat(BuiltinComparator.valueOf("BYTEWISE_COMPARATOR")).
isEqualTo(BuiltinComparator.BYTEWISE_COMPARATOR);
}
}

View File

@ -0,0 +1,22 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb.test;
import org.junit.Test;
import org.rocksdb.CompressionType;
public class CompressionOptionsTest
{
@Test
public void getCompressionType() {
for (CompressionType compressionType : CompressionType.values()) {
String libraryName = compressionType.getLibraryName();
compressionType.equals(CompressionType.getCompressionType(
libraryName));
}
}
}

View File

@ -23,7 +23,7 @@ public class DirectComparatorTest {
public TemporaryFolder dbFolder = new TemporaryFolder();
@Test
public void directComparator() throws IOException {
public void directComparator() throws IOException, RocksDBException {
final AbstractComparatorTest comparatorTest = new AbstractComparatorTest() {
@Override

View File

@ -57,6 +57,9 @@ public class InfoLogLevelTest {
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
}
}
@ -84,9 +87,26 @@ public class InfoLogLevelTest {
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
if (dbOptions != null) {
dbOptions.dispose();
}
}
}
@Test(expected = IllegalArgumentException.class)
public void failIfIllegalByteValueProvided() {
InfoLogLevel.getInfoLogLevel((byte)-1);
}
@Test
public void valueOf() {
assertThat(InfoLogLevel.valueOf("DEBUG_LEVEL")).
isEqualTo(InfoLogLevel.DEBUG_LEVEL);
}
/**
* Read LOG file contents into String.
*

View File

@ -53,6 +53,5 @@ public class MixedOptionsTest {
options.optimizeUniversalStyleCompaction(400);
options.optimizeForPointLookup(1024);
options.prepareForBulkLoad();
System.out.println("Mixed options test passed");
}
}

View File

@ -5,7 +5,6 @@
package org.rocksdb.test;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
@ -13,9 +12,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.ClassRule;
import org.junit.Test;
import org.rocksdb.WriteOptions;
import static org.assertj.core.api.Assertions.assertThat;
@ -27,143 +26,148 @@ public class WriteBatchHandlerTest {
@Test
public void writeBatchHandler() throws IOException, RocksDBException {
WriteBatch batch = null;
CapturingWriteBatchHandler handler = null;
try {
// setup test data
final List<Tuple<Action, Tuple<byte[], byte[]>>> testEvents = new ArrayList<>();
testEvents.add(new Tuple<>(Action.DELETE,
new Tuple<byte[], byte[]>("k0".getBytes(), null)));
testEvents.add(new Tuple<>(Action.PUT,
new Tuple<>("k1".getBytes(), "v1".getBytes())));
testEvents.add(new Tuple<>(Action.PUT,
new Tuple<>("k2".getBytes(), "v2".getBytes())));
testEvents.add(new Tuple<>(Action.PUT,
new Tuple<>("k3".getBytes(), "v3".getBytes())));
testEvents.add(new Tuple<>(Action.LOG,
new Tuple<byte[], byte[]>(null, "log1".getBytes())));
testEvents.add(new Tuple<>(Action.MERGE,
new Tuple<>("k2".getBytes(), "v22".getBytes())));
testEvents.add(new Tuple<>(Action.DELETE,
new Tuple<byte[], byte[]>("k3".getBytes(), null)));
// setup test data
final List<Tuple<Action, Tuple<byte[], byte[]>>> testEvents = new ArrayList<>();
testEvents.add(new Tuple<>(Action.DELETE,
new Tuple<byte[], byte[]>("k0".getBytes(), null)));
testEvents.add(new Tuple<>(Action.PUT,
new Tuple<>("k1".getBytes(), "v1".getBytes())));
testEvents.add(new Tuple<>(Action.PUT,
new Tuple<>("k2".getBytes(), "v2".getBytes())));
testEvents.add(new Tuple<>(Action.PUT,
new Tuple<>("k3".getBytes(), "v3".getBytes())));
testEvents.add(new Tuple<>(Action.LOG,
new Tuple<byte[], byte[]>(null, "log1".getBytes())));
testEvents.add(new Tuple<>(Action.MERGE,
new Tuple<>("k2".getBytes(), "v22".getBytes())));
testEvents.add(new Tuple<>(Action.DELETE,
new Tuple<byte[], byte[]>("k3".getBytes(), null)));
// load test data to the write batch
batch = new WriteBatch();
for (final Tuple<Action, Tuple<byte[], byte[]>> testEvent : testEvents) {
final Tuple<byte[], byte[]> data = testEvent.value;
switch (testEvent.key) {
// load test data to the write batch
final WriteBatch batch = new WriteBatch();
for(final Tuple<Action, Tuple<byte[], byte[]>> testEvent : testEvents) {
final Tuple<byte[], byte[]> data = testEvent.value;
switch(testEvent.key) {
case PUT:
batch.put(data.key, data.value);
break;
case PUT:
batch.put(data.key, data.value);
break;
case MERGE:
batch.merge(data.key, data.value);
break;
case MERGE:
batch.merge(data.key, data.value);
break;
case DELETE:
batch.remove(data.key);
break;
case DELETE:
batch.remove(data.key);
break;
case LOG:
batch.putLogData(data.value);
break;
}
case LOG:
batch.putLogData(data.value);
break;
}
}
// attempt to read test data back from the WriteBatch by iterating with a handler
final CapturingWriteBatchHandler handler = new CapturingWriteBatchHandler();
batch.iterate(handler);
// attempt to read test data back from the WriteBatch by iterating with a handler
handler = new CapturingWriteBatchHandler();
batch.iterate(handler);
// compare the results to the test data
final List<Tuple<Action, Tuple<byte[], byte[]>>> actualEvents = handler.getEvents();
assertThat(testEvents.size()).isSameAs(actualEvents.size());
// compare the results to the test data
final List<Tuple<Action, Tuple<byte[], byte[]>>> actualEvents = handler.getEvents();
assertThat(testEvents.size()).isSameAs(actualEvents.size());
for(int i = 0; i < testEvents.size(); i++) {
assertThat(equals(testEvents.get(i), actualEvents.get(i))).isTrue();
}
for (int i = 0; i < testEvents.size(); i++) {
assertThat(equals(testEvents.get(i), actualEvents.get(i))).isTrue();
}
} finally {
if (handler != null) {
handler.dispose();
}
if (batch != null) {
batch.dispose();
}
}
}
System.out.println("Passed WriteBatchHandler Test");
private static boolean equals(final Tuple<Action, Tuple<byte[], byte[]>> expected,
final Tuple<Action, Tuple<byte[], byte[]>> actual) {
if (!expected.key.equals(actual.key)) {
return false;
}
private static boolean equals(final Tuple<Action, Tuple<byte[], byte[]>> expected,
final Tuple<Action, Tuple<byte[], byte[]>> actual) {
if(!expected.key.equals(actual.key)) {
return false;
}
final Tuple<byte[], byte[]> expectedData = expected.value;
final Tuple<byte[], byte[]> actualData = actual.value;
final Tuple<byte[], byte[]> expectedData = expected.value;
final Tuple<byte[], byte[]> actualData = actual.value;
return equals(expectedData.key, actualData.key)
&& equals(expectedData.value, actualData.value);
}
if(equals(expectedData.key, actualData.key)) {
return equals(expectedData.value, actualData.value);
} else {
return false;
}
private static boolean equals(byte[] expected, byte[] actual) {
if (expected != null) {
return Arrays.equals(expected, actual);
} else {
return actual == null;
}
}
private static boolean equals(byte[] expected, byte[] actual) {
if(expected != null) {
return Arrays.equals(expected, actual);
} else {
return actual == null;
}
private static class Tuple<K, V> {
public final K key;
public final V value;
public Tuple(final K key, final V value) {
this.key = key;
this.value = value;
}
}
private static class Tuple<K, V> {
public final K key;
public final V value;
/**
* Enumeration of Write Batch
* event actions
*/
private enum Action {
PUT,
MERGE,
DELETE,
LOG
}
public Tuple(final K key, final V value) {
this.key = key;
this.value = value;
}
}
/**
* A simple WriteBatch Handler which adds a record
* of each event that it receives to a list
*/
private static class CapturingWriteBatchHandler extends WriteBatch.Handler {
private final List<Tuple<Action, Tuple<byte[], byte[]>>> events = new ArrayList<>();
/**
* Enumeration of Write Batch
* event actions
* Returns a copy of the current events list
*
* @return a list of the events which have happened upto now
*/
private enum Action {
PUT,
MERGE,
DELETE,
LOG
public List<Tuple<Action, Tuple<byte[], byte[]>>> getEvents() {
return new ArrayList<>(events);
}
/**
* A simple WriteBatch Handler which adds a record
* of each event that it receives to a list
*/
private static class CapturingWriteBatchHandler extends WriteBatch.Handler {
private final List<Tuple<Action, Tuple<byte[], byte[]>>> events = new ArrayList<>();
/**
* Returns a copy of the current events list
*
* @return a list of the events which have happened upto now
*/
public List<Tuple<Action, Tuple<byte[], byte[]>>> getEvents() {
return new ArrayList<>(events);
}
@Override
public void put(final byte[] key, final byte[] value) {
events.add(new Tuple<>(Action.PUT, new Tuple<>(key, value)));
}
@Override
public void merge(final byte[] key, final byte[] value) {
events.add(new Tuple<>(Action.MERGE, new Tuple<>(key, value)));
}
@Override
public void delete(final byte[] key) {
events.add(new Tuple<>(Action.DELETE, new Tuple<byte[], byte[]>(key, null)));
}
@Override
public void logData(final byte[] blob) {
events.add(new Tuple<>(Action.LOG, new Tuple<byte[], byte[]>(null, blob)));
}
@Override
public void put(final byte[] key, final byte[] value) {
events.add(new Tuple<>(Action.PUT, new Tuple<>(key, value)));
}
@Override
public void merge(final byte[] key, final byte[] value) {
events.add(new Tuple<>(Action.MERGE, new Tuple<>(key, value)));
}
@Override
public void delete(final byte[] key) {
events.add(new Tuple<>(Action.DELETE, new Tuple<byte[], byte[]>(key, null)));
}
@Override
public void logData(final byte[] blob) {
events.add(new Tuple<>(Action.LOG, new Tuple<byte[], byte[]>(null, blob)));
}
}
}

View File

@ -1874,9 +1874,9 @@ void Java_org_rocksdb_ColumnFamilyOptions_setComparatorHandle__JI(
* Method: setComparatorHandle
* Signature: (JJ)V
*/
void Java_org_rocksdb_ColumnFamilyOptions_setComparatorHandle__JI(
void Java_org_rocksdb_ColumnFamilyOptions_setComparatorHandle__JJ(
JNIEnv* env, jobject jobj, jlong jopt_handle, jlong jcomparator_handle) {
reinterpret_cast<rocksdb::Options*>(jopt_handle)->comparator =
reinterpret_cast<rocksdb::ColumnFamilyOptions*>(jopt_handle)->comparator =
reinterpret_cast<rocksdb::Comparator*>(jcomparator_handle);
}