mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-27 11:43:49 +00:00
f7975ac733
Summary: Each assoc is identified by (id1, assocType). This is the rowkey. Each row has a read/write rowlock. There is statically allocated array of 2000 read/write locks. A rowkey is murmur-hashed to one of the read/write locks. assocPut and assocDelete acquires the rowlock in Write mode. The key-updates are done within the rowlock with a atomic nosync batch write to leveldb. Then the rowlock is released and a write-with-sync is done to sync leveldb transaction log. Test Plan: added unit test Reviewers: heyongqiang Reviewed By: heyongqiang Differential Revision: https://reviews.facebook.net/D5859
357 lines
9.9 KiB
C++
357 lines
9.9 KiB
C++
/**
|
|
* Tests for Thrift server for leveldb
|
|
* @author Dhruba Borthakur (dhruba@gmail.com)
|
|
* Copyright 2012 Facebook
|
|
*/
|
|
#include <protocol/TBinaryProtocol.h>
|
|
#include <transport/TSocket.h>
|
|
#include <transport/TBufferTransports.h>
|
|
#include <util/testharness.h>
|
|
#include <DB.h>
|
|
#include <AssocService.h>
|
|
#include <leveldb_types.h>
|
|
#include "server_options.h"
|
|
|
|
using namespace apache::thrift;
|
|
using namespace apache::thrift::protocol;
|
|
using namespace apache::thrift::transport;
|
|
using boost::shared_ptr;
|
|
using namespace Tleveldb;
|
|
|
|
extern "C" void startServer(int argc, char**argv);
|
|
extern "C" void stopServer(int port);
|
|
extern ServerOptions server_options;
|
|
|
|
static DBHandle dbhandle;
|
|
static DBClient* dbclient;
|
|
static AssocServiceClient* aclient;
|
|
static const Text dbname = "test-dhruba";
|
|
static int ARGC;
|
|
static char** ARGV;
|
|
|
|
static void cleanupDir(std::string dir) {
|
|
// remove old data, if any
|
|
char* cleanup = new char[100];
|
|
snprintf(cleanup, 100, "rm -rf %s", dir.c_str());
|
|
system(cleanup);
|
|
}
|
|
|
|
static void createDatabase() {
|
|
DBOptions options;
|
|
options.create_if_missing = true; // create
|
|
options.error_if_exists = false; // overwrite
|
|
options.write_buffer_size = (4<<20); // 4 MB
|
|
options.max_open_files = 1000;
|
|
options.block_size = 4096;
|
|
options.block_restart_interval = 16;
|
|
options.compression = kSnappyCompression;
|
|
|
|
cleanupDir(server_options.getDataDirectory(dbname));
|
|
|
|
// create the database
|
|
dbclient->Open(dbhandle, dbname, options);
|
|
}
|
|
|
|
static void initialize(int port) {
|
|
boost::shared_ptr<TSocket> socket1(new TSocket("localhost", port));
|
|
boost::shared_ptr<TTransport> transport1(new TBufferedTransport(socket1));
|
|
boost::shared_ptr<TProtocol> protocol1(new TBinaryProtocol(transport1));
|
|
|
|
// open database
|
|
dbclient = new DBClient(protocol1);
|
|
transport1->open();
|
|
|
|
boost::shared_ptr<TSocket> socket2(new TSocket("localhost", port+1));
|
|
boost::shared_ptr<TTransport> transport2(new TBufferedTransport(socket2));
|
|
boost::shared_ptr<TProtocol> protocol2(new TBinaryProtocol(transport2));
|
|
aclient = new AssocServiceClient(protocol2);
|
|
transport2->open();
|
|
|
|
createDatabase();
|
|
printf("Database created.\n");
|
|
}
|
|
|
|
//
|
|
// Run base leveldb thrift server get/put/iter/scan tests
|
|
//
|
|
static void testClient() {
|
|
WriteOptions writeOptions;
|
|
printf("Running base leveldb operations .................\n");
|
|
|
|
// insert record into leveldb
|
|
Slice key;
|
|
key.data = "Key1";
|
|
key.size = 4;
|
|
Slice value;
|
|
value.data = "value1";
|
|
value.size = 6;
|
|
kv keyvalue;
|
|
keyvalue.key = key;
|
|
keyvalue.value = value;
|
|
int ret = dbclient->Put(dbhandle, keyvalue, writeOptions);
|
|
ASSERT_TRUE(ret == Code::kOk);
|
|
printf("Put Key1 suceeded.\n");
|
|
|
|
//read it back
|
|
ReadOptions readOptions;
|
|
ResultItem rValue;
|
|
dbclient->Get(rValue, dbhandle, key, readOptions);
|
|
ASSERT_TRUE(rValue.status == Code::kOk);
|
|
ASSERT_TRUE(value.data.compare(rValue.value.data) == 0);
|
|
ASSERT_TRUE(value.size == rValue.value.size);
|
|
printf("Get suceeded.\n");
|
|
|
|
// get a snapshot
|
|
ResultSnapshot rsnap;
|
|
dbclient->GetSnapshot(rsnap, dbhandle);
|
|
ASSERT_TRUE(rsnap.status == Code::kOk);
|
|
ASSERT_TRUE(rsnap.snapshot.snapshotid > 0);
|
|
printf("Snapshot suceeded.\n");
|
|
|
|
// insert a new record into leveldb
|
|
Slice key2;
|
|
key2.data = "Key2";
|
|
key2.size = 4;
|
|
Slice value2;
|
|
value2.data = "value2";
|
|
value2.size = 6;
|
|
keyvalue.key = key2;
|
|
keyvalue.value = value2;
|
|
ret = dbclient->Put(dbhandle, keyvalue, writeOptions);
|
|
ASSERT_TRUE(ret == Code::kOk);
|
|
printf("Put Key2 suceeded.\n");
|
|
|
|
// verify that a get done with a previous snapshot does not find Key2
|
|
readOptions.snapshot = rsnap.snapshot;
|
|
dbclient->Get(rValue, dbhandle, key2, readOptions);
|
|
ASSERT_TRUE(rValue.status == Code::kNotFound);
|
|
printf("Get with snapshot succeeded.\n");
|
|
|
|
// release snapshot
|
|
ret = dbclient->ReleaseSnapshot(dbhandle, rsnap.snapshot);
|
|
ASSERT_TRUE(ret == Code::kOk);
|
|
printf("Snapshot released.\n");
|
|
|
|
// if we try to re-release the same snapshot, it should fail
|
|
ret = dbclient->ReleaseSnapshot(dbhandle, rsnap.snapshot);
|
|
ASSERT_TRUE(ret == Code::kNotFound);
|
|
|
|
// compact whole database
|
|
Slice range;
|
|
range.size = 0;
|
|
ret = dbclient->CompactRange(dbhandle, range, range);
|
|
ASSERT_TRUE(ret == Code::kOk);
|
|
printf("Compaction trigger suceeded.\n");
|
|
|
|
// create a new iterator to scan all keys from the start
|
|
Slice target;
|
|
ResultIterator ri;
|
|
readOptions.snapshot.snapshotid = 0;
|
|
dbclient->NewIterator(ri, dbhandle, readOptions,
|
|
IteratorType::seekToFirst, target);
|
|
ASSERT_TRUE(ri.status == Code::kOk);
|
|
int foundPairs = 0;
|
|
while (true) {
|
|
ResultPair pair;
|
|
dbclient->GetNext(pair, dbhandle, ri.iterator);
|
|
if (pair.status == Code::kOk) {
|
|
foundPairs++;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
ASSERT_TRUE(foundPairs == 2);
|
|
ret = dbclient->DeleteIterator(dbhandle, ri.iterator);
|
|
ASSERT_TRUE(ret == Code::kOk);
|
|
printf("Iterator scan-all forward passes.\n");
|
|
|
|
// create a new iterator, position at end and scan backwards
|
|
readOptions.snapshot.snapshotid = 0;
|
|
dbclient->NewIterator(ri, dbhandle, readOptions,
|
|
IteratorType::seekToLast, target);
|
|
ASSERT_TRUE(ri.status == Code::kOk);
|
|
foundPairs = 0;
|
|
while (true) {
|
|
ResultPair pair;
|
|
dbclient->GetPrev(pair, dbhandle, ri.iterator);
|
|
if (pair.status == Code::kOk) {
|
|
foundPairs++;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
ASSERT_TRUE(foundPairs == 2);
|
|
ret = dbclient->DeleteIterator(dbhandle, ri.iterator);
|
|
ASSERT_TRUE(ret == Code::kOk);
|
|
printf("Iterator scan-all backward passes.\n");
|
|
|
|
// create a new iterator, position at middle
|
|
readOptions.snapshot.snapshotid = 0;
|
|
target = key;
|
|
dbclient->NewIterator(ri, dbhandle, readOptions,
|
|
IteratorType::seekToKey, target);
|
|
ASSERT_TRUE(ri.status == Code::kOk);
|
|
foundPairs = 0;
|
|
while (true) {
|
|
ResultPair pair;
|
|
dbclient->GetPrev(pair, dbhandle, ri.iterator);
|
|
if (pair.status == Code::kOk) {
|
|
foundPairs++;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
ASSERT_TRUE(foundPairs == 1);
|
|
ret = dbclient->DeleteIterator(dbhandle, ri.iterator);
|
|
ASSERT_TRUE(ret == Code::kOk);
|
|
printf("Iterator scan-selective backward passes.\n");
|
|
}
|
|
|
|
// clean up all data that we inserted as part of this test
|
|
void clearDatabase() {
|
|
WriteOptions writeOptions;
|
|
ReadOptions readOptions;
|
|
readOptions.snapshot.snapshotid = 0;
|
|
printf("Clearing entire database.\n");
|
|
|
|
ResultIterator ri;
|
|
Slice dummy;
|
|
dbclient->NewIterator(ri, dbhandle, readOptions,
|
|
IteratorType::seekToFirst, dummy);
|
|
ASSERT_TRUE(ri.status == Code::kOk);
|
|
while (true) {
|
|
ResultPair pair;
|
|
dbclient->GetNext(pair, dbhandle, ri.iterator);
|
|
if (pair.status == Code::kOk) {
|
|
Slice key = pair.keyvalue.key;
|
|
Code code = dbclient->Delete(dbhandle, key, writeOptions);
|
|
ASSERT_EQ(code, Code::kOk);
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
// no need to invoke DeleteIterator because we scanned
|
|
// till the end using the iterator and ist is auto-deleted
|
|
// by the server.
|
|
}
|
|
|
|
//
|
|
// Run assoc tests
|
|
//
|
|
static void testAssocs() {
|
|
WriteOptions writeOptions;
|
|
printf("Running assoc leveldb operations ................\n");
|
|
|
|
// insert record into leveldb
|
|
int64_t assocType = 100;
|
|
int64_t id1 = 1;
|
|
int64_t id2 = 2;
|
|
int64_t id1Type = 101;
|
|
int64_t id2Type = 102;
|
|
int64_t ts =3333;
|
|
AssocVisibility vis = AssocVisibility::VISIBLE;
|
|
bool update_count = true;
|
|
int64_t dataVersion = 5;
|
|
const Text data = "data......";
|
|
const Text wormhole_comments = "wormhole...";
|
|
int64_t count = aclient->taoAssocPut(dbname, assocType,
|
|
id1, id2, id1Type, id2Type,
|
|
ts, vis, update_count,
|
|
dataVersion, data, wormhole_comments);
|
|
ASSERT_GE(count, 0);
|
|
printf("AssocPut first record suceeded.\n");
|
|
|
|
// verify assoc counts.
|
|
int64_t cnt = aclient->taoAssocCount(dbname, assocType, id1);
|
|
ASSERT_EQ(cnt, 1);
|
|
printf("AssocCount suceeded.\n");
|
|
|
|
// verify that we can read back what we inserted earlier
|
|
std::vector<int64_t> id2list(1);
|
|
id2list[0] = id2;
|
|
std::vector<TaoAssocGetResult> readback(1);
|
|
aclient->taoAssocGet(readback, dbname,
|
|
assocType, id1, id2list);
|
|
printf("AssocGet suceeded.\n");
|
|
ASSERT_EQ((unsigned int)1, readback.size());
|
|
ASSERT_EQ(id1Type, readback[0].id1Type);
|
|
ASSERT_EQ(id2Type, readback[0].id2Type);
|
|
ASSERT_EQ(ts, readback[0].time);
|
|
ASSERT_EQ(dataVersion, readback[0].dataVersion);
|
|
ASSERT_EQ(readback[0].data.compare(data), 0);
|
|
|
|
// add one more assoc
|
|
const Text data1 = "data1......";
|
|
count = aclient->taoAssocPut(dbname, assocType,
|
|
id1, id2+2, id1Type+1, id2Type+1,
|
|
ts+1, vis, update_count,
|
|
dataVersion+1, data1, wormhole_comments);
|
|
ASSERT_EQ(count, 2);
|
|
printf("AssocPut second record suceeded.\n");
|
|
|
|
// verify assoc count is 2
|
|
cnt = aclient->taoAssocCount(dbname, assocType, id1);
|
|
ASSERT_EQ(cnt, 2);
|
|
|
|
// do a range get for id1+type and verify that there
|
|
// are two assocs.
|
|
readback.clear();
|
|
int64_t offset = 0;
|
|
int64_t limit = 1000;
|
|
aclient->taoAssocRangeGet(readback, dbname, assocType,
|
|
id1, LONG_MAX, 0,
|
|
offset, limit);
|
|
ASSERT_EQ((unsigned int)2, readback.size());
|
|
|
|
// Delete the most recent assoc
|
|
int c = aclient->taoAssocDelete(dbname, assocType,
|
|
id1, id2+2, AssocVisibility::HIDDEN, true, "");
|
|
ASSERT_EQ(c, 1);
|
|
|
|
// verify assoc falls back to 1.
|
|
cnt = aclient->taoAssocCount(dbname, assocType, id1);
|
|
ASSERT_EQ(cnt, 1);
|
|
printf("AssocCount suceeded.\n");
|
|
}
|
|
|
|
//
|
|
// close all resources
|
|
//
|
|
static void close() {
|
|
// close database
|
|
dbclient->Close(dbhandle, dbname);
|
|
// transport->close();
|
|
}
|
|
|
|
|
|
int main(int argc, char **argv) {
|
|
|
|
ARGC = argc;
|
|
ARGV = argv;
|
|
|
|
// create a server
|
|
startServer(argc, argv);
|
|
printf("Server thread created.\n");
|
|
|
|
// give some time to the server to initialize itself
|
|
while (server_options.getPort() == 0) {
|
|
sleep(1);
|
|
}
|
|
|
|
// test client
|
|
initialize(server_options.getPort());
|
|
|
|
// run all tests
|
|
testClient();
|
|
clearDatabase();
|
|
|
|
testAssocs();
|
|
clearDatabase();
|
|
|
|
// done all tests
|
|
close();
|
|
|
|
return 0;
|
|
}
|
|
|