agent/service_manager: remove 'updateCh' field from serviceConfigWatch

Passing the channel to the function which uses it significantly
reduces the scope of the variable, and makes its usage more explicit. It
also moves the initialization of the channel closer to where it is used.

Also includes a couple very small cleanups to remove a local var and
read the error from `ctx.Err()` directly instead of creating a channel
to check for an error.
This commit is contained in:
Daniel Nephin 2020-04-17 17:04:58 -04:00
parent 1c7fce73a8
commit bd866d694c
1 changed files with 9 additions and 14 deletions

View File

@ -168,7 +168,6 @@ func (s *ServiceManager) AddService(req *addServiceRequest) error {
// merged config.
watch := &serviceConfigWatch{
registration: reg,
updateCh: make(chan cache.UpdateEvent, 1),
agent: s.agent,
registerCh: s.registerCh,
}
@ -229,9 +228,6 @@ type serviceConfigWatch struct {
// we check to see if a new cache watch is needed.
cacheKey string
// updateCh receives changes from cache watchers
updateCh chan cache.UpdateEvent
cancelFunc func()
running sync.WaitGroup
}
@ -244,8 +240,6 @@ func (w *serviceConfigWatch) RegisterAndStart(
persistServiceConfig bool,
wg *sync.WaitGroup,
) error {
service := w.registration.service
// Either we explicitly block waiting for defaults before registering,
// or we feed it some seed data (or NO data) and bypass the blocking
// operation. Either way the watcher will end up with something flagged
@ -254,7 +248,8 @@ func (w *serviceConfigWatch) RegisterAndStart(
var err error
serviceDefaults, err = w.fetchDefaults(ctx)
if err != nil {
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err)
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
w.registration.service.ID, err)
}
}
@ -318,6 +313,8 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
req := makeConfigRequest(w.agent, w.registration)
w.cacheKey = req.CacheInfo().Key
updateCh := make(chan cache.UpdateEvent, 1)
// We use the cache key as the correlationID here. Notify in general will not
// respond on the updateCh after the context is cancelled however there could
// possible be a race where it has only just got an update and checked the
@ -329,7 +326,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
cachetype.ResolvedServiceConfigName,
req,
w.cacheKey,
w.updateCh,
updateCh,
)
if err != nil {
w.cancelFunc()
@ -338,7 +335,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
w.running.Add(1)
wg.Add(1)
go w.runWatch(ctx, wg)
go w.runWatch(ctx, wg, updateCh)
return nil
}
@ -352,7 +349,7 @@ func (w *serviceConfigWatch) Stop() {
// config watch is shut down.
//
// NOTE: the caller must NOT hold the Agent.stateLock!
func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) {
func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup, updateCh chan cache.UpdateEvent) {
defer wg.Done()
defer w.running.Done()
@ -360,7 +357,7 @@ func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup) {
select {
case <-ctx.Done():
return
case event := <-w.updateCh:
case event := <-updateCh:
if err := w.handleUpdate(ctx, event); err != nil {
w.agent.logger.Error("error handling service update", "error", err)
}
@ -401,10 +398,8 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
// While we were waiting on the agent state lock we may have been shutdown.
// So avoid doing a registration in that case.
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return nil
default:
}
registerReq := &asyncRegisterRequest{