diff --git a/agent/cache-types/catalog_list_services.go b/agent/cache-types/catalog_list_services.go index 7d38f23ae..1c602cdb2 100644 --- a/agent/cache-types/catalog_list_services.go +++ b/agent/cache-types/catalog_list_services.go @@ -38,9 +38,12 @@ func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request) // going to be served from cache and end up arbitrarily stale anyway. This // allows cached service-discover to automatically read scale across all // servers too. - reqReal.AllowStale = true + reqReal.QueryOptions.AllowStale = true + + if opts.LastResult != nil { + reqReal.QueryOptions.AllowNotModifiedResponse = true + } - // Fetch var reply structs.IndexedServices if err := c.RPC.RPC("Catalog.ListServices", reqReal, &reply); err != nil { return result, err @@ -48,5 +51,6 @@ func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request) result.Value = &reply result.Index = reply.QueryMeta.Index + result.NotModified = reply.QueryMeta.NotModified return result, nil } diff --git a/agent/cache-types/catalog_list_services_test.go b/agent/cache-types/catalog_list_services_test.go index ee806ff88..efb1ff33f 100644 --- a/agent/cache-types/catalog_list_services_test.go +++ b/agent/cache-types/catalog_list_services_test.go @@ -1,6 +1,7 @@ package cachetype import ( + "context" "testing" "time" @@ -60,3 +61,58 @@ func TestCatalogListServices_badReqType(t *testing.T) { require.Contains(t, err.Error(), "wrong type") rpc.AssertExpectations(t) } + +func TestCatalogListServices_IntegrationWithCache_NotModifiedResponse(t *testing.T) { + rpc := &MockRPC{} + typ := &CatalogListServices{RPC: rpc} + + services := map[string][]string{ + "foo": {"prod", "linux"}, + "bar": {"qa", "windows"}, + } + rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.DCSpecificRequest) + require.True(t, req.AllowStale) + require.True(t, req.AllowNotModifiedResponse) + + reply := args.Get(2).(*structs.IndexedServices) + reply.QueryMeta.Index = 44 + reply.NotModified = true + }) + + c := cache.New(nil) + c.RegisterType(CatalogListServicesName, typ) + last := cache.FetchResult{ + Value: &structs.IndexedServices{ + Services: services, + QueryMeta: structs.QueryMeta{Index: 42}, + }, + Index: 42, + } + req := &structs.DCSpecificRequest{ + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{ + Token: "token", + MinQueryIndex: 44, + MaxQueryTime: time.Second, + }, + } + + err := c.Prepopulate(CatalogListServicesName, last, "dc1", "token", req.CacheInfo().Key) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + actual, _, err := c.Get(ctx, CatalogListServicesName, req) + require.NoError(t, err) + + expected := &structs.IndexedServices{ + Services: services, + QueryMeta: structs.QueryMeta{Index: 42}, + } + require.Equal(t, expected, actual) + + rpc.AssertExpectations(t) +} diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 15e7b1a3d..2b1f3c120 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -18,6 +18,7 @@ import ( "container/heap" "context" "fmt" + "strconv" "sync" "sync/atomic" "time" @@ -520,7 +521,9 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign if result.Value != nil { // A new value was given, so we create a brand new entry. - newEntry.Value = result.Value + if !result.NotModified { + newEntry.Value = result.Value + } newEntry.State = result.State newEntry.Index = result.Index newEntry.FetchedAt = time.Now() @@ -551,8 +554,9 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // Error handling if err == nil { - metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) - metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_success"}, 1) + labels := []metrics.Label{{Name: "result_not_modified", Value: strconv.FormatBool(result.NotModified)}} + metrics.IncrCounterWithLabels([]string{"consul", "cache", "fetch_success"}, 1, labels) + metrics.IncrCounterWithLabels([]string{"consul", "cache", tEntry.Name, "fetch_success"}, 1, labels) if result.Index > 0 { // Reset the attempts counter so we don't have any backoff diff --git a/agent/cache/type.go b/agent/cache/type.go index deae8a573..febdc18d7 100644 --- a/agent/cache/type.go +++ b/agent/cache/type.go @@ -78,4 +78,8 @@ type FetchResult struct { // Index is the corresponding index value for this data. Index uint64 + + // NotModified indicates that the Value has not changed since LastResult, and + // the LastResult value should be used instead of Value. + NotModified bool }