diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index e0b1b54712..e954e70071 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -114,11 +114,83 @@ const std::string LDBCommand::ARG_READ_TIMESTAMP = "read_timestamp"; const char* LDBCommand::DELIM = " ==> "; namespace { +// Helper class to iterate WAL logs in a directory in chronological order. +class WALFileIterator { + public: + explicit WALFileIterator(const std::string& parent_dir, + const std::vector& filenames); + // REQUIRES Valid() == true + std::string GetNextWAL(); + bool Valid() const { return wal_file_iter_ != log_files_.end(); } -void DumpWalFile(Options options, std::string wal_file, bool print_header, - bool print_values, bool is_write_committed, + private: + // WAL log file names(s) + std::string parent_dir_; + std::vector log_files_; + std::vector::const_iterator wal_file_iter_; +}; + +WALFileIterator::WALFileIterator(const std::string& parent_dir, + const std::vector& filenames) + : parent_dir_(parent_dir) { + // populate wal logs + assert(!filenames.empty()); + for (const auto& fname : filenames) { + uint64_t file_num = 0; + FileType file_type; + bool parse_ok = ParseFileName(fname, &file_num, &file_type); + if (parse_ok && file_type == kWalFile) { + log_files_.push_back(fname); + } + } + + std::sort(log_files_.begin(), log_files_.end(), + [](const std::string& lhs, const std::string& rhs) { + uint64_t num1 = 0; + uint64_t num2 = 0; + FileType type1; + FileType type2; + bool parse_ok1 = ParseFileName(lhs, &num1, &type1); + bool parse_ok2 = ParseFileName(rhs, &num2, &type2); +#ifndef NDEBUG + assert(parse_ok1); + assert(parse_ok2); +#else + (void)parse_ok1; + (void)parse_ok2; +#endif + return num1 < num2; + }); + wal_file_iter_ = log_files_.begin(); +} + +std::string WALFileIterator::GetNextWAL() { + assert(Valid()); + std::string ret; + if (wal_file_iter_ != log_files_.end()) { + ret.assign(parent_dir_); + if (ret.back() != kFilePathSeparator) { + ret.push_back(kFilePathSeparator); + } + ret.append(*wal_file_iter_); + ++wal_file_iter_; + } + return ret; +} + +void DumpWalFiles(Options options, const std::string& dir_or_file, + bool print_header, bool print_values, + bool only_print_seqno_gaps, bool is_write_committed, + const std::map& ucmps, + LDBCommandExecuteResult* exec_state); + +void DumpWalFile(Options options, const std::string& wal_file, + bool print_header, bool print_values, + bool only_print_seqno_gaps, bool is_write_committed, const std::map& ucmps, - LDBCommandExecuteResult* exec_state); + LDBCommandExecuteResult* exec_state, + std::optional* prev_batch_seqno, + std::optional* prev_batch_count); void DumpSstFile(Options options, std::string filename, bool output_hex, bool show_properties, bool decode_blob_index, @@ -2213,9 +2285,10 @@ void DBDumperCommand::DoCommand() { switch (type) { case kWalFile: // TODO(myabandeh): allow configuring is_write_commited - DumpWalFile(options_, path_, /* print_header_ */ true, - /* print_values_ */ true, true /* is_write_commited */, - ucmps_, &exec_state_); + DumpWalFiles(options_, path_, /* print_header_ */ true, + /* print_values_ */ true, + /* only_print_seqno_gaps */ false, + true /* is_write_commited */, ucmps_, &exec_state_); break; case kTableFile: DumpSstFile(options_, path_, is_key_hex_, /* show_properties */ true, @@ -2842,10 +2915,70 @@ class InMemoryHandler : public WriteBatch::Handler { const std::map ucmps_; }; -void DumpWalFile(Options options, std::string wal_file, bool print_header, - bool print_values, bool is_write_committed, +void DumpWalFiles(Options options, const std::string& dir_or_file, + bool print_header, bool print_values, + bool only_print_seqno_gaps, bool is_write_committed, + const std::map& ucmps, + LDBCommandExecuteResult* exec_state) { + std::vector filenames; + ROCKSDB_NAMESPACE::Env* env = options.env; + ROCKSDB_NAMESPACE::Status st = env->GetChildren(dir_or_file, &filenames); + std::optional prev_batch_seqno; + std::optional prev_batch_count; + if (!st.ok() || filenames.empty()) { + // dir_or_file does not exist or does not contain children + // Check its existence first + Status s = env->FileExists(dir_or_file); + // dir_or_file does not exist + if (!s.ok()) { + if (exec_state) { + *exec_state = LDBCommandExecuteResult::Failed( + dir_or_file + ": No such file or directory"); + } + return; + } + // If it exists and doesn't have children, it should be a log file. + if (dir_or_file.length() <= 4 || + dir_or_file.rfind(".log") != dir_or_file.length() - 4) { + if (exec_state) { + *exec_state = LDBCommandExecuteResult::Failed( + dir_or_file + ": Invalid log file name"); + } + return; + } + DumpWalFile(options, dir_or_file, print_header, print_values, + only_print_seqno_gaps, is_write_committed, ucmps, exec_state, + &prev_batch_seqno, &prev_batch_count); + } else { + WALFileIterator wal_file_iter(dir_or_file, filenames); + if (!wal_file_iter.Valid()) { + if (exec_state) { + *exec_state = LDBCommandExecuteResult::Failed( + dir_or_file + ": No valid wal logs found"); + } + return; + } + std::string wal_file = wal_file_iter.GetNextWAL(); + while (!wal_file.empty()) { + std::cout << "Checking wal file: " << wal_file << std::endl; + DumpWalFile(options, wal_file, print_header, print_values, + only_print_seqno_gaps, is_write_committed, ucmps, exec_state, + &prev_batch_seqno, &prev_batch_count); + if (exec_state->IsFailed() || !wal_file_iter.Valid()) { + return; + } + wal_file = wal_file_iter.GetNextWAL(); + } + } +} + +void DumpWalFile(Options options, const std::string& wal_file, + bool print_header, bool print_values, + bool only_print_seqno_gaps, bool is_write_committed, const std::map& ucmps, - LDBCommandExecuteResult* exec_state) { + LDBCommandExecuteResult* exec_state, + std::optional* prev_batch_seqno, + std::optional* prev_batch_count) { const auto& fs = options.env->GetFileSystem(); FileOptions soptions(options); std::unique_ptr wal_file_reader; @@ -2948,8 +3081,32 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header, break; } } - row << WriteBatchInternal::Sequence(&batch) << ","; - row << WriteBatchInternal::Count(&batch) << ","; + SequenceNumber sequence_number = WriteBatchInternal::Sequence(&batch); + uint32_t batch_count = WriteBatchInternal::Count(&batch); + assert(prev_batch_seqno); + assert(prev_batch_count); + assert(prev_batch_seqno->has_value() == prev_batch_count->has_value()); + // TODO(yuzhangyu): handle pessimistic transactions case. + if (only_print_seqno_gaps) { + if (!prev_batch_seqno->has_value() || + !prev_batch_count->has_value() || + prev_batch_seqno->value() + prev_batch_count->value() == + sequence_number) { + *prev_batch_seqno = sequence_number; + *prev_batch_count = batch_count; + continue; + } else if (prev_batch_seqno->has_value() && + prev_batch_count->has_value()) { + row << "Prev batch sequence number: " << prev_batch_seqno->value() + << ", prev batch count: " << prev_batch_count->value() << ", "; + *prev_batch_seqno = sequence_number; + *prev_batch_count = batch_count; + } + } + row << sequence_number << ","; + row << batch_count << ","; + *prev_batch_seqno = sequence_number; + *prev_batch_count = batch_count; row << WriteBatchInternal::ByteSize(&batch) << ","; row << reader.LastRecordOffset() << ","; ColumnFamilyCollector cf_collector; @@ -3003,6 +3160,8 @@ const std::string WALDumperCommand::ARG_WAL_FILE = "walfile"; const std::string WALDumperCommand::ARG_WRITE_COMMITTED = "write_committed"; const std::string WALDumperCommand::ARG_PRINT_VALUE = "print_value"; const std::string WALDumperCommand::ARG_PRINT_HEADER = "header"; +const std::string WALDumperCommand::ARG_ONLY_PRINT_SEQNO_GAPS = + "only_print_seqno_gaps"; WALDumperCommand::WALDumperCommand( const std::vector& /*params*/, @@ -3010,9 +3169,11 @@ WALDumperCommand::WALDumperCommand( const std::vector& flags) : LDBCommand(options, flags, true, BuildCmdLineOptions({ARG_WAL_FILE, ARG_DB, ARG_WRITE_COMMITTED, - ARG_PRINT_HEADER, ARG_PRINT_VALUE})), + ARG_PRINT_HEADER, ARG_PRINT_VALUE, + ARG_ONLY_PRINT_SEQNO_GAPS})), print_header_(false), print_values_(false), + only_print_seqno_gaps_(false), is_write_committed_(false) { wal_file_.clear(); @@ -3023,6 +3184,7 @@ WALDumperCommand::WALDumperCommand( print_header_ = IsFlagPresent(flags, ARG_PRINT_HEADER); print_values_ = IsFlagPresent(flags, ARG_PRINT_VALUE); + only_print_seqno_gaps_ = IsFlagPresent(flags, ARG_ONLY_PRINT_SEQNO_GAPS); is_write_committed_ = ParseBooleanOption(options, ARG_WRITE_COMMITTED, true); if (wal_file_.empty()) { @@ -3038,18 +3200,22 @@ WALDumperCommand::WALDumperCommand( void WALDumperCommand::Help(std::string& ret) { ret.append(" "); ret.append(WALDumperCommand::Name()); - ret.append(" --" + ARG_WAL_FILE + "="); + ret.append(" --" + ARG_WAL_FILE + + "="); ret.append(" [--" + ARG_DB + "=]"); ret.append(" [--" + ARG_PRINT_HEADER + "] "); ret.append(" [--" + ARG_PRINT_VALUE + "] "); + ret.append(" [--" + ARG_ONLY_PRINT_SEQNO_GAPS + + "] (only correct if not using pessimistic transactions)"); ret.append(" [--" + ARG_WRITE_COMMITTED + "=true|false] "); ret.append("\n"); } void WALDumperCommand::DoCommand() { PrepareOptions(); - DumpWalFile(options_, wal_file_, print_header_, print_values_, - is_write_committed_, ucmps_, &exec_state_); + DumpWalFiles(options_, wal_file_, print_header_, print_values_, + only_print_seqno_gaps_, is_write_committed_, ucmps_, + &exec_state_); } // ---------------------------------------------------------------------------- @@ -4540,13 +4706,17 @@ void DBFileDumperCommand::DoCommand() { } else { wal_dir = NormalizePath(options_.wal_dir + "/"); } + std::optional prev_batch_seqno; + std::optional prev_batch_count; for (auto& wal : wal_files) { // TODO(qyang): option.wal_dir should be passed into ldb command std::string filename = wal_dir + wal->PathName(); std::cout << filename << std::endl; // TODO(myabandeh): allow configuring is_write_commited - DumpWalFile(options_, filename, true, true, true /* is_write_commited */, - ucmps_, &exec_state_); + DumpWalFile( + options_, filename, true /* print_header */, true /* print_values */, + false /* only_print_seqno_gapstrue */, true /* is_write_commited */, + ucmps_, &exec_state_, &prev_batch_seqno, &prev_batch_count); } } } diff --git a/tools/ldb_cmd_impl.h b/tools/ldb_cmd_impl.h index 32bf4da598..73130401e2 100644 --- a/tools/ldb_cmd_impl.h +++ b/tools/ldb_cmd_impl.h @@ -379,6 +379,7 @@ class WALDumperCommand : public LDBCommand { bool print_header_; std::string wal_file_; bool print_values_; + bool only_print_seqno_gaps_; bool is_write_committed_; // default will be set to true bool no_db_open_ = true; @@ -386,6 +387,7 @@ class WALDumperCommand : public LDBCommand { static const std::string ARG_WRITE_COMMITTED; static const std::string ARG_PRINT_HEADER; static const std::string ARG_PRINT_VALUE; + static const std::string ARG_ONLY_PRINT_SEQNO_GAPS; }; class GetCommand : public LDBCommand { diff --git a/tools/ldb_test.py b/tools/ldb_test.py index 0d6b125f02..09ab9b799f 100644 --- a/tools/ldb_test.py +++ b/tools/ldb_test.py @@ -562,7 +562,7 @@ class LDBTestCase(unittest.TestCase): 0 == run_err_null( "./ldb dump_wal --db=%s --walfile=%s --header" - % (origDbPath, os.path.join(origDbPath, "LOG")) + % (origDbPath, origDbPath) ) ) self.assertRunOK("scan", "x1 ==> y1\nx2 ==> y2\nx3 ==> y3\nx4 ==> y4")