Merge pull request #8245 from hashicorp/dnephin/use-not-modified-in-cache

agent/cache: Use AllowNotModified in CatalogListServices
This commit is contained in:
Daniel Nephin 2020-07-20 15:30:52 -04:00 committed by GitHub
commit 9f5f8abcbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 14 deletions

View File

@ -38,9 +38,12 @@ func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request)
// going to be served from cache and end up arbitrarily stale anyway. This // going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all // allows cached service-discover to automatically read scale across all
// servers too. // servers too.
reqReal.AllowStale = true reqReal.QueryOptions.AllowStale = true
if opts.LastResult != nil {
reqReal.QueryOptions.AllowNotModifiedResponse = true
}
// Fetch
var reply structs.IndexedServices var reply structs.IndexedServices
if err := c.RPC.RPC("Catalog.ListServices", reqReal, &reply); err != nil { if err := c.RPC.RPC("Catalog.ListServices", reqReal, &reply); err != nil {
return result, err return result, err
@ -48,5 +51,6 @@ func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request)
result.Value = &reply result.Value = &reply
result.Index = reply.QueryMeta.Index result.Index = reply.QueryMeta.Index
result.NotModified = reply.QueryMeta.NotModified
return result, nil return result, nil
} }

View File

@ -1,6 +1,7 @@
package cachetype package cachetype
import ( import (
"context"
"testing" "testing"
"time" "time"
@ -60,3 +61,58 @@ func TestCatalogListServices_badReqType(t *testing.T) {
require.Contains(t, err.Error(), "wrong type") require.Contains(t, err.Error(), "wrong type")
rpc.AssertExpectations(t) rpc.AssertExpectations(t)
} }
func TestCatalogListServices_IntegrationWithCache_NotModifiedResponse(t *testing.T) {
rpc := &MockRPC{}
typ := &CatalogListServices{RPC: rpc}
services := map[string][]string{
"foo": {"prod", "linux"},
"bar": {"qa", "windows"},
}
rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
require.True(t, req.AllowStale)
require.True(t, req.AllowNotModifiedResponse)
reply := args.Get(2).(*structs.IndexedServices)
reply.QueryMeta.Index = 44
reply.NotModified = true
})
c := cache.New(nil)
c.RegisterType(CatalogListServicesName, typ)
last := cache.FetchResult{
Value: &structs.IndexedServices{
Services: services,
QueryMeta: structs.QueryMeta{Index: 42},
},
Index: 42,
}
req := &structs.DCSpecificRequest{
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{
Token: "token",
MinQueryIndex: 44,
MaxQueryTime: time.Second,
},
}
err := c.Prepopulate(CatalogListServicesName, last, "dc1", "token", req.CacheInfo().Key)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
actual, _, err := c.Get(ctx, CatalogListServicesName, req)
require.NoError(t, err)
expected := &structs.IndexedServices{
Services: services,
QueryMeta: structs.QueryMeta{Index: 42},
}
require.Equal(t, expected, actual)
rpc.AssertExpectations(t)
}

28
agent/cache/cache.go vendored
View File

@ -18,6 +18,7 @@ import (
"container/heap" "container/heap"
"context" "context"
"fmt" "fmt"
"strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -167,15 +168,12 @@ type RegisterOptions struct {
// client requests them with MinIndex. // client requests them with MinIndex.
SupportsBlocking bool SupportsBlocking bool
// RefreshTimer is the time between attempting to refresh data. // RefreshTimer is the time to sleep between attempts to refresh data.
// If this is zero, then data is refreshed immediately when a fetch // If this is zero, then data is refreshed immediately when a fetch
// is returned. // is returned.
// //
// RefreshTimeout determines the maximum query time for a refresh // Using different values for RefreshTimer and RefreshTimeout, various
// operation. This is specified as part of the query options and is // "refresh" mechanisms can be implemented:
// expected to be implemented by the Type itself.
//
// Using these values, various "refresh" mechanisms can be implemented:
// //
// * With a high timer duration and a low timeout, a timer-based // * With a high timer duration and a low timeout, a timer-based
// refresh can be set that minimizes load on the Consul servers. // refresh can be set that minimizes load on the Consul servers.
@ -184,7 +182,11 @@ type RegisterOptions struct {
// refresh can be set so that changes in server data are recognized // refresh can be set so that changes in server data are recognized
// within the cache very quickly. // within the cache very quickly.
// //
RefreshTimer time.Duration RefreshTimer time.Duration
// RefreshTimeout is the default value for the maximum query time for a fetch
// operation. It is set as FetchOptions.Timeout so that cache.Type
// implementations can use it as the MaxQueryTime.
RefreshTimeout time.Duration RefreshTimeout time.Duration
} }
@ -473,8 +475,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
// keepalives are every 30 seconds so the RPC should fail if the packets are // keepalives are every 30 seconds so the RPC should fail if the packets are
// being blackholed for more than 30 seconds. // being blackholed for more than 30 seconds.
var connectedTimer *time.Timer var connectedTimer *time.Timer
if tEntry.Opts.Refresh && entry.Index > 0 && if tEntry.Opts.Refresh && entry.Index > 0 && tEntry.Opts.RefreshTimeout > 31*time.Second {
tEntry.Opts.RefreshTimeout > (31*time.Second) {
connectedTimer = time.AfterFunc(31*time.Second, func() { connectedTimer = time.AfterFunc(31*time.Second, func() {
c.entriesLock.Lock() c.entriesLock.Lock()
defer c.entriesLock.Unlock() defer c.entriesLock.Unlock()
@ -520,7 +521,9 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
if result.Value != nil { if result.Value != nil {
// A new value was given, so we create a brand new entry. // A new value was given, so we create a brand new entry.
newEntry.Value = result.Value if !result.NotModified {
newEntry.Value = result.Value
}
newEntry.State = result.State newEntry.State = result.State
newEntry.Index = result.Index newEntry.Index = result.Index
newEntry.FetchedAt = time.Now() newEntry.FetchedAt = time.Now()
@ -551,8 +554,9 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
// Error handling // Error handling
if err == nil { if err == nil {
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) labels := []metrics.Label{{Name: "result_not_modified", Value: strconv.FormatBool(result.NotModified)}}
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_success"}, 1) metrics.IncrCounterWithLabels([]string{"consul", "cache", "fetch_success"}, 1, labels)
metrics.IncrCounterWithLabels([]string{"consul", "cache", tEntry.Name, "fetch_success"}, 1, labels)
if result.Index > 0 { if result.Index > 0 {
// Reset the attempts counter so we don't have any backoff // Reset the attempts counter so we don't have any backoff

4
agent/cache/type.go vendored
View File

@ -78,4 +78,8 @@ type FetchResult struct {
// Index is the corresponding index value for this data. // Index is the corresponding index value for this data.
Index uint64 Index uint64
// NotModified indicates that the Value has not changed since LastResult, and
// the LastResult value should be used instead of Value.
NotModified bool
} }