diff --git a/agent/service_manager.go b/agent/service_manager.go index 16d15255b..a20a00d19 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -168,7 +168,6 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error { // merged config. watch := &serviceConfigWatch{ registration: reg, - updateCh: make(chan cache.UpdateEvent, 1), agent: s.agent, registerCh: s.registerCh, } @@ -229,9 +228,6 @@ type serviceConfigWatch struct { // we check to see if a new cache watch is needed. cacheKey string - // updateCh receives changes from cache watchers - updateCh chan cache.UpdateEvent - cancelFunc func() running sync.WaitGroup } @@ -244,8 +240,6 @@ func (w *serviceConfigWatch) RegisterAndStart( persistServiceConfig bool, wg *sync.WaitGroup, ) error { - service := w.registration.service - // Either we explicitly block waiting for defaults before registering, // or we feed it some seed data (or NO data) and bypass the blocking // operation. Either way the watcher will end up with something flagged @@ -254,7 +248,8 @@ func (w *serviceConfigWatch) RegisterAndStart( var err error serviceDefaults, err = w.fetchDefaults(ctx) if err != nil { - return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err) + return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", + w.registration.service.ID, err) } } @@ -318,6 +313,8 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro req := makeConfigRequest(w.agent, w.registration) w.cacheKey = req.CacheInfo().Key + updateCh := make(chan cache.UpdateEvent, 1) + // We use the cache key as the correlationID here. Notify in general will not // respond on the updateCh after the context is cancelled however there could // possible be a race where it has only just got an update and checked the @@ -329,7 +326,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro cachetype.ResolvedServiceConfigName, req, w.cacheKey, - w.updateCh, + updateCh, ) if err != nil { w.cancelFunc() @@ -338,7 +335,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro w.running.Add(1) wg.Add(1) - go w.runWatch(ctx, wg) + go w.runWatch(ctx, wg, updateCh) return nil } @@ -352,7 +349,7 @@ func (w *serviceConfigWatch) Stop() { // config watch is shut down. // // NOTE: the caller must NOT hold the Agent.stateLock! -func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) { +func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup, updateCh chan cache.UpdateEvent) { defer wg.Done() defer w.running.Done() @@ -360,7 +357,7 @@ func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) { select { case <-ctx.Done(): return - case event := <-w.updateCh: + case event := <-updateCh: if err := w.handleUpdate(ctx, event); err != nil { w.agent.logger.Error("error handling service update", "error", err) } @@ -401,10 +398,8 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat // While we were waiting on the agent state lock we may have been shutdown. // So avoid doing a registration in that case. - select { - case <-ctx.Done(): + if err := ctx.Err(); err != nil { return nil - default: } registerReq := &asyncRegisterRequest{