From 081173652ce5a3ec6bd20e9ecc26dc9f0a20bea2 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 30 Nov 2020 17:50:06 -0500 Subject: [PATCH 1/4] agent: Minor cosmetic changes in ServiceManager Also use the non-deprecated func in a test --- agent/service_manager.go | 20 +++++++------------- agent/service_manager_test.go | 11 +++++++++-- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/agent/service_manager.go b/agent/service_manager.go index eddc12cfe..ba226bb44 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -134,7 +134,10 @@ func (s *ServiceManager) AddService(req addServiceLockedRequest) error { agent: s.agent, registerCh: s.registerCh, } - if err := watch.RegisterAndStart(s.ctx, &s.running); err != nil { + if err := watch.register(s.ctx); err != nil { + return err + } + if err := watch.start(s.ctx, &s.running); err != nil { return err } @@ -178,7 +181,7 @@ type serviceConfigWatch struct { } // NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.WaitGroup) error { +func (w *serviceConfigWatch) register(ctx context.Context) error { serviceDefaults, err := w.registration.serviceDefaults(ctx) if err != nil { return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", @@ -204,10 +207,7 @@ func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.Wait if err != nil { return fmt.Errorf("error updating service registration: %v", err) } - - // Start the config watch, which starts a blocking query for the - // resolved service config in the background. - return w.start(ctx, wg) + return nil } func serviceDefaultsFromStruct(v *structs.ServiceConfigResponse) func(context.Context) (*structs.ServiceConfigResponse, error) { @@ -256,13 +256,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro // context before we cancel and so might still deliver the old event. Using // the cacheKey allows us to ignore updates from the old cache watch and makes // even this rare edge case safe. - err := w.agent.cache.Notify( - ctx, - cachetype.ResolvedServiceConfigName, - req, - w.cacheKey, - updateCh, - ) + err := w.agent.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, req, w.cacheKey, updateCh) if err != nil { w.cancelFunc() return err diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go index ea1d6c4f1..8ed22e9f5 100644 --- a/agent/service_manager_test.go +++ b/agent/service_manager_test.go @@ -387,12 +387,19 @@ func TestServiceManager_PersistService_API(t *testing.T) { configFile := filepath.Join(a.Config.DataDir, serviceConfigDir, svcID.StringHash()) // Service is not persisted unless requested, but we always persist service configs. - require.NoError(a.addServiceFromSource(svc, nil, false, "", ConfigSourceRemote)) + err = a.AddService(AddServiceRequest{Service: svc, Source: ConfigSourceRemote}) + require.NoError(err) requireFileIsAbsent(t, svcFile) requireFileIsPresent(t, configFile) // Persists to file if requested - require.NoError(a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceRemote)) + err = a.AddService(AddServiceRequest{ + Service: svc, + persist: true, + token: "mytoken", + Source: ConfigSourceRemote, + }) + require.NoError(err) requireFileIsPresent(t, svcFile) requireFileIsPresent(t, configFile) From 08e8ed0a7e6af4b211ade831d026145549a13f43 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 30 Nov 2020 18:19:11 -0500 Subject: [PATCH 2/4] agent: remove ServiceManager goroutine The ServiceManager.Start goroutine was used to serialize calls to agent.addServiceInternal. All the goroutines which sent events to the channel would block waiting for a response from that same goroutine, which is effectively the same as a synchronous call without any channels. This commit removes the goroutine and channels, and instead calls addServiceInternal directly. Since all of these goroutines will need to take the agent.stateLock, the mutex handles the serializing of calls. --- agent/agent.go | 1 - agent/service_manager.go | 107 +++++++++------------------------------ 2 files changed, 23 insertions(+), 85 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index ed9082e42..6a908f4c0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -512,7 +512,6 @@ func (a *Agent) Start(ctx context.Context) error { if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil { return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err) } - a.serviceManager.Start() // Load checks/services/metadata. emptyCheckSnapshot := map[structs.CheckID]*structs.HealthCheck{} diff --git a/agent/service_manager.go b/agent/service_manager.go index ba226bb44..21addb029 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -26,13 +26,6 @@ type ServiceManager struct { // services tracks all active watches for registered services services map[structs.ServiceID]*serviceConfigWatch - // registerCh is a channel for receiving service registration requests from - // from serviceConfigWatchers. - // The registrations are handled in the background when watches are notified of - // changes. All sends and receives must also obey the ctx.Done() channel to - // avoid a deadlock during shutdown. - registerCh chan *asyncRegisterRequest - // ctx is the shared context for all goroutines launched ctx context.Context @@ -46,11 +39,10 @@ type ServiceManager struct { func NewServiceManager(agent *Agent) *ServiceManager { ctx, cancel := context.WithCancel(context.Background()) return &ServiceManager{ - agent: agent, - services: make(map[structs.ServiceID]*serviceConfigWatch), - registerCh: make(chan *asyncRegisterRequest), // must be unbuffered - ctx: ctx, - cancel: cancel, + agent: agent, + services: make(map[structs.ServiceID]*serviceConfigWatch), + ctx: ctx, + cancel: cancel, } } @@ -62,36 +54,6 @@ func (s *ServiceManager) Stop() { s.running.Wait() } -// Start starts a background worker goroutine that writes back into the Agent -// state. This only exists to keep the need to lock the agent state lock out of -// the main AddService/RemoveService codepaths to avoid deadlocks. -func (s *ServiceManager) Start() { - s.running.Add(1) - - go func() { - defer s.running.Done() - for { - select { - case <-s.ctx.Done(): - return - case req := <-s.registerCh: - req.Reply <- s.registerOnce(req.Args) - } - } - }() -} - -// runOnce will process a single registration request -func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error { - s.agent.stateLock.Lock() - defer s.agent.stateLock.Unlock() - - if err := s.agent.addServiceInternal(args); err != nil { - return fmt.Errorf("error updating service registration: %v", err) - } - return nil -} - // AddService will (re)create a serviceConfigWatch on the given service. For // each call of this function the first registration will happen inline and // will read the merged global defaults for the service through the agent cache @@ -129,11 +91,7 @@ func (s *ServiceManager) AddService(req addServiceLockedRequest) error { // Get the existing global config and do the initial registration with the // merged config. - watch := &serviceConfigWatch{ - registration: req, - agent: s.agent, - registerCh: s.registerCh, - } + watch := &serviceConfigWatch{registration: req, agent: s.agent} if err := watch.register(s.ctx); err != nil { return err } @@ -168,9 +126,7 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) { // service/proxy defaults. type serviceConfigWatch struct { registration addServiceLockedRequest - - agent *Agent - registerCh chan<- *asyncRegisterRequest + agent *Agent // cacheKey stores the key of the current request, when registration changes // we check to see if a new cache watch is needed. @@ -325,47 +281,30 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat return err } + // make a copy of the AddServiceRequest + req := w.registration + req.Service = merged + req.persistServiceConfig = true + + args := addServiceInternalRequest{ + addServiceLockedRequest: req, + persistService: w.registration.Service, + persistServiceDefaults: serviceDefaults, + } + + w.agent.stateLock.Lock() + defer w.agent.stateLock.Unlock() + // While we were waiting on the agent state lock we may have been shutdown. // So avoid doing a registration in that case. if err := ctx.Err(); err != nil { return nil } - // make a copy of the AddServiceRequest - req := w.registration - req.Service = merged - req.persistServiceConfig = true - - registerReq := &asyncRegisterRequest{ - Args: addServiceInternalRequest{ - addServiceLockedRequest: req, - persistService: w.registration.Service, - persistServiceDefaults: serviceDefaults, - }, - Reply: make(chan error, 1), + if err := w.agent.addServiceInternal(args); err != nil { + return fmt.Errorf("error updating service registration: %v", err) } - - select { - case <-ctx.Done(): - return nil - case w.registerCh <- registerReq: - } - - select { - case <-ctx.Done(): - return nil - - case err := <-registerReq.Reply: - if err != nil { - return fmt.Errorf("error updating service registration: %v", err) - } - return nil - } -} - -type asyncRegisterRequest struct { - Args addServiceInternalRequest - Reply chan error + return nil } func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest { From 3685f3997015c5a7f599abb7ffb113b72ba944bc Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 4 Dec 2020 18:56:03 -0500 Subject: [PATCH 3/4] lib/mutex: add mutex with TryLock and update vendor --- go.mod | 2 +- go.sum | 4 +- lib/mutex/mutex.go | 36 +++++ lib/mutex/mutex_test.go | 93 ++++++++++++ .../golang.org/x/sync/semaphore/semaphore.go | 136 ++++++++++++++++++ .../x/sync/singleflight/singleflight.go | 112 +++++++++++++-- vendor/modules.txt | 3 +- 7 files changed, 372 insertions(+), 14 deletions(-) create mode 100644 lib/mutex/mutex.go create mode 100644 lib/mutex/mutex_test.go create mode 100644 vendor/golang.org/x/sync/semaphore/semaphore.go diff --git a/go.mod b/go.mod index 8ba63deaa..8eceb4990 100644 --- a/go.mod +++ b/go.mod @@ -84,7 +84,7 @@ require ( golang.org/x/crypto v0.0.0-20200930160638-afb6bcd081ae golang.org/x/net v0.0.0-20200930145003-4acb6c075d10 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 - golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 golang.org/x/text v0.3.3 // indirect golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e diff --git a/go.sum b/go.sum index ed143ccf6..3a39744a8 100644 --- a/go.sum +++ b/go.sum @@ -583,8 +583,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o= -golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/lib/mutex/mutex.go b/lib/mutex/mutex.go new file mode 100644 index 000000000..9841acb49 --- /dev/null +++ b/lib/mutex/mutex.go @@ -0,0 +1,36 @@ +/* +Package mutex implements the sync.Locker interface using x/sync/semaphore. It +may be used as a replacement for sync.Mutex when one or more goroutines need to +allow their calls to Lock to be cancelled by context cancellation. +*/ +package mutex + +import ( + "context" + + "golang.org/x/sync/semaphore" +) + +type Mutex semaphore.Weighted + +// New returns a Mutex that is ready for use. +func New() *Mutex { + return (*Mutex)(semaphore.NewWeighted(1)) +} + +func (m *Mutex) Lock() { + _ = (*semaphore.Weighted)(m).Acquire(context.Background(), 1) +} + +func (m *Mutex) Unlock() { + (*semaphore.Weighted)(m).Release(1) +} + +// TryLock acquires the mutex, blocking until resources are available or ctx is +// done. On success, returns nil. On failure, returns ctx.Err() and leaves the +// semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (m *Mutex) TryLock(ctx context.Context) error { + return (*semaphore.Weighted)(m).Acquire(ctx, 1) +} diff --git a/lib/mutex/mutex_test.go b/lib/mutex/mutex_test.go new file mode 100644 index 000000000..0324b3900 --- /dev/null +++ b/lib/mutex/mutex_test.go @@ -0,0 +1,93 @@ +package mutex + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestMutex(t *testing.T) { + t.Run("starts unlocked", func(t *testing.T) { + m := New() + canLock(t, m) + }) + + t.Run("Lock blocks when locked", func(t *testing.T) { + m := New() + m.Lock() + lockIsBlocked(t, m) + }) + + t.Run("Unlock unblocks Lock", func(t *testing.T) { + m := New() + m.Lock() + m.Unlock() // nolint:staticcheck // SA2001 is not relevant here + canLock(t, m) + }) + + t.Run("TryLock acquires lock", func(t *testing.T) { + m := New() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + require.NoError(t, m.TryLock(ctx)) + lockIsBlocked(t, m) + }) + + t.Run("TryLock blocks until timeout when locked", func(t *testing.T) { + m := New() + m.Lock() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + t.Cleanup(cancel) + err := m.TryLock(ctx) + require.Equal(t, err, context.DeadlineExceeded) + }) + + t.Run("TryLock acquires lock before timeout", func(t *testing.T) { + m := New() + m.Lock() + + go func() { + time.Sleep(20 * time.Millisecond) + m.Unlock() + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + err := m.TryLock(ctx) + require.NoError(t, err) + }) + +} + +func canLock(t *testing.T, m *Mutex) { + t.Helper() + chDone := make(chan struct{}) + go func() { + m.Lock() + close(chDone) + }() + + select { + case <-chDone: + case <-time.After(20 * time.Millisecond): + t.Fatal("failed to acquire lock before timeout") + } +} + +func lockIsBlocked(t *testing.T, m *Mutex) { + t.Helper() + chDone := make(chan struct{}) + go func() { + m.Lock() + close(chDone) + }() + + select { + case <-chDone: + t.Fatal("expected Lock to block") + case <-time.After(20 * time.Millisecond): + } +} diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go new file mode 100644 index 000000000..30f632c57 --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore.go @@ -0,0 +1,136 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package semaphore provides a weighted semaphore implementation. +package semaphore // import "golang.org/x/sync/semaphore" + +import ( + "container/list" + "context" + "sync" +) + +type waiter struct { + n int64 + ready chan<- struct{} // Closed when semaphore acquired. +} + +// NewWeighted creates a new weighted semaphore with the given +// maximum combined weight for concurrent access. +func NewWeighted(n int64) *Weighted { + w := &Weighted{size: n} + return w +} + +// Weighted provides a way to bound concurrent access to a resource. +// The callers can request access with a given weight. +type Weighted struct { + size int64 + cur int64 + mu sync.Mutex + waiters list.List +} + +// Acquire acquires the semaphore with a weight of n, blocking until resources +// are available or ctx is done. On success, returns nil. On failure, returns +// ctx.Err() and leaves the semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (s *Weighted) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + if n > s.size { + // Don't make other Acquire calls block on one that's doomed to fail. + s.mu.Unlock() + <-ctx.Done() + return ctx.Err() + } + + ready := make(chan struct{}) + w := waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-ctx.Done(): + err := ctx.Err() + s.mu.Lock() + select { + case <-ready: + // Acquired the semaphore after we were canceled. Rather than trying to + // fix up the queue, just pretend we didn't notice the cancelation. + err = nil + default: + isFront := s.waiters.Front() == elem + s.waiters.Remove(elem) + // If we're at the front and there're extra tokens left, notify other waiters. + if isFront && s.size > s.cur { + s.notifyWaiters() + } + } + s.mu.Unlock() + return err + + case <-ready: + return nil + } +} + +// TryAcquire acquires the semaphore with a weight of n without blocking. +// On success, returns true. On failure, returns false and leaves the semaphore unchanged. +func (s *Weighted) TryAcquire(n int64) bool { + s.mu.Lock() + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n + } + s.mu.Unlock() + return success +} + +// Release releases the semaphore with a weight of n. +func (s *Weighted) Release(n int64) { + s.mu.Lock() + s.cur -= n + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: released more than held") + } + s.notifyWaiters() + s.mu.Unlock() +} + +func (s *Weighted) notifyWaiters() { + for { + next := s.waiters.Front() + if next == nil { + break // No more waiters blocked. + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // Not enough tokens for the next waiter. We could keep going (to try to + // find a waiter with a smaller request), but under load that could cause + // starvation for large requests; instead, we leave all remaining waiters + // blocked. + // + // Consider a semaphore used as a read-write lock, with N tokens, N + // readers, and one writer. Each reader can Acquire(1) to obtain a read + // lock. The writer can Acquire(N) to obtain a write lock, excluding all + // of the readers. If we allow the readers to jump ahead in the queue, + // the writer will starve — there is always one token available for every + // reader. + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) + } +} diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go index 97a1aa4bb..690eb8501 100644 --- a/vendor/golang.org/x/sync/singleflight/singleflight.go +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -6,7 +6,42 @@ // mechanism. package singleflight // import "golang.org/x/sync/singleflight" -import "sync" +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} // call is an in-flight or completed singleflight.Do call type call struct { @@ -57,6 +92,12 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e c.dups++ g.mu.Unlock() c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } return c.val, c.err, true } c := new(call) @@ -70,6 +111,8 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e // DoChan is like Do but returns a channel that will receive the // results when they are ready. +// +// The returned channel will not be closed. func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() @@ -94,17 +137,66 @@ func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result // doCall handles the single call for a key. func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { - c.val, c.err = fn() - c.wg.Done() + normalReturn := false + recovered := false - g.mu.Lock() - if !c.forgotten { - delete(g.m, key) + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + c.wg.Done() + g.mu.Lock() + defer g.mu.Unlock() + if !c.forgotten { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true } - for _, ch := range c.chans { - ch <- Result{c.val, c.err, c.dups > 0} - } - g.mu.Unlock() } // Forget tells the singleflight to forget about a key. Future calls diff --git a/vendor/modules.txt b/vendor/modules.txt index f62567aa2..689f191f0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -505,8 +505,9 @@ golang.org/x/oauth2/google golang.org/x/oauth2/internal golang.org/x/oauth2/jws golang.org/x/oauth2/jwt -# golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a +# golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sync/errgroup +golang.org/x/sync/semaphore golang.org/x/sync/singleflight # golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 golang.org/x/sys/cpu From e1e94ce69cfa8b7a549d8948601d2935f4d1b198 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 4 Dec 2020 19:06:47 -0500 Subject: [PATCH 4/4] agent: use the new lib/mutex for stateLock Previously the ServiceManager had to run a separate goroutine so that it could block on a channel send/receive instead of a lock. Using this mutex with TryLock allows us to cancel the lock when the serviceConfigWatch is stopped. Without this change removing the ServiceManager.Start goroutine would not be possible because when AddService is called it acquires the stateLock. While that lock is held, if there are existing watches for the service, the old watch will be stopped, and the goroutine holding the lock will attempt to wait for that watcher goroutine to exit. If the goroutine is handling an update (serviceConfigWatch.handleUpdate) then it can block on acquiring the stateLock and deadlock the agent. With this change the context is cancelled as and the goroutine will exit instead of waiting on the stateLock. --- agent/agent.go | 4 +++- agent/service_manager.go | 7 ++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 6a908f4c0..201dc7aee 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -48,6 +48,7 @@ import ( "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib/file" + "github.com/hashicorp/consul/lib/mutex" "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" @@ -222,7 +223,7 @@ type Agent struct { exposedPorts map[string]int // stateLock protects the agent state - stateLock sync.Mutex + stateLock *mutex.Mutex // dockerClient is the client for performing docker health checks. dockerClient *checks.DockerClient @@ -358,6 +359,7 @@ func New(bd BaseDeps) (*Agent, error) { retryJoinCh: make(chan error), shutdownCh: make(chan struct{}), endpoints: make(map[string]string), + stateLock: mutex.New(), baseDeps: bd, tokens: bd.Tokens, diff --git a/agent/service_manager.go b/agent/service_manager.go index 21addb029..591c0981f 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -292,11 +292,12 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat persistServiceDefaults: serviceDefaults, } - w.agent.stateLock.Lock() + if err := w.agent.stateLock.TryLock(ctx); err != nil { + return nil + } defer w.agent.stateLock.Unlock() - // While we were waiting on the agent state lock we may have been shutdown. - // So avoid doing a registration in that case. + // The context may have been cancelled after the lock was acquired. if err := ctx.Err(); err != nil { return nil }