rocksdb/thrift/server_utils.cpp

492 lines
16 KiB
C++

/**
* Thrift server for leveldb
* @author Dhruba Borthakur (dhruba@gmail.com)
* Copyright 2012 Facebook
*/
#include <signal.h>
#include <DB.h>
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <server/TConnectionContext.h>
#include <transport/TServerSocket.h>
#include <transport/TBufferTransports.h>
#include <async/TEventServer.h>
#include <async/TAsyncProcessor.h>
#include <async/TSyncToAsyncProcessor.h>
#include <util/TEventServerCreator.h>
#include <leveldb_types.h>
#include "openhandles.h"
#include "server_options.h"
#include "assoc.h"
#include "leveldb/db.h"
#include "leveldb/write_batch.h"
using namespace apache::thrift;
using namespace apache::thrift::util;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::server;
using namespace apache::thrift::async;
using namespace Tleveldb;
using boost::shared_ptr;
extern "C" void startServer(int argc, char** argv);
extern "C" void stopServer(int port);
static boost::shared_ptr<TServer> baseServer;
static boost::shared_ptr<TServer> assocServer;
// The global object that stores the default configuration of the server
ServerOptions server_options;
class DBHandler : virtual public DBIf {
public:
DBHandler(OpenHandles* oh) {
openHandles = oh;
}
void Open(DBHandle& _return, const Text& dbname,
const DBOptions& dboptions) {
printf("Open %s\n", dbname.c_str());
if (!server_options.isValidName(dbname)) {
LeveldbException e;
e.errorCode = Code::kInvalidArgument;
e.message = "Bad DB name";
fprintf(stderr, "Bad DB name %s\n", dbname.c_str());
throw e;
}
std::string dbdir = server_options.getDataDirectory(dbname);
leveldb::Options options;
// fill up per-server options
options.block_cache = server_options.getCache();
// fill up per-DB options
options.create_if_missing = dboptions.create_if_missing;
options.error_if_exists = dboptions.error_if_exists;
options.write_buffer_size = dboptions.write_buffer_size;
options.max_open_files = dboptions.max_open_files;
options.block_size = dboptions.block_size;
options.block_restart_interval = dboptions.block_restart_interval;
if (dboptions.compression == kNoCompression) {
options.compression = leveldb::kNoCompression;
} else if (dboptions.compression == kSnappyCompression) {
options.compression = leveldb::kSnappyCompression;
}
if (dboptions.num_levels > 0)
options.num_levels = dboptions.num_levels;
if (dboptions.level0_file_num_compaction_trigger > 0)
options.level0_file_num_compaction_trigger = dboptions.level0_file_num_compaction_trigger;
if (dboptions.level0_slowdown_writes_trigger > 0)
options.level0_slowdown_writes_trigger = dboptions.level0_slowdown_writes_trigger;
if (dboptions.level0_stop_writes_trigger)
options.level0_stop_writes_trigger = dboptions.level0_stop_writes_trigger;
if (dboptions.target_file_size_base > 0)
options.target_file_size_base = dboptions.target_file_size_base;
if (dboptions.target_file_size_multiplier > 0)
options.target_file_size_multiplier = dboptions.target_file_size_multiplier;
if (dboptions.max_bytes_for_level_base)
options.max_bytes_for_level_base = dboptions.max_bytes_for_level_base;
if (dboptions.max_bytes_for_level_multiplier)
options.max_bytes_for_level_multiplier = dboptions.max_bytes_for_level_multiplier;
if (dboptions.max_grandparent_overlap_factor)
options.max_grandparent_overlap_factor = dboptions.max_grandparent_overlap_factor;
if (dboptions.disableDataSync)
options.disableDataSync = dboptions.disableDataSync;
openHandles->add(options, dbname, dbdir);
_return.dbname = dbname;
}
Code Close(const DBHandle& dbhandle, const Text& dbname) {
//
// We do not close any handles for now, otherwise we have to do
// some locking that will degrade performance in the normal case.
//
return Code::kNotSupported;
}
Code Put(const DBHandle& dbhandle, const kv& kv,
const WriteOptions& options) {
leveldb::WriteOptions woptions;
woptions.sync = options.sync;
woptions.disableWAL = options.disableWAL;
leveldb::Slice key, value;
key.data_ = kv.key.data.data();
key.size_ = kv.key.size;
value.data_ = kv.value.data.data();
value.size_ = kv.value.size;
leveldb::DB* db = openHandles->get(dbhandle.dbname, NULL);
if (db == NULL) {
return Code::kNotFound;
}
leveldb::Status status = db->Put(woptions, key, value);
if (status.ok()) {
return Code::kOk;
}
return Code::kIOError;
}
Code Delete(const DBHandle& dbhandle, const Slice& kv,
const WriteOptions& options) {
leveldb::WriteOptions woptions;
woptions.sync = options.sync;
woptions.disableWAL = options.disableWAL;
leveldb::Slice key;
key.data_ = kv.data.data();
key.size_ = kv.size;
leveldb::DB* db = openHandles->get(dbhandle.dbname, NULL);
if (db == NULL) {
return Code::kNotFound;
}
leveldb::Status status = db->Delete(woptions, key);
if (status.ok()) {
return Code::kOk;
}
return Code::kIOError;
}
Code Write(const DBHandle& dbhandle, const std::vector<kv> & batch,
const WriteOptions& options) {
leveldb::WriteOptions woptions;
leveldb::WriteBatch lbatch;
woptions.sync = options.sync;
woptions.disableWAL = options.disableWAL;
leveldb::Slice key, value;
for (unsigned int i = 0; i < batch.size(); i++) {
kv one = batch[i];
key.data_ = one.key.data.data();
key.size_ = one.key.size;
value.data_ = one.value.data.data();
value.size_ = one.value.size;
lbatch.Put(key, value);
}
leveldb::DB* db = openHandles->get(dbhandle.dbname, NULL);
if (db == NULL) {
return Code::kNotFound;
}
leveldb::Status status = db->Write(woptions, &lbatch);
if (status.ok()) {
return Code::kOk;
}
return Code::kIOError;
}
void Get(ResultItem& _return, const DBHandle& dbhandle, const Slice& inputkey,
const ReadOptions& options) {
struct onehandle* thishandle;
_return.status = Code::kNotFound;
std::string ret;
leveldb::Slice ikey;
ikey.data_ = inputkey.data.data();
ikey.size_ = inputkey.size;
leveldb::DB* db = openHandles->get(dbhandle.dbname, &thishandle);
if (db == NULL) {
return;
}
assert(thishandle != NULL);
const leveldb::Snapshot* s = NULL;
if (options.snapshot.snapshotid > 0) {
s = thishandle->lookupSnapshot(options.snapshot.snapshotid);
assert(s != NULL);
if (s == NULL) {
return;
}
}
leveldb::ReadOptions roptions;
roptions.verify_checksums = options.verify_checksums;
roptions.fill_cache = options.fill_cache;
roptions.snapshot = s;
leveldb::Status status = db->Get(roptions, ikey, &ret);
if (status.ok()) {
_return.value.data = ret.data();
_return.value.size = ret.size();
_return.status = Code::kOk;
}
}
void NewIterator(ResultIterator& _return, const DBHandle& dbhandle,
const ReadOptions& options, IteratorType iteratorType,
const Slice& target) {
struct onehandle* thishandle;
_return.status = Code::kNotFound;
leveldb::DB* db = openHandles->get(dbhandle.dbname, &thishandle);
if (db == NULL) {
return;
}
assert(thishandle != NULL);
// check to see if snapshot is specified
const leveldb::Snapshot* s = NULL;
if (options.snapshot.snapshotid > 0) {
s = thishandle->lookupSnapshot(options.snapshot.snapshotid);
assert(s != NULL);
if (s == NULL) {
return;
}
}
// create leveldb iterator
leveldb::ReadOptions roptions;
roptions.verify_checksums = options.verify_checksums;
roptions.fill_cache = options.fill_cache;
roptions.snapshot = s;
leveldb::Iterator* iter = db->NewIterator(roptions);
if (iter == NULL) {
return;
}
// position iterator at right place
if (iteratorType == IteratorType::seekToFirst) {
iter->SeekToFirst();
} else if (iteratorType == IteratorType::seekToLast) {
iter->SeekToLast();
} else if (iteratorType == IteratorType::seekToKey) {
leveldb::Slice key;
key.data_ = target.data.data();
key.size_ = target.size;
iter->Seek(key);
} else {
delete iter;
_return.status = Code::kInvalidArgument;
return;
}
// insert iterator into openhandle list, get unique id
int64_t id = thishandle->addIterator(iter);
_return.iterator.iteratorid = id;
_return.status = kOk;
}
// Delete existing iterator
Code DeleteIterator(const DBHandle& dbhandle, const Iterator& iterator) {
// find the db
struct onehandle* thishandle;
leveldb::DB* db = openHandles->get(dbhandle.dbname, &thishandle);
if (db == NULL) {
return kNotFound;
}
assert(thishandle != NULL);
// find the leveldb iterator for this db
leveldb::Iterator* it =
thishandle->lookupIterator(iterator.iteratorid);
if (it == NULL) {
// this must have been cleaned up by the last call to GetNext
return kOk;
}
thishandle->removeIterator(iterator.iteratorid);
delete it; // cleanup
return kOk;
}
// read the next value from the iterator
void GetAnother(ResultPair& _return, const DBHandle& dbhandle,
const Iterator& iterator, const bool doNext) {
// find the db
struct onehandle* thishandle;
_return.status = Code::kNotFound;
leveldb::DB* db = openHandles->get(dbhandle.dbname, &thishandle);
if (db == NULL) {
return;
}
assert(thishandle != NULL);
// find the leveldb iterator for this db
leveldb::Iterator* it =
thishandle->lookupIterator(iterator.iteratorid);
assert(it != NULL);
if (it == NULL) {
return;
}
// If the iterator has reached the end close it rightaway.
// There is no need for the application to make another thrift
// call to cleanup the iterator.
if (!it->Valid()) {
thishandle->removeIterator(iterator.iteratorid);
delete it; // cleanup
_return.status = Code::kEnd; // no more elements
return;
}
// if iterator has encountered any corruption
if (!it->status().ok()) {
thishandle->removeIterator(iterator.iteratorid);
delete it; // cleanup
_return.status = Code::kIOError; // error in data
return;
}
// find current key-value
leveldb::Slice key = it->key();
leveldb::Slice value = it->value();
// pack results back to client
_return.keyvalue.key.data.assign(key.data_, key.size_);
_return.keyvalue.key.size = key.size_;
_return.keyvalue.value.data.assign(value.data_, value.size_);
_return.keyvalue.value.size = value.size_;
_return.status = Code::kOk; // success
// move to next or previous value
if (doNext) {
it->Next();
} else {
it->Prev();
}
}
// read the next value from the iterator
void GetNext(ResultPair& _return, const DBHandle& dbhandle,
const Iterator& iterator) {
GetAnother(_return, dbhandle, iterator, 1);
}
// read the prev value from the iterator
void GetPrev(ResultPair& _return, const DBHandle& dbhandle,
const Iterator& iterator) {
GetAnother(_return, dbhandle, iterator, 0);
}
void GetSnapshot(ResultSnapshot& _return, const DBHandle& dbhandle) {
_return.status = kIOError;
struct onehandle* thishandle;
leveldb::DB* db = openHandles->get(dbhandle.dbname, &thishandle);
if (db == NULL) {
return;
}
// create leveldb snapshot
const leveldb::Snapshot* s = db->GetSnapshot();
// store snapshot in dbhandle, get unique id.
int64_t id = thishandle->addSnapshot(s);
_return.snapshot.snapshotid = id;
_return.status = kOk;
}
Code ReleaseSnapshot(const DBHandle& dbhandle, const Snapshot& snapshot) {
struct onehandle* thishandle;
leveldb::DB* db = openHandles->get(dbhandle.dbname, &thishandle);
if (db == NULL) {
return kNotFound;
}
const leveldb::Snapshot* s = thishandle->removeSnapshot(snapshot.snapshotid);
if (s == NULL) {
return Code::kNotFound;
}
db->ReleaseSnapshot(s); // release leveldb snapshot
return Code::kOk;
}
Code CompactRange(const DBHandle& dbhandle, const Slice& begin,
const Slice& end) {
leveldb::DB* db = openHandles->get(dbhandle.dbname, NULL);
if (db == NULL) {
return Code::kNotFound;
}
leveldb::Slice k1, *start = &k1;
k1.data_ = begin.data.data();
k1.size_ = begin.size;
leveldb::Slice k2, *stop = &k2;
k2.data_ = begin.data.data();
k2.size_ = begin.size;
// check special ranges.
if (start->size_ == 0) {
start = NULL;
}
if (stop->size_ == 0) {
stop = NULL;
}
db->CompactRange(start, stop);
return Code::kOk;
}
private:
OpenHandles* openHandles;
};
//
// starts a service
static void* startOneService(void *arg) {
TSimpleServer* t = (TSimpleServer *)arg;
t->serve();
return NULL;
}
// Starts a very simple thrift server
void startServer(int argc, char** argv) {
// process command line options
if (!server_options.parseOptions(argc, argv)) {
exit(1);
}
// create directories for server
if (!server_options.createDirectories()) {
exit(1);
}
// create the server's block cache
server_options.createCache();
// data structure to record ope databases
OpenHandles* openHandles = new OpenHandles();
shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
// create the service to process the normal get/put to leveldb.
int port = server_options.getPort();
fprintf(stderr, "Server starting on port %d\n", port);
shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
shared_ptr<DBHandler> handler(new DBHandler(openHandles));
shared_ptr<TProcessor> processor(new DBProcessor(handler));
TSimpleServer* baseServer = new TSimpleServer(processor, serverTransport,
transportFactory, protocolFactory);
pthread_t dbServerThread;
int rc = pthread_create(&dbServerThread, NULL, startOneService,
(void *)baseServer);
if (rc != 0) {
fprintf(stderr, "Unable to start DB server on port %d\n.", port);
exit(1);
}
// create the service to process the assoc get/put to leveldb.
int assocport = server_options.getAssocPort();
fprintf(stderr, "Assoc Service starting on port %d\n", assocport);
shared_ptr<TServerTransport> assocTransport(new TServerSocket(assocport));
shared_ptr<AssocServiceHandler> assocHandler(new AssocServiceHandler(openHandles));
shared_ptr<TProcessor> assocProcessor(new AssocServiceProcessor(assocHandler));
TSimpleServer* assocServer = new TSimpleServer(assocProcessor,
assocTransport, transportFactory, protocolFactory);
pthread_t assocServerThread;
rc = pthread_create(&assocServerThread, NULL, startOneService,
(void *)assocServer);
if (rc != 0) {
fprintf(stderr, "Unable to start assoc server on port %d\n.", port);
exit(1);
}
}
/**
void startEventServer(int port) {
shared_ptr<DBHandler> handler(new DBHandler());
shared_ptr<TProcessor> processor(new DBProcessor(handler));
shared_ptr<TAsyncProcessor> asyncProcessor(new TSyncToAsyncProcessor(processor));
TEventServerCreator creator(asyncProcessor, (uint16_t)port, 2);
tServer = creator.createServer();
tServer.serve();
}
**/
// Stops the thrift server
void stopServer(int port) {
baseServer->stop();
assocServer->stop();
}