consul: Adding support for lock-delay in sessions
This commit is contained in:
parent
a53cb6e1dd
commit
3b0d3b76c2
|
@ -25,6 +25,23 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
|
||||||
return fmt.Errorf("Must provide key")
|
return fmt.Errorf("Must provide key")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If this is a lock, we must check for a lock-delay. Since lock-delay
|
||||||
|
// is based on wall-time, each peer expire the lock-delay at a slightly
|
||||||
|
// different time. This means the enforcement of lock-delay cannot be done
|
||||||
|
// after the raft log is committed as it would lead to inconsistent FSMs.
|
||||||
|
// Instead, the lock-delay must be enforced before commit. This means that
|
||||||
|
// only the wall-time of the leader node is used, preventing any inconsistencies.
|
||||||
|
if args.Op == structs.KVSLock {
|
||||||
|
state := k.srv.fsm.State()
|
||||||
|
expires := state.KVSLockDelay(args.DirEnt.Key)
|
||||||
|
if expires.After(time.Now()) {
|
||||||
|
k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v",
|
||||||
|
args.DirEnt.Key, expires)
|
||||||
|
*reply = false
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Apply the update
|
// Apply the update
|
||||||
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
|
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"github.com/hashicorp/consul/testutil"
|
"github.com/hashicorp/consul/testutil"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestKVS_Apply(t *testing.T) {
|
func TestKVS_Apply(t *testing.T) {
|
||||||
|
@ -224,5 +225,72 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
|
||||||
if dirent.Keys[2] != "/test/sub/" {
|
if dirent.Keys[2] != "/test/sub/" {
|
||||||
t.Fatalf("Bad: %v", dirent.Keys)
|
t.Fatalf("Bad: %v", dirent.Keys)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestKVS_Apply_LockDelay(t *testing.T) {
|
||||||
|
dir1, s1 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
client := rpcClient(t, s1)
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||||
|
|
||||||
|
// Create and invalidate a session with a lock
|
||||||
|
state := s1.fsm.State()
|
||||||
|
if err := state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
|
t.Fatalf("err: %v")
|
||||||
|
}
|
||||||
|
session := &structs.Session{
|
||||||
|
Node: "foo",
|
||||||
|
LockDelay: 50 * time.Millisecond,
|
||||||
|
}
|
||||||
|
if err := state.SessionCreate(2, session); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
id := session.ID
|
||||||
|
d := &structs.DirEntry{
|
||||||
|
Key: "test",
|
||||||
|
Session: id,
|
||||||
|
}
|
||||||
|
if ok, err := state.KVSLock(3, d); err != nil || !ok {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if err := state.SessionDestroy(4, id); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a new session that is valid
|
||||||
|
if err := state.SessionCreate(5, session); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
validId := session.ID
|
||||||
|
|
||||||
|
// Make a lock request
|
||||||
|
arg := structs.KVSRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.KVSLock,
|
||||||
|
DirEnt: structs.DirEntry{
|
||||||
|
Key: "test",
|
||||||
|
Session: validId,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var out bool
|
||||||
|
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if out != false {
|
||||||
|
t.Fatalf("should not acquire")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for lock-delay
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
|
// Should acquire
|
||||||
|
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if out != true {
|
||||||
|
t.Fatalf("should acquire")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,8 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -54,6 +56,22 @@ type StateStore struct {
|
||||||
tables MDBTables
|
tables MDBTables
|
||||||
watch map[*MDBTable]*NotifyGroup
|
watch map[*MDBTable]*NotifyGroup
|
||||||
queryTables map[string]MDBTables
|
queryTables map[string]MDBTables
|
||||||
|
|
||||||
|
// lockDelay is used to mark certain locks as unacquirable.
|
||||||
|
// When a lock is forcefully released (failing health
|
||||||
|
// check, destroyed session, etc), it is subject to the LockDelay
|
||||||
|
// impossed by the session. This prevents another session from
|
||||||
|
// acquiring the lock for some period of time as a protection against
|
||||||
|
// split-brains. This is inspired by the lock-delay in Chubby.
|
||||||
|
// Because this relies on wall-time, we cannot assume all peers
|
||||||
|
// perceive time as flowing uniformly. This means KVSLock MUST ignore
|
||||||
|
// lockDelay, since the lockDelay may have expired on the leader,
|
||||||
|
// but not on the follower. Rejecting the lock could result in
|
||||||
|
// inconsistencies in the FSMs due to the rate time progresses. Instead,
|
||||||
|
// only the opinion of the leader is respected, and the Raft log
|
||||||
|
// is never questioned.
|
||||||
|
lockDelay map[string]time.Time
|
||||||
|
lockDelayLock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateSnapshot is used to provide a point-in-time snapshot
|
// StateSnapshot is used to provide a point-in-time snapshot
|
||||||
|
@ -94,10 +112,11 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &StateStore{
|
s := &StateStore{
|
||||||
logger: log.New(logOutput, "", log.LstdFlags),
|
logger: log.New(logOutput, "", log.LstdFlags),
|
||||||
path: path,
|
path: path,
|
||||||
env: env,
|
env: env,
|
||||||
watch: make(map[*MDBTable]*NotifyGroup),
|
watch: make(map[*MDBTable]*NotifyGroup),
|
||||||
|
lockDelay: make(map[string]time.Time),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure we can initialize
|
// Ensure we can initialize
|
||||||
|
@ -1076,6 +1095,17 @@ func (s *StateStore) KVSUnlock(index uint64, d *structs.DirEntry) (bool, error)
|
||||||
return s.kvsSet(index, d, kvUnlock)
|
return s.kvsSet(index, d, kvUnlock)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KVSLockDelay returns the expiration time of a key lock delay. A key may
|
||||||
|
// have a lock delay if it was unlocked due to a session invalidation instead
|
||||||
|
// of a graceful unlock. This must be checked on the leader node, and not in
|
||||||
|
// KVSLock due to the variability of clocks.
|
||||||
|
func (s *StateStore) KVSLockDelay(key string) time.Time {
|
||||||
|
s.lockDelayLock.RLock()
|
||||||
|
expires := s.lockDelay[key]
|
||||||
|
s.lockDelayLock.RUnlock()
|
||||||
|
return expires
|
||||||
|
}
|
||||||
|
|
||||||
// kvsSet is the internal setter
|
// kvsSet is the internal setter
|
||||||
func (s *StateStore) kvsSet(
|
func (s *StateStore) kvsSet(
|
||||||
index uint64,
|
index uint64,
|
||||||
|
@ -1367,8 +1397,14 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
|
||||||
}
|
}
|
||||||
session := res[0].(*structs.Session)
|
session := res[0].(*structs.Session)
|
||||||
|
|
||||||
|
// Enforce the MaxLockDelay
|
||||||
|
delay := session.LockDelay
|
||||||
|
if delay > structs.MaxLockDelay {
|
||||||
|
delay = structs.MaxLockDelay
|
||||||
|
}
|
||||||
|
|
||||||
// Invalidate any held locks
|
// Invalidate any held locks
|
||||||
if err := s.invalidateLocks(index, tx, id); err != nil {
|
if err := s.invalidateLocks(index, tx, delay, id); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1395,11 +1431,20 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
|
||||||
|
|
||||||
// invalidateLocks is used to invalidate all the locks held by a session
|
// invalidateLocks is used to invalidate all the locks held by a session
|
||||||
// within a given txn. All tables should be locked in the tx.
|
// within a given txn. All tables should be locked in the tx.
|
||||||
func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, id string) error {
|
func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
|
||||||
|
lockDelay time.Duration, id string) error {
|
||||||
pairs, err := s.kvsTable.GetTxn(tx, "session", id)
|
pairs, err := s.kvsTable.GetTxn(tx, "session", id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var expires time.Time
|
||||||
|
if lockDelay > 0 {
|
||||||
|
s.lockDelayLock.Lock()
|
||||||
|
defer s.lockDelayLock.Unlock()
|
||||||
|
expires = time.Now().Add(lockDelay)
|
||||||
|
}
|
||||||
|
|
||||||
for _, pair := range pairs {
|
for _, pair := range pairs {
|
||||||
kv := pair.(*structs.DirEntry)
|
kv := pair.(*structs.DirEntry)
|
||||||
kv.Session = "" // Clear the lock
|
kv.Session = "" // Clear the lock
|
||||||
|
@ -1407,6 +1452,16 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, id string) error
|
||||||
if err := s.kvsTable.InsertTxn(tx, kv); err != nil {
|
if err := s.kvsTable.InsertTxn(tx, kv); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// If there is a lock delay, prevent acquisition
|
||||||
|
// for at least lockDelay period
|
||||||
|
if lockDelay > 0 {
|
||||||
|
s.lockDelay[kv.Key] = expires
|
||||||
|
time.AfterFunc(lockDelay, func() {
|
||||||
|
s.lockDelayLock.Lock()
|
||||||
|
delete(s.lockDelay, kv.Key)
|
||||||
|
s.lockDelayLock.Unlock()
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if len(pairs) > 0 {
|
if len(pairs) > 0 {
|
||||||
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testStateStore() (*StateStore, error) {
|
func testStateStore() (*StateStore, error) {
|
||||||
|
@ -2059,7 +2060,7 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
|
||||||
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
|
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
session := &structs.Session{Node: "foo"}
|
session := &structs.Session{Node: "foo", LockDelay: 50 * time.Millisecond}
|
||||||
if err := store.SessionCreate(4, session); err != nil {
|
if err := store.SessionCreate(4, session); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -2095,4 +2096,10 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
|
||||||
if d2.Session != "" {
|
if d2.Session != "" {
|
||||||
t.Fatalf("bad: %v", *d2)
|
t.Fatalf("bad: %v", *d2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Key should have a lock delay
|
||||||
|
expires := store.KVSLockDelay("/foo")
|
||||||
|
if expires.Before(time.Now().Add(30 * time.Millisecond)) {
|
||||||
|
t.Fatalf("Bad: %v", expires)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,12 @@ const (
|
||||||
HealthCritical = "critical"
|
HealthCritical = "critical"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MaxLockDelay provides a maximum LockDelay value for
|
||||||
|
// a session. Any value above this will not be respected.
|
||||||
|
MaxLockDelay = 60 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
// RPCInfo is used to describe common information about query
|
// RPCInfo is used to describe common information about query
|
||||||
type RPCInfo interface {
|
type RPCInfo interface {
|
||||||
RequestDatacenter() string
|
RequestDatacenter() string
|
||||||
|
@ -343,10 +349,11 @@ type IndexedKeyList struct {
|
||||||
// Session is used to represent an open session in the KV store.
|
// Session is used to represent an open session in the KV store.
|
||||||
// This issued to associate node checks with acquired locks.
|
// This issued to associate node checks with acquired locks.
|
||||||
type Session struct {
|
type Session struct {
|
||||||
|
CreateIndex uint64
|
||||||
ID string
|
ID string
|
||||||
Node string
|
Node string
|
||||||
Checks []string
|
Checks []string
|
||||||
CreateIndex uint64
|
LockDelay time.Duration
|
||||||
}
|
}
|
||||||
type Sessions []*Session
|
type Sessions []*Session
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue