From a5f6ac0df4ca31a4654dbcf03f638b7e7c411fb4 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Mon, 19 Mar 2018 14:14:03 +0100 Subject: [PATCH 1/6] =?UTF-8?q?[BUGFIX]=C2=A0When=20a=20node=20level=20che?= =?UTF-8?q?ck=20is=20removed,=20ensure=20all=20services=20of=20node=20are?= =?UTF-8?q?=20notified?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bugfix for https://github.com/hashicorp/consul/pull/3899 When a node level check is removed (example: maintenance), some watchers on services might have to recompute their state. If those nodes are performing blocking queries, they have to be notified. While their state was updated when node-level state did change or was added this was not the case when the check was removed. This fixes it. --- agent/consul/state/catalog.go | 85 +++++++++++++++--------------- agent/consul/state/catalog_test.go | 5 ++ 2 files changed, 48 insertions(+), 42 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 38d59e810..6fcb45955 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1086,6 +1086,43 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { return nil } +// This ensure all connected services to a check are updated properly +func (s *Store) alterServicesFromCheck(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck, checkServiceExists bool) error { + // If the check is associated with a service, check that we have + // a registration for the service. + if hc.ServiceID != "" { + service, err := tx.First("services", "id", hc.Node, hc.ServiceID) + // When deleting a service, service might have been removed already + if err != nil && checkServiceExists { + return fmt.Errorf("failed service lookup: %s", err) + } + if service == nil { + return ErrMissingService + } + + // Copy in the service name and tags + svc := service.(*structs.ServiceNode) + hc.ServiceName = svc.ServiceName + hc.ServiceTags = svc.ServiceTags + if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } else { + // Update the status for all the services associated with this node + services, err := tx.Get("services", "node", hc.Node) + if err != nil { + return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err) + } + for service := services.Next(); service != nil; service = services.Next() { + svc := service.(*structs.ServiceNode).ToNodeService() + if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } + } + return nil +} + // ensureCheckTransaction is used as the inner method to handle inserting // a health check into the state store. It ensures safety against inserting // checks with no matching node or service. @@ -1119,36 +1156,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec return ErrMissingNode } - // If the check is associated with a service, check that we have - // a registration for the service. - if hc.ServiceID != "" { - service, err := tx.First("services", "id", hc.Node, hc.ServiceID) - if err != nil { - return fmt.Errorf("failed service lookup: %s", err) - } - if service == nil { - return ErrMissingService - } - - // Copy in the service name and tags - svc := service.(*structs.ServiceNode) - hc.ServiceName = svc.ServiceName - hc.ServiceTags = svc.ServiceTags - if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - } else { - // Update the status for all the services associated with this node - services, err := tx.Get("services", "node", hc.Node) - if err != nil { - return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err) - } - for service := services.Next(); service != nil; service = services.Next() { - svc := service.(*structs.ServiceNode).ToNodeService() - if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - } + err = s.alterServicesFromCheck(tx, idx, hc, true) + if err != nil { + return err } // Delete any sessions for this check if the health is critical. @@ -1390,19 +1400,10 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t return nil } existing := hc.(*structs.HealthCheck) - if existing != nil && existing.ServiceID != "" { - service, err := tx.First("services", "id", node, existing.ServiceID) + if existing != nil { + err = s.alterServicesFromCheck(tx, idx, existing, false) if err != nil { - return fmt.Errorf("failed service lookup: %s", err) - } - if service == nil { - return ErrMissingService - } - - // Updated index of service - svc := service.(*structs.ServiceNode) - if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) + return fmt.Errorf("Failed to update services linked to deleted healthcheck: %s", err) } } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index b22b82fed..1c3b1a867 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2119,6 +2119,7 @@ func TestStateStore_DeleteCheck(t *testing.T) { // Register a node and a node-level health check. testRegisterNode(t, s, 1, "node1") testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing) + testRegisterService(t, s, 2, "node1", "service1") // Make sure the check is there. ws := memdb.NewWatchSet() @@ -2130,6 +2131,8 @@ func TestStateStore_DeleteCheck(t *testing.T) { t.Fatalf("bad: %#v", checks) } + ensureServiceVersion(t, s, ws, "service1", 2, 1) + // Delete the check. if err := s.DeleteCheck(3, "node1", "check1"); err != nil { t.Fatalf("err: %s", err) @@ -2137,6 +2140,8 @@ func TestStateStore_DeleteCheck(t *testing.T) { if !watchFired(ws) { t.Fatalf("bad") } + // All services linked to this node should have their index updated + ensureServiceVersion(t, s, ws, "service1", 3, 1) // Check is gone ws = memdb.NewWatchSet() From eb2a4eaea3bf3472c10ccdf45b6b19dd68140ccc Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Mon, 19 Mar 2018 16:12:54 +0100 Subject: [PATCH 2/6] Refactoring to have clearer code without weird bool --- agent/consul/state/catalog.go | 83 +++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 37 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 6fcb45955..b83265a03 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1086,39 +1086,17 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { return nil } -// This ensure all connected services to a check are updated properly -func (s *Store) alterServicesFromCheck(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck, checkServiceExists bool) error { - // If the check is associated with a service, check that we have - // a registration for the service. - if hc.ServiceID != "" { - service, err := tx.First("services", "id", hc.Node, hc.ServiceID) - // When deleting a service, service might have been removed already - if err != nil && checkServiceExists { - return fmt.Errorf("failed service lookup: %s", err) - } - if service == nil { - return ErrMissingService - } - - // Copy in the service name and tags - svc := service.(*structs.ServiceNode) - hc.ServiceName = svc.ServiceName - hc.ServiceTags = svc.ServiceTags - if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { +// updateIndexForAllServicesOfNode updates the status for all the services associated with this node +func (s *Store) updateAllServiceIndexesOfNode(tx *memdb.Txn, idx uint64, nodeID string) error { + services, err := tx.Get("services", "node", nodeID) + if err != nil { + return fmt.Errorf("failed updating services for node %s: %s", nodeID, err) + } + for service := services.Next(); service != nil; service = services.Next() { + svc := service.(*structs.ServiceNode).ToNodeService() + if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } - } else { - // Update the status for all the services associated with this node - services, err := tx.Get("services", "node", hc.Node) - if err != nil { - return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err) - } - for service := services.Next(); service != nil; service = services.Next() { - svc := service.(*structs.ServiceNode).ToNodeService() - if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - } } return nil } @@ -1156,9 +1134,30 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec return ErrMissingNode } - err = s.alterServicesFromCheck(tx, idx, hc, true) - if err != nil { - return err + // If the check is associated with a service, check that we have + // a registration for the service. + if hc.ServiceID != "" { + service, err := tx.First("services", "id", hc.Node, hc.ServiceID) + if err != nil { + return fmt.Errorf("failed service lookup: %s", err) + } + if service == nil { + return ErrMissingService + } + + // Copy in the service name and tags + svc := service.(*structs.ServiceNode) + hc.ServiceName = svc.ServiceName + hc.ServiceTags = svc.ServiceTags + if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } else { + // Update the status for all the services associated with this node + err = s.updateAllServiceIndexesOfNode(tx, idx, hc.Node) + if err != nil { + return err + } } // Delete any sessions for this check if the health is critical. @@ -1401,9 +1400,19 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t } existing := hc.(*structs.HealthCheck) if existing != nil { - err = s.alterServicesFromCheck(tx, idx, existing, false) - if err != nil { - return fmt.Errorf("Failed to update services linked to deleted healthcheck: %s", err) + // When no service is linked to this service, update all services of node + if existing.ServiceID != "" { + if err = tx.Insert("index", &IndexEntry{serviceIndexName(existing.ServiceName), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } else { + err = s.updateAllServiceIndexesOfNode(tx, idx, existing.Node) + if err != nil { + return fmt.Errorf("Failed to update services linked to deleted healthcheck: %s", err) + } + if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } } } From 3eb287f57d28fdbf2cafcdaba73ff038fb4db253 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Mon, 19 Mar 2018 17:12:08 +0100 Subject: [PATCH 3/6] Fixed typo in comments --- agent/consul/state/catalog.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index b83265a03..d40143de6 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1086,7 +1086,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { return nil } -// updateIndexForAllServicesOfNode updates the status for all the services associated with this node +// updateAllServiceIndexesOfNode updates the status for all the services associated with this node func (s *Store) updateAllServiceIndexesOfNode(tx *memdb.Txn, idx uint64, nodeID string) error { services, err := tx.Get("services", "node", nodeID) if err != nil { From a8b66fb7aaee760b6b089ec1e0bc71f269a60832 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Thu, 22 Mar 2018 10:30:05 +0100 Subject: [PATCH 4/6] Fixed minor typo in comments Might fix unstable travis build --- agent/consul/state/catalog.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index d40143de6..4b563cba9 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1086,7 +1086,7 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { return nil } -// updateAllServiceIndexesOfNode updates the status for all the services associated with this node +// updateAllServiceIndexesOfNode updates the Raft index of all the services associated with this node func (s *Store) updateAllServiceIndexesOfNode(tx *memdb.Txn, idx uint64, nodeID string) error { services, err := tx.Get("services", "node", nodeID) if err != nil { From 7e8e4e014bc7f9e0f627cabe4b2c6b09a85e546c Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Thu, 22 Mar 2018 12:20:25 +0100 Subject: [PATCH 5/6] Added new test regarding checks index --- agent/consul/state/catalog_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 1c3b1a867..818670908 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2137,6 +2137,9 @@ func TestStateStore_DeleteCheck(t *testing.T) { if err := s.DeleteCheck(3, "node1", "check1"); err != nil { t.Fatalf("err: %s", err) } + if idx := s.maxIndex("checks"); idx != 3 { + t.Fatalf("bad index: %d", idx) + } if !watchFired(ws) { t.Fatalf("bad") } From 9cc9dce8481ad71ff205562acc9c0fe919641d7e Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Thu, 22 Mar 2018 12:41:06 +0100 Subject: [PATCH 6/6] More test cases --- agent/consul/state/catalog_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 818670908..4d4500d64 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2137,8 +2137,11 @@ func TestStateStore_DeleteCheck(t *testing.T) { if err := s.DeleteCheck(3, "node1", "check1"); err != nil { t.Fatalf("err: %s", err) } + if idx, check, err := s.NodeCheck("node1", "check1"); idx != 3 || err != nil || check != nil { + t.Fatalf("Node check should have been deleted idx=%d, node=%v, err=%s", idx, check, err) + } if idx := s.maxIndex("checks"); idx != 3 { - t.Fatalf("bad index: %d", idx) + t.Fatalf("bad index for checks: %d", idx) } if !watchFired(ws) { t.Fatalf("bad")