Fix database open with column family. (#12167)

Summary:
When is RocksDB is opened with Column Family descriptors, the default column family must be set properly. If it was not, then the flush operation will fail.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12167

Reviewed By: ajkr

Differential Revision: D53104007

Pulled By: cbi42

fbshipit-source-id: dffa8e34a4b2a438553ee4ea308f3fa2e22e46f7
This commit is contained in:
Radek Hubner 2024-01-26 09:13:03 -08:00 committed by Facebook GitHub Bot
parent 2233a2f4c0
commit f2ddb92750
3 changed files with 108 additions and 11 deletions

View File

@ -41,7 +41,7 @@ public class RocksDB extends RocksObject {
private ColumnFamilyHandle defaultColumnFamilyHandle_;
private final ReadOptions defaultReadOptions_ = new ReadOptions();
private final List<ColumnFamilyHandle> ownedColumnFamilyHandles = new ArrayList<>();
final List<ColumnFamilyHandle> ownedColumnFamilyHandles = new ArrayList<>();
/**
* Loads the necessary library files.
@ -305,11 +305,19 @@ public class RocksDB extends RocksObject {
final byte[][] cfNames = new byte[columnFamilyDescriptors.size()][];
final long[] cfOptionHandles = new long[columnFamilyDescriptors.size()];
int defaultColumnFamilyIndex = -1;
for (int i = 0; i < columnFamilyDescriptors.size(); i++) {
final ColumnFamilyDescriptor cfDescriptor = columnFamilyDescriptors
.get(i);
cfNames[i] = cfDescriptor.getName();
cfOptionHandles[i] = cfDescriptor.getOptions().nativeHandle_;
if (Arrays.equals(cfDescriptor.getName(), RocksDB.DEFAULT_COLUMN_FAMILY)) {
defaultColumnFamilyIndex = i;
}
}
if (defaultColumnFamilyIndex < 0) {
new IllegalArgumentException(
"You must provide the default column family in your columnFamilyDescriptors");
}
final long[] handles = open(options.nativeHandle_, path, cfNames,
@ -324,8 +332,7 @@ public class RocksDB extends RocksObject {
}
db.ownedColumnFamilyHandles.addAll(columnFamilyHandles);
db.storeDefaultColumnFamilyHandle(db.makeDefaultColumnFamilyHandle());
db.storeDefaultColumnFamilyHandle(columnFamilyHandles.get(defaultColumnFamilyIndex));
return db;
}

View File

@ -5,6 +5,7 @@
package org.rocksdb;
import java.util.Arrays;
import java.util.List;
/**
@ -84,7 +85,10 @@ public class TtlDB extends RocksDB {
*/
public static TtlDB open(final Options options, final String db_path,
final int ttl, final boolean readOnly) throws RocksDBException {
return new TtlDB(open(options.nativeHandle_, db_path, ttl, readOnly));
final TtlDB db = new TtlDB(open(options.nativeHandle_, db_path, ttl, readOnly));
db.storeOptionsInstance(options);
db.storeDefaultColumnFamilyHandle(db.makeDefaultColumnFamilyHandle());
return db;
}
/**
@ -116,6 +120,7 @@ public class TtlDB extends RocksDB {
+ " family handle.");
}
int defaultColumnFamilyIndex = -1;
final byte[][] cfNames = new byte[columnFamilyDescriptors.size()][];
final long[] cfOptionHandles = new long[columnFamilyDescriptors.size()];
for (int i = 0; i < columnFamilyDescriptors.size(); i++) {
@ -123,6 +128,13 @@ public class TtlDB extends RocksDB {
columnFamilyDescriptors.get(i);
cfNames[i] = cfDescriptor.getName();
cfOptionHandles[i] = cfDescriptor.getOptions().nativeHandle_;
if (Arrays.equals(cfDescriptor.getName(), RocksDB.DEFAULT_COLUMN_FAMILY)) {
defaultColumnFamilyIndex = i;
}
}
if (defaultColumnFamilyIndex < 0) {
new IllegalArgumentException(
"You must provide the default column family in your columnFamilyDescriptors");
}
final int[] ttlVals = new int[ttlValues.size()];
@ -136,6 +148,10 @@ public class TtlDB extends RocksDB {
for (int i = 1; i < handles.length; i++) {
columnFamilyHandles.add(new ColumnFamilyHandle(ttlDB, handles[i]));
}
ttlDB.storeOptionsInstance(options);
ttlDB.ownedColumnFamilyHandles.addAll(columnFamilyHandles);
ttlDB.storeDefaultColumnFamilyHandle(columnFamilyHandles.get(defaultColumnFamilyIndex));
return ttlDB;
}

View File

@ -5,19 +5,20 @@
package org.rocksdb;
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
public class TtlDBTest {
private static final int BATCH_ITERATION = 16;
@ClassRule
public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE =
@ -109,4 +110,77 @@ public class TtlDBTest {
assertThat(ttlDB.get(columnFamilyHandle, "key".getBytes())).isNull();
}
}
@Test
public void writeBatchWithFlush() throws RocksDBException {
try (final Options dbOptions = new Options()) {
dbOptions.setCreateIfMissing(true);
dbOptions.setCreateMissingColumnFamilies(true);
try (final RocksDB db =
TtlDB.open(dbOptions, dbFolder.getRoot().getAbsolutePath(), 100, false)) {
try (WriteBatch wb = new WriteBatch()) {
for (int i = 0; i < BATCH_ITERATION; i++) {
wb.put(("key" + i).getBytes(StandardCharsets.UTF_8),
("value" + i).getBytes(StandardCharsets.UTF_8));
}
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, wb);
}
try (FlushOptions fOptions = new FlushOptions()) {
db.flush(fOptions);
}
}
for (int i = 0; i < BATCH_ITERATION; i++) {
assertThat(db.get(("key" + i).getBytes(StandardCharsets.UTF_8)))
.isEqualTo(("value" + i).getBytes(StandardCharsets.UTF_8));
}
}
}
}
@Test
public void writeBatchWithFlushAndColumnFamily() throws RocksDBException {
try (final DBOptions dbOptions = new DBOptions()) {
System.out.println("Test start");
dbOptions.setCreateIfMissing(true);
dbOptions.setCreateMissingColumnFamilies(true);
final List<ColumnFamilyDescriptor> cfNames =
Arrays.asList(new ColumnFamilyDescriptor("new_cf".getBytes()),
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
final List<Integer> ttlValues = Arrays.asList(0, 1);
try (final RocksDB db = TtlDB.open(dbOptions, dbFolder.getRoot().getAbsolutePath(), cfNames,
columnFamilyHandleList, ttlValues, false)) {
try {
assertThat(columnFamilyHandleList.get(1).isDefaultColumnFamily()).isTrue();
try (WriteBatch wb = new WriteBatch()) {
for (int i = 0; i < BATCH_ITERATION; i++) {
wb.put(("key" + i).getBytes(StandardCharsets.UTF_8),
("value" + i).getBytes(StandardCharsets.UTF_8));
}
try (WriteOptions writeOptions = new WriteOptions()) {
db.write(writeOptions, wb);
}
try (FlushOptions fOptions = new FlushOptions()) {
// Test both flush options, db.flush(fOptions) slush only default CF
db.flush(fOptions);
db.flush(fOptions, columnFamilyHandleList);
}
}
for (int i = 0; i < BATCH_ITERATION; i++) {
assertThat(db.get(("key" + i).getBytes(StandardCharsets.UTF_8)))
.isEqualTo(("value" + i).getBytes(StandardCharsets.UTF_8));
}
} finally {
// All CF handles must be closed before we close DB.
columnFamilyHandleList.stream().forEach(ColumnFamilyHandle::close);
}
}
}
}
}