Merge pull request #3970 from pierresouchay/node_health_should_change_service_index

[BUGFIX] When a node level check is removed, ensure all services of node are notified
This commit is contained in:
Paul Banks 2018-05-08 16:44:50 +01:00 committed by GitHub
commit c55885efd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 21 deletions

View File

@ -1089,6 +1089,21 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
return nil return nil
} }
// updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node
func (s *Store) updateAllServiceIndexesOfNode(tx *memdb.Txn, idx uint64, nodeID string) error {
services, err := tx.Get("services", "node", nodeID)
if err != nil {
return fmt.Errorf("failed updating services for node %s: %s", nodeID, err)
}
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode).ToNodeService()
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
return nil
}
// ensureCheckTransaction is used as the inner method to handle inserting // ensureCheckTransaction is used as the inner method to handle inserting
// a health check into the state store. It ensures safety against inserting // a health check into the state store. It ensures safety against inserting
// checks with no matching node or service. // checks with no matching node or service.
@ -1142,15 +1157,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
} }
} else { } else {
// Update the status for all the services associated with this node // Update the status for all the services associated with this node
services, err := tx.Get("services", "node", hc.Node) err = s.updateAllServiceIndexesOfNode(tx, idx, hc.Node)
if err != nil { if err != nil {
return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err) return err
}
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode).ToNodeService()
if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
} }
} }
@ -1393,20 +1402,21 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t
return nil return nil
} }
existing := hc.(*structs.HealthCheck) existing := hc.(*structs.HealthCheck)
if existing != nil && existing.ServiceID != "" { if existing != nil {
service, err := tx.First("services", "id", node, existing.ServiceID) // When no service is linked to this service, update all services of node
if err != nil { if existing.ServiceID != "" {
return fmt.Errorf("failed service lookup: %s", err) if err = tx.Insert("index", &IndexEntry{serviceIndexName(existing.ServiceName), idx}); err != nil {
}
if service == nil {
return ErrMissingService
}
// Updated index of service
svc := service.(*structs.ServiceNode)
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
} else {
err = s.updateAllServiceIndexesOfNode(tx, idx, existing.Node)
if err != nil {
return fmt.Errorf("Failed to update services linked to deleted healthcheck: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
} }
// Delete the check from the DB and update the index. // Delete the check from the DB and update the index.

View File

@ -2132,6 +2132,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
// Register a node and a node-level health check. // Register a node and a node-level health check.
testRegisterNode(t, s, 1, "node1") testRegisterNode(t, s, 1, "node1")
testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing) testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
testRegisterService(t, s, 2, "node1", "service1")
// Make sure the check is there. // Make sure the check is there.
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
@ -2143,13 +2144,23 @@ func TestStateStore_DeleteCheck(t *testing.T) {
t.Fatalf("bad: %#v", checks) t.Fatalf("bad: %#v", checks)
} }
ensureServiceVersion(t, s, ws, "service1", 2, 1)
// Delete the check. // Delete the check.
if err := s.DeleteCheck(3, "node1", "check1"); err != nil { if err := s.DeleteCheck(3, "node1", "check1"); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx, check, err := s.NodeCheck("node1", "check1"); idx != 3 || err != nil || check != nil {
t.Fatalf("Node check should have been deleted idx=%d, node=%v, err=%s", idx, check, err)
}
if idx := s.maxIndex("checks"); idx != 3 {
t.Fatalf("bad index for checks: %d", idx)
}
if !watchFired(ws) { if !watchFired(ws) {
t.Fatalf("bad") t.Fatalf("bad")
} }
// All services linked to this node should have their index updated
ensureServiceVersion(t, s, ws, "service1", 3, 1)
// Check is gone // Check is gone
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()