diff --git a/agent/agent.go b/agent/agent.go index 0199f5cba..a8885b9e6 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -3954,6 +3954,15 @@ func (a *Agent) registerCache() { RefreshTimer: 0 * time.Second, RefreshTimeout: 10 * time.Minute, }) + + a.cache.RegisterType(cachetype.ConfigEntriesName, &cachetype.ConfigEntries{ + RPC: a, + }, &cache.RegisterOptions{ + // Maintain a blocking query, retry dropped connections quickly + Refresh: true, + RefreshTimer: 0 * time.Second, + RefreshTimeout: 10 * time.Minute, + }) } // defaultProxyCommand returns the default Connect managed proxy command. diff --git a/agent/cache-types/config_entry.go b/agent/cache-types/config_entry.go new file mode 100644 index 000000000..2a9397166 --- /dev/null +++ b/agent/cache-types/config_entry.go @@ -0,0 +1,51 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +// Recommended name for registration. +const ConfigEntriesName = "config-entries" + +// ConfigEntries supports fetching discovering configuration entries +type ConfigEntries struct { + RPC RPC +} + +func (c *ConfigEntries) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a ConfigEntryQuery. + reqReal, ok := req.(*structs.ConfigEntryQuery) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = opts.MinIndex + reqReal.QueryOptions.MaxQueryTime = opts.Timeout + + // Always allow stale - there's no point in hitting leader if the request is + // going to be served from cache and endup arbitrarily stale anyway. This + // allows cached service-discover to automatically read scale across all + // servers too. + reqReal.AllowStale = true + + // Fetch + var reply structs.IndexedConfigEntries + if err := c.RPC.RPC("ConfigEntry.List", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + result.Index = reply.QueryMeta.Index + return result, nil +} + +func (c *ConfigEntries) SupportsBlocking() bool { + return true +} diff --git a/agent/cache-types/config_entry_test.go b/agent/cache-types/config_entry_test.go new file mode 100644 index 000000000..056c23ab8 --- /dev/null +++ b/agent/cache-types/config_entry_test.go @@ -0,0 +1,66 @@ +package cachetype + +import ( + "testing" + "time" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestConfigEntries(t *testing.T) { + rpc := TestRPC(t) + typ := &ConfigEntries{RPC: rpc} + + // Expect the proper RPC call. This also sets the expected value + // since that is return-by-pointer in the arguments. + var resp *structs.IndexedConfigEntries + rpc.On("RPC", "ConfigEntry.List", mock.Anything, mock.Anything).Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.ConfigEntryQuery) + require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) + require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) + require.True(t, req.AllowStale) + require.Equal(t, structs.ServiceResolver, req.Kind) + require.Equal(t, "", req.Name) + + reply := args.Get(2).(*structs.IndexedConfigEntries) + reply.Kind = structs.ServiceResolver + reply.Entries = []structs.ConfigEntry{ + &structs.ServiceResolverConfigEntry{Kind: structs.ServiceResolver, Name: "foo"}, + &structs.ServiceResolverConfigEntry{Kind: structs.ServiceResolver, Name: "bar"}, + } + reply.QueryMeta.Index = 48 + resp = reply + }) + + // Fetch + resultA, err := typ.Fetch(cache.FetchOptions{ + MinIndex: 24, + Timeout: 1 * time.Second, + }, &structs.ConfigEntryQuery{ + Datacenter: "dc1", + Kind: structs.ServiceResolver, + }) + require.NoError(t, err) + require.Equal(t, cache.FetchResult{ + Value: resp, + Index: 48, + }, resultA) + + rpc.AssertExpectations(t) +} + +func TestConfigEntries_badReqType(t *testing.T) { + rpc := TestRPC(t) + typ := &ConfigEntries{RPC: rpc} + + // Fetch + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) + require.Error(t, err) + require.Contains(t, err.Error(), "wrong type") + rpc.AssertExpectations(t) +} diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 6ef8aec82..9cda7ec3d 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -387,6 +387,31 @@ func (c *ConfigEntryQuery) RequestDatacenter() string { return c.Datacenter } +func (r *ConfigEntryQuery) CacheInfo() cache.RequestInfo { + info := cache.RequestInfo{ + Token: r.Token, + Datacenter: r.Datacenter, + MinIndex: r.MinQueryIndex, + Timeout: r.MaxQueryTime, + MaxAge: r.MaxAge, + MustRevalidate: r.MustRevalidate, + } + + v, err := hashstructure.Hash([]interface{}{ + r.Kind, + r.Name, + r.Filter, + }, nil) + if err == nil { + // If there is an error, we don't set the key. A blank key forces + // no cache for this request so the request is forwarded directly + // to the server. + info.Key = strconv.FormatUint(v, 10) + } + + return info +} + // ServiceConfigRequest is used when requesting the resolved configuration // for a service. type ServiceConfigRequest struct {