From 08e8ed0a7e6af4b211ade831d026145549a13f43 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 30 Nov 2020 18:19:11 -0500 Subject: [PATCH] agent: remove ServiceManager goroutine The ServiceManager.Start goroutine was used to serialize calls to agent.addServiceInternal. All the goroutines which sent events to the channel would block waiting for a response from that same goroutine, which is effectively the same as a synchronous call without any channels. This commit removes the goroutine and channels, and instead calls addServiceInternal directly. Since all of these goroutines will need to take the agent.stateLock, the mutex handles the serializing of calls. --- agent/agent.go | 1 - agent/service_manager.go | 107 +++++++++------------------------------ 2 files changed, 23 insertions(+), 85 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index ed9082e42..6a908f4c0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -512,7 +512,6 @@ func (a *Agent) Start(ctx context.Context) error { if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil { return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err) } - a.serviceManager.Start() // Load checks/services/metadata. emptyCheckSnapshot := map[structs.CheckID]*structs.HealthCheck{} diff --git a/agent/service_manager.go b/agent/service_manager.go index ba226bb44..21addb029 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -26,13 +26,6 @@ type ServiceManager struct { // services tracks all active watches for registered services services map[structs.ServiceID]*serviceConfigWatch - // registerCh is a channel for receiving service registration requests from - // from serviceConfigWatchers. - // The registrations are handled in the background when watches are notified of - // changes. All sends and receives must also obey the ctx.Done() channel to - // avoid a deadlock during shutdown. - registerCh chan *asyncRegisterRequest - // ctx is the shared context for all goroutines launched ctx context.Context @@ -46,11 +39,10 @@ type ServiceManager struct { func NewServiceManager(agent *Agent) *ServiceManager { ctx, cancel := context.WithCancel(context.Background()) return &ServiceManager{ - agent: agent, - services: make(map[structs.ServiceID]*serviceConfigWatch), - registerCh: make(chan *asyncRegisterRequest), // must be unbuffered - ctx: ctx, - cancel: cancel, + agent: agent, + services: make(map[structs.ServiceID]*serviceConfigWatch), + ctx: ctx, + cancel: cancel, } } @@ -62,36 +54,6 @@ func (s *ServiceManager) Stop() { s.running.Wait() } -// Start starts a background worker goroutine that writes back into the Agent -// state. This only exists to keep the need to lock the agent state lock out of -// the main AddService/RemoveService codepaths to avoid deadlocks. -func (s *ServiceManager) Start() { - s.running.Add(1) - - go func() { - defer s.running.Done() - for { - select { - case <-s.ctx.Done(): - return - case req := <-s.registerCh: - req.Reply <- s.registerOnce(req.Args) - } - } - }() -} - -// runOnce will process a single registration request -func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error { - s.agent.stateLock.Lock() - defer s.agent.stateLock.Unlock() - - if err := s.agent.addServiceInternal(args); err != nil { - return fmt.Errorf("error updating service registration: %v", err) - } - return nil -} - // AddService will (re)create a serviceConfigWatch on the given service. For // each call of this function the first registration will happen inline and // will read the merged global defaults for the service through the agent cache @@ -129,11 +91,7 @@ func (s *ServiceManager) AddService(req addServiceLockedRequest) error { // Get the existing global config and do the initial registration with the // merged config. - watch := &serviceConfigWatch{ - registration: req, - agent: s.agent, - registerCh: s.registerCh, - } + watch := &serviceConfigWatch{registration: req, agent: s.agent} if err := watch.register(s.ctx); err != nil { return err } @@ -168,9 +126,7 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) { // service/proxy defaults. type serviceConfigWatch struct { registration addServiceLockedRequest - - agent *Agent - registerCh chan<- *asyncRegisterRequest + agent *Agent // cacheKey stores the key of the current request, when registration changes // we check to see if a new cache watch is needed. @@ -325,47 +281,30 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat return err } + // make a copy of the AddServiceRequest + req := w.registration + req.Service = merged + req.persistServiceConfig = true + + args := addServiceInternalRequest{ + addServiceLockedRequest: req, + persistService: w.registration.Service, + persistServiceDefaults: serviceDefaults, + } + + w.agent.stateLock.Lock() + defer w.agent.stateLock.Unlock() + // While we were waiting on the agent state lock we may have been shutdown. // So avoid doing a registration in that case. if err := ctx.Err(); err != nil { return nil } - // make a copy of the AddServiceRequest - req := w.registration - req.Service = merged - req.persistServiceConfig = true - - registerReq := &asyncRegisterRequest{ - Args: addServiceInternalRequest{ - addServiceLockedRequest: req, - persistService: w.registration.Service, - persistServiceDefaults: serviceDefaults, - }, - Reply: make(chan error, 1), + if err := w.agent.addServiceInternal(args); err != nil { + return fmt.Errorf("error updating service registration: %v", err) } - - select { - case <-ctx.Done(): - return nil - case w.registerCh <- registerReq: - } - - select { - case <-ctx.Done(): - return nil - - case err := <-registerReq.Reply: - if err != nil { - return fmt.Errorf("error updating service registration: %v", err) - } - return nil - } -} - -type asyncRegisterRequest struct { - Args addServiceInternalRequest - Reply chan error + return nil } func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest {