mirror of https://github.com/facebook/rocksdb.git
Add an option to toggle timestamp based validation for the whole DB (#12857)
Summary: As titled. This PR adds a `TransactionDBOptions` field `enable_udt_validation` to allow user to toggle the timestamp based validation behavior across the whole DB. When it is true, which is the default value and the existing behavior. A recap of what this behavior is: `GetForUpdate` does timestamp based conflict checking to make sure no other transaction has committed a version of the key tagged with a timestamp equal to or newer than the calling transaction's `read_timestamp_` the user set via `SetReadTimestampForValidation`. When this field is set to false, we disable timestamp based validation for the whole DB. MyRocks find it hard to find a read timestamp for this validation API, so we added this flexibility. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12857 Test Plan: Added unit test Reviewed By: ltamasi Differential Revision: D60194134 Pulled By: jowlyzhang fbshipit-source-id: b8507f8ddc37fc7a2948cf492ce5c599ae646fef
This commit is contained in:
parent
408e8d4c85
commit
24d86f7b41
|
@ -235,6 +235,11 @@ struct TransactionDBOptions {
|
|||
const Slice& /*key*/)>
|
||||
rollback_deletion_type_callback;
|
||||
|
||||
// A flag to control for the whole DB whether user-defined timestamp based
|
||||
// validation are enabled when applicable. Only WriteCommittedTxn support
|
||||
// user-defined timestamps so this option only applies in this case.
|
||||
bool enable_udt_validation = true;
|
||||
|
||||
private:
|
||||
// 128 entries
|
||||
// Should the default value change, please also update wp_snapshot_cache_bits
|
||||
|
|
|
@ -189,12 +189,9 @@ inline Status WriteCommittedTxn::GetForUpdateImpl(
|
|||
}
|
||||
}
|
||||
|
||||
if (!do_validate && kMaxTxnTimestamp != read_timestamp_) {
|
||||
return Status::InvalidArgument(
|
||||
"If do_validate is false then GetForUpdate with read_timestamp is not "
|
||||
"defined.");
|
||||
} else if (do_validate && kMaxTxnTimestamp == read_timestamp_) {
|
||||
return Status::InvalidArgument("read_timestamp must be set for validation");
|
||||
Status s = SanityCheckReadTimestamp(do_validate);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
if (!read_options.timestamp) {
|
||||
|
@ -237,17 +234,9 @@ Status WriteCommittedTxn::GetEntityForUpdate(const ReadOptions& read_options,
|
|||
}
|
||||
|
||||
assert(ts_sz > 0);
|
||||
|
||||
if (!do_validate) {
|
||||
if (read_timestamp_ != kMaxTxnTimestamp) {
|
||||
return Status::InvalidArgument(
|
||||
"Read timestamp must not be set if validation is disabled");
|
||||
}
|
||||
} else {
|
||||
if (read_timestamp_ == kMaxTxnTimestamp) {
|
||||
return Status::InvalidArgument(
|
||||
"Read timestamp must be set for validation");
|
||||
}
|
||||
Status s = SanityCheckReadTimestamp(do_validate);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
std::string ts_buf;
|
||||
|
@ -271,6 +260,33 @@ Status WriteCommittedTxn::GetEntityForUpdate(const ReadOptions& read_options,
|
|||
read_options, column_family, key, columns, exclusive, do_validate);
|
||||
}
|
||||
|
||||
Status WriteCommittedTxn::SanityCheckReadTimestamp(bool do_validate) {
|
||||
bool enable_udt_validation =
|
||||
txn_db_impl_->GetTxnDBOptions().enable_udt_validation;
|
||||
if (!enable_udt_validation) {
|
||||
if (kMaxTxnTimestamp != read_timestamp_) {
|
||||
return Status::InvalidArgument(
|
||||
"read_timestamp is set but timestamp validation is disabled for the "
|
||||
"DB");
|
||||
}
|
||||
} else {
|
||||
if (!do_validate) {
|
||||
if (kMaxTxnTimestamp != read_timestamp_) {
|
||||
return Status::InvalidArgument(
|
||||
"If do_validate is false then GetForUpdate with read_timestamp is "
|
||||
"not "
|
||||
"defined.");
|
||||
}
|
||||
} else {
|
||||
if (kMaxTxnTimestamp == read_timestamp_) {
|
||||
return Status::InvalidArgument(
|
||||
"read_timestamp must be set for validation");
|
||||
}
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status WriteCommittedTxn::PutEntityImpl(ColumnFamilyHandle* column_family,
|
||||
const Slice& key,
|
||||
const WideColumns& columns,
|
||||
|
@ -496,7 +512,8 @@ Status WriteCommittedTxn::SetReadTimestampForValidation(TxnTimestamp ts) {
|
|||
}
|
||||
|
||||
Status WriteCommittedTxn::SetCommitTimestamp(TxnTimestamp ts) {
|
||||
if (read_timestamp_ < kMaxTxnTimestamp && ts <= read_timestamp_) {
|
||||
if (txn_db_impl_->GetTxnDBOptions().enable_udt_validation &&
|
||||
read_timestamp_ < kMaxTxnTimestamp && ts <= read_timestamp_) {
|
||||
return Status::InvalidArgument(
|
||||
"Cannot commit at timestamp smaller than or equal to read timestamp");
|
||||
}
|
||||
|
@ -1207,7 +1224,10 @@ Status PessimisticTransaction::ValidateSnapshot(
|
|||
|
||||
return TransactionUtil::CheckKeyForConflicts(
|
||||
db_impl_, cfh, key.ToString(), snap_seq, ts_sz == 0 ? nullptr : &ts_buf,
|
||||
false /* cache_only */);
|
||||
false /* cache_only */,
|
||||
/* snap_checker */ nullptr,
|
||||
/* min_uncommitted */ kMaxSequenceNumber,
|
||||
txn_db_impl_->GetTxnDBOptions().enable_udt_validation);
|
||||
}
|
||||
|
||||
bool PessimisticTransaction::TryStealingLocks() {
|
||||
|
|
|
@ -330,6 +330,11 @@ class WriteCommittedTxn : public PessimisticTransaction {
|
|||
|
||||
Status RollbackInternal() override;
|
||||
|
||||
// Checks if the combination of `do_validate`, the read timestamp set in
|
||||
// `read_timestamp_` and the `enable_udt_validation` flag in
|
||||
// TransactionDBOptions make sense together.
|
||||
Status SanityCheckReadTimestamp(bool do_validate);
|
||||
|
||||
// Column families that enable timestamps and whose data are written when
|
||||
// indexing_enabled_ is false. If a key is written when indexing_enabled_ is
|
||||
// true, then the corresponding column family is not added to cfs_with_ts
|
||||
|
|
|
@ -21,7 +21,8 @@ namespace ROCKSDB_NAMESPACE {
|
|||
Status TransactionUtil::CheckKeyForConflicts(
|
||||
DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key,
|
||||
SequenceNumber snap_seq, const std::string* const read_ts, bool cache_only,
|
||||
ReadCallback* snap_checker, SequenceNumber min_uncommitted) {
|
||||
ReadCallback* snap_checker, SequenceNumber min_uncommitted,
|
||||
bool enable_udt_validation) {
|
||||
Status result;
|
||||
|
||||
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
|
||||
|
@ -37,8 +38,9 @@ Status TransactionUtil::CheckKeyForConflicts(
|
|||
SequenceNumber earliest_seq =
|
||||
db_impl->GetEarliestMemTableSequenceNumber(sv, true);
|
||||
|
||||
result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, read_ts,
|
||||
cache_only, snap_checker, min_uncommitted);
|
||||
result =
|
||||
CheckKey(db_impl, sv, earliest_seq, snap_seq, key, read_ts, cache_only,
|
||||
snap_checker, min_uncommitted, enable_udt_validation);
|
||||
|
||||
db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
|
||||
}
|
||||
|
@ -52,7 +54,8 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
|
|||
const std::string& key,
|
||||
const std::string* const read_ts,
|
||||
bool cache_only, ReadCallback* snap_checker,
|
||||
SequenceNumber min_uncommitted) {
|
||||
SequenceNumber min_uncommitted,
|
||||
bool enable_udt_validation) {
|
||||
// When `min_uncommitted` is provided, keys are not always committed
|
||||
// in sequence number order, and `snap_checker` is used to check whether
|
||||
// specific sequence number is in the database is visible to the transaction.
|
||||
|
@ -130,7 +133,7 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
|
|||
? snap_seq < seq
|
||||
: !snap_checker->IsVisible(seq);
|
||||
// Perform conflict checking based on timestamp if applicable.
|
||||
if (!write_conflict && read_ts != nullptr) {
|
||||
if (enable_udt_validation && !write_conflict && read_ts != nullptr) {
|
||||
ColumnFamilyData* cfd = sv->cfd;
|
||||
assert(cfd);
|
||||
const Comparator* const ucmp = cfd->user_comparator();
|
||||
|
|
|
@ -43,7 +43,8 @@ class TransactionUtil {
|
|||
const std::string& key, SequenceNumber snap_seq,
|
||||
const std::string* const ts, bool cache_only,
|
||||
ReadCallback* snap_checker = nullptr,
|
||||
SequenceNumber min_uncommitted = kMaxSequenceNumber);
|
||||
SequenceNumber min_uncommitted = kMaxSequenceNumber,
|
||||
bool enable_udt_validation = true);
|
||||
|
||||
// For each key,SequenceNumber pair tracked by the LockTracker, this function
|
||||
// will verify there have been no writes to the key in the db since that
|
||||
|
@ -70,13 +71,15 @@ class TransactionUtil {
|
|||
// seq > `snap_seq`: applicable to conflict
|
||||
// `min_uncommitted` <= seq <= `snap_seq`: call `snap_checker` to determine.
|
||||
//
|
||||
// If user-defined timestamp is enabled, a write conflict is detected if an
|
||||
// operation for `key` with timestamp greater than `ts` exists.
|
||||
// If user-defined timestamp is enabled and `enable_udt_validation` is set to
|
||||
// true, a write conflict is detected if an operation for `key` with timestamp
|
||||
// greater than `ts` exists.
|
||||
static Status CheckKey(DBImpl* db_impl, SuperVersion* sv,
|
||||
SequenceNumber earliest_seq, SequenceNumber snap_seq,
|
||||
const std::string& key, const std::string* const ts,
|
||||
bool cache_only, ReadCallback* snap_checker = nullptr,
|
||||
SequenceNumber min_uncommitted = kMaxSequenceNumber);
|
||||
SequenceNumber min_uncommitted = kMaxSequenceNumber,
|
||||
bool enable_udt_validation = true);
|
||||
};
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
|
|
@ -554,6 +554,109 @@ TEST_P(WriteCommittedTxnWithTsTest, GetForUpdate) {
|
|||
txn5.reset();
|
||||
}
|
||||
|
||||
TEST_P(WriteCommittedTxnWithTsTest, GetForUpdateUdtValidationNotEnabled) {
|
||||
ASSERT_OK(ReOpenNoDelete());
|
||||
|
||||
ColumnFamilyOptions cf_options;
|
||||
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
const std::string test_cf_name = "test_cf";
|
||||
ColumnFamilyHandle* cfh = nullptr;
|
||||
assert(db);
|
||||
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
|
||||
delete cfh;
|
||||
cfh = nullptr;
|
||||
|
||||
std::vector<ColumnFamilyDescriptor> cf_descs;
|
||||
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
|
||||
cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
|
||||
options.avoid_flush_during_shutdown = true;
|
||||
|
||||
txn_db_options.enable_udt_validation = false;
|
||||
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
|
||||
|
||||
// blind write a key/value for latter read via `GetForUpdate`.
|
||||
std::unique_ptr<Transaction> txn0(
|
||||
NewTxn(WriteOptions(), TransactionOptions()));
|
||||
ASSERT_OK(txn0->Put(handles_[1], "key", "value0"));
|
||||
ASSERT_OK(txn0->SetCommitTimestamp(20));
|
||||
ASSERT_OK(txn0->Commit());
|
||||
|
||||
// When timestamp validation is disabled across the whole DB
|
||||
// `SetReadTimestampForValidation` should not be called.
|
||||
std::unique_ptr<Transaction> txn1(
|
||||
NewTxn(WriteOptions(), TransactionOptions()));
|
||||
std::string value;
|
||||
ASSERT_OK(txn1->SetReadTimestampForValidation(21));
|
||||
ASSERT_TRUE(txn1->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
|
||||
/* exclusive= */ true, /*do_validate=*/true)
|
||||
.IsInvalidArgument());
|
||||
txn1.reset();
|
||||
|
||||
// do_validate and no snapshot, no conflict checking at all
|
||||
std::unique_ptr<Transaction> txn2(
|
||||
NewTxn(WriteOptions(), TransactionOptions()));
|
||||
ASSERT_OK(txn2->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
|
||||
/* exclusive= */ true, /*do_validate=*/true));
|
||||
ASSERT_OK(txn2->Put(handles_[1], "key", "value1"));
|
||||
ASSERT_OK(txn2->SetCommitTimestamp(21));
|
||||
ASSERT_OK(txn2->Commit());
|
||||
txn2.reset();
|
||||
|
||||
// do_validate and set snapshot, execute sequence number based conflict
|
||||
// checking and skip timestamp based conflict checking.
|
||||
std::unique_ptr<Transaction> txn3(
|
||||
NewTxn(WriteOptions(), TransactionOptions()));
|
||||
txn3->SetSnapshot();
|
||||
ASSERT_OK(txn3->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
|
||||
/* exclusive= */ true, /*do_validate=*/true));
|
||||
ASSERT_OK(txn3->Put(handles_[1], "key", "value2"));
|
||||
ASSERT_OK(txn3->SetCommitTimestamp(22));
|
||||
ASSERT_OK(txn3->Commit());
|
||||
txn3.reset();
|
||||
|
||||
// Always check `ReadOptions.timestamp` to be consistent with the default
|
||||
// `read_timestamp_` if it's explicitly set, even if whole DB disables
|
||||
// timestamp validation.
|
||||
std::unique_ptr<Transaction> txn4(
|
||||
NewTxn(WriteOptions(), TransactionOptions()));
|
||||
ReadOptions ropts;
|
||||
std::string read_timestamp;
|
||||
Slice read_ts = EncodeU64Ts(27, &read_timestamp);
|
||||
ropts.timestamp = &read_ts;
|
||||
ASSERT_TRUE(txn4->GetForUpdate(ropts, handles_[1], "key", &value,
|
||||
/* exclusive= */ true, /*do_validate=*/true)
|
||||
.IsInvalidArgument());
|
||||
txn4.reset();
|
||||
|
||||
// Conflict of timestamps not caught when parallel transactions commit with
|
||||
// some out of order timestamps.
|
||||
std::unique_ptr<Transaction> txn5(
|
||||
db->BeginTransaction(WriteOptions(), TransactionOptions()));
|
||||
assert(txn5);
|
||||
|
||||
std::unique_ptr<Transaction> txn6(
|
||||
db->BeginTransaction(WriteOptions(), TransactionOptions()));
|
||||
assert(txn6);
|
||||
ASSERT_OK(txn6->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
|
||||
/* exclusive= */ true, /*do_validate=*/true));
|
||||
ASSERT_OK(txn6->Put(handles_[1], "key", "value4"));
|
||||
ASSERT_OK(txn6->SetName("txn6"));
|
||||
ASSERT_OK(txn6->Prepare());
|
||||
ASSERT_OK(txn6->SetCommitTimestamp(24));
|
||||
ASSERT_OK(txn6->Commit());
|
||||
txn6.reset();
|
||||
|
||||
txn5->SetSnapshot();
|
||||
ASSERT_OK(txn5->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
|
||||
/* exclusive= */ true, /*do_validate=*/true));
|
||||
ASSERT_OK(txn5->Put(handles_[1], "key", "value3"));
|
||||
ASSERT_OK(txn5->SetName("txn5"));
|
||||
// txn5 commits after txn6 but writes a smaller timestamp
|
||||
ASSERT_OK(txn5->SetCommitTimestamp(23));
|
||||
ASSERT_OK(txn5->Commit());
|
||||
txn5.reset();
|
||||
}
|
||||
|
||||
TEST_P(WriteCommittedTxnWithTsTest, BlindWrite) {
|
||||
ASSERT_OK(ReOpenNoDelete());
|
||||
|
||||
|
|
|
@ -529,7 +529,8 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
|
|||
// TODO(yanqin): support user-defined timestamp
|
||||
return TransactionUtil::CheckKeyForConflicts(
|
||||
db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr,
|
||||
false /* cache_only */, &snap_checker, min_uncommitted);
|
||||
false /* cache_only */, &snap_checker, min_uncommitted,
|
||||
txn_db_impl_->GetTxnDBOptions().enable_udt_validation);
|
||||
}
|
||||
|
||||
void WritePreparedTxn::SetSnapshot() {
|
||||
|
|
|
@ -1077,7 +1077,8 @@ Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
|
|||
// TODO(yanqin): Support user-defined timestamp.
|
||||
return TransactionUtil::CheckKeyForConflicts(
|
||||
db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr,
|
||||
false /* cache_only */, &snap_checker, min_uncommitted);
|
||||
false /* cache_only */, &snap_checker, min_uncommitted,
|
||||
txn_db_impl_->GetTxnDBOptions().enable_udt_validation);
|
||||
}
|
||||
|
||||
const std::map<SequenceNumber, size_t>&
|
||||
|
|
Loading…
Reference in New Issue