diff --git a/Makefile b/Makefile index 63b467b393..1514c0b77e 100644 --- a/Makefile +++ b/Makefile @@ -130,8 +130,12 @@ am__v_AR_ = $(am__v_AR_$(AM_DEFAULT_VERBOSITY)) am__v_AR_0 = @echo " AR " $@; am__v_AR_1 = -AM_LINK = $(AM_V_CCLD)$(CXX) $^ $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +ifdef ROCKSDB_USE_LIBRADOS +LIB_SOURCES += utilities/env_librados.cc +LDFLAGS += -lrados +endif +AM_LINK = $(AM_V_CCLD)$(CXX) $^ $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) # detect what platform we're building on dummy := $(shell (export ROCKSDB_ROOT="$(CURDIR)"; "$(CURDIR)/build_tools/build_detect_platform" "$(CURDIR)/make_config.mk")) # this file is generated by the previous line to set build flags and sources @@ -997,6 +1001,11 @@ spatial_db_test: utilities/spatialdb/spatial_db_test.o $(LIBOBJECTS) $(TESTHARNE env_mirror_test: utilities/env_mirror_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +ifdef ROCKSDB_USE_LIBRADOS +env_librados_test: utilities/env_librados_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_V_CCLD)$(CXX) $^ $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +endif + env_registry_test: utilities/env_registry_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/include/rocksdb/utilities/env_librados.h b/include/rocksdb/utilities/env_librados.h new file mode 100644 index 0000000000..5c10ea7ccf --- /dev/null +++ b/include/rocksdb/utilities/env_librados.h @@ -0,0 +1,186 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef ROCKSDB_UTILITIES_ENV_LIBRADOS_H +#define ROCKSDB_UTILITIES_ENV_LIBRADOS_H + +#include +#include + +#include "rocksdb/status.h" +#include "rocksdb/utilities/env_mirror.h" + +#include + +namespace rocksdb { +class LibradosWritableFile; + +class EnvLibrados : public EnvWrapper { +public: + // Create a brand new sequentially-readable file with the specified name. + // On success, stores a pointer to the new file in *result and returns OK. + // On failure stores nullptr in *result and returns non-OK. If the file does + // not exist, returns a non-OK status. + // + // The returned file will only be accessed by one thread at a time. + Status NewSequentialFile( + const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options); + + // Create a brand new random access read-only file with the + // specified name. On success, stores a pointer to the new file in + // *result and returns OK. On failure stores nullptr in *result and + // returns non-OK. If the file does not exist, returns a non-OK + // status. + // + // The returned file may be concurrently accessed by multiple threads. + Status NewRandomAccessFile( + const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options); + + // Create an object that writes to a new file with the specified + // name. Deletes any existing file with the same name and creates a + // new file. On success, stores a pointer to the new file in + // *result and returns OK. On failure stores nullptr in *result and + // returns non-OK. + // + // The returned file will only be accessed by one thread at a time. + Status NewWritableFile( + const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options); + + // Reuse an existing file by renaming it and opening it as writable. + Status ReuseWritableFile( + const std::string& fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options); + + // Create an object that represents a directory. Will fail if directory + // doesn't exist. If the directory exists, it will open the directory + // and create a new Directory object. + // + // On success, stores a pointer to the new Directory in + // *result and returns OK. On failure stores nullptr in *result and + // returns non-OK. + Status NewDirectory( + const std::string& name, + std::unique_ptr* result); + + // Returns OK if the named file exists. + // NotFound if the named file does not exist, + // the calling process does not have permission to determine + // whether this file exists, or if the path is invalid. + // IOError if an IO Error was encountered + Status FileExists(const std::string& fname); + + // Store in *result the names of the children of the specified directory. + // The names are relative to "dir". + // Original contents of *results are dropped. + Status GetChildren(const std::string& dir, + std::vector* result); + + // Delete the named file. + Status DeleteFile(const std::string& fname); + + // Create the specified directory. Returns error if directory exists. + Status CreateDir(const std::string& dirname); + + // Creates directory if missing. Return Ok if it exists, or successful in + // Creating. + Status CreateDirIfMissing(const std::string& dirname); + + // Delete the specified directory. + Status DeleteDir(const std::string& dirname); + + // Store the size of fname in *file_size. + Status GetFileSize(const std::string& fname, uint64_t* file_size); + + // Store the last modification time of fname in *file_mtime. + Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime); + // Rename file src to target. + Status RenameFile(const std::string& src, + const std::string& target); + // Hard Link file src to target. + Status LinkFile(const std::string& src, const std::string& target); + + // Lock the specified file. Used to prevent concurrent access to + // the same db by multiple processes. On failure, stores nullptr in + // *lock and returns non-OK. + // + // On success, stores a pointer to the object that represents the + // acquired lock in *lock and returns OK. The caller should call + // UnlockFile(*lock) to release the lock. If the process exits, + // the lock will be automatically released. + // + // If somebody else already holds the lock, finishes immediately + // with a failure. I.e., this call does not wait for existing locks + // to go away. + // + // May create the named file if it does not already exist. + Status LockFile(const std::string& fname, FileLock** lock); + + // Release the lock acquired by a previous successful call to LockFile. + // REQUIRES: lock was returned by a successful LockFile() call + // REQUIRES: lock has not already been unlocked. + Status UnlockFile(FileLock* lock); + + // Get full directory name for this db. + Status GetAbsolutePath(const std::string& db_path, + std::string* output_path); + + // Generate unique id + std::string GenerateUniqueId(); + + // Get default EnvLibrados + static EnvLibrados* Default(); + + explicit EnvLibrados(const std::string& db_name, + const std::string& config_path, + const std::string& db_pool); + + explicit EnvLibrados(const std::string& client_name, // first 3 parameters are for RADOS client init + const std::string& cluster_name, + const uint64_t flags, + const std::string& db_name, + const std::string& config_path, + const std::string& db_pool, + const std::string& wal_dir, + const std::string& wal_pool, + const uint64_t write_buffer_size); + ~EnvLibrados() { + _rados.shutdown(); + } +private: + std::string _client_name; + std::string _cluster_name; + uint64_t _flags; + std::string _db_name; // get from user, readable string; Also used as db_id for db metadata + std::string _config_path; + librados::Rados _rados; // RADOS client + std::string _db_pool_name; + librados::IoCtx _db_pool_ioctx; // IoCtx for connecting db_pool + std::string _wal_dir; // WAL dir path + std::string _wal_pool_name; + librados::IoCtx _wal_pool_ioctx; // IoCtx for connecting wal_pool + uint64_t _write_buffer_size; // WritableFile buffer max size + + /* private function to communicate with rados */ + std::string _CreateFid(); + Status _GetFid(const std::string& fname, std::string& fid); + Status _GetFid(const std::string& fname, std::string& fid, int fid_len); + Status _RenameFid(const std::string& old_fname, const std::string& new_fname); + Status _AddFid(const std::string& fname, const std::string& fid); + Status _DelFid(const std::string& fname); + Status _GetSubFnames( + const std::string& dirname, + std::vector * result + ); + librados::IoCtx* _GetIoctx(const std::string& prefix); + friend class LibradosWritableFile; +}; +} +#endif diff --git a/utilities/env_librados.cc b/utilities/env_librados.cc new file mode 100644 index 0000000000..6aca09f4fa --- /dev/null +++ b/utilities/env_librados.cc @@ -0,0 +1,1498 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rocksdb/utilities/env_librados.h" +#include "util/random.h" +#include +#include + +namespace rocksdb { +/* GLOBAL DIFINE */ +// #define DEBUG +#ifdef DEBUG +#include +#include +#include +#define LOG_DEBUG(...) do{\ + printf("[%ld:%s:%i:%s]", syscall(SYS_gettid), __FILE__, __LINE__, __FUNCTION__);\ + printf(__VA_ARGS__);\ + }while(0) +#else +#define LOG_DEBUG(...) +#endif + +/* GLOBAL CONSTANT */ +const char *default_db_name = "default_envlibrados_db"; +const char *default_pool_name = "default_envlibrados_pool"; +const char *default_config_path = "CEPH_CONFIG_PATH"; // the env variable name of ceph configure file +// maximum dir/file that can store in the fs +const int MAX_ITEMS_IN_FS = 1 << 30; +// root dir tag +const std::string ROOT_DIR_KEY = "/"; +const std::string DIR_ID_VALUE = ""; + +/** + * @brief convert error code to status + * @details Convert internal linux error code to Status + * + * @param r [description] + * @return [description] + */ +Status err_to_status(int r) +{ + switch (r) { + case 0: + return Status::OK(); + case -ENOENT: + return Status::IOError(); + case -ENODATA: + case -ENOTDIR: + return Status::NotFound(Status::kNone); + case -EINVAL: + return Status::InvalidArgument(Status::kNone); + case -EIO: + return Status::IOError(Status::kNone); + default: + // FIXME :( + assert(0 == "unrecognized error code"); + return Status::NotSupported(Status::kNone); + } +} + +/** + * @brief split file path into dir path and file name + * @details + * Because rocksdb only need a 2-level structure (dir/file), all input path will be shortened to dir/file format + * For example: + * b/c => dir '/b', file 'c' + * /a/b/c => dir '/b', file 'c' + * + * @param fn [description] + * @param dir [description] + * @param file [description] + */ +void split(const std::string &fn, std::string *dir, std::string *file) { + LOG_DEBUG("[IN]%s\n", fn.c_str()); + int pos = fn.size() - 1; + while ('/' == fn[pos]) --pos; + size_t fstart = fn.rfind('/', pos); + *file = fn.substr(fstart + 1, pos - fstart); + + pos = fstart; + while (pos >= 0 && '/' == fn[pos]) --pos; + + if (pos < 0) { + *dir = "/"; + } else { + size_t dstart = fn.rfind('/', pos); + *dir = fn.substr(dstart + 1, pos - dstart); + *dir = std::string("/") + *dir; + } + + LOG_DEBUG("[OUT]%s | %s\n", dir->c_str(), file->c_str()); +} + +// A file abstraction for reading sequentially through a file +class LibradosSequentialFile : public SequentialFile { + librados::IoCtx * _io_ctx; + std::string _fid; + std::string _hint; + int _offset; +public: + LibradosSequentialFile(librados::IoCtx * io_ctx, std::string fid, std::string hint): + _io_ctx(io_ctx), _fid(fid), _hint(hint), _offset(0) {} + + ~LibradosSequentialFile() {} + + /** + * @brief read file + * @details + * Read up to "n" bytes from the file. "scratch[0..n-1]" may be + * written by this routine. Sets "*result" to the data that was + * read (including if fewer than "n" bytes were successfully read). + * May set "*result" to point at data in "scratch[0..n-1]", so + * "scratch[0..n-1]" must be live when "*result" is used. + * If an error was encountered, returns a non-OK status. + * + * REQUIRES: External synchronization + * + * @param n [description] + * @param result [description] + * @param scratch [description] + * @return [description] + */ + Status Read(size_t n, Slice* result, char* scratch) { + LOG_DEBUG("[IN]%i\n", (int)n); + librados::bufferlist buffer; + Status s; + int r = _io_ctx->read(_fid, buffer, n, _offset); + if (r >= 0) { + buffer.copy(0, r, scratch); + *result = Slice(scratch, r); + _offset += r; + s = Status::OK(); + } else { + s = err_to_status(r); + if (s == Status::IOError()) { + *result = Slice(); + s = Status::OK(); + } + } + LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str()); + return s; + } + + /** + * @brief skip "n" bytes from the file + * @details + * Skip "n" bytes from the file. This is guaranteed to be no + * slower that reading the same data, but may be faster. + * + * If end of file is reached, skipping will stop at the end of the + * file, and Skip will return OK. + * + * REQUIRES: External synchronization + * + * @param n [description] + * @return [description] + */ + Status Skip(uint64_t n) { + _offset += n; + return Status::OK(); + } + + /** + * @brief noop + * @details + * rocksdb has it's own caching capabilities that we should be able to use, + * without relying on a cache here. This can safely be a no-op. + * + * @param offset [description] + * @param length [description] + * + * @return [description] + */ + Status InvalidateCache(size_t offset, size_t length) { + return Status::OK(); + } +}; + +// A file abstraction for randomly reading the contents of a file. +class LibradosRandomAccessFile : public RandomAccessFile { + librados::IoCtx * _io_ctx; + std::string _fid; + std::string _hint; +public: + LibradosRandomAccessFile(librados::IoCtx * io_ctx, std::string fid, std::string hint): + _io_ctx(io_ctx), _fid(fid), _hint(hint) {} + + ~LibradosRandomAccessFile() {} + + /** + * @brief read file + * @details similar to LibradosSequentialFile::Read + * + * @param offset [description] + * @param n [description] + * @param result [description] + * @param scratch [description] + * @return [description] + */ + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + LOG_DEBUG("[IN]%i\n", (int)n); + librados::bufferlist buffer; + Status s; + int r = _io_ctx->read(_fid, buffer, n, offset); + if (r >= 0) { + buffer.copy(0, r, scratch); + *result = Slice(scratch, r); + s = Status::OK(); + } else { + s = err_to_status(r); + if (s == Status::IOError()) { + *result = Slice(); + s = Status::OK(); + } + } + LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str()); + return s; + } + + // Used by the file_reader_writer to decide if the ReadAhead wrapper + // should simply forward the call and do not enact buffering or locking. + bool ShouldForwardRawRequest() const { + return false; + } + + // For cases when read-ahead is implemented in the platform dependent + // layer + void EnableReadAhead() {} + + /** + * @brief [brief description] + * @details Get unique id for each file and guarantee this id is different for each file + * + * @param id [description] + * @param max_size max size of id, it shoud be larger than 16 + * + * @return [description] + */ + size_t GetUniqueId(char* id, size_t max_size) const { + // All fid has the same db_id prefix, so we need to ignore db_id prefix + size_t s = std::min(max_size, _fid.size()); + strncpy(id, _fid.c_str() + (_fid.size() - s), s); + id[s - 1] = '\0'; + return s; + }; + + //enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED }; + void Hint(AccessPattern pattern) { + /* Do nothing */ + } + + /** + * @brief noop + * @details [long description] + * + * @param offset [description] + * @param length [description] + * + * @return [description] + */ + Status InvalidateCache(size_t offset, size_t length) { + return Status::OK(); + } +}; + + +// A file abstraction for sequential writing. The implementation +// must provide buffering since callers may append small fragments +// at a time to the file. +class LibradosWritableFile : public WritableFile { + librados::IoCtx * _io_ctx; + std::string _fid; + std::string _hint; + const EnvLibrados * const _env; + + std::mutex _mutex; // used to protect modification of all following variables + librados::bufferlist _buffer; // write buffer + uint64_t _buffer_size; // write buffer size + uint64_t _file_size; // this file size doesn't include buffer size + + /** + * @brief assuming caller holds lock + * @details [long description] + * @return [description] + */ + int _SyncLocked() { + // 1. sync append data to RADOS + int r = _io_ctx->append(_fid, _buffer, _buffer_size); + assert(r >= 0); + + // 2. update local variables + if (0 == r) { + _buffer.clear(); + _file_size += _buffer_size; + _buffer_size = 0; + } + + return r; + } + +public: + LibradosWritableFile(librados::IoCtx * io_ctx, + std::string fid, + std::string hint, + const EnvLibrados * const env) + : _io_ctx(io_ctx), _fid(fid), _hint(hint), _env(env), _buffer(), _buffer_size(0), _file_size(0) { + int ret = _io_ctx->stat(_fid, &_file_size, nullptr); + + // if file not exist + if (ret < 0) { + _file_size = 0; + } + } + + ~LibradosWritableFile() { + // sync before closeing writable file + Sync(); + } + + /** + * @brief append data to file + * @details + * Append will save all written data in buffer util buffer size + * reaches buffer max size. Then, it will write buffer into rados + * + * @param data [description] + * @return [description] + */ + Status Append(const Slice& data) { + // append buffer + LOG_DEBUG("[IN] %i | %s\n", (int)data.size(), data.data()); + int r = 0; + + std::lock_guard lock(_mutex); + _buffer.append(data.data(), data.size()); + _buffer_size += data.size(); + + if (_buffer_size > _env->_write_buffer_size) { + r = _SyncLocked(); + } + + LOG_DEBUG("[OUT] %i\n", r); + return err_to_status(r); + } + + /** + * @brief not supported + * @details [long description] + * @return [description] + */ + Status PositionedAppend( + const Slice& /* data */, + uint64_t /* offset */) { + return Status::NotSupported(); + } + + /** + * @brief truncate file to assigned size + * @details [long description] + * + * @param size [description] + * @return [description] + */ + Status Truncate(uint64_t size) { + LOG_DEBUG("[IN]%lld|%lld|%lld\n", (long long)size, (long long)_file_size, (long long)_buffer_size); + int r = 0; + + std::lock_guard lock(_mutex); + if (_file_size > size) { + r = _io_ctx->trunc(_fid, size); + + if (r == 0) { + _buffer.clear(); + _buffer_size = 0; + _file_size = size; + } + } else if (_file_size == size) { + _buffer.clear(); + _buffer_size = 0; + } else { + librados::bufferlist tmp; + tmp.claim(_buffer); + _buffer.substr_of(tmp, 0, size - _file_size); + _buffer_size = size - _file_size; + } + + LOG_DEBUG("[OUT] %i\n", r); + return err_to_status(r); + } + + /** + * @brief close file + * @details [long description] + * @return [description] + */ + Status Close() { + LOG_DEBUG("%s | %lld | %lld\n", _hint.c_str(), (long long)_buffer_size, (long long)_file_size); + return Sync(); + } + + /** + * @brief flush file, + * @details initiate an aio write and not wait + * + * @return [description] + */ + Status Flush() { + librados::AioCompletion *write_completion = librados::Rados::aio_create_completion(); + int r = 0; + + std::lock_guard lock(_mutex); + r = _io_ctx->aio_append(_fid, write_completion, _buffer, _buffer_size); + + if (0 == r) { + _file_size += _buffer_size; + _buffer.clear(); + _buffer_size = 0; + } + + write_completion->release(); + + return err_to_status(r); + } + + /** + * @brief write buffer data to rados + * @details initiate an aio write and wait for result + * @return [description] + */ + Status Sync() { // sync data + int r = 0; + + std::lock_guard lock(_mutex); + if (_buffer_size > 0) { + r = _SyncLocked(); + } + + return err_to_status(r); + } + + /** + * @brief [brief description] + * @details [long description] + * @return true if Sync() and Fsync() are safe to call concurrently with Append()and Flush(). + */ + bool IsSyncThreadSafe() const { + return true; + } + + /** + * @brief Indicates the upper layers if the current WritableFile implementation uses direct IO. + * @details [long description] + * @return [description] + */ + bool UseDirectIO() const { + return false; + } + + /** + * @brief Get file size + * @details + * This API will use cached file_size. + * @return [description] + */ + uint64_t GetFileSize() { + LOG_DEBUG("%lld|%lld\n", (long long)_buffer_size, (long long)_file_size); + + std::lock_guard lock(_mutex); + int file_size = _file_size + _buffer_size; + + return file_size; + } + + /** + * @brief For documentation, refer to RandomAccessFile::GetUniqueId() + * @details [long description] + * + * @param id [description] + * @param max_size [description] + * + * @return [description] + */ + size_t GetUniqueId(char* id, size_t max_size) const { + // All fid has the same db_id prefix, so we need to ignore db_id prefix + size_t s = std::min(max_size, _fid.size()); + strncpy(id, _fid.c_str() + (_fid.size() - s), s); + id[s - 1] = '\0'; + return s; + } + + /** + * @brief noop + * @details [long description] + * + * @param offset [description] + * @param length [description] + * + * @return [description] + */ + Status InvalidateCache(size_t offset, size_t length) { + return Status::OK(); + } + + using WritableFile::RangeSync; + /** + * @brief No RangeSync support, just call Sync() + * @details [long description] + * + * @param offset [description] + * @param nbytes [description] + * + * @return [description] + */ + Status RangeSync(off_t offset, off_t nbytes) { + return Sync(); + } + +protected: + using WritableFile::Allocate; + /** + * @brief noop + * @details [long description] + * + * @param offset [description] + * @param len [description] + * + * @return [description] + */ + Status Allocate(off_t offset, off_t len) { + return Status::OK(); + } +}; + + +// Directory object represents collection of files and implements +// filesystem operations that can be executed on directories. +class LibradosDirectory : public Directory { + librados::IoCtx * _io_ctx; + std::string _fid; +public: + explicit LibradosDirectory(librados::IoCtx * io_ctx, std::string fid): + _io_ctx(io_ctx), _fid(fid) {} + + // Fsync directory. Can be called concurrently from multiple threads. + Status Fsync() { + return Status::OK(); + } +}; + +// Identifies a locked file. +// This is exclusive lock and can't nested lock by same thread +class LibradosFileLock : public FileLock { + librados::IoCtx * _io_ctx; + const std::string _obj_name; + const std::string _lock_name; + const std::string _cookie; + int lock_state; +public: + LibradosFileLock( + librados::IoCtx * io_ctx, + const std::string obj_name): + _io_ctx(io_ctx), + _obj_name(obj_name), + _lock_name("lock_name"), + _cookie("cookie") { + + // TODO: the lock will never expire. It may cause problem if the process crash or abnormally exit. + while (!_io_ctx->lock_exclusive( + _obj_name, + _lock_name, + _cookie, + "description", nullptr, 0)); + } + + ~LibradosFileLock() { + _io_ctx->unlock(_obj_name, _lock_name, _cookie); + } +}; + + +// -------------------- +// --- EnvLibrados ---- +// -------------------- +/** + * @brief EnvLibrados ctor + * @details [long description] + * + * @param db_name unique database name + * @param config_path the configure file path for rados + */ +EnvLibrados::EnvLibrados(const std::string& db_name, + const std::string& config_path, + const std::string& db_pool) + : EnvLibrados("client.admin", + "ceph", + 0, + db_name, + config_path, + db_pool, + "/wal", + db_pool, + 1 << 20) {} + +/** + * @brief EnvLibrados ctor + * @details [long description] + * + * @param client_name first 3 parameters is for RADOS client init + * @param cluster_name + * @param flags + * @param db_name unique database name, used as db_id key + * @param config_path the configure file path for rados + * @param db_pool the pool for db data + * @param wal_pool the pool for WAL data + * @param write_buffer_size WritableFile buffer max size + */ +EnvLibrados::EnvLibrados(const std::string& client_name, + const std::string& cluster_name, + const uint64_t flags, + const std::string& db_name, + const std::string& config_path, + const std::string& db_pool, + const std::string& wal_dir, + const std::string& wal_pool, + const uint64_t write_buffer_size) + : EnvWrapper(Env::Default()), + _client_name(client_name), + _cluster_name(cluster_name), + _flags(flags), + _db_name(db_name), + _config_path(config_path), + _db_pool_name(db_pool), + _wal_dir(wal_dir), + _wal_pool_name(wal_pool), + _write_buffer_size(write_buffer_size) { + int ret = 0; + + // 1. create a Rados object and initialize it + ret = _rados.init2(_client_name.c_str(), _cluster_name.c_str(), _flags); // just use the client.admin keyring + if (ret < 0) { // let's handle any error that might have come back + std::cerr << "couldn't initialize rados! error " << ret << std::endl; + ret = EXIT_FAILURE; + goto out; + } + + // 2. read configure file + ret = _rados.conf_read_file(_config_path.c_str()); + if (ret < 0) { + // This could fail if the config file is malformed, but it'd be hard. + std::cerr << "failed to parse config file " << _config_path + << "! error" << ret << std::endl; + ret = EXIT_FAILURE; + goto out; + } + + // 3. we actually connect to the cluster + ret = _rados.connect(); + if (ret < 0) { + std::cerr << "couldn't connect to cluster! error " << ret << std::endl; + ret = EXIT_FAILURE; + goto out; + } + + // 4. create db_pool if not exist + ret = _rados.pool_create(_db_pool_name.c_str()); + if (ret < 0 && ret != -EEXIST && ret != -EPERM) { + std::cerr << "couldn't create pool! error " << ret << std::endl; + goto out; + } + + // 5. create db_pool_ioctx + ret = _rados.ioctx_create(_db_pool_name.c_str(), _db_pool_ioctx); + if (ret < 0) { + std::cerr << "couldn't set up ioctx! error " << ret << std::endl; + ret = EXIT_FAILURE; + goto out; + } + + // 6. create wal_pool if not exist + ret = _rados.pool_create(_wal_pool_name.c_str()); + if (ret < 0 && ret != -EEXIST && ret != -EPERM) { + std::cerr << "couldn't create pool! error " << ret << std::endl; + goto out; + } + + // 7. create wal_pool_ioctx + ret = _rados.ioctx_create(_wal_pool_name.c_str(), _wal_pool_ioctx); + if (ret < 0) { + std::cerr << "couldn't set up ioctx! error " << ret << std::endl; + ret = EXIT_FAILURE; + goto out; + } + + // 8. add root dir + _AddFid(ROOT_DIR_KEY, DIR_ID_VALUE); + +out: + LOG_DEBUG("rados connect result code : %i\n", ret); +} + +/**************************************************** + private functions to handle fid operation. + Dir also have fid, but the value is DIR_ID_VALUE +****************************************************/ + +/** + * @brief generate a new fid + * @details [long description] + * @return [description] + */ +std::string EnvLibrados::_CreateFid() { + return _db_name + "." + GenerateUniqueId(); +} + +/** + * @brief get fid + * @details [long description] + * + * @param fname [description] + * @param fid [description] + * + * @return + * Status::OK() + * Status::NotFound() + */ +Status EnvLibrados::_GetFid( + const std::string &fname, + std::string& fid) { + std::set keys; + std::map kvs; + keys.insert(fname); + int r = _db_pool_ioctx.omap_get_vals_by_keys(_db_name, keys, &kvs); + + if (0 == r && 0 == kvs.size()) { + return Status::NotFound(); + } else if (0 == r && 0 != kvs.size()) { + fid.assign(kvs[fname].c_str(), kvs[fname].length()); + return Status::OK(); + } else { + return err_to_status(r); + } +} + +/** + * @brief rename fid + * @details Only modify object in rados once, + * so this rename operation is atomic in term of rados + * + * @param old_fname [description] + * @param new_fname [description] + * + * @return [description] + */ +Status EnvLibrados::_RenameFid(const std::string& old_fname, + const std::string& new_fname) { + std::string fid; + Status s = _GetFid(old_fname, fid); + + if (Status::OK() != s) { + return s; + } + + librados::bufferlist bl; + std::set keys; + std::map kvs; + librados::ObjectWriteOperation o; + bl.append(fid); + keys.insert(old_fname); + kvs[new_fname] = bl; + o.omap_rm_keys(keys); + o.omap_set(kvs); + int r = _db_pool_ioctx.operate(_db_name, &o); + return err_to_status(r); +} + +/** + * @brief add to metadata object. It may overwrite exist key. + * @details [long description] + * + * @param fname [description] + * @param fid [description] + * + * @return [description] + */ +Status EnvLibrados::_AddFid( + const std::string& fname, + const std::string& fid) { + std::map kvs; + librados::bufferlist value; + value.append(fid); + kvs[fname] = value; + int r = _db_pool_ioctx.omap_set(_db_name, kvs); + return err_to_status(r); +} + +/** + * @brief return subfile names of dir. + * @details + * RocksDB has a 2-level structure, so all keys + * that have dir as prefix are subfiles of dir. + * So we can just return these files' name. + * + * @param dir [description] + * @param result [description] + * + * @return [description] + */ +Status EnvLibrados::_GetSubFnames( + const std::string& dir, + std::vector * result +) { + std::string start_after(dir); + std::string filter_prefix(dir); + std::map kvs; + _db_pool_ioctx.omap_get_vals(_db_name, + start_after, filter_prefix, + MAX_ITEMS_IN_FS, &kvs); + + result->clear(); + for (auto i = kvs.begin(); i != kvs.end(); i++) { + result->push_back(i->first.substr(dir.size() + 1)); + } + return Status::OK(); +} + +/** + * @brief delete key fname from metadata object + * @details [long description] + * + * @param fname [description] + * @return [description] + */ +Status EnvLibrados::_DelFid( + const std::string& fname) { + std::set keys; + keys.insert(fname); + int r = _db_pool_ioctx.omap_rm_keys(_db_name, keys); + return err_to_status(r); +} + +/** + * @brief get match IoCtx from _prefix_pool_map + * @details [long description] + * + * @param prefix [description] + * @return [description] + * + */ +librados::IoCtx* EnvLibrados::_GetIoctx(const std::string& fpath) { + auto is_prefix = [](const std::string & s1, const std::string & s2) { + auto it1 = s1.begin(), it2 = s2.begin(); + while (it1 != s1.end() && it2 != s2.end() && *it1 == *it2) ++it1, ++it2; + return it1 == s1.end(); + }; + + if (is_prefix(_wal_dir, fpath)) { + return &_wal_pool_ioctx; + } else { + return &_db_pool_ioctx; + } +} + +/************************************************************ + public functions +************************************************************/ +/** + * @brief generate unique id + * @details Combine system time and random number. + * @return [description] + */ +std::string EnvLibrados::GenerateUniqueId() { + Random64 r(time(nullptr)); + uint64_t random_uuid_portion = + r.Uniform(std::numeric_limits::max()); + uint64_t nanos_uuid_portion = NowNanos(); + char uuid2[200]; + snprintf(uuid2, + 200, + "%16lx-%16lx", + (unsigned long)nanos_uuid_portion, + (unsigned long)random_uuid_portion); + return uuid2; +} + +/** + * @brief create a new sequential read file handler + * @details it will check the existence of fname + * + * @param fname [description] + * @param result [description] + * @param options [description] + * @return [description] + */ +Status EnvLibrados::NewSequentialFile( + const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) +{ + LOG_DEBUG("[IN]%s\n", fname.c_str()); + std::string dir, file, fid; + split(fname, &dir, &file); + Status s; + std::string fpath = dir + "/" + file; + do { + s = _GetFid(dir, fid); + + if (!s.ok() || fid != DIR_ID_VALUE) { + if (fid != DIR_ID_VALUE) s = Status::IOError(); + break; + } + + s = _GetFid(fpath, fid); + + if (Status::NotFound() == s) { + s = Status::IOError(); + errno = ENOENT; + break; + } + + result->reset(new LibradosSequentialFile(_GetIoctx(fpath), fid, fpath)); + s = Status::OK(); + } while (0); + + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief create a new random access file handler + * @details it will check the existence of fname + * + * @param fname [description] + * @param result [description] + * @param options [description] + * @return [description] + */ +Status EnvLibrados::NewRandomAccessFile( + const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) +{ + LOG_DEBUG("[IN]%s\n", fname.c_str()); + std::string dir, file, fid; + split(fname, &dir, &file); + Status s; + std::string fpath = dir + "/" + file; + do { + s = _GetFid(dir, fid); + + if (!s.ok() || fid != DIR_ID_VALUE) { + s = Status::IOError(); + break; + } + + s = _GetFid(fpath, fid); + + if (Status::NotFound() == s) { + s = Status::IOError(); + errno = ENOENT; + break; + } + + result->reset(new LibradosRandomAccessFile(_GetIoctx(fpath), fid, fpath)); + s = Status::OK(); + } while (0); + + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief create a new write file handler + * @details it will check the existence of fname + * + * @param fname [description] + * @param result [description] + * @param options [description] + * @return [description] + */ +Status EnvLibrados::NewWritableFile( + const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) +{ + LOG_DEBUG("[IN]%s\n", fname.c_str()); + std::string dir, file, fid; + split(fname, &dir, &file); + Status s; + std::string fpath = dir + "/" + file; + + do { + // 1. check if dir exist + s = _GetFid(dir, fid); + if (!s.ok()) { + break; + } + + if (fid != DIR_ID_VALUE) { + s = Status::IOError(); + break; + } + + // 2. check if file exist. + // 2.1 exist, use it + // 2.2 not exist, create it + s = _GetFid(fpath, fid); + if (Status::NotFound() == s) { + fid = _CreateFid(); + _AddFid(fpath, fid); + } + + result->reset(new LibradosWritableFile(_GetIoctx(fpath), fid, fpath, this)); + s = Status::OK(); + } while (0); + + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief reuse write file handler + * @details + * This function will rename old_fname to new_fname, + * then return the handler of new_fname + * + * @param new_fname [description] + * @param old_fname [description] + * @param result [description] + * @param options [description] + * @return [description] + */ +Status EnvLibrados::ReuseWritableFile( + const std::string& new_fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options) +{ + LOG_DEBUG("[IN]%s => %s\n", old_fname.c_str(), new_fname.c_str()); + std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file; + split(old_fname, &src_dir, &src_file); + split(new_fname, &dst_dir, &dst_file); + + std::string src_fpath = src_dir + "/" + src_file; + std::string dst_fpath = dst_dir + "/" + dst_file; + Status r = Status::OK(); + do { + r = _RenameFid(src_fpath, + dst_fpath); + if (!r.ok()) { + break; + } + + result->reset(new LibradosWritableFile(_GetIoctx(dst_fpath), src_fid, dst_fpath, this)); + } while (0); + + LOG_DEBUG("[OUT]%s\n", r.ToString().c_str()); + return r; +} + +/** + * @brief create a new directory handler + * @details [long description] + * + * @param name [description] + * @param result [description] + * + * @return [description] + */ +Status EnvLibrados::NewDirectory( + const std::string& name, + std::unique_ptr* result) +{ + LOG_DEBUG("[IN]%s\n", name.c_str()); + std::string fid, dir, file; + /* just want to get dir name */ + split(name + "/tmp", &dir, &file); + Status s; + + do { + s = _GetFid(dir, fid); + + if (!s.ok() || DIR_ID_VALUE != fid) { + s = Status::IOError(name, strerror(-ENOENT)); + break; + } + + if (Status::NotFound() == s) { + s = _AddFid(dir, DIR_ID_VALUE); + if (!s.ok()) break; + } else if (!s.ok()) { + break; + } + + result->reset(new LibradosDirectory(_GetIoctx(dir), dir)); + s = Status::OK(); + } while (0); + + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief check if fname is exist + * @details [long description] + * + * @param fname [description] + * @return [description] + */ +Status EnvLibrados::FileExists(const std::string& fname) +{ + LOG_DEBUG("[IN]%s\n", fname.c_str()); + std::string fid, dir, file; + split(fname, &dir, &file); + Status s = _GetFid(dir + "/" + file, fid); + + if (s.ok() && fid != DIR_ID_VALUE) { + s = Status::OK(); + } + + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief get subfile name of dir_in + * @details [long description] + * + * @param dir_in [description] + * @param result [description] + * + * @return [description] + */ +Status EnvLibrados::GetChildren( + const std::string& dir_in, + std::vector* result) +{ + LOG_DEBUG("[IN]%s\n", dir_in.c_str()); + std::string fid, dir, file; + split(dir_in + "/temp", &dir, &file); + Status s; + + do { + s = _GetFid(dir, fid); + if (!s.ok()) { + break; + } + + if (fid != DIR_ID_VALUE) { + s = Status::IOError(); + break; + } + + s = _GetSubFnames(dir, result); + } while (0); + + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief delete fname + * @details [long description] + * + * @param fname [description] + * @return [description] + */ +Status EnvLibrados::DeleteFile(const std::string& fname) +{ + LOG_DEBUG("[IN]%s\n", fname.c_str()); + std::string fid, dir, file; + split(fname, &dir, &file); + Status s = _GetFid(dir + "/" + file, fid); + + if (s.ok() && DIR_ID_VALUE != fid) { + s = _DelFid(dir + "/" + file); + } else { + s = Status::NotFound(); + } + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief create new dir + * @details [long description] + * + * @param dirname [description] + * @return [description] + */ +Status EnvLibrados::CreateDir(const std::string& dirname) +{ + LOG_DEBUG("[IN]%s\n", dirname.c_str()); + std::string fid, dir, file; + split(dirname + "/temp", &dir, &file); + Status s = _GetFid(dir + "/" + file, fid); + + do { + if (Status::NotFound() != s && fid != DIR_ID_VALUE) { + break; + } else if (Status::OK() == s && fid == DIR_ID_VALUE) { + break; + } + + s = _AddFid(dir, DIR_ID_VALUE); + } while (0); + + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief create dir if missing + * @details [long description] + * + * @param dirname [description] + * @return [description] + */ +Status EnvLibrados::CreateDirIfMissing(const std::string& dirname) +{ + LOG_DEBUG("[IN]%s\n", dirname.c_str()); + std::string fid, dir, file; + split(dirname + "/temp", &dir, &file); + Status s = Status::OK(); + + do { + s = _GetFid(dir, fid); + if (Status::NotFound() != s) { + break; + } + + s = _AddFid(dir, DIR_ID_VALUE); + } while (0); + + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief delete dir + * @details + * + * @param dirname [description] + * @return [description] + */ +Status EnvLibrados::DeleteDir(const std::string& dirname) +{ + LOG_DEBUG("[IN]%s\n", dirname.c_str()); + std::string fid, dir, file; + split(dirname + "/temp", &dir, &file); + Status s = Status::OK(); + + s = _GetFid(dir, fid); + + if (s.ok() && DIR_ID_VALUE == fid) { + std::vector subs; + s = _GetSubFnames(dir, &subs); + // if subfiles exist, can't delete dir + if (subs.size() > 0) { + s = Status::IOError(); + } else { + s = _DelFid(dir); + } + } else { + s = Status::NotFound(); + } + + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief return file size + * @details [long description] + * + * @param fname [description] + * @param file_size [description] + * + * @return [description] + */ +Status EnvLibrados::GetFileSize( + const std::string& fname, + uint64_t* file_size) +{ + LOG_DEBUG("[IN]%s\n", fname.c_str()); + std::string fid, dir, file; + split(fname, &dir, &file); + time_t mtime; + Status s; + + do { + std::string fpath = dir + "/" + file; + s = _GetFid(fpath, fid); + + if (!s.ok()) { + break; + } + + int ret = _GetIoctx(fpath)->stat(fid, file_size, &mtime); + if (ret < 0) { + LOG_DEBUG("%i\n", ret); + if (-ENOENT == ret) { + *file_size = 0; + s = Status::OK(); + } else { + s = err_to_status(ret); + } + } else { + s = Status::OK(); + } + } while (0); + + LOG_DEBUG("[OUT]%s|%lld\n", s.ToString().c_str(), (long long)*file_size); + return s; +} + +/** + * @brief get file modification time + * @details [long description] + * + * @param fname [description] + * @param file_mtime [description] + * + * @return [description] + */ +Status EnvLibrados::GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) +{ + LOG_DEBUG("[IN]%s\n", fname.c_str()); + std::string fid, dir, file; + split(fname, &dir, &file); + time_t mtime; + uint64_t file_size; + Status s = Status::OK(); + do { + std::string fpath = dir + "/" + file; + s = _GetFid(dir + "/" + file, fid); + + if (!s.ok()) { + break; + } + + int ret = _GetIoctx(fpath)->stat(fid, &file_size, &mtime); + if (ret < 0) { + if (Status::NotFound() == err_to_status(ret)) { + *file_mtime = static_cast(mtime); + s = Status::OK(); + } else { + s = err_to_status(ret); + } + } else { + s = Status::OK(); + } + } while (0); + + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief rename file + * @details + * + * @param src [description] + * @param target_in [description] + * + * @return [description] + */ +Status EnvLibrados::RenameFile( + const std::string& src, + const std::string& target_in) +{ + LOG_DEBUG("[IN]%s => %s\n", src.c_str(), target_in.c_str()); + std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file; + split(src, &src_dir, &src_file); + split(target_in, &dst_dir, &dst_file); + + auto s = _RenameFid(src_dir + "/" + src_file, + dst_dir + "/" + dst_file); + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief not support + * @details [long description] + * + * @param src [description] + * @param target_in [description] + * + * @return [description] + */ +Status EnvLibrados::LinkFile( + const std::string& src, + const std::string& target_in) +{ + LOG_DEBUG("[IO]%s => %s\n", src.c_str(), target_in.c_str()); + return Status::NotSupported(); +} + +/** + * @brief lock file. create if missing. + * @details [long description] + * + * It seems that LockFile is used for preventing other instance of RocksDB + * from opening up the database at the same time. From RocksDB source code, + * the invokes of LockFile are at following locations: + * + * ./db/db_impl.cc:1159: s = env_->LockFile(LockFileName(dbname_), &db_lock_); // DBImpl::Recover + * ./db/db_impl.cc:5839: Status result = env->LockFile(lockname, &lock); // Status DestroyDB + * + * When db recovery and db destroy, RocksDB will call LockFile + * + * @param fname [description] + * @param lock [description] + * + * @return [description] + */ +Status EnvLibrados::LockFile( + const std::string& fname, + FileLock** lock) +{ + LOG_DEBUG("[IN]%s\n", fname.c_str()); + std::string fid, dir, file; + split(fname, &dir, &file); + Status s = Status::OK(); + + do { + std::string fpath = dir + "/" + file; + s = _GetFid(fpath, fid); + + if (Status::OK() != s && + Status::NotFound() != s) { + break; + } else if (Status::NotFound() == s) { + s = _AddFid(fpath, _CreateFid()); + if (!s.ok()) { + break; + } + } else if (Status::OK() == s && DIR_ID_VALUE == fid) { + s = Status::IOError(); + break; + } + + *lock = new LibradosFileLock(_GetIoctx(fpath), fpath); + } while (0); + + LOG_DEBUG("[OUT]%s\n", s.ToString().c_str()); + return s; +} + +/** + * @brief unlock file + * @details [long description] + * + * @param lock [description] + * @return [description] + */ +Status EnvLibrados::UnlockFile(FileLock* lock) +{ + LOG_DEBUG("[IO]%p\n", lock); + if (nullptr != lock) { + delete lock; + } + return Status::OK(); +} + + +/** + * @brief not support + * @details [long description] + * + * @param db_path [description] + * @param output_path [description] + * + * @return [description] + */ +Status EnvLibrados::GetAbsolutePath( + const std::string& db_path, + std::string* output_path) +{ + LOG_DEBUG("[IO]%s\n", db_path.c_str()); + return Status::NotSupported(); +} + +/** + * @brief Get default EnvLibrados + * @details [long description] + * @return [description] + */ +EnvLibrados* EnvLibrados::Default() { + static EnvLibrados default_env(default_db_name, + std::getenv(default_config_path), + default_pool_name); + return &default_env; +} +} \ No newline at end of file diff --git a/utilities/env_librados.md b/utilities/env_librados.md new file mode 100644 index 0000000000..45a2a7badb --- /dev/null +++ b/utilities/env_librados.md @@ -0,0 +1,122 @@ +# Introduce to EnvLibrados +EnvLibrados is a customized RocksDB Env to use RADOS as the backend file system of RocksDB. It overrides all file system related API of default Env. The easiest way to use it is just like following: +```c++ +std::string db_name = "test_db"; +std::string config_path = "path/to/ceph/config"; +DB* db; +Options options; +options.env = EnvLibrados(db_name, config_path); +Status s = DB::Open(options, kDBPath, &db); +... +``` +Then EnvLibrados will forward all file read/write operation to the RADOS cluster assigned by config_path. Default pool is db_name+"_pool". + +# Options for EnvLibrados +There are some options that users could set for EnvLibrados. +- write_buffer_size. This variable is the max buffer size for WritableFile. After reaching the buffer_max_size, EnvLibrados will sync buffer content to RADOS, then clear buffer. +- db_pool. Rather than using default pool, users could set their own db pool name +- wal_dir. The dir for WAL files. Because RocksDB only has 2-level structure (dir_name/file_name), the format of wal_dir is "/dir_name"(CAN'T be "/dir1/dir2"). Default wal_dir is "/wal". +- wal_pool. Corresponding pool name for WAL files. Default value is db_name+"_wal_pool" + +The example of setting options looks like following: +```c++ +db_name = "test_db"; +db_pool = db_name+"_pool"; +wal_dir = "/wal"; +wal_pool = db_name+"_wal_pool"; +write_buffer_size = 1 << 20; +env_ = new EnvLibrados(db_name, config, db_pool, wal_dir, wal_pool, write_buffer_size); + +DB* db; +Options options; +options.env = env_; +// The last level dir name should match the dir name in prefix_pool_map +options.wal_dir = "/tmp/wal"; + +// open DB +Status s = DB::Open(options, kDBPath, &db); +... +``` + +# Performance Test +## Compile +Check this [link](https://github.com/facebook/rocksdb/blob/master/INSTALL.md) to install the dependencies of RocksDB. Then you can compile it by running `$ make env_librados_test ROCKSDB_USE_LIBRADOS=1` under `rocksdb\`. The configure file used by env_librados_test is `../ceph/src/ceph.conf`. For Ubuntu 14.04, just run following commands: +```bash +$ sudo apt-get install libgflags-dev +$ sudo apt-get install libsnappy-dev +$ sudo apt-get install zlib1g-dev +$ sudo apt-get install libbz2-dev +$ make env_librados_test ROCKSDB_USE_LIBRADOS=1 +``` + +## Test Result +My test environment is Ubuntu 14.04 in VirtualBox with 8 cores and 8G RAM. Following is the test result. + +1. Write (1<<20) keys in random order. The time of writing under default env is around 10s while the time of writing under EnvLibrados is varying from 10s to 30s. + +2. Write (1<<20) keys in sequential order. The time of writing under default env drops to arround 1s. But the time of writing under EnvLibrados is not changed. + +3. Read (1<<16) keys from (1<<20) keys in random order. The time of reading under both Envs are roughly the same, around 1.8s. + +# MyRocks Test +## Compile Ceph +See [link](http://docs.ceph.com/docs/master/install/build-ceph/) + +## Start RADOS + +```bash +cd ceph-path/src +( ( ./stop.sh; rm -rf dev/*; CEPH_NUM_OSD=3 ./vstart.sh --short --localhost -n +-x -d ; ) ) 2>&1 +``` + +## Compile MySQL + +```bash +sudo apt-get update +sudo apt-get install g++ cmake libbz2-dev libaio-dev bison \ +zlib1g-dev libsnappy-dev +sudo apt-get install libgflags-dev libreadline6-dev libncurses5-dev \ +libssl-dev liblz4-dev gdb git + +git clone https://github.com/facebook/mysql-5.6.git +cd mysql-5.6 +git submodule init +git submodule update +cmake . -DCMAKE_BUILD_TYPE=RelWithDebInfo -DWITH_SSL=system \ +-DWITH_ZLIB=bundled -DMYSQL_MAINTAINER_MODE=0 -DENABLED_LOCAL_INFILE=1 -DROCKSDB_USE_LIBRADOS=1 +make install -j8 +``` + +Check this [link](https://github.com/facebook/mysql-5.6/wiki/Build-Steps) for latest compile steps. + +## Configure MySQL +Following is the steps of configuration of MySQL. + +```bash +mkdir -p /etc/mysql +mkdir -p /var/lib/mysql +mkdir -p /etc/mysql/conf.d +echo -e '[mysqld_safe]\nsyslog' > /etc/mysql/conf.d/mysqld_safe_syslog.cnf +cp /usr/share/mysql/my-medium.cnf /etc/mysql/my.cnf +sed -i 's#.*datadir.*#datadir = /var/lib/mysql#g' /etc/mysql/my.cnf +chown mysql:mysql -R /var/lib/mysql + +mysql_install_db --user=mysql --ldata=/var/lib/mysql/ +export CEPH_CONFIG_PATH="path/of/ceph/config/file" +mysqld_safe -user=mysql --skip-innodb --rocksdb --default-storage-engine=rocksdb --default-tmp-storage-engine=MyISAM & +mysqladmin -u root password +mysql -u root -p +``` + +Check this [link](https://gist.github.com/shichao-an/f5639ecd551496ac2d70) for detail information. + +```sql +show databases; +create database testdb; +use testdb; +show tables; +CREATE TABLE tbl (id INT AUTO_INCREMENT primary key, str VARCHAR(32)); +insert into tbl values (1, "val2"); +select * from tbl; +``` diff --git a/utilities/env_librados_test.cc b/utilities/env_librados_test.cc new file mode 100644 index 0000000000..9535401c4a --- /dev/null +++ b/utilities/env_librados_test.cc @@ -0,0 +1,1146 @@ +// Copyright (c) 2016, Red Hat, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#include +#include "rocksdb/utilities/env_librados.h" +#include "util/mock_env.h" +#include "util/testharness.h" + +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "rocksdb/options.h" +#include "util/random.h" +#include +#include +#include "rocksdb/utilities/transaction_db.h" + +class Timer { + typedef std::chrono::high_resolution_clock high_resolution_clock; + typedef std::chrono::milliseconds milliseconds; +public: + explicit Timer(bool run = false) + { + if (run) + Reset(); + } + void Reset() + { + _start = high_resolution_clock::now(); + } + milliseconds Elapsed() const + { + return std::chrono::duration_cast(high_resolution_clock::now() - _start); + } + template + friend std::basic_ostream& operator<<(std::basic_ostream& out, const Timer& timer) + { + return out << timer.Elapsed().count(); + } +private: + high_resolution_clock::time_point _start; +}; + +namespace rocksdb { + +class EnvLibradosTest : public testing::Test { +public: + // we will use all of these below + const std::string db_name = "env_librados_test_db"; + const std::string db_pool = db_name + "_pool"; + const char *keyring = "admin"; + const char *config = "../ceph/src/ceph.conf"; + + EnvLibrados* env_; + const EnvOptions soptions_; + + EnvLibradosTest() + : env_(new EnvLibrados(db_name, config, db_pool)) { + } + ~EnvLibradosTest() { + delete env_; + librados::Rados rados; + int ret = 0; + do { + ret = rados.init("admin"); // just use the client.admin keyring + if (ret < 0) { // let's handle any error that might have come back + std::cerr << "couldn't initialize rados! error " << ret << std::endl; + ret = EXIT_FAILURE; + break; + } + + ret = rados.conf_read_file(config); + if (ret < 0) { + // This could fail if the config file is malformed, but it'd be hard. + std::cerr << "failed to parse config file " << config + << "! error" << ret << std::endl; + ret = EXIT_FAILURE; + break; + } + + /* + * next, we actually connect to the cluster + */ + + ret = rados.connect(); + if (ret < 0) { + std::cerr << "couldn't connect to cluster! error " << ret << std::endl; + ret = EXIT_FAILURE; + break; + } + + /* + * And now we're done, so let's remove our pool and then + * shut down the connection gracefully. + */ + int delete_ret = rados.pool_delete(db_pool.c_str()); + if (delete_ret < 0) { + // be careful not to + std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl; + ret = EXIT_FAILURE; + } + } while (0); + } +}; + +TEST_F(EnvLibradosTest, Basics) { + uint64_t file_size; + unique_ptr writable_file; + std::vector children; + + ASSERT_OK(env_->CreateDir("/dir")); + // Check that the directory is empty. + ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/non_existent")); + ASSERT_TRUE(!env_->GetFileSize("/dir/non_existent", &file_size).ok()); + ASSERT_OK(env_->GetChildren("/dir", &children)); + ASSERT_EQ(0U, children.size()); + + // Create a file. + ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_)); + writable_file.reset(); + + // Check that the file exists. + ASSERT_OK(env_->FileExists("/dir/f")); + ASSERT_OK(env_->GetFileSize("/dir/f", &file_size)); + ASSERT_EQ(0U, file_size); + ASSERT_OK(env_->GetChildren("/dir", &children)); + ASSERT_EQ(1U, children.size()); + ASSERT_EQ("f", children[0]); + + // Write to the file. + ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_)); + ASSERT_OK(writable_file->Append("abc")); + writable_file.reset(); + + + // Check for expected size. + ASSERT_OK(env_->GetFileSize("/dir/f", &file_size)); + ASSERT_EQ(3U, file_size); + + + // Check that renaming works. + ASSERT_TRUE(!env_->RenameFile("/dir/non_existent", "/dir/g").ok()); + ASSERT_OK(env_->RenameFile("/dir/f", "/dir/g")); + ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/f")); + ASSERT_OK(env_->FileExists("/dir/g")); + ASSERT_OK(env_->GetFileSize("/dir/g", &file_size)); + ASSERT_EQ(3U, file_size); + + // Check that opening non-existent file fails. + unique_ptr seq_file; + unique_ptr rand_file; + ASSERT_TRUE( + !env_->NewSequentialFile("/dir/non_existent", &seq_file, soptions_).ok()); + ASSERT_TRUE(!seq_file); + ASSERT_TRUE(!env_->NewRandomAccessFile("/dir/non_existent", &rand_file, + soptions_).ok()); + ASSERT_TRUE(!rand_file); + + // Check that deleting works. + ASSERT_TRUE(!env_->DeleteFile("/dir/non_existent").ok()); + ASSERT_OK(env_->DeleteFile("/dir/g")); + ASSERT_EQ(Status::NotFound(), env_->FileExists("/dir/g")); + ASSERT_OK(env_->GetChildren("/dir", &children)); + ASSERT_EQ(0U, children.size()); + ASSERT_OK(env_->DeleteDir("/dir")); +} + +TEST_F(EnvLibradosTest, ReadWrite) { + unique_ptr writable_file; + unique_ptr seq_file; + unique_ptr rand_file; + Slice result; + char scratch[100]; + + ASSERT_OK(env_->CreateDir("/dir")); + + ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_)); + ASSERT_OK(writable_file->Append("hello ")); + ASSERT_OK(writable_file->Append("world")); + writable_file.reset(); + + // Read sequentially. + ASSERT_OK(env_->NewSequentialFile("/dir/f", &seq_file, soptions_)); + ASSERT_OK(seq_file->Read(5, &result, scratch)); // Read "hello". + ASSERT_EQ(0, result.compare("hello")); + ASSERT_OK(seq_file->Skip(1)); + ASSERT_OK(seq_file->Read(1000, &result, scratch)); // Read "world". + ASSERT_EQ(0, result.compare("world")); + ASSERT_OK(seq_file->Read(1000, &result, scratch)); // Try reading past EOF. + ASSERT_EQ(0U, result.size()); + ASSERT_OK(seq_file->Skip(100)); // Try to skip past end of file. + ASSERT_OK(seq_file->Read(1000, &result, scratch)); + ASSERT_EQ(0U, result.size()); + + // Random reads. + ASSERT_OK(env_->NewRandomAccessFile("/dir/f", &rand_file, soptions_)); + ASSERT_OK(rand_file->Read(6, 5, &result, scratch)); // Read "world". + ASSERT_EQ(0, result.compare("world")); + ASSERT_OK(rand_file->Read(0, 5, &result, scratch)); // Read "hello". + ASSERT_EQ(0, result.compare("hello")); + ASSERT_OK(rand_file->Read(10, 100, &result, scratch)); // Read "d". + ASSERT_EQ(0, result.compare("d")); + + // Too high offset. + ASSERT_OK(rand_file->Read(1000, 5, &result, scratch)); +} + +TEST_F(EnvLibradosTest, Locks) { + FileLock* lock = nullptr; + unique_ptr writable_file; + + ASSERT_OK(env_->CreateDir("/dir")); + + ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file, soptions_)); + + // These are no-ops, but we test they return success. + ASSERT_OK(env_->LockFile("some file", &lock)); + ASSERT_OK(env_->UnlockFile(lock)); + + ASSERT_OK(env_->LockFile("/dir/f", &lock)); + ASSERT_OK(env_->UnlockFile(lock)); +} + +TEST_F(EnvLibradosTest, Misc) { + std::string test_dir; + ASSERT_OK(env_->GetTestDirectory(&test_dir)); + ASSERT_TRUE(!test_dir.empty()); + + unique_ptr writable_file; + ASSERT_TRUE(!env_->NewWritableFile("/a/b", &writable_file, soptions_).ok()); + + ASSERT_OK(env_->NewWritableFile("/a", &writable_file, soptions_)); + // These are no-ops, but we test they return success. + ASSERT_OK(writable_file->Sync()); + ASSERT_OK(writable_file->Flush()); + ASSERT_OK(writable_file->Close()); + writable_file.reset(); +} + +TEST_F(EnvLibradosTest, LargeWrite) { + const size_t kWriteSize = 300 * 1024; + char* scratch = new char[kWriteSize * 2]; + + std::string write_data; + for (size_t i = 0; i < kWriteSize; ++i) { + write_data.append(1, 'h'); + } + + unique_ptr writable_file; + ASSERT_OK(env_->CreateDir("/dir")); + ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_)); + ASSERT_OK(writable_file->Append("foo")); + ASSERT_OK(writable_file->Append(write_data)); + writable_file.reset(); + + unique_ptr seq_file; + Slice result; + ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_)); + ASSERT_OK(seq_file->Read(3, &result, scratch)); // Read "foo". + ASSERT_EQ(0, result.compare("foo")); + + size_t read = 0; + std::string read_data; + while (read < kWriteSize) { + ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch)); + read_data.append(result.data(), result.size()); + read += result.size(); + } + ASSERT_TRUE(write_data == read_data); + delete[] scratch; +} + +TEST_F(EnvLibradosTest, FrequentlySmallWrite) { + const size_t kWriteSize = 1 << 10; + char* scratch = new char[kWriteSize * 2]; + + std::string write_data; + for (size_t i = 0; i < kWriteSize; ++i) { + write_data.append(1, 'h'); + } + + unique_ptr writable_file; + ASSERT_OK(env_->CreateDir("/dir")); + ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_)); + ASSERT_OK(writable_file->Append("foo")); + + for (size_t i = 0; i < kWriteSize; ++i) { + ASSERT_OK(writable_file->Append("h")); + } + writable_file.reset(); + + unique_ptr seq_file; + Slice result; + ASSERT_OK(env_->NewSequentialFile("/dir/g", &seq_file, soptions_)); + ASSERT_OK(seq_file->Read(3, &result, scratch)); // Read "foo". + ASSERT_EQ(0, result.compare("foo")); + + size_t read = 0; + std::string read_data; + while (read < kWriteSize) { + ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch)); + read_data.append(result.data(), result.size()); + read += result.size(); + } + ASSERT_TRUE(write_data == read_data); + delete[] scratch; +} + +TEST_F(EnvLibradosTest, Truncate) { + const size_t kWriteSize = 300 * 1024; + const size_t truncSize = 1024; + std::string write_data; + for (size_t i = 0; i < kWriteSize; ++i) { + write_data.append(1, 'h'); + } + + unique_ptr writable_file; + ASSERT_OK(env_->CreateDir("/dir")); + ASSERT_OK(env_->NewWritableFile("/dir/g", &writable_file, soptions_)); + ASSERT_OK(writable_file->Append(write_data)); + ASSERT_EQ(writable_file->GetFileSize(), kWriteSize); + ASSERT_OK(writable_file->Truncate(truncSize)); + ASSERT_EQ(writable_file->GetFileSize(), truncSize); + writable_file.reset(); +} + +TEST_F(EnvLibradosTest, DBBasics) { + std::string kDBPath = "/tmp/DBBasics"; + DB* db; + Options options; + // Optimize RocksDB. This is the easiest way to get RocksDB to perform well + options.IncreaseParallelism(); + options.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options.create_if_missing = true; + options.env = env_; + + // open DB + Status s = DB::Open(options, kDBPath, &db); + assert(s.ok()); + + // Put key-value + s = db->Put(WriteOptions(), "key1", "value"); + assert(s.ok()); + std::string value; + // get value + s = db->Get(ReadOptions(), "key1", &value); + assert(s.ok()); + assert(value == "value"); + + // atomically apply a set of updates + { + WriteBatch batch; + batch.Delete("key1"); + batch.Put("key2", value); + s = db->Write(WriteOptions(), &batch); + } + + s = db->Get(ReadOptions(), "key1", &value); + assert(s.IsNotFound()); + + db->Get(ReadOptions(), "key2", &value); + assert(value == "value"); + + delete db; +} + +TEST_F(EnvLibradosTest, DBLoadKeysInRandomOrder) { + char key[20] = {0}, value[20] = {0}; + int max_loop = 1 << 10; + Timer timer(false); + std::cout << "Test size : loop(" << max_loop << ")" << std::endl; + /********************************** + use default env + ***********************************/ + std::string kDBPath1 = "/tmp/DBLoadKeysInRandomOrder1"; + DB* db1; + Options options1; + // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well + options1.IncreaseParallelism(); + options1.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options1.create_if_missing = true; + + // open DB + Status s1 = DB::Open(options1, kDBPath1, &db1); + assert(s1.ok()); + + rocksdb::Random64 r1(time(nullptr)); + + timer.Reset(); + for (int i = 0; i < max_loop; ++i) { + snprintf(key, + 20, + "%16lx", + (unsigned long)r1.Uniform(std::numeric_limits::max())); + snprintf(value, + 20, + "%16lx", + (unsigned long)r1.Uniform(std::numeric_limits::max())); + // Put key-value + s1 = db1->Put(WriteOptions(), key, value); + assert(s1.ok()); + } + std::cout << "Time by default : " << timer << "ms" << std::endl; + delete db1; + + /********************************** + use librados env + ***********************************/ + std::string kDBPath2 = "/tmp/DBLoadKeysInRandomOrder2"; + DB* db2; + Options options2; + // Optimize RocksDB. This is the easiest way to get RocksDB to perform well + options2.IncreaseParallelism(); + options2.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options2.create_if_missing = true; + options2.env = env_; + + // open DB + Status s2 = DB::Open(options2, kDBPath2, &db2); + assert(s2.ok()); + + rocksdb::Random64 r2(time(nullptr)); + + timer.Reset(); + for (int i = 0; i < max_loop; ++i) { + snprintf(key, + 20, + "%16lx", + (unsigned long)r2.Uniform(std::numeric_limits::max())); + snprintf(value, + 20, + "%16lx", + (unsigned long)r2.Uniform(std::numeric_limits::max())); + // Put key-value + s2 = db2->Put(WriteOptions(), key, value); + assert(s2.ok()); + } + std::cout << "Time by librados : " << timer << "ms" << std::endl; + delete db2; +} + +TEST_F(EnvLibradosTest, DBBulkLoadKeysInRandomOrder) { + char key[20] = {0}, value[20] = {0}; + int max_loop = 1 << 6; + int bulk_size = 1 << 15; + Timer timer(false); + std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl; + /********************************** + use default env + ***********************************/ + std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1"; + DB* db1; + Options options1; + // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well + options1.IncreaseParallelism(); + options1.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options1.create_if_missing = true; + + // open DB + Status s1 = DB::Open(options1, kDBPath1, &db1); + assert(s1.ok()); + + rocksdb::Random64 r1(time(nullptr)); + + timer.Reset(); + for (int i = 0; i < max_loop; ++i) { + WriteBatch batch; + for (int j = 0; j < bulk_size; ++j) { + snprintf(key, + 20, + "%16lx", + (unsigned long)r1.Uniform(std::numeric_limits::max())); + snprintf(value, + 20, + "%16lx", + (unsigned long)r1.Uniform(std::numeric_limits::max())); + batch.Put(key, value); + } + s1 = db1->Write(WriteOptions(), &batch); + assert(s1.ok()); + } + std::cout << "Time by default : " << timer << "ms" << std::endl; + delete db1; + + /********************************** + use librados env + ***********************************/ + std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2"; + DB* db2; + Options options2; + // Optimize RocksDB. This is the easiest way to get RocksDB to perform well + options2.IncreaseParallelism(); + options2.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options2.create_if_missing = true; + options2.env = env_; + + // open DB + Status s2 = DB::Open(options2, kDBPath2, &db2); + assert(s2.ok()); + + rocksdb::Random64 r2(time(nullptr)); + + timer.Reset(); + for (int i = 0; i < max_loop; ++i) { + WriteBatch batch; + for (int j = 0; j < bulk_size; ++j) { + snprintf(key, + 20, + "%16lx", + (unsigned long)r2.Uniform(std::numeric_limits::max())); + snprintf(value, + 20, + "%16lx", + (unsigned long)r2.Uniform(std::numeric_limits::max())); + batch.Put(key, value); + } + s2 = db2->Write(WriteOptions(), &batch); + assert(s2.ok()); + } + std::cout << "Time by librados : " << timer << "ms" << std::endl; + delete db2; +} + +TEST_F(EnvLibradosTest, DBBulkLoadKeysInSequentialOrder) { + char key[20] = {0}, value[20] = {0}; + int max_loop = 1 << 6; + int bulk_size = 1 << 15; + Timer timer(false); + std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl; + /********************************** + use default env + ***********************************/ + std::string kDBPath1 = "/tmp/DBBulkLoadKeysInSequentialOrder1"; + DB* db1; + Options options1; + // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well + options1.IncreaseParallelism(); + options1.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options1.create_if_missing = true; + + // open DB + Status s1 = DB::Open(options1, kDBPath1, &db1); + assert(s1.ok()); + + rocksdb::Random64 r1(time(nullptr)); + + timer.Reset(); + for (int i = 0; i < max_loop; ++i) { + WriteBatch batch; + for (int j = 0; j < bulk_size; ++j) { + snprintf(key, + 20, + "%019lld", + (long long)(i * bulk_size + j)); + snprintf(value, + 20, + "%16lx", + (unsigned long)r1.Uniform(std::numeric_limits::max())); + batch.Put(key, value); + } + s1 = db1->Write(WriteOptions(), &batch); + assert(s1.ok()); + } + std::cout << "Time by default : " << timer << "ms" << std::endl; + delete db1; + + /********************************** + use librados env + ***********************************/ + std::string kDBPath2 = "/tmp/DBBulkLoadKeysInSequentialOrder2"; + DB* db2; + Options options2; + // Optimize RocksDB. This is the easiest way to get RocksDB to perform well + options2.IncreaseParallelism(); + options2.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options2.create_if_missing = true; + options2.env = env_; + + // open DB + Status s2 = DB::Open(options2, kDBPath2, &db2); + assert(s2.ok()); + + rocksdb::Random64 r2(time(nullptr)); + + timer.Reset(); + for (int i = 0; i < max_loop; ++i) { + WriteBatch batch; + for (int j = 0; j < bulk_size; ++j) { + snprintf(key, + 20, + "%16lx", + (unsigned long)r2.Uniform(std::numeric_limits::max())); + snprintf(value, + 20, + "%16lx", + (unsigned long)r2.Uniform(std::numeric_limits::max())); + batch.Put(key, value); + } + s2 = db2->Write(WriteOptions(), &batch); + assert(s2.ok()); + } + std::cout << "Time by librados : " << timer << "ms" << std::endl; + delete db2; +} + +TEST_F(EnvLibradosTest, DBRandomRead) { + char key[20] = {0}, value[20] = {0}; + int max_loop = 1 << 6; + int bulk_size = 1 << 10; + int read_loop = 1 << 20; + Timer timer(false); + std::cout << "Test size : keys_num(" << max_loop << ", " << bulk_size << "); read_loop(" << read_loop << ")" << std::endl; + /********************************** + use default env + ***********************************/ + std::string kDBPath1 = "/tmp/DBRandomRead1"; + DB* db1; + Options options1; + // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well + options1.IncreaseParallelism(); + options1.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options1.create_if_missing = true; + + // open DB + Status s1 = DB::Open(options1, kDBPath1, &db1); + assert(s1.ok()); + + rocksdb::Random64 r1(time(nullptr)); + + + for (int i = 0; i < max_loop; ++i) { + WriteBatch batch; + for (int j = 0; j < bulk_size; ++j) { + snprintf(key, + 20, + "%019lld", + (long long)(i * bulk_size + j)); + snprintf(value, + 20, + "%16lx", + (unsigned long)r1.Uniform(std::numeric_limits::max())); + batch.Put(key, value); + } + s1 = db1->Write(WriteOptions(), &batch); + assert(s1.ok()); + } + timer.Reset(); + int base1 = 0, offset1 = 0; + for (int i = 0; i < read_loop; ++i) { + base1 = r1.Uniform(max_loop); + offset1 = r1.Uniform(bulk_size); + std::string value1; + snprintf(key, + 20, + "%019lld", + (long long)(base1 * bulk_size + offset1)); + s1 = db1->Get(ReadOptions(), key, &value1); + assert(s1.ok()); + } + std::cout << "Time by default : " << timer << "ms" << std::endl; + delete db1; + + /********************************** + use librados env + ***********************************/ + std::string kDBPath2 = "/tmp/DBRandomRead2"; + DB* db2; + Options options2; + // Optimize RocksDB. This is the easiest way to get RocksDB to perform well + options2.IncreaseParallelism(); + options2.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options2.create_if_missing = true; + options2.env = env_; + + // open DB + Status s2 = DB::Open(options2, kDBPath2, &db2); + assert(s2.ok()); + + rocksdb::Random64 r2(time(nullptr)); + + for (int i = 0; i < max_loop; ++i) { + WriteBatch batch; + for (int j = 0; j < bulk_size; ++j) { + snprintf(key, + 20, + "%019lld", + (long long)(i * bulk_size + j)); + snprintf(value, + 20, + "%16lx", + (unsigned long)r2.Uniform(std::numeric_limits::max())); + batch.Put(key, value); + } + s2 = db2->Write(WriteOptions(), &batch); + assert(s2.ok()); + } + + timer.Reset(); + int base2 = 0, offset2 = 0; + for (int i = 0; i < read_loop; ++i) { + base2 = r2.Uniform(max_loop); + offset2 = r2.Uniform(bulk_size); + std::string value2; + snprintf(key, + 20, + "%019lld", + (long long)(base2 * bulk_size + offset2)); + s2 = db2->Get(ReadOptions(), key, &value2); + if (!s2.ok()) { + std::cout << s2.ToString() << std::endl; + } + assert(s2.ok()); + } + std::cout << "Time by librados : " << timer << "ms" << std::endl; + delete db2; +} + +class EnvLibradosMutipoolTest : public testing::Test { +public: + // we will use all of these below + const std::string client_name = "client.admin"; + const std::string cluster_name = "ceph"; + const uint64_t flags = 0; + const std::string db_name = "env_librados_test_db"; + const std::string db_pool = db_name + "_pool"; + const std::string wal_dir = "/wal"; + const std::string wal_pool = db_name + "_wal_pool"; + const size_t write_buffer_size = 1 << 20; + const char *keyring = "admin"; + const char *config = "../ceph/src/ceph.conf"; + + EnvLibrados* env_; + const EnvOptions soptions_; + + EnvLibradosMutipoolTest() { + env_ = new EnvLibrados(client_name, cluster_name, flags, db_name, config, db_pool, wal_dir, wal_pool, write_buffer_size); + } + ~EnvLibradosMutipoolTest() { + delete env_; + librados::Rados rados; + int ret = 0; + do { + ret = rados.init("admin"); // just use the client.admin keyring + if (ret < 0) { // let's handle any error that might have come back + std::cerr << "couldn't initialize rados! error " << ret << std::endl; + ret = EXIT_FAILURE; + break; + } + + ret = rados.conf_read_file(config); + if (ret < 0) { + // This could fail if the config file is malformed, but it'd be hard. + std::cerr << "failed to parse config file " << config + << "! error" << ret << std::endl; + ret = EXIT_FAILURE; + break; + } + + /* + * next, we actually connect to the cluster + */ + + ret = rados.connect(); + if (ret < 0) { + std::cerr << "couldn't connect to cluster! error " << ret << std::endl; + ret = EXIT_FAILURE; + break; + } + + /* + * And now we're done, so let's remove our pool and then + * shut down the connection gracefully. + */ + int delete_ret = rados.pool_delete(db_pool.c_str()); + if (delete_ret < 0) { + // be careful not to + std::cerr << "We failed to delete our test pool!" << db_pool << delete_ret << std::endl; + ret = EXIT_FAILURE; + } + delete_ret = rados.pool_delete(wal_pool.c_str()); + if (delete_ret < 0) { + // be careful not to + std::cerr << "We failed to delete our test pool!" << wal_pool << delete_ret << std::endl; + ret = EXIT_FAILURE; + } + } while (0); + } +}; + +TEST_F(EnvLibradosMutipoolTest, Basics) { + uint64_t file_size; + unique_ptr writable_file; + std::vector children; + std::vector v = {"/tmp/dir1", "/tmp/dir2", "/tmp/dir3", "/tmp/dir4", "dir"}; + + for (size_t i = 0; i < v.size(); ++i) { + std::string dir = v[i]; + std::string dir_non_existent = dir + "/non_existent"; + std::string dir_f = dir + "/f"; + std::string dir_g = dir + "/g"; + + ASSERT_OK(env_->CreateDir(dir.c_str())); + // Check that the directory is empty. + ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_non_existent.c_str())); + ASSERT_TRUE(!env_->GetFileSize(dir_non_existent.c_str(), &file_size).ok()); + ASSERT_OK(env_->GetChildren(dir.c_str(), &children)); + ASSERT_EQ(0U, children.size()); + + // Create a file. + ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_)); + writable_file.reset(); + + // Check that the file exists. + ASSERT_OK(env_->FileExists(dir_f.c_str())); + ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size)); + ASSERT_EQ(0U, file_size); + ASSERT_OK(env_->GetChildren(dir.c_str(), &children)); + ASSERT_EQ(1U, children.size()); + ASSERT_EQ("f", children[0]); + + // Write to the file. + ASSERT_OK(env_->NewWritableFile(dir_f.c_str(), &writable_file, soptions_)); + ASSERT_OK(writable_file->Append("abc")); + writable_file.reset(); + + + // Check for expected size. + ASSERT_OK(env_->GetFileSize(dir_f.c_str(), &file_size)); + ASSERT_EQ(3U, file_size); + + + // Check that renaming works. + ASSERT_TRUE(!env_->RenameFile(dir_non_existent.c_str(), dir_g.c_str()).ok()); + ASSERT_OK(env_->RenameFile(dir_f.c_str(), dir_g.c_str())); + ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_f.c_str())); + ASSERT_OK(env_->FileExists(dir_g.c_str())); + ASSERT_OK(env_->GetFileSize(dir_g.c_str(), &file_size)); + ASSERT_EQ(3U, file_size); + + // Check that opening non-existent file fails. + unique_ptr seq_file; + unique_ptr rand_file; + ASSERT_TRUE( + !env_->NewSequentialFile(dir_non_existent.c_str(), &seq_file, soptions_).ok()); + ASSERT_TRUE(!seq_file); + ASSERT_TRUE(!env_->NewRandomAccessFile(dir_non_existent.c_str(), &rand_file, + soptions_).ok()); + ASSERT_TRUE(!rand_file); + + // Check that deleting works. + ASSERT_TRUE(!env_->DeleteFile(dir_non_existent.c_str()).ok()); + ASSERT_OK(env_->DeleteFile(dir_g.c_str())); + ASSERT_EQ(Status::NotFound(), env_->FileExists(dir_g.c_str())); + ASSERT_OK(env_->GetChildren(dir.c_str(), &children)); + ASSERT_EQ(0U, children.size()); + ASSERT_OK(env_->DeleteDir(dir.c_str())); + } +} + +TEST_F(EnvLibradosMutipoolTest, DBBasics) { + std::string kDBPath = "/tmp/DBBasics"; + std::string walPath = "/tmp/wal"; + DB* db; + Options options; + // Optimize RocksDB. This is the easiest way to get RocksDB to perform well + options.IncreaseParallelism(); + options.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options.create_if_missing = true; + options.env = env_; + options.wal_dir = walPath; + + // open DB + Status s = DB::Open(options, kDBPath, &db); + assert(s.ok()); + + // Put key-value + s = db->Put(WriteOptions(), "key1", "value"); + assert(s.ok()); + std::string value; + // get value + s = db->Get(ReadOptions(), "key1", &value); + assert(s.ok()); + assert(value == "value"); + + // atomically apply a set of updates + { + WriteBatch batch; + batch.Delete("key1"); + batch.Put("key2", value); + s = db->Write(WriteOptions(), &batch); + } + + s = db->Get(ReadOptions(), "key1", &value); + assert(s.IsNotFound()); + + db->Get(ReadOptions(), "key2", &value); + assert(value == "value"); + + delete db; +} + +TEST_F(EnvLibradosMutipoolTest, DBBulkLoadKeysInRandomOrder) { + char key[20] = {0}, value[20] = {0}; + int max_loop = 1 << 6; + int bulk_size = 1 << 15; + Timer timer(false); + std::cout << "Test size : loop(" << max_loop << "); bulk_size(" << bulk_size << ")" << std::endl; + /********************************** + use default env + ***********************************/ + std::string kDBPath1 = "/tmp/DBBulkLoadKeysInRandomOrder1"; + std::string walPath = "/tmp/wal"; + DB* db1; + Options options1; + // Optimize Rocksdb. This is the easiest way to get RocksDB to perform well + options1.IncreaseParallelism(); + options1.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options1.create_if_missing = true; + + // open DB + Status s1 = DB::Open(options1, kDBPath1, &db1); + assert(s1.ok()); + + rocksdb::Random64 r1(time(nullptr)); + + timer.Reset(); + for (int i = 0; i < max_loop; ++i) { + WriteBatch batch; + for (int j = 0; j < bulk_size; ++j) { + snprintf(key, + 20, + "%16lx", + (unsigned long)r1.Uniform(std::numeric_limits::max())); + snprintf(value, + 20, + "%16lx", + (unsigned long)r1.Uniform(std::numeric_limits::max())); + batch.Put(key, value); + } + s1 = db1->Write(WriteOptions(), &batch); + assert(s1.ok()); + } + std::cout << "Time by default : " << timer << "ms" << std::endl; + delete db1; + + /********************************** + use librados env + ***********************************/ + std::string kDBPath2 = "/tmp/DBBulkLoadKeysInRandomOrder2"; + DB* db2; + Options options2; + // Optimize RocksDB. This is the easiest way to get RocksDB to perform well + options2.IncreaseParallelism(); + options2.OptimizeLevelStyleCompaction(); + // create the DB if it's not already present + options2.create_if_missing = true; + options2.env = env_; + options2.wal_dir = walPath; + + // open DB + Status s2 = DB::Open(options2, kDBPath2, &db2); + if (!s2.ok()) { + std::cerr << s2.ToString() << std::endl; + } + assert(s2.ok()); + + rocksdb::Random64 r2(time(nullptr)); + + timer.Reset(); + for (int i = 0; i < max_loop; ++i) { + WriteBatch batch; + for (int j = 0; j < bulk_size; ++j) { + snprintf(key, + 20, + "%16lx", + (unsigned long)r2.Uniform(std::numeric_limits::max())); + snprintf(value, + 20, + "%16lx", + (unsigned long)r2.Uniform(std::numeric_limits::max())); + batch.Put(key, value); + } + s2 = db2->Write(WriteOptions(), &batch); + assert(s2.ok()); + } + std::cout << "Time by librados : " << timer << "ms" << std::endl; + delete db2; +} + +TEST_F(EnvLibradosMutipoolTest, DBTransactionDB) { + std::string kDBPath = "/tmp/DBTransactionDB"; + // open DB + Options options; + TransactionDBOptions txn_db_options; + options.create_if_missing = true; + options.env = env_; + TransactionDB* txn_db; + + Status s = TransactionDB::Open(options, txn_db_options, kDBPath, &txn_db); + assert(s.ok()); + + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + std::string value; + + //////////////////////////////////////////////////////// + // + // Simple OptimisticTransaction Example ("Read Committed") + // + //////////////////////////////////////////////////////// + + // Start a transaction + Transaction* txn = txn_db->BeginTransaction(write_options); + assert(txn); + + // Read a key in this transaction + s = txn->Get(read_options, "abc", &value); + assert(s.IsNotFound()); + + // Write a key in this transaction + s = txn->Put("abc", "def"); + assert(s.ok()); + + // Read a key OUTSIDE this transaction. Does not affect txn. + s = txn_db->Get(read_options, "abc", &value); + + // Write a key OUTSIDE of this transaction. + // Does not affect txn since this is an unrelated key. If we wrote key 'abc' + // here, the transaction would fail to commit. + s = txn_db->Put(write_options, "xyz", "zzz"); + + // Commit transaction + s = txn->Commit(); + assert(s.ok()); + delete txn; + + //////////////////////////////////////////////////////// + // + // "Repeatable Read" (Snapshot Isolation) Example + // -- Using a single Snapshot + // + //////////////////////////////////////////////////////// + + // Set a snapshot at start of transaction by setting set_snapshot=true + txn_options.set_snapshot = true; + txn = txn_db->BeginTransaction(write_options, txn_options); + + const Snapshot* snapshot = txn->GetSnapshot(); + + // Write a key OUTSIDE of transaction + s = txn_db->Put(write_options, "abc", "xyz"); + assert(s.ok()); + + // Attempt to read a key using the snapshot. This will fail since + // the previous write outside this txn conflicts with this read. + read_options.snapshot = snapshot; + s = txn->GetForUpdate(read_options, "abc", &value); + assert(s.IsBusy()); + + txn->Rollback(); + + delete txn; + // Clear snapshot from read options since it is no longer valid + read_options.snapshot = nullptr; + snapshot = nullptr; + + //////////////////////////////////////////////////////// + // + // "Read Committed" (Monotonic Atomic Views) Example + // --Using multiple Snapshots + // + //////////////////////////////////////////////////////// + + // In this example, we set the snapshot multiple times. This is probably + // only necessary if you have very strict isolation requirements to + // implement. + + // Set a snapshot at start of transaction + txn_options.set_snapshot = true; + txn = txn_db->BeginTransaction(write_options, txn_options); + + // Do some reads and writes to key "x" + read_options.snapshot = txn_db->GetSnapshot(); + s = txn->Get(read_options, "x", &value); + txn->Put("x", "x"); + + // Do a write outside of the transaction to key "y" + s = txn_db->Put(write_options, "y", "y"); + + // Set a new snapshot in the transaction + txn->SetSnapshot(); + txn->SetSavePoint(); + read_options.snapshot = txn_db->GetSnapshot(); + + // Do some reads and writes to key "y" + // Since the snapshot was advanced, the write done outside of the + // transaction does not conflict. + s = txn->GetForUpdate(read_options, "y", &value); + txn->Put("y", "y"); + + // Decide we want to revert the last write from this transaction. + txn->RollbackToSavePoint(); + + // Commit. + s = txn->Commit(); + assert(s.ok()); + delete txn; + // Clear snapshot from read options since it is no longer valid + read_options.snapshot = nullptr; + + // Cleanup + delete txn_db; + DestroyDB(kDBPath, options); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int argc, char** argv) { + fprintf(stderr, "SKIPPED as EnvMirror is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE