diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 38d59e810..6fcb45955 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1086,6 +1086,43 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { return nil } +// This ensure all connected services to a check are updated properly +func (s *Store) alterServicesFromCheck(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck, checkServiceExists bool) error { + // If the check is associated with a service, check that we have + // a registration for the service. + if hc.ServiceID != "" { + service, err := tx.First("services", "id", hc.Node, hc.ServiceID) + // When deleting a service, service might have been removed already + if err != nil && checkServiceExists { + return fmt.Errorf("failed service lookup: %s", err) + } + if service == nil { + return ErrMissingService + } + + // Copy in the service name and tags + svc := service.(*structs.ServiceNode) + hc.ServiceName = svc.ServiceName + hc.ServiceTags = svc.ServiceTags + if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } else { + // Update the status for all the services associated with this node + services, err := tx.Get("services", "node", hc.Node) + if err != nil { + return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err) + } + for service := services.Next(); service != nil; service = services.Next() { + svc := service.(*structs.ServiceNode).ToNodeService() + if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } + } + return nil +} + // ensureCheckTransaction is used as the inner method to handle inserting // a health check into the state store. It ensures safety against inserting // checks with no matching node or service. @@ -1119,36 +1156,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec return ErrMissingNode } - // If the check is associated with a service, check that we have - // a registration for the service. - if hc.ServiceID != "" { - service, err := tx.First("services", "id", hc.Node, hc.ServiceID) - if err != nil { - return fmt.Errorf("failed service lookup: %s", err) - } - if service == nil { - return ErrMissingService - } - - // Copy in the service name and tags - svc := service.(*structs.ServiceNode) - hc.ServiceName = svc.ServiceName - hc.ServiceTags = svc.ServiceTags - if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - } else { - // Update the status for all the services associated with this node - services, err := tx.Get("services", "node", hc.Node) - if err != nil { - return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err) - } - for service := services.Next(); service != nil; service = services.Next() { - svc := service.(*structs.ServiceNode).ToNodeService() - if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - } + err = s.alterServicesFromCheck(tx, idx, hc, true) + if err != nil { + return err } // Delete any sessions for this check if the health is critical. @@ -1390,19 +1400,10 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t return nil } existing := hc.(*structs.HealthCheck) - if existing != nil && existing.ServiceID != "" { - service, err := tx.First("services", "id", node, existing.ServiceID) + if existing != nil { + err = s.alterServicesFromCheck(tx, idx, existing, false) if err != nil { - return fmt.Errorf("failed service lookup: %s", err) - } - if service == nil { - return ErrMissingService - } - - // Updated index of service - svc := service.(*structs.ServiceNode) - if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) + return fmt.Errorf("Failed to update services linked to deleted healthcheck: %s", err) } } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index b22b82fed..1c3b1a867 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2119,6 +2119,7 @@ func TestStateStore_DeleteCheck(t *testing.T) { // Register a node and a node-level health check. testRegisterNode(t, s, 1, "node1") testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing) + testRegisterService(t, s, 2, "node1", "service1") // Make sure the check is there. ws := memdb.NewWatchSet() @@ -2130,6 +2131,8 @@ func TestStateStore_DeleteCheck(t *testing.T) { t.Fatalf("bad: %#v", checks) } + ensureServiceVersion(t, s, ws, "service1", 2, 1) + // Delete the check. if err := s.DeleteCheck(3, "node1", "check1"); err != nil { t.Fatalf("err: %s", err) @@ -2137,6 +2140,8 @@ func TestStateStore_DeleteCheck(t *testing.T) { if !watchFired(ws) { t.Fatalf("bad") } + // All services linked to this node should have their index updated + ensureServiceVersion(t, s, ws, "service1", 3, 1) // Check is gone ws = memdb.NewWatchSet()