Merge pull request #9302 from hashicorp/dnephin/add-service-3

agent: remove ServiceManager.Start goroutine
This commit is contained in:
Daniel Nephin 2021-01-28 16:59:41 -05:00 committed by GitHub
commit 2eea58bcc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 274 additions and 111 deletions

View File

@ -48,6 +48,7 @@ import (
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/file"
"github.com/hashicorp/consul/lib/mutex"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
@ -222,7 +223,7 @@ type Agent struct {
exposedPorts map[string]int
// stateLock protects the agent state
stateLock sync.Mutex
stateLock *mutex.Mutex
// dockerClient is the client for performing docker health checks.
dockerClient *checks.DockerClient
@ -358,6 +359,7 @@ func New(bd BaseDeps) (*Agent, error) {
retryJoinCh: make(chan error),
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
stateLock: mutex.New(),
baseDeps: bd,
tokens: bd.Tokens,
@ -512,7 +514,6 @@ func (a *Agent) Start(ctx context.Context) error {
if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err)
}
a.serviceManager.Start()
// Load checks/services/metadata.
emptyCheckSnapshot := map[structs.CheckID]*structs.HealthCheck{}

View File

@ -26,13 +26,6 @@ type ServiceManager struct {
// services tracks all active watches for registered services
services map[structs.ServiceID]*serviceConfigWatch
// registerCh is a channel for receiving service registration requests from
// from serviceConfigWatchers.
// The registrations are handled in the background when watches are notified of
// changes. All sends and receives must also obey the ctx.Done() channel to
// avoid a deadlock during shutdown.
registerCh chan *asyncRegisterRequest
// ctx is the shared context for all goroutines launched
ctx context.Context
@ -46,11 +39,10 @@ type ServiceManager struct {
func NewServiceManager(agent *Agent) *ServiceManager {
ctx, cancel := context.WithCancel(context.Background())
return &ServiceManager{
agent: agent,
services: make(map[structs.ServiceID]*serviceConfigWatch),
registerCh: make(chan *asyncRegisterRequest), // must be unbuffered
ctx: ctx,
cancel: cancel,
agent: agent,
services: make(map[structs.ServiceID]*serviceConfigWatch),
ctx: ctx,
cancel: cancel,
}
}
@ -62,36 +54,6 @@ func (s *ServiceManager) Stop() {
s.running.Wait()
}
// Start starts a background worker goroutine that writes back into the Agent
// state. This only exists to keep the need to lock the agent state lock out of
// the main AddService/RemoveService codepaths to avoid deadlocks.
func (s *ServiceManager) Start() {
s.running.Add(1)
go func() {
defer s.running.Done()
for {
select {
case <-s.ctx.Done():
return
case req := <-s.registerCh:
req.Reply <- s.registerOnce(req.Args)
}
}
}()
}
// runOnce will process a single registration request
func (s *ServiceManager) registerOnce(args addServiceInternalRequest) error {
s.agent.stateLock.Lock()
defer s.agent.stateLock.Unlock()
if err := s.agent.addServiceInternal(args); err != nil {
return fmt.Errorf("error updating service registration: %v", err)
}
return nil
}
// AddService will (re)create a serviceConfigWatch on the given service. For
// each call of this function the first registration will happen inline and
// will read the merged global defaults for the service through the agent cache
@ -129,12 +91,11 @@ func (s *ServiceManager) AddService(req addServiceLockedRequest) error {
// Get the existing global config and do the initial registration with the
// merged config.
watch := &serviceConfigWatch{
registration: req,
agent: s.agent,
registerCh: s.registerCh,
watch := &serviceConfigWatch{registration: req, agent: s.agent}
if err := watch.register(s.ctx); err != nil {
return err
}
if err := watch.RegisterAndStart(s.ctx, &s.running); err != nil {
if err := watch.start(s.ctx, &s.running); err != nil {
return err
}
@ -165,9 +126,7 @@ func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) {
// service/proxy defaults.
type serviceConfigWatch struct {
registration addServiceLockedRequest
agent *Agent
registerCh chan<- *asyncRegisterRequest
agent *Agent
// cacheKey stores the key of the current request, when registration changes
// we check to see if a new cache watch is needed.
@ -178,7 +137,7 @@ type serviceConfigWatch struct {
}
// NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.WaitGroup) error {
func (w *serviceConfigWatch) register(ctx context.Context) error {
serviceDefaults, err := w.registration.serviceDefaults(ctx)
if err != nil {
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
@ -204,10 +163,7 @@ func (w *serviceConfigWatch) RegisterAndStart(ctx context.Context, wg *sync.Wait
if err != nil {
return fmt.Errorf("error updating service registration: %v", err)
}
// Start the config watch, which starts a blocking query for the
// resolved service config in the background.
return w.start(ctx, wg)
return nil
}
func serviceDefaultsFromStruct(v *structs.ServiceConfigResponse) func(context.Context) (*structs.ServiceConfigResponse, error) {
@ -256,13 +212,7 @@ func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) erro
// context before we cancel and so might still deliver the old event. Using
// the cacheKey allows us to ignore updates from the old cache watch and makes
// even this rare edge case safe.
err := w.agent.cache.Notify(
ctx,
cachetype.ResolvedServiceConfigName,
req,
w.cacheKey,
updateCh,
)
err := w.agent.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, req, w.cacheKey, updateCh)
if err != nil {
w.cancelFunc()
return err
@ -331,47 +281,31 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
return err
}
// While we were waiting on the agent state lock we may have been shutdown.
// So avoid doing a registration in that case.
if err := ctx.Err(); err != nil {
return nil
}
// make a copy of the AddServiceRequest
req := w.registration
req.Service = merged
req.persistServiceConfig = true
registerReq := &asyncRegisterRequest{
Args: addServiceInternalRequest{
addServiceLockedRequest: req,
persistService: w.registration.Service,
persistServiceDefaults: serviceDefaults,
},
Reply: make(chan error, 1),
args := addServiceInternalRequest{
addServiceLockedRequest: req,
persistService: w.registration.Service,
persistServiceDefaults: serviceDefaults,
}
select {
case <-ctx.Done():
return nil
case w.registerCh <- registerReq:
}
select {
case <-ctx.Done():
return nil
case err := <-registerReq.Reply:
if err != nil {
return fmt.Errorf("error updating service registration: %v", err)
}
if err := w.agent.stateLock.TryLock(ctx); err != nil {
return nil
}
}
defer w.agent.stateLock.Unlock()
type asyncRegisterRequest struct {
Args addServiceInternalRequest
Reply chan error
// The context may have been cancelled after the lock was acquired.
if err := ctx.Err(); err != nil {
return nil
}
if err := w.agent.addServiceInternal(args); err != nil {
return fmt.Errorf("error updating service registration: %v", err)
}
return nil
}
func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest {

View File

@ -387,12 +387,19 @@ func TestServiceManager_PersistService_API(t *testing.T) {
configFile := filepath.Join(a.Config.DataDir, serviceConfigDir, svcID.StringHash())
// Service is not persisted unless requested, but we always persist service configs.
require.NoError(a.addServiceFromSource(svc, nil, false, "", ConfigSourceRemote))
err = a.AddService(AddServiceRequest{Service: svc, Source: ConfigSourceRemote})
require.NoError(err)
requireFileIsAbsent(t, svcFile)
requireFileIsPresent(t, configFile)
// Persists to file if requested
require.NoError(a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceRemote))
err = a.AddService(AddServiceRequest{
Service: svc,
persist: true,
token: "mytoken",
Source: ConfigSourceRemote,
})
require.NoError(err)
requireFileIsPresent(t, svcFile)
requireFileIsPresent(t, configFile)

2
go.mod
View File

@ -84,7 +84,7 @@ require (
golang.org/x/crypto v0.0.0-20200930160638-afb6bcd081ae
golang.org/x/net v0.0.0-20200930145003-4acb6c075d10
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5
golang.org/x/text v0.3.3 // indirect
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e

4
go.sum
View File

@ -583,8 +583,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

36
lib/mutex/mutex.go Normal file
View File

@ -0,0 +1,36 @@
/*
Package mutex implements the sync.Locker interface using x/sync/semaphore. It
may be used as a replacement for sync.Mutex when one or more goroutines need to
allow their calls to Lock to be cancelled by context cancellation.
*/
package mutex
import (
"context"
"golang.org/x/sync/semaphore"
)
type Mutex semaphore.Weighted
// New returns a Mutex that is ready for use.
func New() *Mutex {
return (*Mutex)(semaphore.NewWeighted(1))
}
func (m *Mutex) Lock() {
_ = (*semaphore.Weighted)(m).Acquire(context.Background(), 1)
}
func (m *Mutex) Unlock() {
(*semaphore.Weighted)(m).Release(1)
}
// TryLock acquires the mutex, blocking until resources are available or ctx is
// done. On success, returns nil. On failure, returns ctx.Err() and leaves the
// semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (m *Mutex) TryLock(ctx context.Context) error {
return (*semaphore.Weighted)(m).Acquire(ctx, 1)
}

93
lib/mutex/mutex_test.go Normal file
View File

@ -0,0 +1,93 @@
package mutex
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestMutex(t *testing.T) {
t.Run("starts unlocked", func(t *testing.T) {
m := New()
canLock(t, m)
})
t.Run("Lock blocks when locked", func(t *testing.T) {
m := New()
m.Lock()
lockIsBlocked(t, m)
})
t.Run("Unlock unblocks Lock", func(t *testing.T) {
m := New()
m.Lock()
m.Unlock() // nolint:staticcheck // SA2001 is not relevant here
canLock(t, m)
})
t.Run("TryLock acquires lock", func(t *testing.T) {
m := New()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
t.Cleanup(cancel)
require.NoError(t, m.TryLock(ctx))
lockIsBlocked(t, m)
})
t.Run("TryLock blocks until timeout when locked", func(t *testing.T) {
m := New()
m.Lock()
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
t.Cleanup(cancel)
err := m.TryLock(ctx)
require.Equal(t, err, context.DeadlineExceeded)
})
t.Run("TryLock acquires lock before timeout", func(t *testing.T) {
m := New()
m.Lock()
go func() {
time.Sleep(20 * time.Millisecond)
m.Unlock()
}()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
t.Cleanup(cancel)
err := m.TryLock(ctx)
require.NoError(t, err)
})
}
func canLock(t *testing.T, m *Mutex) {
t.Helper()
chDone := make(chan struct{})
go func() {
m.Lock()
close(chDone)
}()
select {
case <-chDone:
case <-time.After(20 * time.Millisecond):
t.Fatal("failed to acquire lock before timeout")
}
}
func lockIsBlocked(t *testing.T, m *Mutex) {
t.Helper()
chDone := make(chan struct{})
go func() {
m.Lock()
close(chDone)
}()
select {
case <-chDone:
t.Fatal("expected Lock to block")
case <-time.After(20 * time.Millisecond):
}
}

View File

@ -6,7 +6,42 @@
// mechanism.
package singleflight // import "golang.org/x/sync/singleflight"
import "sync"
import (
"bytes"
"errors"
"fmt"
"runtime"
"runtime/debug"
"sync"
)
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")
// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
type panicError struct {
value interface{}
stack []byte
}
// Error implements error interface.
func (p *panicError) Error() string {
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
func newPanicError(v interface{}) error {
stack := debug.Stack()
// The first line of the stack trace is of the form "goroutine N [status]:"
// but by the time the panic reaches Do the goroutine may no longer exist
// and its status will have changed. Trim out the misleading line.
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
// call is an in-flight or completed singleflight.Do call
type call struct {
@ -57,6 +92,12 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
@ -70,6 +111,8 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
@ -94,17 +137,66 @@ func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
normalReturn := false
recovered := false
g.mu.Lock()
if !c.forgotten {
delete(g.m, key)
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
// Forget tells the singleflight to forget about a key. Future calls

2
vendor/modules.txt vendored
View File

@ -505,7 +505,7 @@ golang.org/x/oauth2/google
golang.org/x/oauth2/internal
golang.org/x/oauth2/jws
golang.org/x/oauth2/jwt
# golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
# golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sync/errgroup
golang.org/x/sync/semaphore
golang.org/x/sync/singleflight