From 6aa022c1cd75fa6f14d8b600aac2fa6c27327aa1 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 17 Apr 2019 21:35:19 -0700 Subject: [PATCH 1/6] Add the service registration manager to the agent --- agent/agent.go | 7 + agent/cache-types/resolved_service_config.go | 52 +++++ .../resolved_service_config_test.go | 67 +++++++ agent/service_manager.go | 183 ++++++++++++++++++ agent/structs/config_entry.go | 27 +++ 5 files changed, 336 insertions(+) create mode 100644 agent/cache-types/resolved_service_config.go create mode 100644 agent/cache-types/resolved_service_config_test.go create mode 100644 agent/service_manager.go diff --git a/agent/agent.go b/agent/agent.go index aa9de46bb..771f8a411 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -243,6 +243,8 @@ type Agent struct { // directly. proxyConfig *proxycfg.Manager + serviceManager *ServiceManager + // xdsServer is the Server instance that serves xDS gRPC API. xdsServer *xds.Server @@ -473,6 +475,9 @@ func (a *Agent) Start() error { } }() + // Start the service registration manager. + a.serviceManager = NewServiceManager(a) + // Start watching for critical services to deregister, based on their // checks. go a.reapServices() @@ -1892,6 +1897,7 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error { func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { a.stateLock.Lock() defer a.stateLock.Unlock() + a.serviceManager.AddService(service, chkTypes, persist, token, source) return a.addServiceLocked(service, chkTypes, persist, token, source) } @@ -2055,6 +2061,7 @@ func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.Check func (a *Agent) RemoveService(serviceID string, persist bool) error { a.stateLock.Lock() defer a.stateLock.Unlock() + a.serviceManager.RemoveService(serviceID) return a.removeServiceLocked(serviceID, persist) } diff --git a/agent/cache-types/resolved_service_config.go b/agent/cache-types/resolved_service_config.go new file mode 100644 index 000000000..d1038283f --- /dev/null +++ b/agent/cache-types/resolved_service_config.go @@ -0,0 +1,52 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +// Recommended name for registration. +const ResolvedServiceConfigName = "resolved-service-config" + +// ResolvedServiceConfig supports fetching the config for a service resolved from +// the global proxy defaults and the centrally registered service config. +type ResolvedServiceConfig struct { + RPC RPC +} + +func (c *ResolvedServiceConfig) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a ServiceConfigRequest. + reqReal, ok := req.(*structs.ServiceConfigRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = opts.MinIndex + reqReal.QueryOptions.MaxQueryTime = opts.Timeout + + // Allways allow stale - there's no point in hitting leader if the request is + // going to be served from cache and endup arbitrarily stale anyway. This + // allows cached service-discover to automatically read scale across all + // servers too. + reqReal.AllowStale = true + + // Fetch + var reply structs.ServiceConfigResponse + if err := c.RPC.RPC("ConfigEntry.ResolveServiceConfig", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + result.Index = reply.QueryMeta.Index + return result, nil +} + +func (c *ResolvedServiceConfig) SupportsBlocking() bool { + return true +} diff --git a/agent/cache-types/resolved_service_config_test.go b/agent/cache-types/resolved_service_config_test.go new file mode 100644 index 000000000..f89002393 --- /dev/null +++ b/agent/cache-types/resolved_service_config_test.go @@ -0,0 +1,67 @@ +package cachetype + +import ( + "testing" + "time" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestResolvedServiceConfig(t *testing.T) { + require := require.New(t) + rpc := TestRPC(t) + defer rpc.AssertExpectations(t) + typ := &ResolvedServiceConfig{RPC: rpc} + + // Expect the proper RPC call. This also sets the expected value + // since that is return-by-pointer in the arguments. + var resp *structs.ServiceConfigResponse + rpc.On("RPC", "ConfigEntry.ResolveServiceConfig", mock.Anything, mock.Anything).Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.ServiceConfigRequest) + require.Equal(uint64(24), req.QueryOptions.MinQueryIndex) + require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime) + require.Equal("foo", req.Name) + require.True(req.AllowStale) + + reply := args.Get(2).(*structs.ServiceConfigResponse) + reply.Definition = structs.ServiceDefinition{ + ID: "1234", + Name: "foo", + } + + reply.QueryMeta.Index = 48 + resp = reply + }) + + // Fetch + resultA, err := typ.Fetch(cache.FetchOptions{ + MinIndex: 24, + Timeout: 1 * time.Second, + }, &structs.ServiceConfigRequest{ + Datacenter: "dc1", + Name: "foo", + }) + require.NoError(err) + require.Equal(cache.FetchResult{ + Value: resp, + Index: 48, + }, resultA) +} + +func TestResolvedServiceConfig_badReqType(t *testing.T) { + require := require.New(t) + rpc := TestRPC(t) + defer rpc.AssertExpectations(t) + typ := &ResolvedServiceConfig{RPC: rpc} + + // Fetch + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) + require.Error(err) + require.Contains(err.Error(), "wrong type") + +} diff --git a/agent/service_manager.go b/agent/service_manager.go new file mode 100644 index 000000000..ea131dd10 --- /dev/null +++ b/agent/service_manager.go @@ -0,0 +1,183 @@ +package agent + +import ( + "fmt" + "sync" + + "github.com/hashicorp/consul/agent/cache" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/structs" + "golang.org/x/net/context" +) + +type ServiceManager struct { + services map[string]*serviceConfigWatch + agent *Agent + + sync.Mutex +} + +func NewServiceManager(agent *Agent) *ServiceManager { + return &ServiceManager{ + services: make(map[string]*serviceConfigWatch), + agent: agent, + } +} + +func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) { + s.Lock() + defer s.Unlock() + + reg := serviceRegistration{ + service: service, + chkTypes: chkTypes, + persist: persist, + token: token, + source: source, + } + + // If a service watch already exists, update the registration. Otherwise, + // start a new config watcher. + watch, ok := s.services[service.ID] + if ok { + watch.updateRegistration(®) + } else { + watch := &serviceConfigWatch{ + registration: ®, + updateCh: make(chan cache.UpdateEvent, 1), + agent: s.agent, + } + + s.services[service.ID] = watch + watch.Start() + } +} + +func (s *ServiceManager) RemoveService(serviceID string) { + s.Lock() + defer s.Unlock() + + serviceWatch, ok := s.services[serviceID] + if !ok { + return + } + + serviceWatch.Stop() + delete(s.services, serviceID) +} + +type serviceRegistration struct { + service *structs.NodeService + chkTypes []*structs.CheckType + persist bool + token string + source configSource +} + +type serviceConfigWatch struct { + registration *serviceRegistration + config *structs.ServiceDefinition + + agent *Agent + + updateCh chan cache.UpdateEvent + ctx context.Context + cancelFunc func() + + sync.RWMutex +} + +func (s *serviceConfigWatch) Start() error { + s.ctx, s.cancelFunc = context.WithCancel(context.Background()) + if err := s.startConfigWatch(); err != nil { + return err + } + go s.runWatch() + + return nil +} + +func (s *serviceConfigWatch) runWatch() { + for { + select { + case <-s.ctx.Done(): + return + case event := <-s.updateCh: + s.handleUpdate(event) + } + } +} + +func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) { + switch event.Result.(type) { + case serviceRegistration: + s.Lock() + s.registration = event.Result.(*serviceRegistration) + s.Unlock() + case structs.ServiceConfigResponse: + s.Lock() + s.config = &event.Result.(*structs.ServiceConfigResponse).Definition + s.Unlock() + default: + s.agent.logger.Printf("[ERR] unknown update event type: %T", event) + } + + service := s.mergeServiceConfig() + s.agent.logger.Printf("[INFO] updating service registration: %v, %v", service.ID, service.Meta) + /*err := s.agent.AddService(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) + if err != nil { + s.agent.logger.Printf("[ERR] error updating service registration: %v", err) + }*/ +} + +func (s *serviceConfigWatch) startConfigWatch() error { + s.RLock() + name := s.registration.service.Service + s.RUnlock() + + req := &structs.ServiceConfigRequest{ + Name: name, + Datacenter: s.agent.config.Datacenter, + } + err := s.agent.cache.Notify(s.ctx, cachetype.ResolvedServiceConfigName, req, fmt.Sprintf("service-config:%s", name), s.updateCh) + + return err +} + +func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) { + s.updateCh <- cache.UpdateEvent{ + Result: registration, + } +} + +func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService { + return nil +} + +func (s *serviceConfigWatch) Stop() { + s.cancelFunc() +} + +/* +// Construct the service config request. This will be re-used with an updated + // index to watch for changes in the effective service config. + req := structs.ServiceConfigRequest{ + Name: s.registration.service.Service, + Datacenter: s.agent.config.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.agent.tokens.AgentToken()}, + } + + consul.RetryLoopBackoff(s.shutdownCh, func() error { + var reply structs.ServiceConfigResponse + if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &req, &reply); err != nil { + return err + } + + s.updateConfig(&reply.Definition) + + req.QueryOptions.MinQueryIndex = reply.QueryMeta.Index + return nil + }, func(err error) { + s.agent.logger.Printf("[ERR] Error getting service config: %v", err) + }) +*/ diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 7f571109d..e1ee3fd5c 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -2,10 +2,13 @@ package structs import ( "fmt" + "strconv" "strings" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/go-msgpack/codec" + "github.com/mitchellh/hashstructure" ) const ( @@ -265,6 +268,30 @@ func (s *ServiceConfigRequest) RequestDatacenter() string { return s.Datacenter } +func (r *ServiceConfigRequest) CacheInfo() cache.RequestInfo { + info := cache.RequestInfo{ + Token: r.Token, + Datacenter: r.Datacenter, + MinIndex: r.MinQueryIndex, + Timeout: r.MaxQueryTime, + MaxAge: r.MaxAge, + MustRevalidate: r.MustRevalidate, + } + + // To calculate the cache key we only hash the service name. The + // datacenter is handled by the cache framework. The other fields are + // not, but should not be used in any cache types. + v, err := hashstructure.Hash(r.Name, nil) + if err == nil { + // If there is an error, we don't set the key. A blank key forces + // no cache for this request so the request is forwarded directly + // to the server. + info.Key = strconv.FormatUint(v, 10) + } + + return info +} + type ServiceConfigResponse struct { Definition ServiceDefinition From 6faa8ba4517c093cabc40576a3a348f88f359b7a Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 22 Apr 2019 23:39:02 -0700 Subject: [PATCH 2/6] Fill out the service manager functionality and fix tests --- agent/agent.go | 110 ++++++++++++++++---------- agent/consul/config_endpoint.go | 19 +++-- agent/service_manager.go | 133 +++++++++++++++++++------------- agent/service_manager_test.go | 55 +++++++++++++ agent/structs/structs.go | 51 ++++++++++++ 5 files changed, 267 insertions(+), 101 deletions(-) create mode 100644 agent/service_manager_test.go diff --git a/agent/agent.go b/agent/agent.go index 771f8a411..69a6ffd82 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -289,6 +289,7 @@ func New(c *config.RuntimeConfig) (*Agent, error) { endpoints: make(map[string]string), tokens: new(token.Store), } + a.serviceManager = NewServiceManager(a) if err := a.initializeACLs(); err != nil { return nil, err @@ -475,9 +476,6 @@ func (a *Agent) Start() error { } }() - // Start the service registration manager. - a.serviceManager = NewServiceManager(a) - // Start watching for critical services to deregister, based on their // checks. go a.reapServices() @@ -1897,53 +1895,22 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error { func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { a.stateLock.Lock() defer a.stateLock.Unlock() - a.serviceManager.AddService(service, chkTypes, persist, token, source) return a.addServiceLocked(service, chkTypes, persist, token, source) } func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { - if service.Service == "" { - return fmt.Errorf("Service name missing") - } - if service.ID == "" && service.Service != "" { - service.ID = service.Service - } - for _, check := range chkTypes { - if err := check.Validate(); err != nil { - return fmt.Errorf("Check is not valid: %v", err) - } + if err := a.validateService(service, chkTypes); err != nil { + return err } - // Set default weights if not specified. This is important as it ensures AE - // doesn't consider the service different since it has nil weights. - if service.Weights == nil { - service.Weights = &structs.Weights{Passing: 1, Warning: 1} + if err := a.serviceManager.AddService(service, chkTypes, persist, token, source); err != nil { + return err } - // Warn if the service name is incompatible with DNS - if InvalidDnsRe.MatchString(service.Service) { - a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+ - "via DNS due to invalid characters. Valid characters include "+ - "all alpha-numerics and dashes.", service.Service) - } else if len(service.Service) > MaxDNSLabelLength { - a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+ - "via DNS due to it being too long. Valid lengths are between "+ - "1 and 63 bytes.", service.Service) - } - - // Warn if any tags are incompatible with DNS - for _, tag := range service.Tags { - if InvalidDnsRe.MatchString(tag) { - a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+ - "via DNS due to invalid characters. Valid characters include "+ - "all alpha-numerics and dashes.", tag) - } else if len(tag) > MaxDNSLabelLength { - a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+ - "via DNS due to it being too long. Valid lengths are between "+ - "1 and 63 bytes.", tag) - } - } + return nil +} +func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { // Pause the service syncs during modification a.PauseSync() defer a.ResumeSync() @@ -2033,6 +2000,54 @@ func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*struc return nil } +// validateService validates an service and its checks, either returning an error or emitting a +// warning based on the nature of the error. +func (a *Agent) validateService(service *structs.NodeService, chkTypes []*structs.CheckType) error { + if service.Service == "" { + return fmt.Errorf("Service name missing") + } + if service.ID == "" && service.Service != "" { + service.ID = service.Service + } + for _, check := range chkTypes { + if err := check.Validate(); err != nil { + return fmt.Errorf("Check is not valid: %v", err) + } + } + + // Set default weights if not specified. This is important as it ensures AE + // doesn't consider the service different since it has nil weights. + if service.Weights == nil { + service.Weights = &structs.Weights{Passing: 1, Warning: 1} + } + + // Warn if the service name is incompatible with DNS + if InvalidDnsRe.MatchString(service.Service) { + a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+ + "via DNS due to invalid characters. Valid characters include "+ + "all alpha-numerics and dashes.", service.Service) + } else if len(service.Service) > MaxDNSLabelLength { + a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+ + "via DNS due to it being too long. Valid lengths are between "+ + "1 and 63 bytes.", service.Service) + } + + // Warn if any tags are incompatible with DNS + for _, tag := range service.Tags { + if InvalidDnsRe.MatchString(tag) { + a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+ + "via DNS due to invalid characters. Valid characters include "+ + "all alpha-numerics and dashes.", tag) + } else if len(tag) > MaxDNSLabelLength { + a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+ + "via DNS due to it being too long. Valid lengths are between "+ + "1 and 63 bytes.", tag) + } + } + + return nil +} + // cleanupRegistration is called on registration error to ensure no there are no // leftovers after a partial failure func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.CheckID) { @@ -2061,7 +2076,6 @@ func (a *Agent) cleanupRegistration(serviceIDs []string, checksIDs []types.Check func (a *Agent) RemoveService(serviceID string, persist bool) error { a.stateLock.Lock() defer a.stateLock.Unlock() - a.serviceManager.RemoveService(serviceID) return a.removeServiceLocked(serviceID, persist) } @@ -2073,6 +2087,9 @@ func (a *Agent) removeServiceLocked(serviceID string, persist bool) error { return fmt.Errorf("ServiceID missing") } + // Shut down the config watch in the service manager. + a.serviceManager.RemoveService(serviceID) + checks := a.State.Checks() var checkIDs []types.CheckID for id, check := range checks { @@ -3677,6 +3694,15 @@ func (a *Agent) registerCache() { RefreshTimer: 0 * time.Second, RefreshTimeout: 10 * time.Minute, }) + + a.cache.RegisterType(cachetype.ResolvedServiceConfigName, &cachetype.ResolvedServiceConfig{ + RPC: a, + }, &cache.RegisterOptions{ + // Maintain a blocking query, retry dropped connections quickly + Refresh: true, + RefreshTimer: 0 * time.Second, + RefreshTimeout: 10 * time.Minute, + }) } // defaultProxyCommand returns the default Connect managed proxy command. diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 566a932f2..720e0df03 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -191,18 +191,25 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r if err != nil { return err } - serviceConf, ok := serviceEntry.(*structs.ServiceConfigEntry) - if !ok { - return fmt.Errorf("invalid service config type %T", serviceEntry) + var serviceConf *structs.ServiceConfigEntry + var ok bool + if serviceEntry != nil { + serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry) + if !ok { + return fmt.Errorf("invalid service config type %T", serviceEntry) + } } _, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal) if err != nil { return err } - proxyConf, ok := proxyEntry.(*structs.ProxyConfigEntry) - if !ok { - return fmt.Errorf("invalid proxy config type %T", serviceEntry) + var proxyConf *structs.ProxyConfigEntry + if proxyEntry != nil { + proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry) + if !ok { + return fmt.Errorf("invalid proxy config type %T", proxyEntry) + } } // Resolve the service definition by overlaying the service config onto the global diff --git a/agent/service_manager.go b/agent/service_manager.go index ea131dd10..f86adf66c 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -24,7 +24,7 @@ func NewServiceManager(agent *Agent) *ServiceManager { } } -func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) { +func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { s.Lock() defer s.Unlock() @@ -40,17 +40,45 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st // start a new config watcher. watch, ok := s.services[service.ID] if ok { - watch.updateRegistration(®) + s.agent.logger.Printf("[DEBUG] agent: updating local registration for service %q", service.ID) + if err := watch.updateRegistration(®); err != nil { + return err + } } else { + // This is a new entry, so get the existing global config and do the initial + // registration with the merged config. + args := structs.ServiceConfigRequest{ + Name: service.Service, + Datacenter: s.agent.config.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken}, + } + if token != "" { + args.QueryOptions.Token = token + } + var resp structs.ServiceConfigResponse + if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &args, &resp); err != nil { + s.agent.logger.Printf("[WARN] agent: could not retrieve central configuration for service %q: %v", + service.Service, err) + } + watch := &serviceConfigWatch{ - registration: ®, - updateCh: make(chan cache.UpdateEvent, 1), - agent: s.agent, + updateCh: make(chan cache.UpdateEvent, 1), + agent: s.agent, + config: &resp.Definition, + } + + // Force an update/register immediately. + if err := watch.updateRegistration(®); err != nil { + return err } s.services[service.ID] = watch - watch.Start() + if err := watch.Start(); err != nil { + return err + } } + + return nil } func (s *ServiceManager) RemoveService(serviceID string) { @@ -84,7 +112,7 @@ type serviceConfigWatch struct { ctx context.Context cancelFunc func() - sync.RWMutex + sync.Mutex } func (s *serviceConfigWatch) Start() error { @@ -103,81 +131,80 @@ func (s *serviceConfigWatch) runWatch() { case <-s.ctx.Done(): return case event := <-s.updateCh: - s.handleUpdate(event) + if err := s.handleUpdate(event, false); err != nil { + s.agent.logger.Printf("[ERR] agent: error handling service update: %v", err) + continue + } } } } -func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent) { +func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error { + s.Lock() + defer s.Unlock() + + if event.Err != nil { + return fmt.Errorf("error watching service config: %v", event.Err) + } + switch event.Result.(type) { - case serviceRegistration: - s.Lock() + case *serviceRegistration: s.registration = event.Result.(*serviceRegistration) - s.Unlock() - case structs.ServiceConfigResponse: - s.Lock() - s.config = &event.Result.(*structs.ServiceConfigResponse).Definition - s.Unlock() + case *structs.ServiceConfigResponse: + resp := event.Result.(*structs.ServiceConfigResponse) + s.config = &resp.Definition default: - s.agent.logger.Printf("[ERR] unknown update event type: %T", event) + return fmt.Errorf("unknown update event type: %T", event) } service := s.mergeServiceConfig() - s.agent.logger.Printf("[INFO] updating service registration: %v, %v", service.ID, service.Meta) - /*err := s.agent.AddService(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) + + if !locked { + s.agent.stateLock.Lock() + defer s.agent.stateLock.Unlock() + } + + err := s.agent.addServiceInternal(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) if err != nil { - s.agent.logger.Printf("[ERR] error updating service registration: %v", err) - }*/ + return fmt.Errorf("error updating service registration: %v", err) + } + + return nil } func (s *serviceConfigWatch) startConfigWatch() error { - s.RLock() name := s.registration.service.Service - s.RUnlock() req := &structs.ServiceConfigRequest{ - Name: name, - Datacenter: s.agent.config.Datacenter, + Name: name, + Datacenter: s.agent.config.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken}, + } + if s.registration.token != "" { + req.QueryOptions.Token = s.registration.token } err := s.agent.cache.Notify(s.ctx, cachetype.ResolvedServiceConfigName, req, fmt.Sprintf("service-config:%s", name), s.updateCh) return err } -func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) { - s.updateCh <- cache.UpdateEvent{ +func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { + return s.handleUpdate(cache.UpdateEvent{ Result: registration, - } + }, true) } func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService { - return nil + if s.config == nil { + return s.registration.service + } + + svc := s.config.NodeService() + svc.Merge(s.registration.service) + + return svc } func (s *serviceConfigWatch) Stop() { s.cancelFunc() } - -/* -// Construct the service config request. This will be re-used with an updated - // index to watch for changes in the effective service config. - req := structs.ServiceConfigRequest{ - Name: s.registration.service.Service, - Datacenter: s.agent.config.Datacenter, - QueryOptions: structs.QueryOptions{Token: s.agent.tokens.AgentToken()}, - } - - consul.RetryLoopBackoff(s.shutdownCh, func() error { - var reply structs.ServiceConfigResponse - if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &req, &reply); err != nil { - return err - } - - s.updateConfig(&reply.Definition) - - req.QueryOptions.MinQueryIndex = reply.QueryMeta.Index - return nil - }, func(err error) { - s.agent.logger.Printf("[ERR] Error getting service config: %v", err) - }) -*/ diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go new file mode 100644 index 000000000..d7487defa --- /dev/null +++ b/agent/service_manager_test.go @@ -0,0 +1,55 @@ +package agent + +import ( + "testing" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/testrpc" + "github.com/stretchr/testify/require" +) + +func TestServiceManager_RegisterService(t *testing.T) { + require := require.New(t) + + a := NewTestAgent(t, t.Name(), "") + defer a.Shutdown() + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register some global proxy config + args := &structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ProxyConfigEntry{ + Config: map[string]interface{}{ + "foo": 1, + }, + }, + } + var out struct{} + require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) + + // Now register a service locally and make sure the resulting State entry + // has the global config in it. + svc := &structs.NodeService{ + ID: "redis", + Service: "redis", + Port: 8000, + } + require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal)) + mergedService := a.State.Service("redis") + require.NotNil(mergedService) + require.Equal(&structs.NodeService{ + ID: "redis", + Service: "redis", + Port: 8000, + Proxy: structs.ConnectProxyConfig{ + Config: map[string]interface{}{ + "foo": int64(1), + }, + }, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + }, mergedService) +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 9e7b6acf1..06988679b 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -765,6 +765,57 @@ type ServiceConnect struct { SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"` } +// Merge overlays the given node's attributes onto the existing node. +func (s *NodeService) Merge(other *NodeService) { + if other.Kind != "" { + s.Kind = other.Kind + } + if other.ID != "" { + s.ID = other.ID + } + if other.Service != "" { + s.Service = other.Service + } + for _, tag := range other.Tags { + s.Tags = append(s.Tags, tag) + } + if other.Address != "" { + s.Address = other.Address + } + if s.Meta == nil { + s.Meta = other.Meta + } else { + for k, v := range other.Meta { + s.Meta[k] = v + } + } + if other.Port != 0 { + s.Port = other.Port + } + if other.Weights != nil { + s.Weights = other.Weights + } + s.EnableTagOverride = other.EnableTagOverride + if other.ProxyDestination != "" { + s.ProxyDestination = other.ProxyDestination + } + + // Take the incoming service's proxy fields and merge the config map. + proxyConf := s.Proxy.Config + s.Proxy = other.Proxy + if proxyConf == nil { + proxyConf = other.Proxy.Config + } else { + for k, v := range other.Proxy.Config { + proxyConf[k] = v + } + } + s.Proxy.Config = proxyConf + + s.Connect = other.Connect + s.LocallyRegisteredAsSidecar = other.LocallyRegisteredAsSidecar +} + // Validate validates the node service configuration. // // NOTE(mitchellh): This currently only validates fields for a ConnectProxy. From f89aa69b9dccd63cdd92aea153804d3448df9fa8 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 23 Apr 2019 03:31:24 -0700 Subject: [PATCH 3/6] Fix a race in the service updates --- agent/service_manager.go | 59 +++++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 16 deletions(-) diff --git a/agent/service_manager.go b/agent/service_manager.go index f86adf66c..9e49748b7 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -10,6 +10,11 @@ import ( "golang.org/x/net/context" ) +// The ServiceManager is a layer for service registration in between the agent +// and the local state. Any services must be registered with the ServiceManager, +// which then maintains a long-running watch of any globally-set service or proxy +// configuration that applies to the service in order to register the final, merged +// service configuration locally in the agent state. type ServiceManager struct { services map[string]*serviceConfigWatch agent *Agent @@ -24,10 +29,11 @@ func NewServiceManager(agent *Agent) *ServiceManager { } } +// AddService starts a new serviceConfigWatch if the service has not been registered, and +// updates the existing registration if it has. For a new service, a call will also be made +// to fetch the merged global defaults that apply to the service in order to compose the +// initial registration. func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { - s.Lock() - defer s.Unlock() - reg := serviceRegistration{ service: service, chkTypes: chkTypes, @@ -38,12 +44,14 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st // If a service watch already exists, update the registration. Otherwise, // start a new config watcher. + s.Lock() watch, ok := s.services[service.ID] + s.Unlock() if ok { - s.agent.logger.Printf("[DEBUG] agent: updating local registration for service %q", service.ID) if err := watch.updateRegistration(®); err != nil { return err } + s.agent.logger.Printf("[DEBUG] agent.manager: updated local registration for service %q", service.ID) } else { // This is a new entry, so get the existing global config and do the initial // registration with the merged config. @@ -72,10 +80,13 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st return err } + s.Lock() s.services[service.ID] = watch + s.Unlock() if err := watch.Start(); err != nil { return err } + s.agent.logger.Printf("[DEBUG] agent.manager: adding local registration for service %q", service.ID) } return nil @@ -94,6 +105,7 @@ func (s *ServiceManager) RemoveService(serviceID string) { delete(s.services, serviceID) } +// serviceRegistration represents a locally registered service. type serviceRegistration struct { service *structs.NodeService chkTypes []*structs.CheckType @@ -102,6 +114,9 @@ type serviceRegistration struct { source configSource } +// serviceConfigWatch is a long running helper for composing the end config +// for a given service from both the local registration and the global +// service/proxy defaults. type serviceConfigWatch struct { registration *serviceRegistration config *structs.ServiceDefinition @@ -125,6 +140,12 @@ func (s *serviceConfigWatch) Start() error { return nil } +func (s *serviceConfigWatch) Stop() { + s.cancelFunc() +} + +// runWatch handles any update events from the cache.Notify until the +// config watch is shut down. func (s *serviceConfigWatch) runWatch() { for { select { @@ -132,14 +153,24 @@ func (s *serviceConfigWatch) runWatch() { return case event := <-s.updateCh: if err := s.handleUpdate(event, false); err != nil { - s.agent.logger.Printf("[ERR] agent: error handling service update: %v", err) - continue + s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err) } } } } +// handleUpdate receives an update event about either the service registration or the +// global config defaults, updates the local state and re-registers the service with +// the newly merged config. This function takes the serviceConfigWatch lock to ensure +// only one update can be happening at a time. func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error { + // Take the agent state lock if needed. This is done before the local config watch + // lock in order to prevent a race between this config watch and others - the config + // watch lock is the inner lock and the agent stateLock is the outer lock. + if !locked { + s.agent.stateLock.Lock() + defer s.agent.stateLock.Unlock() + } s.Lock() defer s.Unlock() @@ -158,12 +189,6 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) } service := s.mergeServiceConfig() - - if !locked { - s.agent.stateLock.Lock() - defer s.agent.stateLock.Unlock() - } - err := s.agent.addServiceInternal(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) if err != nil { return fmt.Errorf("error updating service registration: %v", err) @@ -172,6 +197,8 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) return nil } +// startConfigWatch starts a cache.Notify goroutine to run a continuous blocking query +// on the resolved service config for this service. func (s *serviceConfigWatch) startConfigWatch() error { name := s.registration.service.Service @@ -188,12 +215,16 @@ func (s *serviceConfigWatch) startConfigWatch() error { return err } +// updateRegistration does a synchronous update of the local service registration and +// returns the result. func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { return s.handleUpdate(cache.UpdateEvent{ Result: registration, }, true) } +// mergeServiceConfig returns the final effective config for the watched service, +// including the latest known global defaults from the servers. func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService { if s.config == nil { return s.registration.service @@ -204,7 +235,3 @@ func (s *serviceConfigWatch) mergeServiceConfig() *structs.NodeService { return svc } - -func (s *serviceConfigWatch) Stop() { - s.cancelFunc() -} From 1fc96c770badce9fba8bb03118efd134160e17e9 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 24 Apr 2019 06:11:08 -0700 Subject: [PATCH 4/6] Make central service config opt-in and rework the initial registration --- agent/agent.go | 9 +- agent/cache-types/resolved_service_config.go | 4 +- agent/cache/watch.go | 2 +- agent/config/builder.go | 1 + agent/config/config.go | 1 + agent/config/runtime.go | 6 + agent/config/runtime_test.go | 4 + agent/service_manager.go | 126 ++++++++++++------- agent/service_manager_test.go | 45 ++++++- agent/structs/structs.go | 27 +++- agent/structs/structs_test.go | 97 ++++++++++++++ agent/testagent.go | 2 +- 12 files changed, 265 insertions(+), 59 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 69a6ffd82..a2fa2b885 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1898,18 +1898,21 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.Che return a.addServiceLocked(service, chkTypes, persist, token, source) } +// addServiceLocked adds a service entry to the service manager if enabled, or directly +// to the local state if it is not. This function assumes the state lock is already held. func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { if err := a.validateService(service, chkTypes); err != nil { return err } - if err := a.serviceManager.AddService(service, chkTypes, persist, token, source); err != nil { - return err + if a.config.EnableCentralServiceConfig { + return a.serviceManager.AddService(service, chkTypes, persist, token, source) } - return nil + return a.addServiceInternal(service, chkTypes, persist, token, source) } +// addServiceInternal adds the given service and checks to the local state. func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { // Pause the service syncs during modification a.PauseSync() diff --git a/agent/cache-types/resolved_service_config.go b/agent/cache-types/resolved_service_config.go index d1038283f..c1bba9606 100644 --- a/agent/cache-types/resolved_service_config.go +++ b/agent/cache-types/resolved_service_config.go @@ -30,9 +30,9 @@ func (c *ResolvedServiceConfig) Fetch(opts cache.FetchOptions, req cache.Request reqReal.QueryOptions.MinQueryIndex = opts.MinIndex reqReal.QueryOptions.MaxQueryTime = opts.Timeout - // Allways allow stale - there's no point in hitting leader if the request is + // Always allow stale - there's no point in hitting leader if the request is // going to be served from cache and endup arbitrarily stale anyway. This - // allows cached service-discover to automatically read scale across all + // allows cached resolved-service-config to automatically read scale across all // servers too. reqReal.AllowStale = true diff --git a/agent/cache/watch.go b/agent/cache/watch.go index b4664fcfa..47476c37b 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -95,7 +95,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co // Check the index of the value returned in the cache entry to be sure it // changed - if index < meta.Index { + if index == 0 || index < meta.Index { u := UpdateEvent{correlationID, res, meta, err} select { case ch <- u: diff --git a/agent/config/builder.go b/agent/config/builder.go index 96b424a2f..18706ffe9 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -793,6 +793,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput), DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale), EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks), + EnableCentralServiceConfig: b.boolVal(c.EnableCentralServiceConfig), EnableDebug: b.boolVal(c.EnableDebug), EnableRemoteScriptChecks: enableRemoteScriptChecks, EnableLocalScriptChecks: enableLocalScriptChecks, diff --git a/agent/config/config.go b/agent/config/config.go index 8b7d2d9cf..855ce9194 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -201,6 +201,7 @@ type Config struct { DiscoveryMaxStale *string `json:"discovery_max_stale" hcl:"discovery_max_stale" mapstructure:"discovery_max_stale"` EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"` EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"` + EnableCentralServiceConfig *bool `json:"enable_central_service_config,omitempty" hcl:"enable_central_service_config" mapstructure:"enable_central_service_config"` EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"` EnableScriptChecks *bool `json:"enable_script_checks,omitempty" hcl:"enable_script_checks" mapstructure:"enable_script_checks"` EnableLocalScriptChecks *bool `json:"enable_local_script_checks,omitempty" hcl:"enable_local_script_checks" mapstructure:"enable_local_script_checks"` diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 39e7a0877..dc93b614d 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -669,6 +669,12 @@ type RuntimeConfig struct { // and key). EnableAgentTLSForChecks bool + // EnableCentralServiceConfig controls whether the agent should incorporate + // centralized config such as service-defaults into local service registrations. + // + // hcl: (enable) + EnableCentralServiceConfig bool + // EnableDebug is used to enable various debugging features. // // hcl: enable_debug = (true|false) diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 329abfb6a..59ab47de7 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -3073,6 +3073,7 @@ func TestFullConfig(t *testing.T) { }, "enable_acl_replication": true, "enable_agent_tls_for_checks": true, + "enable_central_service_config": true, "enable_debug": true, "enable_script_checks": true, "enable_local_script_checks": true, @@ -3629,6 +3630,7 @@ func TestFullConfig(t *testing.T) { } enable_acl_replication = true enable_agent_tls_for_checks = true + enable_central_service_config = true enable_debug = true enable_script_checks = true enable_local_script_checks = true @@ -4270,6 +4272,7 @@ func TestFullConfig(t *testing.T) { DiscardCheckOutput: true, DiscoveryMaxStale: 5 * time.Second, EnableAgentTLSForChecks: true, + EnableCentralServiceConfig: true, EnableDebug: true, EnableRemoteScriptChecks: true, EnableLocalScriptChecks: true, @@ -5067,6 +5070,7 @@ func TestSanitize(t *testing.T) { "DiscoveryMaxStale": "0s", "EnableAgentTLSForChecks": false, "EnableDebug": false, + "EnableCentralServiceConfig": false, "EnableLocalScriptChecks": false, "EnableRemoteScriptChecks": false, "EnableSyslog": false, diff --git a/agent/service_manager.go b/agent/service_manager.go index 9e49748b7..163aa78a1 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -19,7 +19,7 @@ type ServiceManager struct { services map[string]*serviceConfigWatch agent *Agent - sync.Mutex + lock sync.Mutex } func NewServiceManager(agent *Agent) *ServiceManager { @@ -44,9 +44,9 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st // If a service watch already exists, update the registration. Otherwise, // start a new config watcher. - s.Lock() + s.lock.Lock() watch, ok := s.services[service.ID] - s.Unlock() + s.lock.Unlock() if ok { if err := watch.updateRegistration(®); err != nil { return err @@ -55,46 +55,39 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st } else { // This is a new entry, so get the existing global config and do the initial // registration with the merged config. - args := structs.ServiceConfigRequest{ - Name: service.Service, - Datacenter: s.agent.config.Datacenter, - QueryOptions: structs.QueryOptions{Token: s.agent.config.ACLAgentToken}, - } - if token != "" { - args.QueryOptions.Token = token - } - var resp structs.ServiceConfigResponse - if err := s.agent.RPC("ConfigEntry.ResolveServiceConfig", &args, &resp); err != nil { - s.agent.logger.Printf("[WARN] agent: could not retrieve central configuration for service %q: %v", - service.Service, err) - } - watch := &serviceConfigWatch{ - updateCh: make(chan cache.UpdateEvent, 1), - agent: s.agent, - config: &resp.Definition, + registration: ®, + readyCh: make(chan error), + updateCh: make(chan cache.UpdateEvent, 1), + agent: s.agent, } - // Force an update/register immediately. - if err := watch.updateRegistration(®); err != nil { - return err - } - - s.Lock() - s.services[service.ID] = watch - s.Unlock() + // Start the config watch, which starts a blocking query for the resolved service config + // in the background. if err := watch.Start(); err != nil { return err } - s.agent.logger.Printf("[DEBUG] agent.manager: adding local registration for service %q", service.ID) + + // Call ReadyWait to block until the cache has returned the initial config and the service + // has been registered. + if err := watch.ReadyWait(); err != nil { + watch.Stop() + return err + } + + s.lock.Lock() + s.services[service.ID] = watch + s.lock.Unlock() + + s.agent.logger.Printf("[DEBUG] agent.manager: added local registration for service %q", service.ID) } return nil } func (s *ServiceManager) RemoveService(serviceID string) { - s.Lock() - defer s.Unlock() + s.lock.Lock() + defer s.lock.Unlock() serviceWatch, ok := s.services[serviceID] if !ok { @@ -123,13 +116,21 @@ type serviceConfigWatch struct { agent *Agent + // readyCh is used for ReadyWait in order to block until the first update + // for the resolved service config is received from the cache. Both this + // and ready are protected the lock. + readyCh chan error + ready bool + updateCh chan cache.UpdateEvent ctx context.Context cancelFunc func() - sync.Mutex + lock sync.Mutex } +// Start starts the config watch and a goroutine to handle updates over +// the updateCh. This is not safe to call more than once. func (s *serviceConfigWatch) Start() error { s.ctx, s.cancelFunc = context.WithCancel(context.Background()) if err := s.startConfigWatch(); err != nil { @@ -144,6 +145,14 @@ func (s *serviceConfigWatch) Stop() { s.cancelFunc() } +// ReadyWait blocks until the readyCh is closed, which means the initial +// registration of the service has been completed. If there was an error +// with the initial registration, it will be returned. +func (s *serviceConfigWatch) ReadyWait() error { + err := <-s.readyCh + return err +} + // runWatch handles any update events from the cache.Notify until the // config watch is shut down. func (s *serviceConfigWatch) runWatch() { @@ -166,34 +175,55 @@ func (s *serviceConfigWatch) runWatch() { func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error { // Take the agent state lock if needed. This is done before the local config watch // lock in order to prevent a race between this config watch and others - the config - // watch lock is the inner lock and the agent stateLock is the outer lock. - if !locked { + // watch lock is the inner lock and the agent stateLock is the outer lock. In the case + // where s.ready == false we assume the lock is already held, since this update is being + // waited on for the initial registration by a call from the agent that already holds + // the lock. + if !locked && s.ready { s.agent.stateLock.Lock() defer s.agent.stateLock.Unlock() } - s.Lock() - defer s.Unlock() + s.lock.Lock() + defer s.lock.Unlock() + // If we got an error, log a warning if this is the first update; otherwise return the error. + // We want the initial update to cause a service registration no matter what. if event.Err != nil { - return fmt.Errorf("error watching service config: %v", event.Err) - } - - switch event.Result.(type) { - case *serviceRegistration: - s.registration = event.Result.(*serviceRegistration) - case *structs.ServiceConfigResponse: - resp := event.Result.(*structs.ServiceConfigResponse) - s.config = &resp.Definition - default: - return fmt.Errorf("unknown update event type: %T", event) + if !s.ready { + s.agent.logger.Printf("[WARN] could not retrieve initial service_defaults config for service %q: %v", + s.registration.service.ID, event.Err) + } else { + return fmt.Errorf("error watching service config: %v", event.Err) + } + } else { + switch event.Result.(type) { + case *serviceRegistration: + s.registration = event.Result.(*serviceRegistration) + case *structs.ServiceConfigResponse: + resp := event.Result.(*structs.ServiceConfigResponse) + s.config = &resp.Definition + default: + return fmt.Errorf("unknown update event type: %T", event) + } } service := s.mergeServiceConfig() err := s.agent.addServiceInternal(service, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source) if err != nil { + // If this is the initial registration, return the error through the readyCh + // so it can be passed back to the original caller. + if !s.ready { + s.readyCh <- err + } return fmt.Errorf("error updating service registration: %v", err) } + // If this is the first registration, set the ready status by closing the channel. + if !s.ready { + close(s.readyCh) + s.ready = true + } + return nil } @@ -216,7 +246,7 @@ func (s *serviceConfigWatch) startConfigWatch() error { } // updateRegistration does a synchronous update of the local service registration and -// returns the result. +// returns the result. The agent stateLock should be held when calling this function. func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { return s.handleUpdate(cache.UpdateEvent{ Result: registration, diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go index d7487defa..9fc2b2196 100644 --- a/agent/service_manager_test.go +++ b/agent/service_manager_test.go @@ -11,7 +11,7 @@ import ( func TestServiceManager_RegisterService(t *testing.T) { require := require.New(t) - a := NewTestAgent(t, t.Name(), "") + a := NewTestAgent(t, t.Name(), "enable_central_service_config = true") defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") @@ -53,3 +53,46 @@ func TestServiceManager_RegisterService(t *testing.T) { }, }, mergedService) } + +func TestServiceManager_Disabled(t *testing.T) { + require := require.New(t) + + a := NewTestAgent(t, t.Name(), "enable_central_service_config = false") + defer a.Shutdown() + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register some global proxy config + args := &structs.ConfigEntryRequest{ + Datacenter: "dc1", + Entry: &structs.ProxyConfigEntry{ + Config: map[string]interface{}{ + "foo": 1, + }, + }, + } + var out struct{} + require.NoError(a.RPC("ConfigEntry.Apply", args, &out)) + + // Now register a service locally and make sure the resulting State entry + // has the global config in it. + svc := &structs.NodeService{ + ID: "redis", + Service: "redis", + Port: 8000, + } + require.NoError(a.AddService(svc, nil, false, "", ConfigSourceLocal)) + mergedService := a.State.Service("redis") + require.NotNil(mergedService) + // The proxy config map shouldn't be present; the agent should ignore global + // defaults here. + require.Equal(&structs.NodeService{ + ID: "redis", + Service: "redis", + Port: 8000, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + }, mergedService) +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 06988679b..94d5b564b 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -765,7 +765,9 @@ type ServiceConnect struct { SidecarService *ServiceDefinition `json:",omitempty" bexpr:"-"` } -// Merge overlays the given node's attributes onto the existing node. +// Merge overlays any non-empty fields of other onto s. Tags, metadata and proxy +// config are unioned together instead of overwritten. The Connect field and the +// non-config proxy fields are taken from other. func (s *NodeService) Merge(other *NodeService) { if other.Kind != "" { s.Kind = other.Kind @@ -776,9 +778,26 @@ func (s *NodeService) Merge(other *NodeService) { if other.Service != "" { s.Service = other.Service } - for _, tag := range other.Tags { - s.Tags = append(s.Tags, tag) + + if s.Tags == nil { + s.Tags = other.Tags + } else if other.Tags != nil { + // Both nodes have tags, so deduplicate and merge them. + tagSet := make(map[string]struct{}) + for _, tag := range s.Tags { + tagSet[tag] = struct{}{} + } + for _, tag := range other.Tags { + tagSet[tag] = struct{}{} + } + tags := make([]string, 0, len(tagSet)) + for tag, _ := range tagSet { + tags = append(tags, tag) + } + sort.Strings(tags) + s.Tags = tags } + if other.Address != "" { s.Address = other.Address } @@ -812,6 +831,8 @@ func (s *NodeService) Merge(other *NodeService) { } s.Proxy.Config = proxyConf + // Just take the entire Connect block from the other node. + // We can revisit this when adding more fields to centralized config. s.Connect = other.Connect s.LocallyRegisteredAsSidecar = other.LocallyRegisteredAsSidecar } diff --git a/agent/structs/structs_test.go b/agent/structs/structs_test.go index e407f758c..37167e780 100644 --- a/agent/structs/structs_test.go +++ b/agent/structs/structs_test.go @@ -561,6 +561,103 @@ func TestStructs_NodeService_IsSame(t *testing.T) { } } +func TestStructs_NodeService_Merge(t *testing.T) { + a := &NodeService{ + Kind: "service", + ID: "foo:1", + Service: "foo", + Tags: []string{"a", "b"}, + Address: "127.0.0.1", + Meta: map[string]string{"a": "b"}, + Port: 1234, + Weights: &Weights{ + Passing: 1, + Warning: 1, + }, + EnableTagOverride: false, + ProxyDestination: "asdf", + Proxy: ConnectProxyConfig{ + DestinationServiceName: "baz", + DestinationServiceID: "baz:1", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 2345, + Config: map[string]interface{}{ + "foo": 1, + }, + }, + Connect: ServiceConnect{ + Native: false, + }, + LocallyRegisteredAsSidecar: false, + } + + b := &NodeService{ + Kind: "other", + ID: "bar:1", + Service: "bar", + Tags: []string{"c", "d"}, + Address: "127.0.0.2", + Meta: map[string]string{"c": "d"}, + Port: 4567, + Weights: &Weights{ + Passing: 2, + Warning: 2, + }, + EnableTagOverride: true, + ProxyDestination: "qwer", + Proxy: ConnectProxyConfig{ + DestinationServiceName: "zoo", + DestinationServiceID: "zoo:1", + LocalServiceAddress: "127.0.0.2", + LocalServicePort: 6789, + Config: map[string]interface{}{ + "bar": 2, + }, + }, + Connect: ServiceConnect{ + Native: true, + }, + LocallyRegisteredAsSidecar: true, + } + + expected := &NodeService{ + Kind: "other", + ID: "bar:1", + Service: "bar", + Tags: []string{"a", "b", "c", "d"}, + Address: "127.0.0.2", + Meta: map[string]string{ + "a": "b", + "c": "d", + }, + Port: 4567, + Weights: &Weights{ + Passing: 2, + Warning: 2, + }, + EnableTagOverride: true, + ProxyDestination: "qwer", + Proxy: ConnectProxyConfig{ + DestinationServiceName: "zoo", + DestinationServiceID: "zoo:1", + LocalServiceAddress: "127.0.0.2", + LocalServicePort: 6789, + Config: map[string]interface{}{ + "foo": 1, + "bar": 2, + }, + }, + Connect: ServiceConnect{ + Native: true, + }, + LocallyRegisteredAsSidecar: true, + } + + a.Merge(b) + + require.Equal(t, expected, a) +} + func TestStructs_HealthCheck_IsSame(t *testing.T) { hc := &HealthCheck{ Node: "node1", diff --git a/agent/testagent.go b/agent/testagent.go index bc74aad3e..cf47fd4b3 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -23,8 +23,8 @@ import ( "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/logger" + "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/stretchr/testify/require" From c02716ac2cff57b453ca8f669abbfb81892e692e Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 24 Apr 2019 06:46:30 -0700 Subject: [PATCH 5/6] Fix a race in the ready logic --- agent/agent.go | 2 ++ agent/service_manager.go | 35 ++++++++++++++++------------------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index a2fa2b885..da059801e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -243,6 +243,8 @@ type Agent struct { // directly. proxyConfig *proxycfg.Manager + // serviceManager is the manager for combining local service registrations with + // the centrally configured proxy/service defaults. serviceManager *ServiceManager // xdsServer is the Server instance that serves xDS gRPC API. diff --git a/agent/service_manager.go b/agent/service_manager.go index 163aa78a1..a227ff158 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -34,6 +34,9 @@ func NewServiceManager(agent *Agent) *ServiceManager { // to fetch the merged global defaults that apply to the service in order to compose the // initial registration. func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error { + s.lock.Lock() + defer s.lock.Unlock() + reg := serviceRegistration{ service: service, chkTypes: chkTypes, @@ -44,9 +47,7 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st // If a service watch already exists, update the registration. Otherwise, // start a new config watcher. - s.lock.Lock() watch, ok := s.services[service.ID] - s.lock.Unlock() if ok { if err := watch.updateRegistration(®); err != nil { return err @@ -75,9 +76,7 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st return err } - s.lock.Lock() s.services[service.ID] = watch - s.lock.Unlock() s.agent.logger.Printf("[DEBUG] agent.manager: added local registration for service %q", service.ID) } @@ -117,10 +116,8 @@ type serviceConfigWatch struct { agent *Agent // readyCh is used for ReadyWait in order to block until the first update - // for the resolved service config is received from the cache. Both this - // and ready are protected the lock. + // for the resolved service config is received from the cache. readyCh chan error - ready bool updateCh chan cache.UpdateEvent ctx context.Context @@ -156,14 +153,16 @@ func (s *serviceConfigWatch) ReadyWait() error { // runWatch handles any update events from the cache.Notify until the // config watch is shut down. func (s *serviceConfigWatch) runWatch() { + firstRun := true for { select { case <-s.ctx.Done(): return case event := <-s.updateCh: - if err := s.handleUpdate(event, false); err != nil { + if err := s.handleUpdate(event, false, firstRun); err != nil { s.agent.logger.Printf("[ERR] agent.manager: error handling service update: %v", err) } + firstRun = false } } } @@ -172,14 +171,13 @@ func (s *serviceConfigWatch) runWatch() { // global config defaults, updates the local state and re-registers the service with // the newly merged config. This function takes the serviceConfigWatch lock to ensure // only one update can be happening at a time. -func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) error { +func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked, firstRun bool) error { // Take the agent state lock if needed. This is done before the local config watch // lock in order to prevent a race between this config watch and others - the config - // watch lock is the inner lock and the agent stateLock is the outer lock. In the case - // where s.ready == false we assume the lock is already held, since this update is being - // waited on for the initial registration by a call from the agent that already holds - // the lock. - if !locked && s.ready { + // watch lock is the inner lock and the agent stateLock is the outer lock. If this is the + // first run we also don't need to take the stateLock, as this is being waited on + // synchronously by a caller that already holds it. + if !locked && !firstRun { s.agent.stateLock.Lock() defer s.agent.stateLock.Unlock() } @@ -189,7 +187,7 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) // If we got an error, log a warning if this is the first update; otherwise return the error. // We want the initial update to cause a service registration no matter what. if event.Err != nil { - if !s.ready { + if firstRun { s.agent.logger.Printf("[WARN] could not retrieve initial service_defaults config for service %q: %v", s.registration.service.ID, event.Err) } else { @@ -212,16 +210,15 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked bool) if err != nil { // If this is the initial registration, return the error through the readyCh // so it can be passed back to the original caller. - if !s.ready { + if firstRun { s.readyCh <- err } return fmt.Errorf("error updating service registration: %v", err) } // If this is the first registration, set the ready status by closing the channel. - if !s.ready { + if firstRun { close(s.readyCh) - s.ready = true } return nil @@ -250,7 +247,7 @@ func (s *serviceConfigWatch) startConfigWatch() error { func (s *serviceConfigWatch) updateRegistration(registration *serviceRegistration) error { return s.handleUpdate(cache.UpdateEvent{ Result: registration, - }, true) + }, true, false) } // mergeServiceConfig returns the final effective config for the watched service, From a113d8ca1feffdf730473d9070a46bd95e62f666 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 25 Apr 2019 02:11:07 -0700 Subject: [PATCH 6/6] Test an index=0 value in cache.Notify --- agent/agent.go | 6 ++++-- agent/cache/watch_test.go | 40 +++++++++++++++++++++++++++------------ agent/config/runtime.go | 2 +- 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index da059801e..71a1fb8a4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2092,8 +2092,10 @@ func (a *Agent) removeServiceLocked(serviceID string, persist bool) error { return fmt.Errorf("ServiceID missing") } - // Shut down the config watch in the service manager. - a.serviceManager.RemoveService(serviceID) + // Shut down the config watch in the service manager if enabled. + if a.config.EnableCentralServiceConfig { + a.serviceManager.RemoveService(serviceID) + } checks := a.State.Checks() var checkIDs []types.CheckID diff --git a/agent/cache/watch_test.go b/agent/cache/watch_test.go index 54b542fe6..1995987ce 100644 --- a/agent/cache/watch_test.go +++ b/agent/cache/watch_test.go @@ -2,6 +2,7 @@ package cache import ( "context" + "errors" "fmt" "sync/atomic" "testing" @@ -23,11 +24,15 @@ func TestCacheNotify(t *testing.T) { }) // Setup triggers to control when "updates" should be delivered - trigger := make([]chan time.Time, 4) + trigger := make([]chan time.Time, 5) for i := range trigger { trigger[i] = make(chan time.Time) } + // Send an error to fake a situation where the servers aren't reachable + // initially. + typ.Static(FetchResult{Value: nil, Index: 0}, errors.New("no servers available")).Once() + // Configure the type typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) { // Assert the right request type - all real Fetch implementations do this so @@ -35,16 +40,16 @@ func TestCacheNotify(t *testing.T) { // break in real life (hint: it did on the first attempt) _, ok := args.Get(1).(*MockRequest) require.True(t, ok) - }) - typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[0]) + }).WaitUntil(trigger[0]) typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[1]) - typ.Static(FetchResult{Value: 42, Index: 7}, nil).Once().WaitUntil(trigger[2]) + typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[2]) + typ.Static(FetchResult{Value: 42, Index: 7}, nil).Once().WaitUntil(trigger[3]) // It's timing dependent whether the blocking loop manages to make another // call before we cancel so don't require it. We need to have a higher index // here because if the index is the same then the cache Get will not return // until the full 10 min timeout expires. This causes the last fetch to return // after cancellation as if it had timed out. - typ.Static(FetchResult{Value: 42, Index: 8}, nil).WaitUntil(trigger[3]) + typ.Static(FetchResult{Value: 42, Index: 8}, nil).WaitUntil(trigger[4]) require := require.New(t) @@ -56,12 +61,12 @@ func TestCacheNotify(t *testing.T) { err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch) require.NoError(err) - // Should receive the first result pretty soon + // Should receive the error with index == 0 first. TestCacheNotifyChResult(t, ch, UpdateEvent{ CorrelationID: "test", - Result: 1, - Meta: ResultMeta{Hit: false, Index: 4}, - Err: nil, + Result: nil, + Meta: ResultMeta{Hit: false, Index: 0}, + Err: errors.New("no servers available"), }) // There should be no more updates delivered yet @@ -70,6 +75,17 @@ func TestCacheNotify(t *testing.T) { // Trigger blocking query to return a "change" close(trigger[0]) + // Should receive the first real update next. + TestCacheNotifyChResult(t, ch, UpdateEvent{ + CorrelationID: "test", + Result: 1, + Meta: ResultMeta{Hit: false, Index: 4}, + Err: nil, + }) + + // Trigger blocking query to return a "change" + close(trigger[1]) + // Should receive the next result pretty soon TestCacheNotifyChResult(t, ch, UpdateEvent{ CorrelationID: "test", @@ -99,7 +115,7 @@ func TestCacheNotify(t *testing.T) { // We could wait for a full timeout but we can't directly observe it so // simulate the behavior by triggering a response with the same value and // index as the last one. - close(trigger[1]) + close(trigger[2]) // We should NOT be notified about that. Note this is timing dependent but // it's only a sanity check, if we somehow _do_ get the change delivered later @@ -108,7 +124,7 @@ func TestCacheNotify(t *testing.T) { require.Len(ch, 0) // Trigger final update - close(trigger[2]) + close(trigger[3]) TestCacheNotifyChResult(t, ch, UpdateEvent{ CorrelationID: "test", @@ -134,7 +150,7 @@ func TestCacheNotify(t *testing.T) { // have no way to interrupt a blocking query. In practice it's fine to know // that after 10 mins max the blocking query will return and the resources // will be cleaned. - close(trigger[3]) + close(trigger[4]) // I want to test that canceling the context cleans up goroutines (which it // does from manual verification with debugger etc). I had a check based on a diff --git a/agent/config/runtime.go b/agent/config/runtime.go index dc93b614d..ba3689a2c 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -672,7 +672,7 @@ type RuntimeConfig struct { // EnableCentralServiceConfig controls whether the agent should incorporate // centralized config such as service-defaults into local service registrations. // - // hcl: (enable) + // hcl: enable_central_service_config = (true|false) EnableCentralServiceConfig bool // EnableDebug is used to enable various debugging features.