consul: Adding session invalidation

This commit is contained in:
Armon Dadgar 2014-05-15 11:51:31 -07:00
parent 2e3d3ec2f8
commit 679e4e6e70
2 changed files with 250 additions and 23 deletions

View File

@ -366,8 +366,7 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes) {
// EnsureService is used to ensure a given node exposes a service
func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeService) error {
tables := MDBTables{s.nodeTable, s.serviceTable}
tx, err := tables.StartTxn(false)
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
@ -461,8 +460,7 @@ func (s *StateStore) parseNodeServices(tables MDBTables, tx *MDBTxn, name string
// DeleteNodeService is used to delete a node service
func (s *StateStore) DeleteNodeService(index uint64, node, id string) error {
tables := MDBTables{s.serviceTable, s.checkTable}
tx, err := tables.StartTxn(false)
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
@ -476,6 +474,19 @@ func (s *StateStore) DeleteNodeService(index uint64, node, id string) error {
}
defer s.watch[s.serviceTable].Notify()
}
// Invalidate any sessions using these checks
checks, err := s.checkTable.GetTxn(tx, "node", node, id)
if err != nil {
return err
}
for _, c := range checks {
check := c.(*structs.HealthCheck)
if err := s.invalidateCheck(index, tx, node, check.CheckID); err != nil {
return err
}
}
if n, err := s.checkTable.DeleteTxn(tx, "node", node, id); err != nil {
return err
} else if n > 0 {
@ -489,13 +500,17 @@ func (s *StateStore) DeleteNodeService(index uint64, node, id string) error {
// DeleteNode is used to delete a node and all it's services
func (s *StateStore) DeleteNode(index uint64, node string) error {
tables := MDBTables{s.nodeTable, s.serviceTable, s.checkTable}
tx, err := tables.StartTxn(false)
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
// Invalidate any sessions held by the node
if err := s.invalidateNode(index, tx, node); err != nil {
return err
}
if n, err := s.serviceTable.DeleteTxn(tx, "id", node); err != nil {
return err
} else if n > 0 {
@ -633,8 +648,7 @@ func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error
}
// Start the txn
tables := MDBTables{s.nodeTable, s.serviceTable, s.checkTable}
tx, err := tables.StartTxn(false)
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
@ -663,6 +677,14 @@ func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error
check.ServiceName = srv.ServiceName
}
// Invalidate any sessions if status is critical
if check.Status == structs.HealthCritical {
err := s.invalidateCheck(index, tx, check.Node, check.CheckID)
if err != nil {
return err
}
}
// Ensure the check is set
if err := s.checkTable.InsertTxn(tx, check); err != nil {
return err
@ -676,12 +698,17 @@ func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error
// DeleteNodeCheck is used to delete a node health check
func (s *StateStore) DeleteNodeCheck(index uint64, node, id string) error {
tx, err := s.checkTable.StartTxn(false, nil)
tx, err := s.tables.StartTxn(false)
if err != nil {
return err
}
defer tx.Abort()
// Invalidate any sessions held by this check
if err := s.invalidateCheck(index, tx, node, id); err != nil {
return err
}
if n, err := s.checkTable.DeleteTxn(tx, "id", node, id); err != nil {
return err
} else if n > 0 {
@ -1101,9 +1128,7 @@ func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error
session.CreateIndex = index
// Start the transaction
tables := MDBTables{s.nodeTable, s.checkTable,
s.sessionTable, s.sessionCheckTable}
tx, err := tables.StartTxn(false)
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
@ -1172,9 +1197,7 @@ func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error
// doing a restore, otherwise SessionCreate should be used.
func (s *StateStore) SessionRestore(session *structs.Session) error {
// Start the transaction
tables := MDBTables{s.nodeTable, s.checkTable,
s.sessionTable, s.sessionCheckTable}
tx, err := tables.StartTxn(false)
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
@ -1235,14 +1258,53 @@ func (s *StateStore) NodeSessions(node string) (uint64, []*structs.Session, erro
// SessionDelete is used to destroy a session.
func (s *StateStore) SessionDestroy(index uint64, id string) error {
// Start the transaction
tables := MDBTables{s.sessionTable, s.sessionCheckTable}
tx, err := tables.StartTxn(false)
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
if err := s.invalidateSession(index, tx, id); err != nil {
return err
}
return tx.Commit()
}
// invalideNode is used to invalide all sessions belonging to a node
// All tables should be locked in the tx.
func (s *StateStore) invalidateNode(index uint64, tx *MDBTxn, node string) error {
sessions, err := s.sessionTable.GetTxn(tx, "node", node)
if err != nil {
return err
}
for _, sess := range sessions {
session := sess.(*structs.Session).ID
if err := s.invalidateSession(index, tx, session); err != nil {
return err
}
}
return nil
}
// invalidateCheck is used to invalide all sessions belonging to a check
// All tables should be locked in the tx.
func (s *StateStore) invalidateCheck(index uint64, tx *MDBTxn, node, check string) error {
sessionChecks, err := s.sessionCheckTable.GetTxn(tx, "id", node, check)
if err != nil {
return err
}
for _, sc := range sessionChecks {
session := sc.(*sessionCheck).Session
if err := s.invalidateSession(index, tx, session); err != nil {
return err
}
}
return nil
}
// invalidateSession is used to invalide a session within a given txn
// All tables should be locked in the tx.
func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) error {
// Get the session
res, err := s.sessionTable.GetTxn(tx, "id", id)
if err != nil {
@ -1272,8 +1334,8 @@ func (s *StateStore) SessionDestroy(index uint64, id string) error {
if err := s.sessionTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.sessionTable].Notify()
return tx.Commit()
tx.Defer(func() { s.watch[s.sessionTable].Notify() })
return nil
}
// Snapshot is used to create a point in time snapshot

View File

@ -717,11 +717,11 @@ func TestStoreSnapshot(t *testing.T) {
ServiceID: "db",
}
if err := store.EnsureCheck(17, checkAfter); err != nil {
t.Fatalf("err: %v")
t.Fatalf("err: %v", err)
}
if err := store.KVSDelete(18, "/web/a"); err != nil {
t.Fatalf("err: %v")
t.Fatalf("err: %v", err)
}
// Check snapshot has old values
@ -1630,7 +1630,7 @@ func TestSessionCreate_Invalid(t *testing.T) {
Status: structs.HealthCritical,
}
if err := store.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v")
t.Fatalf("err: %v", err)
}
if err := store.SessionCreate(1000, session); err.Error() != "Check 'bar' is in critical state" {
t.Fatalf("err: %v", err)
@ -1719,3 +1719,168 @@ func TestSession_Lookups(t *testing.T) {
t.Fatalf("bad: %v %v", ids, out)
}
}
func TestSessionInvalidate_CriticalHealthCheck(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
check := &structs.HealthCheck{
Node: "foo",
CheckID: "bar",
Status: structs.HealthPassing,
}
if err := store.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
Checks: []string{"bar"},
}
if err := store.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Invalidate the check
check.Status = structs.HealthCritical
if err := store.EnsureCheck(15, check); err != nil {
t.Fatalf("err: %v", err)
}
// Lookup by ID, should be nil
_, s2, err := store.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
}
func TestSessionInvalidate_DeleteHealthCheck(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
check := &structs.HealthCheck{
Node: "foo",
CheckID: "bar",
Status: structs.HealthPassing,
}
if err := store.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
Checks: []string{"bar"},
}
if err := store.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the check
if err := store.DeleteNodeCheck(15, "foo", "bar"); err != nil {
t.Fatalf("err: %v", err)
}
// Lookup by ID, should be nil
_, s2, err := store.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
}
func TestSessionInvalidate_DeleteNode(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
}
if err := store.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the node
if err := store.DeleteNode(15, "foo"); err != nil {
t.Fatalf("err: %v")
}
// Lookup by ID, should be nil
_, s2, err := store.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
}
func TestSessionInvalidate_DeleteNodeService(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(11, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService(12, "foo", &structs.NodeService{"api", "api", nil, 5000}); err != nil {
t.Fatalf("err: %v", err)
}
check := &structs.HealthCheck{
Node: "foo",
CheckID: "api",
Name: "Can connect",
Status: structs.HealthPassing,
ServiceID: "api",
}
if err := store.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
Checks: []string{"api"},
}
if err := store.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Should invalidate the session
if err := store.DeleteNodeService(15, "foo", "api"); err != nil {
t.Fatalf("err: %v", err)
}
// Lookup by ID, should be nil
_, s2, err := store.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
}