consul: Enforce lock-delay with delete behavior

This commit is contained in:
Armon Dadgar 2015-01-07 15:48:27 -08:00
parent 95b3837b75
commit 5b972294f8
2 changed files with 54 additions and 16 deletions

View File

@ -1784,23 +1784,19 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
} }
session := res[0].(*structs.Session) session := res[0].(*structs.Session)
// Enforce the MaxLockDelay
delay := session.LockDelay
if delay > structs.MaxLockDelay {
delay = structs.MaxLockDelay
}
// Invalidate any held locks
if session.Behavior == structs.SessionKeysDelete { if session.Behavior == structs.SessionKeysDelete {
// delete the keys held by the session if err := s.deleteLocks(index, tx, delay, id); err != nil {
if err := s.kvsDeleteWithIndexTxn(index, tx, "session", id); err != nil {
return err
}
} else { // default to release
// Enforce the MaxLockDelay
delay := session.LockDelay
if delay > structs.MaxLockDelay {
delay = structs.MaxLockDelay
}
// Invalidate any held locks
if err := s.invalidateLocks(index, tx, delay, id); err != nil {
return err return err
} }
} else if err := s.invalidateLocks(index, tx, delay, id); err != nil {
return err
} }
// Nuke the session // Nuke the session
@ -1867,6 +1863,42 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
return nil return nil
} }
// deleteLocks is used to delete all the locks held by a session
// within a given txn. All tables should be locked in the tx.
func (s *StateStore) deleteLocks(index uint64, tx *MDBTxn,
lockDelay time.Duration, id string) error {
pairs, err := s.kvsTable.GetTxn(tx, "session", id)
if err != nil {
return err
}
var expires time.Time
if lockDelay > 0 {
s.lockDelayLock.Lock()
defer s.lockDelayLock.Unlock()
expires = time.Now().Add(lockDelay)
}
for _, pair := range pairs {
kv := pair.(*structs.DirEntry)
if err := s.kvsDeleteWithIndexTxn(index, tx, "id", kv.Key); err != nil {
return err
}
// If there is a lock delay, prevent acquisition
// for at least lockDelay period
if lockDelay > 0 {
s.lockDelay[kv.Key] = expires
time.AfterFunc(lockDelay, func() {
s.lockDelayLock.Lock()
delete(s.lockDelay, kv.Key)
s.lockDelayLock.Unlock()
})
}
}
return nil
}
// ACLSet is used to create or update an ACL entry // ACLSet is used to create or update an ACL entry
func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error { func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error {
// Check for an ID // Check for an ID

View File

@ -2731,7 +2731,7 @@ func TestSessionInvalidate_KeyDelete(t *testing.T) {
} }
notify1 := make(chan struct{}, 1) notify1 := make(chan struct{}, 1)
store.WatchKV("/f", notify1) store.WatchKV("/b", notify1)
// Delete the node // Delete the node
if err := store.DeleteNode(6, "foo"); err != nil { if err := store.DeleteNode(6, "foo"); err != nil {
@ -2748,7 +2748,13 @@ func TestSessionInvalidate_KeyDelete(t *testing.T) {
select { select {
case <-notify1: case <-notify1:
default: default:
t.Fatalf("should notify /f") t.Fatalf("should notify /b")
}
// Key should have a lock delay
expires := store.KVSLockDelay("/bar")
if expires.Before(time.Now().Add(30 * time.Millisecond)) {
t.Fatalf("Bad: %v", expires)
} }
} }