fsm: one-time token expiration should be deterministic (#13737)
When applying a raft log to expire ACL tokens, we need to use a timestamp provided by the leader so that the result is deterministic across servers. Use leader's timestamp from RPC call
This commit is contained in:
parent
4dea14267d
commit
cfa2cb140e
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
acl: Fixed a bug where the timestamp for expiring one-time tokens was not deterministic between servers
|
||||||
|
```
|
|
@ -1013,6 +1013,8 @@ func (a *ACL) ExpireOneTimeTokens(args *structs.OneTimeTokenExpireRequest, reply
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
args.Timestamp = time.Now() // use the leader's timestamp
|
||||||
|
|
||||||
// Expire token via raft; because this is the only write in the RPC the
|
// Expire token via raft; because this is the only write in the RPC the
|
||||||
// caller can safely retry with the same token if the raft write fails
|
// caller can safely retry with the same token if the raft write fails
|
||||||
_, index, err := a.srv.raftApply(structs.OneTimeTokenExpireRequestType, args)
|
_, index, err := a.srv.raftApply(structs.OneTimeTokenExpireRequestType, args)
|
||||||
|
|
|
@ -1229,7 +1229,7 @@ func (n *nomadFSM) applyOneTimeTokenExpire(msgType structs.MessageType, buf []by
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := n.state.ExpireOneTimeTokens(msgType, index); err != nil {
|
if err := n.state.ExpireOneTimeTokens(msgType, index, req.Timestamp); err != nil {
|
||||||
n.logger.Error("ExpireOneTimeTokens failed", "error", err)
|
n.logger.Error("ExpireOneTimeTokens failed", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -5852,11 +5852,11 @@ func (s *StateStore) DeleteOneTimeTokens(msgType structs.MessageType, index uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExpireOneTimeTokens deletes tokens that have expired
|
// ExpireOneTimeTokens deletes tokens that have expired
|
||||||
func (s *StateStore) ExpireOneTimeTokens(msgType structs.MessageType, index uint64) error {
|
func (s *StateStore) ExpireOneTimeTokens(msgType structs.MessageType, index uint64, timestamp time.Time) error {
|
||||||
txn := s.db.WriteTxnMsgT(msgType, index)
|
txn := s.db.WriteTxnMsgT(msgType, index)
|
||||||
defer txn.Abort()
|
defer txn.Abort()
|
||||||
|
|
||||||
iter, err := s.oneTimeTokensExpiredTxn(txn, nil)
|
iter, err := s.oneTimeTokensExpiredTxn(txn, nil, timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -5887,14 +5887,14 @@ func (s *StateStore) ExpireOneTimeTokens(msgType structs.MessageType, index uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// oneTimeTokensExpiredTxn returns an iterator over all expired one-time tokens
|
// oneTimeTokensExpiredTxn returns an iterator over all expired one-time tokens
|
||||||
func (s *StateStore) oneTimeTokensExpiredTxn(txn *txn, ws memdb.WatchSet) (memdb.ResultIterator, error) {
|
func (s *StateStore) oneTimeTokensExpiredTxn(txn *txn, ws memdb.WatchSet, timestamp time.Time) (memdb.ResultIterator, error) {
|
||||||
iter, err := txn.Get("one_time_token", "id")
|
iter, err := txn.Get("one_time_token", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("one-time token lookup failed: %v", err)
|
return nil, fmt.Errorf("one-time token lookup failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ws.Add(iter.WatchCh())
|
ws.Add(iter.WatchCh())
|
||||||
iter = memdb.NewFilterIterator(iter, expiredOneTimeTokenFilter(time.Now()))
|
iter = memdb.NewFilterIterator(iter, expiredOneTimeTokenFilter(timestamp))
|
||||||
return iter, nil
|
return iter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8912,9 +8912,9 @@ func TestStateStore_OneTimeTokens(t *testing.T) {
|
||||||
|
|
||||||
// now verify expiration
|
// now verify expiration
|
||||||
|
|
||||||
getExpiredTokens := func() []*structs.OneTimeToken {
|
getExpiredTokens := func(now time.Time) []*structs.OneTimeToken {
|
||||||
txn := state.db.ReadTxn()
|
txn := state.db.ReadTxn()
|
||||||
iter, err := state.oneTimeTokensExpiredTxn(txn, nil)
|
iter, err := state.oneTimeTokensExpiredTxn(txn, nil, now)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
results := []*structs.OneTimeToken{}
|
results := []*structs.OneTimeToken{}
|
||||||
|
@ -8930,7 +8930,7 @@ func TestStateStore_OneTimeTokens(t *testing.T) {
|
||||||
return results
|
return results
|
||||||
}
|
}
|
||||||
|
|
||||||
results = getExpiredTokens()
|
results = getExpiredTokens(time.Now())
|
||||||
require.Len(t, results, 2)
|
require.Len(t, results, 2)
|
||||||
|
|
||||||
// results aren't ordered
|
// results aren't ordered
|
||||||
|
@ -8942,10 +8942,10 @@ func TestStateStore_OneTimeTokens(t *testing.T) {
|
||||||
|
|
||||||
// clear the expired tokens and verify they're gone
|
// clear the expired tokens and verify they're gone
|
||||||
index++
|
index++
|
||||||
require.NoError(t,
|
require.NoError(t, state.ExpireOneTimeTokens(
|
||||||
state.ExpireOneTimeTokens(structs.MsgTypeTestSetup, index))
|
structs.MsgTypeTestSetup, index, time.Now()))
|
||||||
|
|
||||||
results = getExpiredTokens()
|
results = getExpiredTokens(time.Now())
|
||||||
require.Len(t, results, 0)
|
require.Len(t, results, 0)
|
||||||
|
|
||||||
// query the unexpired token
|
// query the unexpired token
|
||||||
|
|
|
@ -12077,6 +12077,7 @@ type OneTimeTokenDeleteRequest struct {
|
||||||
|
|
||||||
// OneTimeTokenExpireRequest is a request to delete all expired one-time tokens
|
// OneTimeTokenExpireRequest is a request to delete all expired one-time tokens
|
||||||
type OneTimeTokenExpireRequest struct {
|
type OneTimeTokenExpireRequest struct {
|
||||||
|
Timestamp time.Time
|
||||||
WriteRequest
|
WriteRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue