diff --git a/agent/agent.go b/agent/agent.go index ed9082e42..201dc7aee 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -48,6 +48,7 @@ import ( "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib/file" + "github.com/hashicorp/consul/lib/mutex" "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" @@ -222,7 +223,7 @@ type Agent struct { exposedPorts map[string]int // stateLock protects the agent state - stateLock sync.Mutex + stateLock *mutex.Mutex // dockerClient is the client for performing docker health checks. dockerClient *checks.DockerClient @@ -358,6 +359,7 @@ func New(bd BaseDeps) (*Agent, error) { retryJoinCh: make(chan error), shutdownCh: make(chan struct{}), endpoints: make(map[string]string), + stateLock: mutex.New(), baseDeps: bd, tokens: bd.Tokens, @@ -512,7 +514,6 @@ func (a *Agent) Start(ctx context.Context) error { if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil { return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err) } - a.serviceManager.Start() // Load checks/services/metadata. emptyCheckSnapshot := map[structs.CheckID]*structs.HealthCheck{} diff --git a/agent/service_manager.go b/agent/service_manager.go index eddc12cfe..591c0981f 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -26,13 +26,6 @@ type ServiceManager struct { // services tracks all active watches for registered services services map[structs.ServiceID]*serviceConfigWatch - // registerCh is a channel for receiving service registration requests from - // from serviceConfigWatchers. - // The registrations are handled in the background when watches are notified of - // changes. All sends and receives must also obey the ctx.Done() channel to - // avoid a deadlock during shutdown. - registerCh chan *asyncRegisterRequest - // ctx is the shared context for all goroutines launched ctx context.Context @@ -46,11 +39,10 @@ type ServiceManager struct { func NewServiceManager(agent *Agent) *ServiceManager { ctx, cancel := context.WithCancel(context.Background()) return &ServiceManager{ - agent: agent, - services: make(map[structs.ServiceID]*serviceConfigWatch), - registerCh: make(chan *asyncRegisterRequest), // must be unbuffered - ctx: ctx, - cancel: cancel, + agent: agent, + services: make(map[structs.ServiceID]*serviceConfigWatch), + ctx: ctx, + cancel: cancel, } } @@ -62,36 +54,6 @@ func (s *ServiceManager) Stop() { s.running.Wait() } -// Start starts a background worker goroutine that writes back into the Agent -// state. This only exists to keep the need to lock the agent state lock out of -// the main AddService/RemoveService codepaths to avoid deadlocks. -func (s *ServiceManager) Start() { - s.running.Add(1) - - go func() { - defer s.running.Done() - for { - select { - case <-s.ctx.Done(): - return - case req := <-s.registerCh: - req.Reply <- s.registerOnce(req.Args) - } - } - }() -} - -// runOnce will process a single registration request -func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error { - s.agent.stateLock.Lock() - defer s.agent.stateLock.Unlock() - - if err := s.agent.addServiceInternal(args); err != nil { - return fmt.Errorf("error updating service registration: %v", err) - } - return nil -} - // AddService will (re)create a serviceConfigWatch on the given service. For // each call of this function the first registration will happen inline and // will read the merged global defaults for the service through the agent cache @@ -129,12 +91,11 @@ func (s *ServiceManager) AddService(req addServiceLockedRequest) error { // Get the existing global config and do the initial registration with the // merged config. - watch := &serviceConfigWatch{ - registration: req, - agent: s.agent, - registerCh: s.registerCh, + watch := &serviceConfigWatch{registration: req, agent: s.agent} + if err := watch.register(s.ctx); err != nil { + return err } - if err := watch.RegisterAndStart(s.ctx, &s.running); err != nil { + if err := watch.start(s.ctx, &s.running); err != nil { return err } @@ -165,9 +126,7 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) { // service/proxy defaults. type serviceConfigWatch struct { registration addServiceLockedRequest - - agent *Agent - registerCh chan<- *asyncRegisterRequest + agent *Agent // cacheKey stores the key of the current request, when registration changes // we check to see if a new cache watch is needed. @@ -178,7 +137,7 @@ type serviceConfigWatch struct { } // NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.WaitGroup) error { +func (w *serviceConfigWatch) register(ctx context.Context) error { serviceDefaults, err := w.registration.serviceDefaults(ctx) if err != nil { return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", @@ -204,10 +163,7 @@ func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.Wait if err != nil { return fmt.Errorf("error updating service registration: %v", err) } - - // Start the config watch, which starts a blocking query for the - // resolved service config in the background. - return w.start(ctx, wg) + return nil } func serviceDefaultsFromStruct(v *structs.ServiceConfigResponse) func(context.Context) (*structs.ServiceConfigResponse, error) { @@ -256,13 +212,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro // context before we cancel and so might still deliver the old event. Using // the cacheKey allows us to ignore updates from the old cache watch and makes // even this rare edge case safe. - err := w.agent.cache.Notify( - ctx, - cachetype.ResolvedServiceConfigName, - req, - w.cacheKey, - updateCh, - ) + err := w.agent.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, req, w.cacheKey, updateCh) if err != nil { w.cancelFunc() return err @@ -331,47 +281,31 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat return err } - // While we were waiting on the agent state lock we may have been shutdown. - // So avoid doing a registration in that case. - if err := ctx.Err(); err != nil { - return nil - } - // make a copy of the AddServiceRequest req := w.registration req.Service = merged req.persistServiceConfig = true - registerReq := &asyncRegisterRequest{ - Args: addServiceInternalRequest{ - addServiceLockedRequest: req, - persistService: w.registration.Service, - persistServiceDefaults: serviceDefaults, - }, - Reply: make(chan error, 1), + args := addServiceInternalRequest{ + addServiceLockedRequest: req, + persistService: w.registration.Service, + persistServiceDefaults: serviceDefaults, } - select { - case <-ctx.Done(): - return nil - case w.registerCh <- registerReq: - } - - select { - case <-ctx.Done(): - return nil - - case err := <-registerReq.Reply: - if err != nil { - return fmt.Errorf("error updating service registration: %v", err) - } + if err := w.agent.stateLock.TryLock(ctx); err != nil { return nil } -} + defer w.agent.stateLock.Unlock() -type asyncRegisterRequest struct { - Args addServiceInternalRequest - Reply chan error + // The context may have been cancelled after the lock was acquired. + if err := ctx.Err(); err != nil { + return nil + } + + if err := w.agent.addServiceInternal(args); err != nil { + return fmt.Errorf("error updating service registration: %v", err) + } + return nil } func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest { diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go index ea1d6c4f1..8ed22e9f5 100644 --- a/agent/service_manager_test.go +++ b/agent/service_manager_test.go @@ -387,12 +387,19 @@ func TestServiceManager_PersistService_API(t *testing.T) { configFile := filepath.Join(a.Config.DataDir, serviceConfigDir, svcID.StringHash()) // Service is not persisted unless requested, but we always persist service configs. - require.NoError(a.addServiceFromSource(svc, nil, false, "", ConfigSourceRemote)) + err = a.AddService(AddServiceRequest{Service: svc, Source: ConfigSourceRemote}) + require.NoError(err) requireFileIsAbsent(t, svcFile) requireFileIsPresent(t, configFile) // Persists to file if requested - require.NoError(a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceRemote)) + err = a.AddService(AddServiceRequest{ + Service: svc, + persist: true, + token: "mytoken", + Source: ConfigSourceRemote, + }) + require.NoError(err) requireFileIsPresent(t, svcFile) requireFileIsPresent(t, configFile) diff --git a/go.mod b/go.mod index ab5a9f982..cb4e79192 100644 --- a/go.mod +++ b/go.mod @@ -84,7 +84,7 @@ require ( golang.org/x/crypto v0.0.0-20200930160638-afb6bcd081ae golang.org/x/net v0.0.0-20200930145003-4acb6c075d10 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 - golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 golang.org/x/text v0.3.3 // indirect golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e diff --git a/go.sum b/go.sum index a365fd4ba..b3e7c9a34 100644 --- a/go.sum +++ b/go.sum @@ -583,8 +583,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o= -golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/lib/mutex/mutex.go b/lib/mutex/mutex.go new file mode 100644 index 000000000..9841acb49 --- /dev/null +++ b/lib/mutex/mutex.go @@ -0,0 +1,36 @@ +/* +Package mutex implements the sync.Locker interface using x/sync/semaphore. It +may be used as a replacement for sync.Mutex when one or more goroutines need to +allow their calls to Lock to be cancelled by context cancellation. +*/ +package mutex + +import ( + "context" + + "golang.org/x/sync/semaphore" +) + +type Mutex semaphore.Weighted + +// New returns a Mutex that is ready for use. +func New() *Mutex { + return (*Mutex)(semaphore.NewWeighted(1)) +} + +func (m *Mutex) Lock() { + _ = (*semaphore.Weighted)(m).Acquire(context.Background(), 1) +} + +func (m *Mutex) Unlock() { + (*semaphore.Weighted)(m).Release(1) +} + +// TryLock acquires the mutex, blocking until resources are available or ctx is +// done. On success, returns nil. On failure, returns ctx.Err() and leaves the +// semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (m *Mutex) TryLock(ctx context.Context) error { + return (*semaphore.Weighted)(m).Acquire(ctx, 1) +} diff --git a/lib/mutex/mutex_test.go b/lib/mutex/mutex_test.go new file mode 100644 index 000000000..0324b3900 --- /dev/null +++ b/lib/mutex/mutex_test.go @@ -0,0 +1,93 @@ +package mutex + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestMutex(t *testing.T) { + t.Run("starts unlocked", func(t *testing.T) { + m := New() + canLock(t, m) + }) + + t.Run("Lock blocks when locked", func(t *testing.T) { + m := New() + m.Lock() + lockIsBlocked(t, m) + }) + + t.Run("Unlock unblocks Lock", func(t *testing.T) { + m := New() + m.Lock() + m.Unlock() // nolint:staticcheck // SA2001 is not relevant here + canLock(t, m) + }) + + t.Run("TryLock acquires lock", func(t *testing.T) { + m := New() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + require.NoError(t, m.TryLock(ctx)) + lockIsBlocked(t, m) + }) + + t.Run("TryLock blocks until timeout when locked", func(t *testing.T) { + m := New() + m.Lock() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + t.Cleanup(cancel) + err := m.TryLock(ctx) + require.Equal(t, err, context.DeadlineExceeded) + }) + + t.Run("TryLock acquires lock before timeout", func(t *testing.T) { + m := New() + m.Lock() + + go func() { + time.Sleep(20 * time.Millisecond) + m.Unlock() + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + err := m.TryLock(ctx) + require.NoError(t, err) + }) + +} + +func canLock(t *testing.T, m *Mutex) { + t.Helper() + chDone := make(chan struct{}) + go func() { + m.Lock() + close(chDone) + }() + + select { + case <-chDone: + case <-time.After(20 * time.Millisecond): + t.Fatal("failed to acquire lock before timeout") + } +} + +func lockIsBlocked(t *testing.T, m *Mutex) { + t.Helper() + chDone := make(chan struct{}) + go func() { + m.Lock() + close(chDone) + }() + + select { + case <-chDone: + t.Fatal("expected Lock to block") + case <-time.After(20 * time.Millisecond): + } +} diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go index 97a1aa4bb..690eb8501 100644 --- a/vendor/golang.org/x/sync/singleflight/singleflight.go +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -6,7 +6,42 @@ // mechanism. package singleflight // import "golang.org/x/sync/singleflight" -import "sync" +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} // call is an in-flight or completed singleflight.Do call type call struct { @@ -57,6 +92,12 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e c.dups++ g.mu.Unlock() c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } return c.val, c.err, true } c := new(call) @@ -70,6 +111,8 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e // DoChan is like Do but returns a channel that will receive the // results when they are ready. +// +// The returned channel will not be closed. func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() @@ -94,17 +137,66 @@ func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result // doCall handles the single call for a key. func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { - c.val, c.err = fn() - c.wg.Done() + normalReturn := false + recovered := false - g.mu.Lock() - if !c.forgotten { - delete(g.m, key) + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + c.wg.Done() + g.mu.Lock() + defer g.mu.Unlock() + if !c.forgotten { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true } - for _, ch := range c.chans { - ch <- Result{c.val, c.err, c.dups > 0} - } - g.mu.Unlock() } // Forget tells the singleflight to forget about a key. Future calls diff --git a/vendor/modules.txt b/vendor/modules.txt index 36120d83c..4bd16dde9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -505,7 +505,7 @@ golang.org/x/oauth2/google golang.org/x/oauth2/internal golang.org/x/oauth2/jws golang.org/x/oauth2/jwt -# golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a +# golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sync/errgroup golang.org/x/sync/semaphore golang.org/x/sync/singleflight