Create C API function to iterate over WriteBatch for custom Column Families (#12718)

Summary:
Create C API function for iterating over WriteBatch for custom Column Families
Adding function to C API that exposes column family specific methods to iterate over WriteBatch: put_cf, delete_cf and merge_cf. This is required when the one needs to read changes for any non-default column family. Without that functionality it is impossible to iterate over changes in WAL that are relevant to custom column families.

Fixes https://github.com/facebook/rocksdb/issues/12790

Testing:
Added WriteBatch iteration test to "columnfamilies" section of C API unit tests

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12718

Reviewed By: cbi42

Differential Revision: D59483601

Pulled By: ajkr

fbshipit-source-id: b68b900636304528a38620a8c3ad82fdce4b60cb
This commit is contained in:
Konstantin Ilin 2024-07-09 12:05:08 -07:00 committed by Facebook GitHub Bot
parent b837d41ab1
commit 5ecb92760a
4 changed files with 137 additions and 0 deletions

41
db/c.cc
View File

@ -2286,6 +2286,32 @@ class H : public WriteBatch::Handler {
} }
}; };
class HCF : public WriteBatch::Handler {
public:
void* state_;
void (*put_cf_)(void*, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen);
void (*deleted_cf_)(void*, uint32_t cfid, const char* k, size_t klen);
void (*merge_cf_)(void*, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen);
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
(*put_cf_)(state_, column_family_id, key.data(), key.size(), value.data(),
value.size());
return Status::OK();
}
Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
(*deleted_cf_)(state_, column_family_id, key.data(), key.size());
return Status::OK();
}
Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
(*merge_cf_)(state_, column_family_id, key.data(), key.size(), value.data(),
value.size());
return Status::OK();
}
};
void rocksdb_writebatch_iterate(rocksdb_writebatch_t* b, void* state, void rocksdb_writebatch_iterate(rocksdb_writebatch_t* b, void* state,
void (*put)(void*, const char* k, size_t klen, void (*put)(void*, const char* k, size_t klen,
const char* v, size_t vlen), const char* v, size_t vlen),
@ -2298,6 +2324,21 @@ void rocksdb_writebatch_iterate(rocksdb_writebatch_t* b, void* state,
b->rep.Iterate(&handler); b->rep.Iterate(&handler);
} }
void rocksdb_writebatch_iterate_cf(
rocksdb_writebatch_t* b, void* state,
void (*put_cf)(void*, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen),
void (*deleted_cf)(void*, uint32_t cfid, const char* k, size_t klen),
void (*merge_cf)(void*, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen)) {
HCF handler;
handler.state_ = state;
handler.put_cf_ = put_cf;
handler.deleted_cf_ = deleted_cf;
handler.merge_cf_ = merge_cf;
b->rep.Iterate(&handler);
}
const char* rocksdb_writebatch_data(rocksdb_writebatch_t* b, size_t* size) { const char* rocksdb_writebatch_data(rocksdb_writebatch_t* b, size_t* size) {
*size = b->rep.GetDataSize(); *size = b->rep.GetDataSize();
return b->rep.Data().c_str(); return b->rep.Data().c_str();

View File

@ -199,6 +199,79 @@ static void CheckDel(void* ptr, const char* k, size_t klen) {
(*state)++; (*state)++;
} }
// Callback from rocksdb_writebatch_iterate_cf()
static void CheckPutCF(void* ptr, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen) {
int* state = (int*)ptr;
switch (*state) {
case 0:
CheckEqual("bar", k, klen);
CheckEqual("b", v, vlen);
CheckCondition(cfid == 1);
break;
case 1:
CheckEqual("box", k, klen);
CheckEqual("c", v, vlen);
CheckCondition(cfid == 1);
break;
case 4:
CheckEqual("foo", k, klen);
CheckEqual("f", v, vlen);
CheckCondition(cfid == 0);
break;
case 6:
CheckEqual("baz", k, klen);
CheckEqual("a", v, vlen);
CheckCondition(cfid == 0);
break;
default:
CheckCondition(false);
break;
}
(*state)++;
}
// Callback from rocksdb_writebatch_iterate_cf()
static void CheckDelCF(void* ptr, uint32_t cfid, const char* k, size_t klen) {
int* state = (int*)ptr;
switch (*state) {
case 2:
CheckEqual("bar", k, klen);
CheckCondition(cfid == 1);
break;
case 5:
CheckEqual("foo", k, klen);
CheckCondition(cfid == 0);
break;
default:
CheckCondition(false);
break;
}
(*state)++;
}
// Callback from rocksdb_writebatch_iterate_cf()
static void CheckMergeCF(void* ptr, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen) {
int* state = (int*)ptr;
switch (*state) {
case 3:
CheckEqual("box", k, klen);
CheckEqual("cc", v, vlen);
CheckCondition(cfid == 1);
break;
case 7:
CheckEqual("baz", k, klen);
CheckEqual("aa", v, vlen);
CheckCondition(cfid == 0);
break;
default:
CheckCondition(false);
break;
}
(*state)++;
}
static void CmpDestroy(void* arg) { (void)arg; } static void CmpDestroy(void* arg) { (void)arg; }
static int CmpCompare(void* arg, const char* a, size_t alen, const char* b, static int CmpCompare(void* arg, const char* a, size_t alen, const char* b,
@ -1688,6 +1761,21 @@ int main(int argc, char** argv) {
CheckPinGetCF(db, roptions, handles[1], "bar", NULL); CheckPinGetCF(db, roptions, handles[1], "bar", NULL);
CheckPinGetCF(db, roptions, handles[1], "box", "c"); CheckPinGetCF(db, roptions, handles[1], "box", "c");
CheckPinGetCF(db, roptions, handles[1], "buff", "rocksdb"); CheckPinGetCF(db, roptions, handles[1], "buff", "rocksdb");
rocksdb_writebatch_clear(wb);
// Test WriteBatch iteration with Column Family
int pos = 0;
rocksdb_writebatch_put_cf(wb, handles[1], "bar", 3, "b", 1);
rocksdb_writebatch_put_cf(wb, handles[1], "box", 3, "c", 1);
rocksdb_writebatch_delete_cf(wb, handles[1], "bar", 3);
rocksdb_writebatch_merge_cf(wb, handles[1], "box", 3, "cc", 2);
rocksdb_writebatch_put(wb, "foo", 3, "f", 1);
rocksdb_writebatch_delete(wb, "foo", 3);
rocksdb_writebatch_put(wb, "baz", 3, "a", 1);
rocksdb_writebatch_merge(wb, "baz", 3, "aa", 2);
rocksdb_writebatch_iterate_cf(wb, &pos, CheckPutCF, CheckDelCF,
CheckMergeCF);
CheckCondition(pos == 8);
rocksdb_writebatch_clear(wb);
rocksdb_writebatch_destroy(wb); rocksdb_writebatch_destroy(wb);
rocksdb_flush_wal(db, 1, &err); rocksdb_flush_wal(db, 1, &err);

View File

@ -863,6 +863,13 @@ extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_iterate(
rocksdb_writebatch_t*, void* state, rocksdb_writebatch_t*, void* state,
void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen), void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen),
void (*deleted)(void*, const char* k, size_t klen)); void (*deleted)(void*, const char* k, size_t klen));
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_iterate_cf(
rocksdb_writebatch_t*, void* state,
void (*put_cf)(void*, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen),
void (*deleted_cf)(void*, uint32_t cfid, const char* k, size_t klen),
void (*merge_cf)(void*, uint32_t cfid, const char* k, size_t klen,
const char* v, size_t vlen));
extern ROCKSDB_LIBRARY_API const char* rocksdb_writebatch_data( extern ROCKSDB_LIBRARY_API const char* rocksdb_writebatch_data(
rocksdb_writebatch_t*, size_t* size); rocksdb_writebatch_t*, size_t* size);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_set_save_point( extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_set_save_point(

View File

@ -0,0 +1 @@
Introduced new C API function rocksdb_writebatch_iterate_cf for column family-aware iteration over the contents of a WriteBatch