From d5503208cf0cca33f586938c28d4a9ef1d083976 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Wed, 11 Jul 2012 14:08:46 -0700 Subject: [PATCH] Share a single cache for all the DBs served by this server. Summary: Test Plan: Reviewers: CC: Task ID: # Blame Rev: --- build_detect_platform | 2 +- thrift/if/leveldb.thrift | 3 +- thrift/openhandles.h | 10 +- thrift/server.cpp | 10 +- thrift/server_options.cpp | 11 ++ thrift/server_options.h | 212 +++++++++++++++++++++++++++++++++++++ thrift/server_utils.cpp | 50 ++++++++- thrift/test/simpletest.cpp | 36 ++++--- 8 files changed, 307 insertions(+), 27 deletions(-) create mode 100644 thrift/server_options.cpp create mode 100644 thrift/server_options.h diff --git a/build_detect_platform b/build_detect_platform index a40d78c7a0..59f6b23a2a 100755 --- a/build_detect_platform +++ b/build_detect_platform @@ -102,7 +102,7 @@ esac # prune take effect. DIRS="util db table" if test "$USE_THRIFT"; then - DIRS+=" thrift/gen-cpp thrift/server_utils.cpp" + DIRS+=" thrift/gen-cpp thrift/server_utils.cpp thrift/server_options.cpp" THRIFTSERVER=leveldb_server fi set -f # temporarily disable globbing so that our patterns aren't expanded diff --git a/thrift/if/leveldb.thrift b/thrift/if/leveldb.thrift index b0de6326ef..fbf078a121 100644 --- a/thrift/if/leveldb.thrift +++ b/thrift/if/leveldb.thrift @@ -121,7 +121,8 @@ exception LeveldbException { // The Database service service DB { - // opens the database + // opens the database. The database name cannot have "/" + // in its name. DBHandle Open(1:Text dbname, 2:DBOptions dboptions) throws (1:LeveldbException se), diff --git a/thrift/openhandles.h b/thrift/openhandles.h index 4b2f40832f..4262be2ca0 100644 --- a/thrift/openhandles.h +++ b/thrift/openhandles.h @@ -4,6 +4,9 @@ * Copyright 2012 Facebook */ +#ifndef THRIFT_LEVELDB_SERVER_H_ +#define THRIFT_LEVELDB_SERVER_H_ + #include #include #include "DB.h" @@ -142,15 +145,15 @@ class OpenHandles { // Inserts a new database into the list. // If the database is already open, increase refcount. // If the database is not already open, open and insert into list. - int64_t add(leveldb::Options& options, Text dbname) { + int64_t add(leveldb::Options& options, Text dbname, std::string dbdir) { struct onehandle* found = head_[dbname]; if (found == NULL) { found = new onehandle; found->name = dbname; found->uniqueid = dbnum_++; - fprintf(stderr, "openhandle.add: Opening leveldb directory %s\n", + fprintf(stderr, "openhandle.add: Opening leveldb DB %s\n", dbname.c_str()); - leveldb::Status status = leveldb::DB::Open(options, dbname, &found->onedb); + leveldb::Status status = leveldb::DB::Open(options, dbdir, &found->onedb); if (!status.ok()) { LeveldbException e; e.errorCode = Code::kIOError; @@ -223,5 +226,6 @@ class OpenHandles { } }; +#endif // THRIFT_LEVELDB_SERVER_H_ diff --git a/thrift/server.cpp b/thrift/server.cpp index 5ffca33ac3..2c6682edf1 100644 --- a/thrift/server.cpp +++ b/thrift/server.cpp @@ -13,6 +13,7 @@ #include #include #include "openhandles.h" +#include "server_options.h" #include "leveldb/db.h" #include "leveldb/write_batch.h" @@ -24,23 +25,22 @@ using namespace apache::thrift::server; using namespace Tleveldb; using boost::shared_ptr; -extern "C" void startServer(int port); +extern "C" void startServer(int argc, char** argv); extern "C" void stopServer(int port); - -static int port = 9090; +extern ServerOptions server_options; void signal_handler(int sig) { switch (sig) { case SIGINT: fprintf(stderr, "Received SIGINT, stopping leveldb server"); - stopServer(port); + stopServer(server_options.getPort()); break; } } int main(int argc, char **argv) { signal(SIGINT, signal_handler); - startServer(port); + startServer(argc, argv); return 0; } diff --git a/thrift/server_options.cpp b/thrift/server_options.cpp new file mode 100644 index 0000000000..4316672f45 --- /dev/null +++ b/thrift/server_options.cpp @@ -0,0 +1,11 @@ +/** + * Options for the Thrift leveldb server. + * @author Dhruba Borthakur (dhruba@gmail.com) + * Copyright 2012 Facebook + **/ +#include +#include "server_options.h" + +const std::string ServerOptions::DEFAULT_HOST = "hostname"; +const std::string ServerOptions::DEFAULT_ROOTDIR = "/tmp/ldb/"; + diff --git a/thrift/server_options.h b/thrift/server_options.h new file mode 100644 index 0000000000..3f99e716d2 --- /dev/null +++ b/thrift/server_options.h @@ -0,0 +1,212 @@ +/** + * Options for the Thrift leveldb server. + * @author Dhruba Borthakur (dhruba@gmail.com) + * Copyright 2012 Facebook + */ + +#ifndef THRIFT_LEVELDB_SERVER_OPTIONS_ +#define THRIFT_LEVELDB_SERVER_OPTIONS_ + +#include +#include + +#include "leveldb/db.h" +#include "leveldb/cache.h" + +// +// These are configuration options for the entire server. +// +class ServerOptions { + private: + int num_threads_; // number of thrift server threads + int cache_numshardbits_; // cache shards + long cache_size_; // cache size in bytes + int port_; // port number + std::string hostname_; // host name of this machine + std::string rootdir_; // root directory of all DBs + leveldb::Cache* cache_; // the block cache + + // Number of concurrent threads to run. + const static int DEFAULT_threads = 1; + + // Number of bytes to use as a cache of uncompressed data. + // Default setting of 100 MB + const static long DEFAULT_cache_size = 100 * 1024 * 1024; + + // Number of shards for the block cache is 2 ** DEFAULT_cache_numshardbits. + // Negative means use default settings. This is applied only + // if DEFAULT_cache_size is non-negative. + const static int DEFAULT_cache_numshardbits = 6; + + // default port + const static int DEFAULT_PORT = 6666; + + // default machine name + const static std::string DEFAULT_HOST; + + // default directory where the server stores all its data + const static std::string DEFAULT_ROOTDIR; + +public: + ServerOptions() : num_threads_(DEFAULT_threads), + cache_numshardbits_(DEFAULT_cache_numshardbits), + cache_size_(DEFAULT_cache_size), + port_(DEFAULT_PORT), + hostname_(DEFAULT_HOST), + rootdir_(DEFAULT_ROOTDIR + DEFAULT_HOST), + cache_(NULL) { + char buf[100]; + if (gethostname(buf, sizeof(buf)) == 0) { + hostname_ = buf; + } + } + + // + // Returns succes if all command line options are parsed successfully, + // otherwise returns false. + bool parseOptions(int argc, char** argv) { + int n; + long l; + char junk; + char* cports = NULL; + for (int i = 1; i < argc; i++) { + if (sscanf(argv[i], "--port=%d%c", &n, &junk) == 1) { + port_ = n; + } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) { + num_threads_ = n; + } else if (sscanf(argv[i], "--cache_size=%ld%c", &n, &junk) == 1) { + cache_size_ = n; + } else if (sscanf(argv[i], "--cache_numshardbits=%d%c", &n, &junk) == 1) { + cache_numshardbits_ = n; + } else if (strncmp(argv[i], "--hostname=", 10) == 0) { + hostname_ = argv[i] + 10; + } else if (strncmp(argv[i], "--rootdir=", 9) == 0) { + rootdir_ = argv[i] + 9; + } else { + fprintf(stderr, "Invalid flag '%s'\n", argv[i]); + return false; + } + } + return true; + } + + // Create the directory format on disk. + // Returns true on success, false on failure + bool createDirectories() { + mode_t mode = 0755; + const char* dir = getRootDirectory().c_str(); + if (mkpath(dir, mode) < 0) { + fprintf(stderr, "Unable to create root directory %s\n", dir); + return false; + } + dir = getDataDirectory().c_str();; + if (mkpath(dir, mode) < 0) { + fprintf(stderr, "Unable to create data directory %s\n", dir); + return false; + } + dir = getConfigDirectory().c_str();; + if (mkpath(dir, mode) < 0) { + fprintf(stderr, "Unable to create config directory %s\n", dir); + return false; + } + return true; + } + + // create a cache instance that is shared by all DBs served by this server + void createCache() { + if (cache_numshardbits_ >= 1) { + cache_ = leveldb::NewLRUCache(cache_size_, cache_numshardbits_); + } else { + cache_ = leveldb::NewLRUCache(cache_size_); + } + } + + // Returns the server port + int getPort() { + return port_; + } + + // Returns the cache + leveldb::Cache* getCache() { + return cache_; + } + + // Returns the configured number of server threads + int getNumThreads() { + return num_threads_; + } + + // Returns the root directory where the server is rooted. + // The hostname is appended to the rootdir to arrive at the directory name. + std::string getRootDirectory() { + return rootdir_ + "/" + hostname_; + } + + // Returns the directory where the server stores all users's DBs. + std::string getDataDirectory() { + return getRootDirectory() + "/userdata/"; + } + + // Returns the directory where the server stores all its configurations + std::string getConfigDirectory() { + return getRootDirectory() + "/config/"; + } + + // Returns the data directory for the specified DB + std::string getDataDirectory(const std::string& dbname) { + return getDataDirectory() + dbname; + } + + // Returns true if the DB name is valid, otherwise return false + bool isValidName(const std::string& dbname) { + // The DB name cannot have '/' in the name + if (dbname.find('/') < dbname.size()) { + return false; + } + return true; + } + + private: + static int do_mkdir(const char *path, mode_t mode) { + struct stat st; + int status = 0; + + if (stat(path, &st) != 0) { + if (mkdir(path, mode) != 0) { + status = -1; + } + } else if (!S_ISDIR(st.st_mode)) { + errno = ENOTDIR; + status = -1; + } + return(status); + } + + // mkpath - ensure all directories in path exist + static int mkpath(const char *path, mode_t mode) + { + char *pp; + char *sp; + int status; + char *newpath = strdup(path); + + status = 0; + pp = newpath; + while (status == 0 && (sp = strchr(pp, '/')) != 0) { + if (sp != pp) { + /* Neither root nor double slash in path */ + *sp = '\0'; + status = do_mkdir(newpath, mode); + *sp = '/'; + } + pp = sp + 1; + } + if (status == 0) { + status = do_mkdir(path, mode); + } + free(newpath); + return (status); + } +}; + +#endif // THRIFT_LEVELDB_SERVER_OPTIONS_ diff --git a/thrift/server_utils.cpp b/thrift/server_utils.cpp index d2f325e5cf..e409450508 100644 --- a/thrift/server_utils.cpp +++ b/thrift/server_utils.cpp @@ -17,6 +17,7 @@ #include #include #include "openhandles.h" +#include "server_options.h" #include "leveldb/db.h" #include "leveldb/write_batch.h" @@ -30,11 +31,14 @@ using namespace apache::thrift::async; using namespace Tleveldb; using boost::shared_ptr; -extern "C" void startServer(int port); +extern "C" void startServer(int argc, char** argv); extern "C" void stopServer(int port); static boost::shared_ptr tServer; +// The global object that stores the default configuration of the server +ServerOptions server_options; + class DBHandler : virtual public DBIf { public: DBHandler() { @@ -43,9 +47,21 @@ class DBHandler : virtual public DBIf { void Open(DBHandle& _return, const Text& dbname, const DBOptions& dboptions) { - printf("Open\n"); + printf("Open %s\n", dbname.c_str()); + if (!server_options.isValidName(dbname)) { + LeveldbException e; + e.errorCode = Code::kInvalidArgument; + e.message = "Bad DB name"; + fprintf(stderr, "Bad DB name %s\n", dbname.c_str()); + throw e; + } + std::string dbdir = server_options.getDataDirectory(dbname); leveldb::Options options; - leveldb::DB* db; + + // fill up per-server options + options.block_cache = server_options.getCache(); + + // fill up per-DB options options.create_if_missing = dboptions.create_if_missing; options.error_if_exists = dboptions.error_if_exists; options.write_buffer_size = dboptions.write_buffer_size; @@ -57,7 +73,7 @@ class DBHandler : virtual public DBIf { } else if (dboptions.compression == kSnappyCompression) { options.compression = leveldb::kSnappyCompression; } - int64_t session = openHandles->add(options, dbname); + int64_t session = openHandles->add(options, dbname, dbdir); _return.dbname = dbname; _return.handleid = session; } @@ -280,6 +296,14 @@ class DBHandler : virtual public DBIf { return; } + // if iterator has encountered any corruption + if (!it->status().ok()) { + thishandle->removeIterator(iterator.iteratorid); + delete it; // cleanup + _return.status = Code::kIOError; // error in data + return; + } + // find current key-value leveldb::Slice key = it->key(); leveldb::Slice value = it->value(); @@ -372,7 +396,21 @@ class DBHandler : virtual public DBIf { }; // Starts a very simple thrift server -void startServer(int port) { +void startServer(int argc, char** argv) { + + // process command line options + if (!server_options.parseOptions(argc, argv)) { + exit(1); + } + + // create directories for server + if (!server_options.createDirectories()) { + exit(1); + } + // create the server's block cache + server_options.createCache(); + + int port = server_options.getPort(); shared_ptr handler(new DBHandler()); shared_ptr processor(new DBProcessor(handler)); shared_ptr serverTransport(new TServerSocket(port)); @@ -380,6 +418,8 @@ void startServer(int port) { shared_ptr protocolFactory(new TBinaryProtocolFactory()); TSimpleServer tServer(processor, serverTransport, transportFactory, protocolFactory); + fprintf(stderr, "Server started on port %d\n", port); + tServer.serve(); } diff --git a/thrift/test/simpletest.cpp b/thrift/test/simpletest.cpp index 43d805ed29..6052655183 100644 --- a/thrift/test/simpletest.cpp +++ b/thrift/test/simpletest.cpp @@ -8,6 +8,7 @@ #include #include #include +#include "server_options.h" using namespace apache::thrift; using namespace apache::thrift::protocol; @@ -15,14 +16,24 @@ using namespace apache::thrift::transport; using boost::shared_ptr; using namespace Tleveldb; -extern "C" void startServer(int port); +extern "C" void startServer(int argc, char**argv); extern "C" void stopServer(int port); +extern ServerOptions server_options; static DBHandle dbhandle; static DBClient* dbclient; -static const Text dbname = "/tmp/leveldb/test"; -static const int myport = 9091; +static const Text dbname = "test"; static pthread_t serverThread; +static int ARGC; +static char** ARGV; + +static void cleanupDir(std::string dir) { + // remove old data, if any + int ret = unlink(dir.c_str()); + char* cleanup = new char[100]; + snprintf(cleanup, 100, "rm -rf %s", dir.c_str()); + system(cleanup); +} static void createDatabase() { DBOptions options; @@ -34,6 +45,8 @@ static void createDatabase() { options.block_restart_interval = 16; options.compression = kSnappyCompression; + cleanupDir(server_options.getDataDirectory(dbname)); + // create the database dbclient->Open(dbhandle, dbname, options); } @@ -185,27 +198,26 @@ static void testClient(int port) { static void* startTestServer(void *arg) { - printf("Server starting on port %d...\n", myport); - startServer(myport); + printf("Server test server\n"); + startServer(ARGC, ARGV); } int main(int argc, char **argv) { - // remove old data, if any - int ret = unlink(dbname.c_str()); - char* cleanup = new char[100]; - snprintf(cleanup, 100, "rm -rf %s", dbname.c_str()); - system(cleanup); + ARGC = argc; + ARGV = argv; // create a server int rc = pthread_create(&serverThread, NULL, startTestServer, NULL); printf("Server thread created.\n"); // give some time to the server to initialize itself - sleep(1); + while (server_options.getPort() == 0) { + sleep(1); + } // test client - testClient(myport); + testClient(server_options.getPort()); return 0; }