diff --git a/agent/agent.go b/agent/agent.go index ed9082e42..6a908f4c0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -512,7 +512,6 @@ func (a *Agent) Start(ctx context.Context) error { if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil { return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err) } - a.serviceManager.Start() // Load checks/services/metadata. emptyCheckSnapshot := map[structs.CheckID]*structs.HealthCheck{} diff --git a/agent/service_manager.go b/agent/service_manager.go index ba226bb44..21addb029 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -26,13 +26,6 @@ type ServiceManager struct { // services tracks all active watches for registered services services map[structs.ServiceID]*serviceConfigWatch - // registerCh is a channel for receiving service registration requests from - // from serviceConfigWatchers. - // The registrations are handled in the background when watches are notified of - // changes. All sends and receives must also obey the ctx.Done() channel to - // avoid a deadlock during shutdown. - registerCh chan *asyncRegisterRequest - // ctx is the shared context for all goroutines launched ctx context.Context @@ -46,11 +39,10 @@ type ServiceManager struct { func NewServiceManager(agent *Agent) *ServiceManager { ctx, cancel := context.WithCancel(context.Background()) return &ServiceManager{ - agent: agent, - services: make(map[structs.ServiceID]*serviceConfigWatch), - registerCh: make(chan *asyncRegisterRequest), // must be unbuffered - ctx: ctx, - cancel: cancel, + agent: agent, + services: make(map[structs.ServiceID]*serviceConfigWatch), + ctx: ctx, + cancel: cancel, } } @@ -62,36 +54,6 @@ func (s *ServiceManager) Stop() { s.running.Wait() } -// Start starts a background worker goroutine that writes back into the Agent -// state. This only exists to keep the need to lock the agent state lock out of -// the main AddService/RemoveService codepaths to avoid deadlocks. -func (s *ServiceManager) Start() { - s.running.Add(1) - - go func() { - defer s.running.Done() - for { - select { - case <-s.ctx.Done(): - return - case req := <-s.registerCh: - req.Reply <- s.registerOnce(req.Args) - } - } - }() -} - -// runOnce will process a single registration request -func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error { - s.agent.stateLock.Lock() - defer s.agent.stateLock.Unlock() - - if err := s.agent.addServiceInternal(args); err != nil { - return fmt.Errorf("error updating service registration: %v", err) - } - return nil -} - // AddService will (re)create a serviceConfigWatch on the given service. For // each call of this function the first registration will happen inline and // will read the merged global defaults for the service through the agent cache @@ -129,11 +91,7 @@ func (s *ServiceManager) AddService(req addServiceLockedRequest) error { // Get the existing global config and do the initial registration with the // merged config. - watch := &serviceConfigWatch{ - registration: req, - agent: s.agent, - registerCh: s.registerCh, - } + watch := &serviceConfigWatch{registration: req, agent: s.agent} if err := watch.register(s.ctx); err != nil { return err } @@ -168,9 +126,7 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) { // service/proxy defaults. type serviceConfigWatch struct { registration addServiceLockedRequest - - agent *Agent - registerCh chan<- *asyncRegisterRequest + agent *Agent // cacheKey stores the key of the current request, when registration changes // we check to see if a new cache watch is needed. @@ -325,47 +281,30 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat return err } + // make a copy of the AddServiceRequest + req := w.registration + req.Service = merged + req.persistServiceConfig = true + + args := addServiceInternalRequest{ + addServiceLockedRequest: req, + persistService: w.registration.Service, + persistServiceDefaults: serviceDefaults, + } + + w.agent.stateLock.Lock() + defer w.agent.stateLock.Unlock() + // While we were waiting on the agent state lock we may have been shutdown. // So avoid doing a registration in that case. if err := ctx.Err(); err != nil { return nil } - // make a copy of the AddServiceRequest - req := w.registration - req.Service = merged - req.persistServiceConfig = true - - registerReq := &asyncRegisterRequest{ - Args: addServiceInternalRequest{ - addServiceLockedRequest: req, - persistService: w.registration.Service, - persistServiceDefaults: serviceDefaults, - }, - Reply: make(chan error, 1), + if err := w.agent.addServiceInternal(args); err != nil { + return fmt.Errorf("error updating service registration: %v", err) } - - select { - case <-ctx.Done(): - return nil - case w.registerCh <- registerReq: - } - - select { - case <-ctx.Done(): - return nil - - case err := <-registerReq.Reply: - if err != nil { - return fmt.Errorf("error updating service registration: %v", err) - } - return nil - } -} - -type asyncRegisterRequest struct { - Args addServiceInternalRequest - Reply chan error + return nil } func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest {