Fix ingested file and direcotry not being sync (#5435)

Summary:
It it not safe to assume application had sync the SST file before ingest it into DB. Also the directory to put the ingested file needs to be fsync, otherwise the file can be lost. For integrity of RocksDB we need to sync the ingested file and directory before apply the change to manifest.

Also syncing after writing global sequence when write_global_seqno=true was removed in https://github.com/facebook/rocksdb/issues/4172. Adding it back.

Fixes https://github.com/facebook/rocksdb/issues/5287.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5435

Test Plan:
Test ingest file with ldb command and observe fsync/fdatasync in strace output. Tried both move_files=true and move_files=false.
https://gist.github.com/yiwu-arbug/650a4023f57979056d83485fa863bef9

More test suggestions are welcome.

Differential Revision: D15941675

Pulled By: riversand963

fbshipit-source-id: 389533f3923065a96df2cdde23ff4724a1810d78
This commit is contained in:
Yi Wu 2019-06-21 10:12:29 -07:00 committed by Facebook Github Bot
parent 1bfeffab2d
commit 2730fe693e
9 changed files with 270 additions and 44 deletions

View File

@ -29,6 +29,7 @@
* Fix a bug in WAL replay of secondary instance by skipping write batches with older sequence numbers than the current last sequence number.
* Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level.
* Fix/improve memtable earliest sequence assignment and WAL replay so that WAL entries of unflushed column families will not be skipped after replaying the MANIFEST and increasing db sequence due to another flushed/compacted column family.
* Fix ingested file and directory not being fsync.
* Return TryAgain status in place of Corruption when new tail is not visible to TransactionLogIterator.
* Fix a bug caused by secondary not skipping the beginning of new MANIFEST.

View File

@ -861,16 +861,6 @@ Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
return ret_dir;
}
Directory* DBImpl::Directories::GetDataDir(size_t path_id) const {
assert(path_id < data_dirs_.size());
Directory* ret_dir = data_dirs_[path_id].get();
if (ret_dir == nullptr) {
// Should use db_dir_
return db_dir_.get();
}
return ret_dir;
}
Status DBImpl::SetOptions(
ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& options_map) {
@ -3644,7 +3634,7 @@ Status DBImpl::IngestExternalFiles(
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
ingestion_jobs.emplace_back(env_, versions_.get(), cfd,
immutable_db_options_, env_options_,
&snapshots_, arg.options);
&snapshots_, arg.options, &directories_);
}
std::vector<std::pair<bool, Status>> exec_results;
for (size_t i = 0; i != num_cfs; ++i) {

View File

@ -77,6 +77,38 @@ struct JobContext;
struct ExternalSstFileInfo;
struct MemTableInfo;
// Class to maintain directories for all database paths other than main one.
class Directories {
public:
Status SetDirectories(Env* env, const std::string& dbname,
const std::string& wal_dir,
const std::vector<DbPath>& data_paths);
Directory* GetDataDir(size_t path_id) const {
assert(path_id < data_dirs_.size());
Directory* ret_dir = data_dirs_[path_id].get();
if (ret_dir == nullptr) {
// Should use db_dir_
return db_dir_.get();
}
return ret_dir;
}
Directory* GetWalDir() {
if (wal_dir_) {
return wal_dir_.get();
}
return db_dir_.get();
}
Directory* GetDbDir() { return db_dir_.get(); }
private:
std::unique_ptr<Directory> db_dir_;
std::vector<std::unique_ptr<Directory>> data_dirs_;
std::unique_ptr<Directory> wal_dir_;
};
// While DB is the public interface of RocksDB, and DBImpl is the actual
// class implementing it. It's the entrance of the core RocksdB engine.
// All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a
@ -1047,30 +1079,6 @@ class DBImpl : public DB {
}
};
// Class to maintain directories for all database paths other than main one.
class Directories {
public:
Status SetDirectories(Env* env, const std::string& dbname,
const std::string& wal_dir,
const std::vector<DbPath>& data_paths);
Directory* GetDataDir(size_t path_id) const;
Directory* GetWalDir() {
if (wal_dir_) {
return wal_dir_.get();
}
return db_dir_.get();
}
Directory* GetDbDir() { return db_dir_.get(); }
private:
std::unique_ptr<Directory> db_dir_;
std::vector<std::unique_ptr<Directory>> data_dirs_;
std::unique_ptr<Directory> wal_dir_;
};
struct LogFileNumberSize {
explicit LogFileNumberSize(uint64_t _number) : number(_number) {}
void AddSize(uint64_t new_size) { size += new_size; }

View File

@ -265,9 +265,9 @@ Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname,
return env->NewDirectory(dirname, directory);
}
Status DBImpl::Directories::SetDirectories(
Env* env, const std::string& dbname, const std::string& wal_dir,
const std::vector<DbPath>& data_paths) {
Status Directories::SetDirectories(Env* env, const std::string& dbname,
const std::string& wal_dir,
const std::vector<DbPath>& data_paths) {
Status s = DBImpl::CreateAndNewDirectory(env, dbname, &db_dir_);
if (!s.ok()) {
return s;

View File

@ -9,6 +9,7 @@
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/sst_file_writer.h"
#include "test_util/fault_injection_test_env.h"
#include "test_util/testutil.h"
namespace rocksdb {
@ -20,6 +21,7 @@ class ExternalSSTFileBasicTest
public:
ExternalSSTFileBasicTest() : DBTestBase("/external_sst_file_basic_test") {
sst_files_dir_ = dbname_ + "/sst_files/";
fault_injection_test_env_.reset(new FaultInjectionTestEnv(Env::Default()));
DestroyAndRecreateExternalSSTFilesDir();
}
@ -140,6 +142,7 @@ class ExternalSSTFileBasicTest
protected:
std::string sst_files_dir_;
std::unique_ptr<FaultInjectionTestEnv> fault_injection_test_env_;
};
TEST_F(ExternalSSTFileBasicTest, Basic) {
@ -689,6 +692,59 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(ExternalSSTFileBasicTest, SyncFailure) {
Options options;
options.create_if_missing = true;
options.env = fault_injection_test_env_.get();
std::vector<std::pair<std::string, std::string>> test_cases = {
{"ExternalSstFileIngestionJob::BeforeSyncIngestedFile",
"ExternalSstFileIngestionJob::AfterSyncIngestedFile"},
{"ExternalSstFileIngestionJob::BeforeSyncDir",
"ExternalSstFileIngestionJob::AfterSyncDir"},
{"ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno",
"ExternalSstFileIngestionJob::AfterSyncGlobalSeqno"}};
for (size_t i = 0; i < test_cases.size(); i++) {
SyncPoint::GetInstance()->SetCallBack(test_cases[i].first, [&](void*) {
fault_injection_test_env_->SetFilesystemActive(false);
});
SyncPoint::GetInstance()->SetCallBack(test_cases[i].second, [&](void*) {
fault_injection_test_env_->SetFilesystemActive(true);
});
SyncPoint::GetInstance()->EnableProcessing();
DestroyAndReopen(options);
if (i == 2) {
ASSERT_OK(Put("foo", "v1"));
}
Options sst_file_writer_options;
std::unique_ptr<SstFileWriter> sst_file_writer(
new SstFileWriter(EnvOptions(), sst_file_writer_options));
std::string file_name =
sst_files_dir_ + "sync_failure_test_" + ToString(i) + ".sst";
ASSERT_OK(sst_file_writer->Open(file_name));
ASSERT_OK(sst_file_writer->Put("bar", "v2"));
ASSERT_OK(sst_file_writer->Finish());
IngestExternalFileOptions ingest_opt;
if (i == 0) {
ingest_opt.move_files = true;
}
const Snapshot* snapshot = db_->GetSnapshot();
if (i == 2) {
ingest_opt.write_global_seqno = true;
}
ASSERT_FALSE(db_->IngestExternalFile({file_name}, ingest_opt).ok());
db_->ReleaseSnapshot(snapshot);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Destroy(options);
}
}
TEST_P(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) {
int kNumLevels = 7;
Options options = CurrentOptions();

View File

@ -7,11 +7,13 @@
#include "db/external_sst_file_ingestion_job.h"
#include <cinttypes>
#include <algorithm>
#include <cinttypes>
#include <string>
#include <unordered_set>
#include <vector>
#include "db/db_impl/db_impl.h"
#include "db/version_edit.h"
#include "file/file_util.h"
#include "table/merging_iterator.h"
@ -86,6 +88,7 @@ Status ExternalSstFileIngestionJob::Prepare(
}
// Copy/Move external files into DB
std::unordered_set<size_t> ingestion_path_ids;
for (IngestedFileInfo& f : files_to_ingest_) {
f.fd = FileDescriptor(next_file_number++, 0, f.file_size);
f.copy_file = false;
@ -95,8 +98,26 @@ Status ExternalSstFileIngestionJob::Prepare(
f.fd.GetPathId());
if (ingestion_options_.move_files) {
status = env_->LinkFile(path_outside_db, path_inside_db);
if (status.IsNotSupported() &&
ingestion_options_.failed_move_fall_back_to_copy) {
if (status.ok()) {
// It is unsafe to assume application had sync the file and file
// directory before ingest the file. For integrity of RocksDB we need
// to sync the file.
std::unique_ptr<WritableFile> file_to_sync;
status = env_->ReopenWritableFile(path_inside_db, &file_to_sync,
env_options_);
if (status.ok()) {
TEST_SYNC_POINT(
"ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
status = SyncIngestedFile(file_to_sync.get());
TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncIngestedFile");
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to sync ingested file %s: %s",
path_inside_db.c_str(), status.ToString().c_str());
}
}
} else if (status.IsNotSupported() &&
ingestion_options_.failed_move_fall_back_to_copy) {
// Original file is on a different FS, use copy instead of hard linking.
f.copy_file = true;
}
@ -107,6 +128,7 @@ Status ExternalSstFileIngestionJob::Prepare(
if (f.copy_file) {
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
nullptr);
// CopyFile also sync the new file.
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
db_options_.use_fsync);
}
@ -115,8 +137,25 @@ Status ExternalSstFileIngestionJob::Prepare(
break;
}
f.internal_file_path = path_inside_db;
ingestion_path_ids.insert(f.fd.GetPathId());
}
TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
if (status.ok()) {
for (auto path_id : ingestion_path_ids) {
status = directories_->GetDataDir(path_id)->Fsync();
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to sync directory %" ROCKSDB_PRIszt
" while ingest file: %s",
path_id, status.ToString().c_str());
break;
}
}
}
TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir");
// TODO: The following is duplicated with Cleanup().
if (!status.ok()) {
// We failed, remove all files that we copied into the db
for (IngestedFileInfo& f : files_to_ingest_) {
@ -559,6 +598,18 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
std::string seqno_val;
PutFixed64(&seqno_val, seqno);
status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val);
if (status.ok()) {
TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno");
status = SyncIngestedFile(rwfile.get());
TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno");
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to sync ingested file %s after writing global "
"sequence number: %s",
file_to_ingest->internal_file_path.c_str(),
status.ToString().c_str());
}
}
if (!status.ok()) {
return status;
}
@ -599,6 +650,16 @@ bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
return true;
}
template <typename TWritableFile>
Status ExternalSstFileIngestionJob::SyncIngestedFile(TWritableFile* file) {
assert(file != nullptr);
if (db_options_.use_fsync) {
return file->Fsync();
} else {
return file->Sync();
}
}
} // namespace rocksdb
#endif // !ROCKSDB_LITE

