diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 162eed79c..02cec7fe0 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -13,6 +13,10 @@ import ( const ( servicesTableName = "services" + + // serviceLastExtinctionIndexName keeps track of the last raft index when the last instance + // of any service was unregistered. This is used by blocking queries on missing services. + serviceLastExtinctionIndexName = "service_last_extinction" ) // nodesTableSchema returns a new table schema used for storing node @@ -841,19 +845,30 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) } // maxIndexForService return the maximum Raft Index for a service -// If the index is not set for the service, it will return: -// - maxIndex(nodes, services) if checks is false -// - maxIndex(nodes, services, checks) if checks is true -func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) uint64 { - transaction, err := tx.First("index", "id", serviceIndexName(serviceName)) - if err == nil { - if idx, ok := transaction.(*IndexEntry); ok { - return idx.Value +// If the index is not set for the service, it will return the missing +// service index. +// The service_last_extinction is set to the last raft index when a service +// was unregistered (or 0 if no services were ever unregistered). This +// allows blocking queries to +// * return when the last instance of a service is removed +// * block until an instance for this service is available, or another +// service is unregistered. +func maxIndexForService(tx *memdb.Txn, serviceName string, serviceExists, checks bool) uint64 { + if !serviceExists { + res, err := tx.First("index", "id", serviceLastExtinctionIndexName) + if missingIdx, ok := res.(*IndexEntry); ok && err == nil { + return missingIdx.Value } } + + res, err := tx.First("index", "id", serviceIndexName(serviceName)) + if idx, ok := res.(*IndexEntry); ok && err == nil { + return idx.Value + } if checks { return maxIndexTxn(tx, "nodes", "services", "checks") } + return maxIndexTxn(tx, "nodes", "services") } @@ -873,9 +888,6 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool tx := s.db.Txn(false) defer tx.Abort() - // Get the table index. - idx := maxIndexForService(tx, serviceName, false) - // Function for lookup var f func() (memdb.ResultIterator, error) if !connect { @@ -905,6 +917,10 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool if err != nil { return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) } + + // Get the table index. + idx := maxIndexForService(tx, serviceName, len(results) > 0, false) + return idx, results, nil } @@ -914,9 +930,6 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string tx := s.db.Txn(false) defer tx.Abort() - // Get the table index. - idx := maxIndexForService(tx, service, false) - // List all the services. services, err := tx.Get("services", "service", service) if err != nil { @@ -925,9 +938,11 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string ws.Add(services.WatchCh()) // Gather all the services and apply the tag filter. + serviceExists := false var results structs.ServiceNodes for service := services.Next(); service != nil; service = services.Next() { svc := service.(*structs.ServiceNode) + serviceExists = true if !serviceTagsFilter(svc, tags) { results = append(results, svc) } @@ -938,6 +953,9 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string if err != nil { return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) } + // Get the table index. + idx := maxIndexForService(tx, service, serviceExists, false) + return idx, results, nil } @@ -1214,6 +1232,11 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err) } } + + if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil { + return fmt.Errorf("failed updating missing service index: %s", err) + } + } } else { return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err) @@ -1438,7 +1461,7 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string, defer tx.Abort() // Get the table index. - idx := maxIndexForService(tx, serviceName, true) + idx := maxIndexForService(tx, serviceName, true, true) // Return the checks. iter, err := tx.Get("checks", "service", serviceName) if err != nil { @@ -1627,9 +1650,6 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect tx := s.db.Txn(false) defer tx.Abort() - // Get the table index. - idx := maxIndexForService(tx, serviceName, true) - // Function for lookup var f func() (memdb.ResultIterator, error) if !connect { @@ -1654,6 +1674,10 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect for service := iter.Next(); service != nil; service = iter.Next() { results = append(results, service.(*structs.ServiceNode)) } + + // Get the table index. + idx := maxIndexForService(tx, serviceName, len(results) > 0, true) + return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err) } @@ -1663,9 +1687,6 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags tx := s.db.Txn(false) defer tx.Abort() - // Get the table index. - idx := maxIndexForService(tx, serviceName, true) - // Query the state store for the service. iter, err := tx.Get("services", "service", serviceName) if err != nil { @@ -1674,13 +1695,18 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags ws.Add(iter.WatchCh()) // Return the results, filtering by tag. + serviceExists := false var results structs.ServiceNodes for service := iter.Next(); service != nil; service = iter.Next() { svc := service.(*structs.ServiceNode) + serviceExists = true if !serviceTagsFilter(svc, tags) { results = append(results, svc) } } + + // Get the table index. + idx := maxIndexForService(tx, serviceName, serviceExists, true) return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err) } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 3b1314f96..2f932a053 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2848,16 +2848,68 @@ func TestIndexIndependence(t *testing.T) { ensureServiceVersion(t, s, ws, "service_shared", 17, 0) testRegisterService(t, s, 18, "node1", "service_new") - // Since service does not exists anymore, its index should be last insert - // The behaviour is the same as all non-existing services, meaning - // we properly did collect garbage - ensureServiceVersion(t, s, ws, "service_shared", 18, 0) + + // Since service does not exists anymore, its index should be that of + // the last deleted service + ensureServiceVersion(t, s, ws, "service_shared", 17, 0) + // No index should exist anymore, it must have been garbage collected ensureIndexForService(t, s, ws, "service_shared", 0) if !watchFired(ws) { t.Fatalf("bad") } +} +func TestMissingServiceIndex(t *testing.T) { + s := testStateStore(t) + + // Querying with no matches gives an empty response + ws := memdb.NewWatchSet() + idx, res, err := s.CheckServiceNodes(ws, "service1") + require.Nil(t, err) + require.Nil(t, res) + + // index should be 0 for a non existing service at startup + require.Equal(t, uint64(0), idx) + + testRegisterNode(t, s, 0, "node1") + + // node operations should not affect missing service index + ensureServiceVersion(t, s, ws, "service1", 0, 0) + + testRegisterService(t, s, 10, "node1", "service1") + ensureServiceVersion(t, s, ws, "service1", 10, 1) + + s.DeleteService(11, "node1", "service1") + // service1 is now missing, its index is now that of the last index a service was + // deleted at + ensureServiceVersion(t, s, ws, "service1", 11, 0) + + testRegisterService(t, s, 12, "node1", "service2") + ensureServiceVersion(t, s, ws, "service2", 12, 1) + + // missing service index does not change even though another service have been + // registered + ensureServiceVersion(t, s, ws, "service1", 11, 0) + ensureServiceVersion(t, s, ws, "i_do_not_exist", 11, 0) + + // registering a service on another node does not affect missing service + // index + testRegisterNode(t, s, 13, "node2") + testRegisterService(t, s, 14, "node2", "service3") + ensureServiceVersion(t, s, ws, "service3", 14, 1) + ensureServiceVersion(t, s, ws, "service1", 11, 0) + + // unregistering a service bumps missing service index + s.DeleteService(15, "node2", "service3") + ensureServiceVersion(t, s, ws, "service3", 15, 0) + ensureServiceVersion(t, s, ws, "service2", 12, 1) + ensureServiceVersion(t, s, ws, "service1", 15, 0) + ensureServiceVersion(t, s, ws, "i_do_not_exist", 15, 0) + + // registering again a missing service correctly updates its index + testRegisterService(t, s, 16, "node1", "service1") + ensureServiceVersion(t, s, ws, "service1", 16, 1) } func TestStateStore_CheckServiceNodes(t *testing.T) {