proxycfg: use rpcclient/health.Client instead of passing around cache name

This should allow us to swap out the implementation with something other
than `agent/cache` without making further code changes.
This commit is contained in:
Daniel Nephin 2021-02-23 12:52:54 -05:00
parent c33570be34
commit 2a53b8293a
7 changed files with 45 additions and 38 deletions

View File

@ -536,12 +536,9 @@ func (a *Agent) Start(ctx context.Context) error {
}
// Start the proxy config manager.
cacheName := cachetype.HealthServicesName
if a.config.UseStreamingBackend {
cacheName = cachetype.StreamingHealthServicesName
}
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: a.cache,
Health: a.rpcClientHealth,
Logger: a.logger.Named(logging.ProxyConfig),
State: a.State,
Source: &structs.QuerySource{
@ -553,9 +550,8 @@ func (a *Agent) Start(ctx context.Context) error {
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
ServiceHealthCacheName: cacheName,
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
})
if err != nil {
return err

View File

@ -71,18 +71,11 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
event := stream.Event{
Index: idx,
Topic: topic,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &n,
},
}
payload := EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &n,
}
// TODO: share this logic with serviceHealthToConnectEvents
if connect && n.Service.Kind == structs.ServiceKindConnectProxy {
payload.key = n.Service.Proxy.DestinationServiceName
}
event.Payload = payload
if !connect {
// append each event as a separate item so that they can be serialized

View File

@ -59,6 +59,8 @@ type ManagerConfig struct {
// Cache is the agent's cache instance that can be used to retrieve, store and
// monitor state for the proxies.
Cache *cache.Cache
// Health provides service health updates on a notification channel.
Health Health
// state is the agent's local state to be watched for new proxy registrations.
State *local.State
// source describes the current agent's identity, it's used directly for
@ -72,9 +74,6 @@ type ManagerConfig struct {
Logger hclog.Logger
TLSConfigurator *tlsutil.Configurator
// TODO: replace this field with a type that exposes Notify
ServiceHealthCacheName string
// IntentionDefaultAllow is set by the agent so that we can pass this
// information to proxies that need to make intention decisions on their
// own.
@ -191,7 +190,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string
}
var err error
state, err = newState(ns, token, m.ManagerConfig.ServiceHealthCacheName)
state, err = newState(ns, token)
if err != nil {
return err
}
@ -199,6 +198,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string
// Set the necessary dependencies
state.logger = m.Logger.With("service_id", sid.String())
state.cache = m.Cache
state.health = m.Health
state.source = m.Source
state.dnsConfig = m.DNSConfig
state.intentionDefaultAllow = m.IntentionDefaultAllow

View File

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/sdk/testutil"
@ -343,11 +344,11 @@ func testManager_BasicLifecycle(
// Create manager
m, err := NewManager(ManagerConfig{
Cache: c,
State: state,
Source: source,
Logger: logger,
ServiceHealthCacheName: cachetype.HealthServicesName,
Cache: c,
Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName},
State: state,
Source: source,
Logger: logger,
})
require.NoError(err)

View File

@ -24,6 +24,10 @@ type CacheNotifier interface {
correlationID string, ch chan<- cache.UpdateEvent) error
}
type Health interface {
Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- cache.UpdateEvent) error
}
const (
coalesceTimeout = 200 * time.Millisecond
rootsWatchID = "roots"
@ -55,6 +59,7 @@ type state struct {
logger hclog.Logger
source *structs.QuerySource
cache CacheNotifier
health Health
dnsConfig DNSConfig
serverSNIFn ServerSNIFunc
intentionDefaultAllow bool
@ -73,9 +78,6 @@ type state struct {
proxyCfg structs.ConnectProxyConfig
token string
// TODO: replace this field with a type that exposes Notify
serviceHealthCacheName string
ch chan cache.UpdateEvent
snapCh chan ConfigSnapshot
reqCh chan chan *ConfigSnapshot
@ -124,7 +126,7 @@ func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error
//
// The returned state needs its required dependencies to be set before Watch
// can be called.
func newState(ns *structs.NodeService, token string, serviceHealthCacheName string) (*state, error) {
func newState(ns *structs.NodeService, token string) (*state, error) {
switch ns.Kind {
case structs.ServiceKindConnectProxy:
case structs.ServiceKindTerminatingGateway:
@ -160,7 +162,6 @@ func newState(ns *structs.NodeService, token string, serviceHealthCacheName stri
proxyCfg: proxyCfg,
token: token,
serviceHealthCacheName: serviceHealthCacheName,
// 10 is fairly arbitrary here but allow for the 3 mandatory and a
// reasonable number of upstream watches to all deliver their initial
// messages in parallel without blocking the cache.Notify loops. It's not a
@ -231,7 +232,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri
var finalMeta structs.EnterpriseMeta
finalMeta.Merge(entMeta)
return s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{
return s.health.Notify(ctx, structs.ServiceSpecificRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{
Token: s.token,
@ -449,7 +450,7 @@ func (s *state) initWatchesMeshGateway() error {
return err
}
err = s.cache.Notify(s.ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{
err = s.health.Notify(s.ctx, structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: structs.ConsulServiceName,
@ -975,7 +976,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config
// Watch the health endpoint to discover endpoints for the service
if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok {
ctx, cancel := context.WithCancel(s.ctx)
err := s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: svc.Service.Name,
@ -1273,7 +1274,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok {
ctx, cancel := context.WithCancel(s.ctx)
err := s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: svc.Name,

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
)
@ -112,7 +113,7 @@ func TestStateChanged(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
state, err := newState(tt.ns, tt.token, cachetype.HealthServicesName)
state, err := newState(tt.ns, tt.token)
require.NoError(err)
otherNS, otherToken := tt.mutate(*tt.ns, tt.token)
require.Equal(tt.want, state.Changed(otherNS, otherToken))
@ -144,6 +145,10 @@ func (cn *testCacheNotifier) Notify(ctx context.Context, t string, r cache.Reque
return nil
}
func (cn *testCacheNotifier) Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) {
panic("Get: not implemented")
}
func (cn *testCacheNotifier) getNotifierRequest(t testing.TB, correlationId string) testCacheNotifierRequest {
cn.lock.RLock()
req, ok := cn.notifiers[correlationId]
@ -1510,7 +1515,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
state, err := newState(&tc.ns, "", cachetype.HealthServicesName)
state, err := newState(&tc.ns, "")
// verify building the initial state worked
require.NoError(t, err)
@ -1522,6 +1527,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// setup a new testing cache notifier
cn := newTestCacheNotifier()
state.cache = cn
state.health = &health.Client{Cache: cn, CacheName: cachetype.HealthServicesName}
// setup the local datacenter information
state.source = &structs.QuerySource{

View File

@ -20,6 +20,7 @@ type NetRPC interface {
type CacheGetter interface {
Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error)
Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error
}
func (c *Client) ServiceNodes(
@ -64,3 +65,12 @@ func (c *Client) getServiceNodes(
return *value, md, nil
}
func (c *Client) Notify(
ctx context.Context,
req structs.ServiceSpecificRequest,
correlationID string,
ch chan<- cache.UpdateEvent,
) error {
return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch)
}