diff --git a/agent/agent.go b/agent/agent.go index 0f2fe11db..00f488a33 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -536,12 +536,9 @@ func (a *Agent) Start(ctx context.Context) error { } // Start the proxy config manager. - cacheName := cachetype.HealthServicesName - if a.config.UseStreamingBackend { - cacheName = cachetype.StreamingHealthServicesName - } a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ Cache: a.cache, + Health: a.rpcClientHealth, Logger: a.logger.Named(logging.ProxyConfig), State: a.State, Source: &structs.QuerySource{ @@ -553,9 +550,8 @@ func (a *Agent) Start(ctx context.Context) error { Domain: a.config.DNSDomain, AltDomain: a.config.DNSAltDomain, }, - TLSConfigurator: a.tlsConfigurator, - IntentionDefaultAllow: intentionDefaultAllow, - ServiceHealthCacheName: cacheName, + TLSConfigurator: a.tlsConfigurator, + IntentionDefaultAllow: intentionDefaultAllow, }) if err != nil { return err diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 20b0a69f6..3f1e1c0e2 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -71,18 +71,11 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { event := stream.Event{ Index: idx, Topic: topic, + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &n, + }, } - payload := EventPayloadCheckServiceNode{ - Op: pbsubscribe.CatalogOp_Register, - Value: &n, - } - - // TODO: share this logic with serviceHealthToConnectEvents - if connect && n.Service.Kind == structs.ServiceKindConnectProxy { - payload.key = n.Service.Proxy.DestinationServiceName - } - - event.Payload = payload if !connect { // append each event as a separate item so that they can be serialized diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index bcd7f6df4..4858d1f33 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -59,6 +59,8 @@ type ManagerConfig struct { // Cache is the agent's cache instance that can be used to retrieve, store and // monitor state for the proxies. Cache *cache.Cache + // Health provides service health updates on a notification channel. + Health Health // state is the agent's local state to be watched for new proxy registrations. State *local.State // source describes the current agent's identity, it's used directly for @@ -72,9 +74,6 @@ type ManagerConfig struct { Logger hclog.Logger TLSConfigurator *tlsutil.Configurator - // TODO: replace this field with a type that exposes Notify - ServiceHealthCacheName string - // IntentionDefaultAllow is set by the agent so that we can pass this // information to proxies that need to make intention decisions on their // own. @@ -191,7 +190,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string } var err error - state, err = newState(ns, token, m.ManagerConfig.ServiceHealthCacheName) + state, err = newState(ns, token) if err != nil { return err } @@ -199,6 +198,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string // Set the necessary dependencies state.logger = m.Logger.With("service_id", sid.String()) state.cache = m.Cache + state.health = m.Health state.source = m.Source state.dnsConfig = m.DNSConfig state.intentionDefaultAllow = m.IntentionDefaultAllow diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 0c30f2cf5..6d68342e4 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/local" + "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/sdk/testutil" @@ -343,11 +344,11 @@ func testManager_BasicLifecycle( // Create manager m, err := NewManager(ManagerConfig{ - Cache: c, - State: state, - Source: source, - Logger: logger, - ServiceHealthCacheName: cachetype.HealthServicesName, + Cache: c, + Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName}, + State: state, + Source: source, + Logger: logger, }) require.NoError(err) diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 27663ca08..7eeeb03f3 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -24,6 +24,10 @@ type CacheNotifier interface { correlationID string, ch chan<- cache.UpdateEvent) error } +type Health interface { + Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- cache.UpdateEvent) error +} + const ( coalesceTimeout = 200 * time.Millisecond rootsWatchID = "roots" @@ -55,6 +59,7 @@ type state struct { logger hclog.Logger source *structs.QuerySource cache CacheNotifier + health Health dnsConfig DNSConfig serverSNIFn ServerSNIFunc intentionDefaultAllow bool @@ -73,9 +78,6 @@ type state struct { proxyCfg structs.ConnectProxyConfig token string - // TODO: replace this field with a type that exposes Notify - serviceHealthCacheName string - ch chan cache.UpdateEvent snapCh chan ConfigSnapshot reqCh chan chan *ConfigSnapshot @@ -124,7 +126,7 @@ func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error // // The returned state needs its required dependencies to be set before Watch // can be called. -func newState(ns *structs.NodeService, token string, serviceHealthCacheName string) (*state, error) { +func newState(ns *structs.NodeService, token string) (*state, error) { switch ns.Kind { case structs.ServiceKindConnectProxy: case structs.ServiceKindTerminatingGateway: @@ -160,7 +162,6 @@ func newState(ns *structs.NodeService, token string, serviceHealthCacheName stri proxyCfg: proxyCfg, token: token, - serviceHealthCacheName: serviceHealthCacheName, // 10 is fairly arbitrary here but allow for the 3 mandatory and a // reasonable number of upstream watches to all deliver their initial // messages in parallel without blocking the cache.Notify loops. It's not a @@ -231,7 +232,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri var finalMeta structs.EnterpriseMeta finalMeta.Merge(entMeta) - return s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{ + return s.health.Notify(ctx, structs.ServiceSpecificRequest{ Datacenter: dc, QueryOptions: structs.QueryOptions{ Token: s.token, @@ -449,7 +450,7 @@ func (s *state) initWatchesMeshGateway() error { return err } - err = s.cache.Notify(s.ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{ + err = s.health.Notify(s.ctx, structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: structs.ConsulServiceName, @@ -975,7 +976,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config // Watch the health endpoint to discover endpoints for the service if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok { ctx, cancel := context.WithCancel(s.ctx) - err := s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{ + err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: svc.Service.Name, @@ -1273,7 +1274,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok { ctx, cancel := context.WithCancel(s.ctx) - err := s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{ + err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: svc.Name, diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 3a58ac074..4b2f02404 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/consul/discoverychain" + "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil" ) @@ -112,7 +113,7 @@ func TestStateChanged(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { require := require.New(t) - state, err := newState(tt.ns, tt.token, cachetype.HealthServicesName) + state, err := newState(tt.ns, tt.token) require.NoError(err) otherNS, otherToken := tt.mutate(*tt.ns, tt.token) require.Equal(tt.want, state.Changed(otherNS, otherToken)) @@ -144,6 +145,10 @@ func (cn *testCacheNotifier) Notify(ctx context.Context, t string, r cache.Reque return nil } +func (cn *testCacheNotifier) Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) { + panic("Get: not implemented") +} + func (cn *testCacheNotifier) getNotifierRequest(t testing.TB, correlationId string) testCacheNotifierRequest { cn.lock.RLock() req, ok := cn.notifiers[correlationId] @@ -1510,7 +1515,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { - state, err := newState(&tc.ns, "", cachetype.HealthServicesName) + state, err := newState(&tc.ns, "") // verify building the initial state worked require.NoError(t, err) @@ -1522,6 +1527,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { // setup a new testing cache notifier cn := newTestCacheNotifier() state.cache = cn + state.health = &health.Client{Cache: cn, CacheName: cachetype.HealthServicesName} // setup the local datacenter information state.source = &structs.QuerySource{ diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 96b4fe632..259c5b5f9 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -20,6 +20,7 @@ type NetRPC interface { type CacheGetter interface { Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) + Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error } func (c *Client) ServiceNodes( @@ -64,3 +65,12 @@ func (c *Client) getServiceNodes( return *value, md, nil } + +func (c *Client) Notify( + ctx context.Context, + req structs.ServiceSpecificRequest, + correlationID string, + ch chan<- cache.UpdateEvent, +) error { + return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch) +}