View File

@ -20,6 +20,8 @@
namespace rocksdb {
class Directories;
struct IngestedFileInfo {
// External file path
std::string external_file_path;
@ -77,7 +79,8 @@ class ExternalSstFileIngestionJob {
Env* env, VersionSet* versions, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options, const EnvOptions& env_options,
SnapshotList* db_snapshots,
const IngestExternalFileOptions& ingestion_options)
const IngestExternalFileOptions& ingestion_options,
Directories* directories)
: env_(env),
versions_(versions),
cfd_(cfd),
@ -85,8 +88,11 @@ class ExternalSstFileIngestionJob {
env_options_(env_options),
db_snapshots_(db_snapshots),
ingestion_options_(ingestion_options),
directories_(directories),
job_start_time_(env_->NowMicros()),
consumed_seqno_(false) {}
consumed_seqno_(false) {
assert(directories != nullptr);
}
// Prepare the job by copying external files into the DB.
Status Prepare(const std::vector<std::string>& external_files_paths,
@ -153,6 +159,10 @@ class ExternalSstFileIngestionJob {
bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest,
int level);
// Helper method to sync given file.
template <typename TWritableFile>
Status SyncIngestedFile(TWritableFile* file);
Env* env_;
VersionSet* versions_;
ColumnFamilyData* cfd_;
@ -161,6 +171,7 @@ class ExternalSstFileIngestionJob {
SnapshotList* db_snapshots_;
autovector<IngestedFileInfo> files_to_ingest_;
const IngestExternalFileOptions& ingestion_options_;
Directories* directories_;
VersionEdit edit_;
uint64_t job_start_time_;
bool consumed_seqno_;

View File

@ -98,6 +98,9 @@ Status FileState::DropRandomUnsyncedData(Env* env, Random* rand) const {
}
Status TestDirectory::Fsync() {
if (!env_->IsFilesystemActive()) {
return env_->GetError();
}
env_->SyncDir(dirname_);
return dir_->Fsync();
}
@ -158,6 +161,53 @@ Status TestWritableFile::Sync() {
return Status::OK();
}
TestRandomRWFile::TestRandomRWFile(const std::string& /*fname*/,
std::unique_ptr<RandomRWFile>&& f,
FaultInjectionTestEnv* env)
: target_(std::move(f)), file_opened_(true), env_(env) {
assert(target_ != nullptr);
}
TestRandomRWFile::~TestRandomRWFile() {
if (file_opened_) {
Close();
}
}
Status TestRandomRWFile::Write(uint64_t offset, const Slice& data) {
if (!env_->IsFilesystemActive()) {
return env_->GetError();
}
return target_->Write(offset, data);
}
Status TestRandomRWFile::Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
if (!env_->IsFilesystemActive()) {
return env_->GetError();
}
return target_->Read(offset, n, result, scratch);
}
Status TestRandomRWFile::Close() {
file_opened_ = false;
return target_->Close();
}
Status TestRandomRWFile::Flush() {
if (!env_->IsFilesystemActive()) {
return env_->GetError();
}
return target_->Flush();
}
Status TestRandomRWFile::Sync() {
if (!env_->IsFilesystemActive()) {
return env_->GetError();
}
return target_->Sync();
}
Status FaultInjectionTestEnv::NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) {
std::unique_ptr<Directory> r;
@ -220,6 +270,27 @@ Status FaultInjectionTestEnv::ReopenWritableFile(
return s;
}
Status FaultInjectionTestEnv::NewRandomRWFile(
const std::string& fname, std::unique_ptr<RandomRWFile>* result,
const EnvOptions& soptions) {
if (!IsFilesystemActive()) {
return GetError();
}
Status s = target()->NewRandomRWFile(fname, result, soptions);
if (s.ok()) {
result->reset(new TestRandomRWFile(fname, std::move(*result), this));
// WritableFileWriter* file is opened
// again then it will be truncated - so forget our saved state.
UntrackFile(fname);
MutexLock l(&mutex_);
open_files_.insert(fname);
auto dir_and_name = GetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list.insert(dir_and_name.second);
}
return s;
}
Status FaultInjectionTestEnv::NewRandomAccessFile(
const std::string& fname, std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) {
@ -238,7 +309,6 @@ Status FaultInjectionTestEnv::DeleteFile(const std::string& f) {
fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(),
s.ToString().c_str());
}
assert(s.ok());
if (s.ok()) {
UntrackFile(f);
}

View File

@ -82,6 +82,31 @@ class TestWritableFile : public WritableFile {
FaultInjectionTestEnv* env_;
};
// A wrapper around WritableFileWriter* file
// is written to or sync'ed.
class TestRandomRWFile : public RandomRWFile {
public:
explicit TestRandomRWFile(const std::string& fname,
std::unique_ptr<RandomRWFile>&& f,
FaultInjectionTestEnv* env);
virtual ~TestRandomRWFile();
Status Write(uint64_t offset, const Slice& data) override;
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
Status Close() override;
Status Flush() override;
Status Sync() override;
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
bool use_direct_io() const override { return target_->use_direct_io(); };
private:
std::unique_ptr<RandomRWFile> target_;
bool file_opened_;
FaultInjectionTestEnv* env_;
};
class TestDirectory : public Directory {
public:
explicit TestDirectory(FaultInjectionTestEnv* env, std::string dirname,
@ -114,6 +139,10 @@ class FaultInjectionTestEnv : public EnvWrapper {
std::unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override;
Status NewRandomRWFile(const std::string& fname,
std::unique_ptr<RandomRWFile>* result,
const EnvOptions& soptions) override;
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) override;