diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 9f066d417..16fb62cdf 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -517,7 +517,11 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error } var sids []string for service := services.Next(); service != nil; service = services.Next() { - sids = append(sids, service.(*structs.ServiceNode).ServiceID) + svc := service.(*structs.ServiceNode) + sids = append(sids, svc.ServiceID) + if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } } // Do the delete in a separate loop so we don't trash the iterator. @@ -638,6 +642,9 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } + if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } return nil } @@ -752,14 +759,29 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) return idx, results, nil } +// maxIndexForService return the maximum transaction number for a service +// If the transaction is not set for the service, it will return the max +// transaction number of "nodes", "services" +func maxIndexForService(tx *memdb.Txn, serviceName string) (uint64, error) { + transaction, err := tx.First("index", "id", serviceIndexName(serviceName)) + if err == nil { + if idx, ok := transaction.(*IndexEntry); ok { + return idx.Value, nil + } + } + return maxIndexTxn(tx, "nodes", "services"), nil +} + // ServiceNodes returns the nodes associated with a given service name. func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, "nodes", "services") - + idx, err := maxIndexForService(tx, serviceName) + if err != nil { + panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", serviceName, err)) + } // List all the services. services, err := tx.Get("services", "service", serviceName) if err != nil { @@ -787,7 +809,10 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) ( defer tx.Abort() // Get the table index. - idx := maxIndexTxn(tx, "nodes", "services") + idx, err := maxIndexForService(tx, service) + if err != nil { + panic(fmt.Sprintf("Could not retrieve maxIndex for %s: %s", service, err)) + } // List all the services. services, err := tx.Get("services", "service", service) @@ -979,6 +1004,10 @@ func (s *Store) DeleteService(idx uint64, nodeName, serviceID string) error { return nil } +func serviceIndexName(name string) string { + return fmt.Sprintf("service.%s", name) +} + // deleteServiceTxn is the inner method called to remove a service // registration within an existing transaction. func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error { @@ -1022,6 +1051,26 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID return fmt.Errorf("failed updating index: %s", err) } + svc := service.(*structs.ServiceNode) + if remainingServicesItr, err := tx.Get("services", "service", svc.ServiceName); err == nil { + if remainingServicesItr != nil && remainingServicesItr.Next() != nil { + // We have at least one remaining service, update the index + if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } else { + // There are no more service instances, cleanup the service. index + serviceIndex, err := tx.First("index", "id", serviceIndexName(svc.ServiceName)) + if err == nil && serviceIndex != nil { + // we found service. index, garbage collect it + if errW := tx.Delete("index", serviceIndex); errW != nil { + return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err) + } + } + } + } else { + return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err) + } return nil } @@ -1087,6 +1136,21 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec svc := service.(*structs.ServiceNode) hc.ServiceName = svc.ServiceName hc.ServiceTags = svc.ServiceTags + if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } else { + // Update the status for all the services associated with this node + services, err := tx.Get("services", "node", hc.Node) + if err != nil { + return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err) + } + for service := services.Next(); service != nil; service = services.Next() { + svc := service.(*structs.ServiceNode).ToNodeService() + if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } } // Delete any sessions for this check if the health is critical. @@ -1328,6 +1392,22 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t if hc == nil { return nil } + existing := hc.(*structs.HealthCheck) + if existing != nil && existing.ServiceID != "" { + service, err := tx.First("services", "id", node, existing.ServiceID) + if err != nil { + return fmt.Errorf("failed service lookup: %s", err) + } + if service == nil { + return ErrMissingService + } + + // Updated index of service + svc := service.(*structs.ServiceNode) + if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + } // Delete the check from the DB and update the index. if err := tx.Delete("checks", hc); err != nil { diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index e948b092c..f3d9f3e8d 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2166,6 +2166,101 @@ func TestStateStore_DeleteCheck(t *testing.T) { } } +func ensureServiceVersion(t *testing.T, s *Store, ws memdb.WatchSet, serviceID string, expectedIdx uint64, expectedSize int) { + idx, services, err := s.ServiceNodes(ws, serviceID) + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != expectedIdx { + t.Fatalf("bad: %d, expected %d", idx, expectedIdx) + } + if len(services) != expectedSize { + t.Fatalf("expected size: %d, but was %d", expectedSize, len(services)) + } +} + +// TestIndexIndependance test that changes on a given service does not impact the +// index of other services. It allows to have huge benefits for watches since +// watchers are notified ONLY when there are changes in the given service +func TestIndexIndependance(t *testing.T) { + s := testStateStore(t) + + // Querying with no matches gives an empty response + ws := memdb.NewWatchSet() + idx, res, err := s.CheckServiceNodes(ws, "service1") + if idx != 0 || res != nil || err != nil { + t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) + } + + // Register some nodes. + testRegisterNode(t, s, 0, "node1") + testRegisterNode(t, s, 1, "node2") + + // Register node-level checks. These should be the final result. + testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing) + testRegisterCheck(t, s, 3, "node2", "", "check2", api.HealthPassing) + + // Register a service against the nodes. + testRegisterService(t, s, 4, "node1", "service1") + testRegisterService(t, s, 5, "node2", "service2") + ensureServiceVersion(t, s, ws, "service2", 5, 1) + + // Register checks against the services. + testRegisterCheck(t, s, 6, "node1", "service1", "check3", api.HealthPassing) + testRegisterCheck(t, s, 7, "node2", "service2", "check4", api.HealthPassing) + // Index must be updated when checks are updated + ensureServiceVersion(t, s, ws, "service1", 6, 1) + ensureServiceVersion(t, s, ws, "service2", 7, 1) + + if !watchFired(ws) { + t.Fatalf("bad") + } + // We ensure the idx for service2 has not been changed + testRegisterCheck(t, s, 8, "node2", "service2", "check4", api.HealthWarning) + ensureServiceVersion(t, s, ws, "service2", 8, 1) + testRegisterCheck(t, s, 9, "node2", "service2", "check4", api.HealthPassing) + ensureServiceVersion(t, s, ws, "service2", 9, 1) + + // Add a new check on node1, while not on service, it should impact + // indexes of all services running on node1, aka service1 + testRegisterCheck(t, s, 10, "node1", "", "check_node", api.HealthPassing) + + // Service2 should not be modified + ensureServiceVersion(t, s, ws, "service2", 9, 1) + // Service1 should be modified + ensureServiceVersion(t, s, ws, "service1", 10, 1) + + if !watchFired(ws) { + t.Fatalf("bad") + } + + testRegisterService(t, s, 11, "node1", "service_shared") + ensureServiceVersion(t, s, ws, "service_shared", 11, 1) + testRegisterService(t, s, 12, "node2", "service_shared") + ensureServiceVersion(t, s, ws, "service_shared", 12, 2) + + testRegisterCheck(t, s, 13, "node2", "service_shared", "check_service_shared", api.HealthCritical) + ensureServiceVersion(t, s, ws, "service_shared", 13, 2) + testRegisterCheck(t, s, 14, "node2", "service_shared", "check_service_shared", api.HealthPassing) + ensureServiceVersion(t, s, ws, "service_shared", 14, 2) + + s.DeleteCheck(15, "node2", types.CheckID("check_service_shared")) + ensureServiceVersion(t, s, ws, "service_shared", 15, 2) + s.DeleteService(16, "node2", "service_shared") + ensureServiceVersion(t, s, ws, "service_shared", 16, 1) + s.DeleteService(17, "node1", "service_shared") + ensureServiceVersion(t, s, ws, "service_shared", 17, 0) + + testRegisterService(t, s, 18, "node1", "service_new") + // Since service does not exists anymore, its index should be last insert + // The behaviour is the same as all non-existing services, meaning + // we properly did collect garbage + ensureServiceVersion(t, s, ws, "service_shared", 18, 0) + if !watchFired(ws) { + t.Fatalf("bad") + } +} + func TestStateStore_CheckServiceNodes(t *testing.T) { s := testStateStore(t) @@ -2197,9 +2292,13 @@ func TestStateStore_CheckServiceNodes(t *testing.T) { t.Fatalf("bad") } + // We ensure the idx for service2 has not been changed + ensureServiceVersion(t, s, ws, "service2", 7, 1) + // Query the state store for nodes and checks which have been registered // with a specific service. ws = memdb.NewWatchSet() + ensureServiceVersion(t, s, ws, "service1", 6, 1) idx, results, err := s.CheckServiceNodes(ws, "service1") if err != nil { t.Fatalf("err: %s", err)