From 3685f3997015c5a7f599abb7ffb113b72ba944bc Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 4 Dec 2020 18:56:03 -0500 Subject: [PATCH] lib/mutex: add mutex with TryLock and update vendor --- go.mod | 2 +- go.sum | 4 +- lib/mutex/mutex.go | 36 +++++ lib/mutex/mutex_test.go | 93 ++++++++++++ .../golang.org/x/sync/semaphore/semaphore.go | 136 ++++++++++++++++++ .../x/sync/singleflight/singleflight.go | 112 +++++++++++++-- vendor/modules.txt | 3 +- 7 files changed, 372 insertions(+), 14 deletions(-) create mode 100644 lib/mutex/mutex.go create mode 100644 lib/mutex/mutex_test.go create mode 100644 vendor/golang.org/x/sync/semaphore/semaphore.go diff --git a/go.mod b/go.mod index 8ba63deaa..8eceb4990 100644 --- a/go.mod +++ b/go.mod @@ -84,7 +84,7 @@ require ( golang.org/x/crypto v0.0.0-20200930160638-afb6bcd081ae golang.org/x/net v0.0.0-20200930145003-4acb6c075d10 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 - golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 golang.org/x/text v0.3.3 // indirect golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e diff --git a/go.sum b/go.sum index ed143ccf6..3a39744a8 100644 --- a/go.sum +++ b/go.sum @@ -583,8 +583,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o= -golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/lib/mutex/mutex.go b/lib/mutex/mutex.go new file mode 100644 index 000000000..9841acb49 --- /dev/null +++ b/lib/mutex/mutex.go @@ -0,0 +1,36 @@ +/* +Package mutex implements the sync.Locker interface using x/sync/semaphore. It +may be used as a replacement for sync.Mutex when one or more goroutines need to +allow their calls to Lock to be cancelled by context cancellation. +*/ +package mutex + +import ( + "context" + + "golang.org/x/sync/semaphore" +) + +type Mutex semaphore.Weighted + +// New returns a Mutex that is ready for use. +func New() *Mutex { + return (*Mutex)(semaphore.NewWeighted(1)) +} + +func (m *Mutex) Lock() { + _ = (*semaphore.Weighted)(m).Acquire(context.Background(), 1) +} + +func (m *Mutex) Unlock() { + (*semaphore.Weighted)(m).Release(1) +} + +// TryLock acquires the mutex, blocking until resources are available or ctx is +// done. On success, returns nil. On failure, returns ctx.Err() and leaves the +// semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (m *Mutex) TryLock(ctx context.Context) error { + return (*semaphore.Weighted)(m).Acquire(ctx, 1) +} diff --git a/lib/mutex/mutex_test.go b/lib/mutex/mutex_test.go new file mode 100644 index 000000000..0324b3900 --- /dev/null +++ b/lib/mutex/mutex_test.go @@ -0,0 +1,93 @@ +package mutex + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestMutex(t *testing.T) { + t.Run("starts unlocked", func(t *testing.T) { + m := New() + canLock(t, m) + }) + + t.Run("Lock blocks when locked", func(t *testing.T) { + m := New() + m.Lock() + lockIsBlocked(t, m) + }) + + t.Run("Unlock unblocks Lock", func(t *testing.T) { + m := New() + m.Lock() + m.Unlock() // nolint:staticcheck // SA2001 is not relevant here + canLock(t, m) + }) + + t.Run("TryLock acquires lock", func(t *testing.T) { + m := New() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + require.NoError(t, m.TryLock(ctx)) + lockIsBlocked(t, m) + }) + + t.Run("TryLock blocks until timeout when locked", func(t *testing.T) { + m := New() + m.Lock() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + t.Cleanup(cancel) + err := m.TryLock(ctx) + require.Equal(t, err, context.DeadlineExceeded) + }) + + t.Run("TryLock acquires lock before timeout", func(t *testing.T) { + m := New() + m.Lock() + + go func() { + time.Sleep(20 * time.Millisecond) + m.Unlock() + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + err := m.TryLock(ctx) + require.NoError(t, err) + }) + +} + +func canLock(t *testing.T, m *Mutex) { + t.Helper() + chDone := make(chan struct{}) + go func() { + m.Lock() + close(chDone) + }() + + select { + case <-chDone: + case <-time.After(20 * time.Millisecond): + t.Fatal("failed to acquire lock before timeout") + } +} + +func lockIsBlocked(t *testing.T, m *Mutex) { + t.Helper() + chDone := make(chan struct{}) + go func() { + m.Lock() + close(chDone) + }() + + select { + case <-chDone: + t.Fatal("expected Lock to block") + case <-time.After(20 * time.Millisecond): + } +} diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go new file mode 100644 index 000000000..30f632c57 --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore.go @@ -0,0 +1,136 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package semaphore provides a weighted semaphore implementation. +package semaphore // import "golang.org/x/sync/semaphore" + +import ( + "container/list" + "context" + "sync" +) + +type waiter struct { + n int64 + ready chan<- struct{} // Closed when semaphore acquired. +} + +// NewWeighted creates a new weighted semaphore with the given +// maximum combined weight for concurrent access. +func NewWeighted(n int64) *Weighted { + w := &Weighted{size: n} + return w +} + +// Weighted provides a way to bound concurrent access to a resource. +// The callers can request access with a given weight. +type Weighted struct { + size int64 + cur int64 + mu sync.Mutex + waiters list.List +} + +// Acquire acquires the semaphore with a weight of n, blocking until resources +// are available or ctx is done. On success, returns nil. On failure, returns +// ctx.Err() and leaves the semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (s *Weighted) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + if n > s.size { + // Don't make other Acquire calls block on one that's doomed to fail. + s.mu.Unlock() + <-ctx.Done() + return ctx.Err() + } + + ready := make(chan struct{}) + w := waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-ctx.Done(): + err := ctx.Err() + s.mu.Lock() + select { + case <-ready: + // Acquired the semaphore after we were canceled. Rather than trying to + // fix up the queue, just pretend we didn't notice the cancelation. + err = nil + default: + isFront := s.waiters.Front() == elem + s.waiters.Remove(elem) + // If we're at the front and there're extra tokens left, notify other waiters. + if isFront && s.size > s.cur { + s.notifyWaiters() + } + } + s.mu.Unlock() + return err + + case <-ready: + return nil + } +} + +// TryAcquire acquires the semaphore with a weight of n without blocking. +// On success, returns true. On failure, returns false and leaves the semaphore unchanged. +func (s *Weighted) TryAcquire(n int64) bool { + s.mu.Lock() + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n + } + s.mu.Unlock() + return success +} + +// Release releases the semaphore with a weight of n. +func (s *Weighted) Release(n int64) { + s.mu.Lock() + s.cur -= n + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: released more than held") + } + s.notifyWaiters() + s.mu.Unlock() +} + +func (s *Weighted) notifyWaiters() { + for { + next := s.waiters.Front() + if next == nil { + break // No more waiters blocked. + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // Not enough tokens for the next waiter. We could keep going (to try to + // find a waiter with a smaller request), but under load that could cause + // starvation for large requests; instead, we leave all remaining waiters + // blocked. + // + // Consider a semaphore used as a read-write lock, with N tokens, N + // readers, and one writer. Each reader can Acquire(1) to obtain a read + // lock. The writer can Acquire(N) to obtain a write lock, excluding all + // of the readers. If we allow the readers to jump ahead in the queue, + // the writer will starve — there is always one token available for every + // reader. + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) + } +} diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go index 97a1aa4bb..690eb8501 100644 --- a/vendor/golang.org/x/sync/singleflight/singleflight.go +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -6,7 +6,42 @@ // mechanism. package singleflight // import "golang.org/x/sync/singleflight" -import "sync" +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} // call is an in-flight or completed singleflight.Do call type call struct { @@ -57,6 +92,12 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e c.dups++ g.mu.Unlock() c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } return c.val, c.err, true } c := new(call) @@ -70,6 +111,8 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e // DoChan is like Do but returns a channel that will receive the // results when they are ready. +// +// The returned channel will not be closed. func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() @@ -94,17 +137,66 @@ func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result // doCall handles the single call for a key. func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { - c.val, c.err = fn() - c.wg.Done() + normalReturn := false + recovered := false - g.mu.Lock() - if !c.forgotten { - delete(g.m, key) + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + c.wg.Done() + g.mu.Lock() + defer g.mu.Unlock() + if !c.forgotten { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true } - for _, ch := range c.chans { - ch <- Result{c.val, c.err, c.dups > 0} - } - g.mu.Unlock() } // Forget tells the singleflight to forget about a key. Future calls diff --git a/vendor/modules.txt b/vendor/modules.txt index f62567aa2..689f191f0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -505,8 +505,9 @@ golang.org/x/oauth2/google golang.org/x/oauth2/internal golang.org/x/oauth2/jws golang.org/x/oauth2/jwt -# golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a +# golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sync/errgroup +golang.org/x/sync/semaphore golang.org/x/sync/singleflight # golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 golang.org/x/sys/cpu