diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index e17c9a0fd8..057f77ec53 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -518,8 +518,8 @@ if test "$USE_HDFS"; then echo "JAVA_HOME has to be set for HDFS usage." exit 1 fi - HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS" - HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64" + HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS -I$HADOOP_HOME/include" + HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64 -L$HADOOP_HOME/lib/native" HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib" HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm" COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS" diff --git a/env/env_hdfs.cc b/env/env_hdfs.cc index 7c0e14fe23..5acf9301c6 100644 --- a/env/env_hdfs.cc +++ b/env/env_hdfs.cc @@ -11,13 +11,14 @@ #ifndef ROCKSDB_HDFS_FILE_C #define ROCKSDB_HDFS_FILE_C -#include #include #include #include +#include #include #include #include "rocksdb/status.h" +#include "util/logging.h" #include "util/string_util.h" #define HDFS_EXISTS 0 @@ -224,7 +225,7 @@ class HdfsWritableFile: public WritableFile { filename_.c_str()); const char* src = data.data(); size_t left = data.size(); - size_t ret = hdfsWrite(fileSys_, hfile_, src, left); + size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast(left)); ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n", filename_.c_str()); if (ret != left) { @@ -254,7 +255,8 @@ class HdfsWritableFile: public WritableFile { // This is used by HdfsLogger to write data to the debug log file virtual Status Append(const char* src, size_t size) { - if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) { + if (hdfsWrite(fileSys_, hfile_, src, static_cast(size)) != + static_cast(size)) { return IOError(filename_, errno); } return Status::OK(); @@ -282,11 +284,10 @@ class HdfsLogger : public Logger { Status HdfsCloseHelper() { ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n", file_->getName().c_str()); - Status s = file_->Close(); if (mylog != nullptr && mylog == this) { mylog = nullptr; } - return s; + return Status::OK(); } protected: @@ -299,14 +300,15 @@ class HdfsLogger : public Logger { file_->getName().c_str()); } - virtual ~HdfsLogger() { + ~HdfsLogger() override { if (!closed_) { closed_ = true; HdfsCloseHelper(); } } - virtual void Logv(const char* format, va_list ap) { + using Logger::Logv; + void Logv(const char* format, va_list ap) override { const uint64_t thread_id = (*gettid_)(); // We try twice: the first time with a fixed-size stack allocated buffer, @@ -384,7 +386,7 @@ const std::string HdfsEnv::pathsep = "/"; // open a file for sequential reading Status HdfsEnv::NewSequentialFile(const std::string& fname, std::unique_ptr* result, - const EnvOptions& options) { + const EnvOptions& /*options*/) { result->reset(); HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); if (f == nullptr || !f->isValid()) { @@ -399,7 +401,7 @@ Status HdfsEnv::NewSequentialFile(const std::string& fname, // open a file for random reading Status HdfsEnv::NewRandomAccessFile(const std::string& fname, std::unique_ptr* result, - const EnvOptions& options) { + const EnvOptions& /*options*/) { result->reset(); HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); if (f == nullptr || !f->isValid()) { @@ -414,7 +416,7 @@ Status HdfsEnv::NewRandomAccessFile(const std::string& fname, // create a new file for writing Status HdfsEnv::NewWritableFile(const std::string& fname, std::unique_ptr* result, - const EnvOptions& options) { + const EnvOptions& /*options*/) { result->reset(); Status s; HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); @@ -432,7 +434,9 @@ class HdfsDirectory : public Directory { explicit HdfsDirectory(int fd) : fd_(fd) {} ~HdfsDirectory() {} - virtual Status Fsync() { return Status::OK(); } + Status Fsync() override { return Status::OK(); } + + int GetFd() const { return fd_; } private: int fd_; @@ -477,10 +481,10 @@ Status HdfsEnv::GetChildren(const std::string& path, pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries); if (numEntries >= 0) { for(int i = 0; i < numEntries; i++) { - char* pathname = pHdfsFileInfo[i].mName; - char* filename = std::rindex(pathname, '/'); - if (filename != nullptr) { - result->push_back(filename+1); + std::string pathname(pHdfsFileInfo[i].mName); + size_t pos = pathname.rfind("/"); + if (std::string::npos != pos) { + result->push_back(pathname.substr(pos + 1)); } } if (pHdfsFileInfo != nullptr) { @@ -571,16 +575,14 @@ Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) { return IOError(src, errno); } -Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) { +Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) { // there isn's a very good way to atomically check and create // a file via libhdfs *lock = nullptr; return Status::OK(); } -Status HdfsEnv::UnlockFile(FileLock* lock) { - return Status::OK(); -} +Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); } Status HdfsEnv::NewLogger(const std::string& fname, std::shared_ptr* result) { diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index a77c42e0af..903e32ef92 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -54,110 +54,109 @@ class HdfsEnv : public Env { hdfsDisconnect(fileSys_); } - virtual Status NewSequentialFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options); + Status NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; - virtual Status NewRandomAccessFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options); + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; - virtual Status NewWritableFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options); + Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; - virtual Status NewDirectory(const std::string& name, - std::unique_ptr* result); + Status NewDirectory(const std::string& name, + std::unique_ptr* result) override; - virtual Status FileExists(const std::string& fname); + Status FileExists(const std::string& fname) override; - virtual Status GetChildren(const std::string& path, - std::vector* result); + Status GetChildren(const std::string& path, + std::vector* result) override; - virtual Status DeleteFile(const std::string& fname); + Status DeleteFile(const std::string& fname) override; - virtual Status CreateDir(const std::string& name); + Status CreateDir(const std::string& name) override; - virtual Status CreateDirIfMissing(const std::string& name); + Status CreateDirIfMissing(const std::string& name) override; - virtual Status DeleteDir(const std::string& name); + Status DeleteDir(const std::string& name) override; - virtual Status GetFileSize(const std::string& fname, uint64_t* size); + Status GetFileSize(const std::string& fname, uint64_t* size) override; - virtual Status GetFileModificationTime(const std::string& fname, - uint64_t* file_mtime); + Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) override; - virtual Status RenameFile(const std::string& src, const std::string& target); + Status RenameFile(const std::string& src, const std::string& target) override; - virtual Status LinkFile(const std::string& src, const std::string& target) { + Status LinkFile(const std::string& /*src*/, + const std::string& /*target*/) override { return Status::NotSupported(); // not supported } - virtual Status LockFile(const std::string& fname, FileLock** lock); + Status LockFile(const std::string& fname, FileLock** lock) override; - virtual Status UnlockFile(FileLock* lock); + Status UnlockFile(FileLock* lock) override; - virtual Status NewLogger(const std::string& fname, - std::shared_ptr* result); + Status NewLogger(const std::string& fname, + std::shared_ptr* result) override; - virtual void Schedule(void (*function)(void* arg), void* arg, - Priority pri = LOW, void* tag = nullptr, void (*unschedFunction)(void* arg) = 0) { + void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW, + void* tag = nullptr, + void (*unschedFunction)(void* arg) = 0) override { posixEnv->Schedule(function, arg, pri, tag, unschedFunction); } - virtual int UnSchedule(void* tag, Priority pri) { + int UnSchedule(void* tag, Priority pri) override { return posixEnv->UnSchedule(tag, pri); } - virtual void StartThread(void (*function)(void* arg), void* arg) { + void StartThread(void (*function)(void* arg), void* arg) override { posixEnv->StartThread(function, arg); } - virtual void WaitForJoin() { posixEnv->WaitForJoin(); } + void WaitForJoin() override { posixEnv->WaitForJoin(); } - virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const - override { + unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override { return posixEnv->GetThreadPoolQueueLen(pri); } - virtual Status GetTestDirectory(std::string* path) { + Status GetTestDirectory(std::string* path) override { return posixEnv->GetTestDirectory(path); } - virtual uint64_t NowMicros() { - return posixEnv->NowMicros(); - } + uint64_t NowMicros() override { return posixEnv->NowMicros(); } - virtual void SleepForMicroseconds(int micros) { + void SleepForMicroseconds(int micros) override { posixEnv->SleepForMicroseconds(micros); } - virtual Status GetHostName(char* name, uint64_t len) { + Status GetHostName(char* name, uint64_t len) override { return posixEnv->GetHostName(name, len); } - virtual Status GetCurrentTime(int64_t* unix_time) { + Status GetCurrentTime(int64_t* unix_time) override { return posixEnv->GetCurrentTime(unix_time); } - virtual Status GetAbsolutePath(const std::string& db_path, - std::string* output_path) { + Status GetAbsolutePath(const std::string& db_path, + std::string* output_path) override { return posixEnv->GetAbsolutePath(db_path, output_path); } - virtual void SetBackgroundThreads(int number, Priority pri = LOW) { + void SetBackgroundThreads(int number, Priority pri = LOW) override { posixEnv->SetBackgroundThreads(number, pri); } - virtual int GetBackgroundThreads(Priority pri = LOW) { + int GetBackgroundThreads(Priority pri = LOW) override { return posixEnv->GetBackgroundThreads(pri); } - virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { + void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { posixEnv->IncBackgroundThreadsIfNeeded(number, pri); } - virtual std::string TimeToString(uint64_t number) { + std::string TimeToString(uint64_t number) override { return posixEnv->TimeToString(number); } @@ -166,9 +165,7 @@ class HdfsEnv : public Env { return (uint64_t)pthread_self(); } - virtual uint64_t GetThreadID() const override { - return HdfsEnv::gettid(); - } + uint64_t GetThreadID() const override { return HdfsEnv::gettid(); } private: std::string fsname_; // string of the form "hdfs://hostname:port/" @@ -206,7 +203,7 @@ class HdfsEnv : public Env { std::string host(parts[0]); std::string remaining(parts[1]); - int rem = remaining.find(pathsep); + int rem = static_cast(remaining.find(pathsep)); std::string portStr = (rem == 0 ? remaining : remaining.substr(0, rem)); diff --git a/hdfs/setup.sh b/hdfs/setup.sh old mode 100644 new mode 100755 index f071b7e314..ba76ec2090 --- a/hdfs/setup.sh +++ b/hdfs/setup.sh @@ -1,8 +1,8 @@ # shellcheck disable=SC2148 export USE_HDFS=1 -export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:/usr/lib/hadoop/lib/native +export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:$HADOOP_HOME/lib/native -export CLASSPATH= +export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob` for f in `find /usr/lib/hadoop-hdfs | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done for f in `find /usr/lib/hadoop | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done for f in `find /usr/lib/hadoop/client | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 1d079a7c7f..7f8c4b53f7 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1403,7 +1403,14 @@ class StressTest { FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]); } } - DestroyDB(FLAGS_db, Options()); + Options options; + options.env = FLAGS_env; + Status s = DestroyDB(FLAGS_db, options); + if (!s.ok()) { + fprintf(stderr, "Cannot destroy original db: %s\n", + s.ToString().c_str()); + exit(1); + } } }