Ability to take a file-lvel snapshot from leveldb.

Summary:
A set of apis that allows an application to backup data from the
leveldb database based on a set of files.

Test Plan: unint test attached. more coming soon.

Reviewers: heyongqiang

Reviewed By: heyongqiang

Differential Revision: https://reviews.facebook.net/D5439
This commit is contained in:
Dhruba Borthakur 2012-09-14 17:11:35 -07:00
parent b85cdca690
commit ba55d77b5d
7 changed files with 167 additions and 1 deletions

61
db/db_filesnapshot.cc Normal file
View file

@ -0,0 +1,61 @@
// Copyright (c) 2012 Facebook.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "db/db_impl.h"
#include "db/filename.h"
#include <string>
#include <stdint.h>
#include "db/version_set.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "port/port.h"
#include "util/mutexlock.h"
namespace leveldb {
Status DBImpl::DisableFileDeletions() {
MutexLock l(&mutex_);
disable_delete_obsolete_files_ = true;
return Status::OK();
}
Status DBImpl::EnableFileDeletions() {
MutexLock l(&mutex_);
disable_delete_obsolete_files_ = false;
return Status::OK();
}
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret) {
// flush all dirty data to disk.
Status status = Flush(FlushOptions());
if (!status.ok()) {
Log(options_.info_log, "Cannot Flush data %s\n",
status.ToString().c_str());
return status;
}
MutexLock l(&mutex_);
// Make a set of all of the live *.sst files
std::set<uint64_t> live;
versions_->AddLiveFilesCurrentVersion(&live);
ret.resize(live.size() + 2); //*.sst + CURRENT + MANIFEST
// create names of the live files. The names are not absolute
// paths, instead they are relative to dbname_;
std::set<uint64_t>::iterator it = live.begin();
for (unsigned int i = 0; i < live.size(); i++, it++) {
ret[i] = TableFileName("", *it);
}
ret[live.size()] = CurrentFileName("");
ret[live.size()+1] = DescriptorFileName("",
versions_->ManifestFileNumber());
return Status::OK();
}
}

View file

@ -140,7 +140,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
bg_compaction_scheduled_(false),
bg_logstats_scheduled_(false),
manual_compaction_(NULL),
logger_(NULL) {
logger_(NULL),
disable_delete_obsolete_files_(false) {
mem_->Ref();
has_imm_.Release_Store(NULL);
@ -244,6 +245,11 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
}
void DBImpl::DeleteObsoleteFiles() {
// if deletion is disabled, do nothing
if (disable_delete_obsolete_files_) {
return;
}
// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);

View file

@ -49,6 +49,9 @@ class DBImpl : public DB {
virtual int MaxMemCompactionLevel();
virtual int Level0StopWriteTrigger();
virtual Status Flush(const FlushOptions& options);
virtual Status DisableFileDeletions();
virtual Status EnableFileDeletions();
virtual Status GetLiveFiles(std::vector<std::string>&);
// Extra methods (for testing) that are not in the public DB interface
@ -190,6 +193,9 @@ class DBImpl : public DB {
int64_t volatile last_log_ts;
// shall we disable deletion of obsolete files
bool disable_delete_obsolete_files_;
// Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level".
struct CompactionStats {

View file

@ -1685,6 +1685,62 @@ TEST(DBTest, BloomFilter) {
delete options.filter_policy;
}
TEST(DBTest, SnapshotFiles) {
Options options = CurrentOptions();
options.write_buffer_size = 100000000; // Large write buffer
Reopen(&options);
Random rnd(301);
// Write 8MB (80 values, each 100K)
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
std::vector<std::string> values;
for (int i = 0; i < 80; i++) {
values.push_back(RandomString(&rnd, 100000));
ASSERT_OK(Put(Key(i), values[i]));
}
// assert that nothing makes it to disk yet.
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
// get a file snapshot
std::vector<std::string> files;
dbfull()->DisableFileDeletions();
dbfull()->GetLiveFiles(files);
// CURRENT, MANIFEST, *.sst files
ASSERT_EQ(files.size(), 3);
// copy these files to a new snapshot directory
std::string snapdir = dbname_ + ".snapdir/";
std::string mkdir = "mkdir -p " + snapdir;
ASSERT_EQ(system(mkdir.c_str()), 0);
for (int i = 0; i < files.size(); i++) {
std::string src = dbname_ + "/" + files[i];
std::string dest = snapdir + "/" + files[i];
std::string cmd = "cp " + src + " " + dest;
ASSERT_EQ(system(cmd.c_str()), 0);
}
// release file snapshot
dbfull()->DisableFileDeletions();
// verify that data in the snapshot are correct
Options opts;
DB* snapdb;
opts.create_if_missing = false;
Status stat = DB::Open(opts, snapdir, &snapdb);
ASSERT_TRUE(stat.ok());
ReadOptions roptions;
std::string val;
for (int i = 0; i < 80; i++) {
stat = snapdb->Get(roptions, Key(i), &val);
ASSERT_EQ(values[i].compare(val), 0);
}
delete snapdb;
}
// Multi-threaded test:
namespace {
@ -1874,6 +1930,16 @@ class ModelDB: public DB {
return ret;
}
virtual Status DisableFileDeletions() {
return Status::OK();
}
virtual Status EnableFileDeletions() {
return Status::OK();
}
virtual Status GetLiveFiles(std::vector<std::string>&) {
return Status::OK();
}
private:
class ModelIter: public Iterator {
public:

View file

@ -1220,6 +1220,16 @@ void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
}
}
void VersionSet::AddLiveFilesCurrentVersion(std::set<uint64_t>* live) {
Version* v = current_;
for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
live->insert(files[i]->number);
}
}
}
int64_t VersionSet::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < NumberLevels());

View file

@ -236,6 +236,9 @@ class VersionSet {
// May also mutate some internal state.
void AddLiveFiles(std::set<uint64_t>* live);
// Add all files listed in the current version to *live.
void AddLiveFilesCurrentVersion(std::set<uint64_t>* live);
// Return the approximate offset in the database of the data for
// "key" as of version "v".
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);

View file

@ -7,6 +7,7 @@
#include <stdint.h>
#include <stdio.h>
#include <vector>
#include "leveldb/iterator.h"
#include "leveldb/options.h"
@ -154,6 +155,19 @@ class DB {
// Flush all mem-table data.
virtual Status Flush(const FlushOptions& options) = 0;
// Prevent file deletions. Compactions will continue to occur,
// but no obsolete files will be deleted. Calling this multiple
// times have the same effect as calling it once.
virtual Status DisableFileDeletions() = 0;
// Allow compactions to delete obselete files.
virtual Status EnableFileDeletions() = 0;
// Retrieve the list of all files in the database. The files are
// related to the dbname and are not absolute paths. This list
// can be used to generate a backup.
virtual Status GetLiveFiles(std::vector<std::string>&) = 0;
private:
// No copying allowed
DB(const DB&);