From 1379b5f7d62f882f53f0835d08b6462c6a8ff742 Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Wed, 27 Jan 2021 11:14:52 -0500 Subject: [PATCH] Upgrade raft-autopilot and wait for autopilot it to stop when revoking leadership (#9644) Fixes: 9626 --- .changelog/9626.txt | 3 + agent/consul/leader.go | 4 +- agent/consul/snapshot_endpoint_test.go | 7 + go.mod | 2 +- go.sum | 4 +- .../hashicorp/raft-autopilot/autopilot.go | 57 +++++--- .../hashicorp/raft-autopilot/go.mod | 1 + .../hashicorp/raft-autopilot/go.sum | 1 + .../hashicorp/raft-autopilot/mutex.go | 35 +++++ .../hashicorp/raft-autopilot/run.go | 102 +++++++++---- .../hashicorp/raft-autopilot/state.go | 40 ++---- .../golang.org/x/sync/semaphore/semaphore.go | 136 ++++++++++++++++++ vendor/modules.txt | 3 +- 13 files changed, 316 insertions(+), 79 deletions(-) create mode 100644 .changelog/9626.txt create mode 100644 vendor/github.com/hashicorp/raft-autopilot/mutex.go create mode 100644 vendor/golang.org/x/sync/semaphore/semaphore.go diff --git a/.changelog/9626.txt b/.changelog/9626.txt new file mode 100644 index 000000000..fdd8475d0 --- /dev/null +++ b/.changelog/9626.txt @@ -0,0 +1,3 @@ +```release-note:bug +autopilot: Fixed a bug that would cause snapshot restoration to stop autopilot on the leader. +``` \ No newline at end of file diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 6ecb18d00..e4470ac1f 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -395,7 +395,9 @@ func (s *Server) revokeLeadership() { s.resetConsistentReadReady() - s.autopilot.Stop() + // Stop returns a chan and we want to block until it is closed + // which indicates that autopilot is actually stopped. + <-s.autopilot.Stop() } // DEPRECATED (ACL-Legacy-Compat) - Remove once old ACL compatibility is removed diff --git a/agent/consul/snapshot_endpoint_test.go b/agent/consul/snapshot_endpoint_test.go index 64827245f..9c0f00d40 100644 --- a/agent/consul/snapshot_endpoint_test.go +++ b/agent/consul/snapshot_endpoint_test.go @@ -13,6 +13,8 @@ import ( "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + autopilot "github.com/hashicorp/raft-autopilot" + "github.com/stretchr/testify/require" ) // verifySnapshot is a helper that does a snapshot and restore. @@ -165,6 +167,11 @@ func TestSnapshot(t *testing.T) { testrpc.WaitForLeader(t, s1.RPC, "dc1") verifySnapshot(t, s1, "dc1", "") + + // ensure autopilot is still running + // https://github.com/hashicorp/consul/issues/9626 + apstatus, _ := s1.autopilot.IsRunning() + require.Equal(t, autopilot.Running, apstatus) } func TestSnapshot_LeaderState(t *testing.T) { diff --git a/go.mod b/go.mod index 8ba63deaa..ab5a9f982 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/hashicorp/memberlist v0.2.2 github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 github.com/hashicorp/raft v1.2.0 - github.com/hashicorp/raft-autopilot v0.1.1 + github.com/hashicorp/raft-autopilot v0.1.2 github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea github.com/hashicorp/serf v0.9.5 github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086 diff --git a/go.sum b/go.sum index ed143ccf6..a365fd4ba 100644 --- a/go.sum +++ b/go.sum @@ -289,8 +289,8 @@ github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mo github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= github.com/hashicorp/raft v1.2.0 h1:mHzHIrF0S91d3A7RPBvuqkgB4d/7oFJZyvf1Q4m7GA0= github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= -github.com/hashicorp/raft-autopilot v0.1.1 h1:f8Dv2y1Vq8ttuH2+oh5l87Paj/BINpMm5TBrMLx+qGQ= -github.com/hashicorp/raft-autopilot v0.1.1/go.mod h1:HUBUSYtpQRVkgjvvoOgsZPvwe6b6FZJ1xXtaftRZvrA= +github.com/hashicorp/raft-autopilot v0.1.2 h1:yeqdUjWLjVJkBM+mcVxqwxi+w+aHsb9cEON2dz69OCs= +github.com/hashicorp/raft-autopilot v0.1.2/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hashicorp/serf v0.9.5 h1:EBWvyu9tcRszt3Bxp3KNssBMP1KuHWyO51lz9+786iM= diff --git a/vendor/github.com/hashicorp/raft-autopilot/autopilot.go b/vendor/github.com/hashicorp/raft-autopilot/autopilot.go index 99b8cc848..d0640fc85 100644 --- a/vendor/github.com/hashicorp/raft-autopilot/autopilot.go +++ b/vendor/github.com/hashicorp/raft-autopilot/autopilot.go @@ -75,6 +75,31 @@ func WithPromoter(promoter Promoter) Option { } } +// ExecutionStatus represents the current status of the autopilot background go routines +type ExecutionStatus string + +const ( + NotRunning ExecutionStatus = "not-running" + Running ExecutionStatus = "running" + ShuttingDown ExecutionStatus = "shutting-down" +) + +type execInfo struct { + // status is the current state of autopilot executation + status ExecutionStatus + + // shutdown is a function that can be execute to shutdown a running + // autopilot's go routines. + shutdown context.CancelFunc + + // done is a chan that will be closed when the running autopilot go + // routines have exited. Technically closing it is the very last + // thing done in the go routine but at that point enough state has + // been cleaned up that we would then allow it to be started + // immediately afterward + done chan struct{} +} + // Autopilot is the type to manage a running Raft instance. // // Each Raft node in the cluster will have a corresponding Autopilot instance but @@ -132,10 +157,6 @@ type Autopilot struct { // brought up. startTime time.Time - // running is a simple bool to indicate whether the go routines to actually - // execute autopilot are currently running - running bool - // removeDeadCh is used to trigger the running autopilot go routines to // find and remove any dead/failed servers removeDeadCh chan struct{} @@ -143,20 +164,20 @@ type Autopilot struct { // reconcileCh is used to trigger an immediate round of reconciliation. reconcileCh chan struct{} - // shutdown is a function that can be execute to shutdown a running - // autopilot's go routines. - shutdown context.CancelFunc - // done is a chan that will be closed when the running autopilot go - // routines have exited. Technically closing it is the very last - // thing done in the go routine but at that point enough state has - // been cleaned up that we would then allow it to be started - // immediately afterward - done chan struct{} + // leaderLock implements a cancellable mutex that will be used to ensure + // that only one autopilot go routine is the "leader". The leader is + // the go routine that is currently responsible for updating the + // autopilot state and performing raft promotions/demotions. + leaderLock *mutex - // runLock is meant to protect all of the fields regarding coordination - // of whether the autopilot go routines are running and - // starting/stopping them. - runLock sync.Mutex + // execution is the information about the most recent autopilot execution. + // Start will initialize this with the most recent execution and it will + // be updated by Stop and by the go routines being executed when they are + // finished. + execution *execInfo + + // execLock protects access to the execution field + execLock sync.Mutex } // New will create a new Autopilot instance utilizing the given Raft and Delegate. @@ -166,6 +187,7 @@ func New(raft Raft, delegate ApplicationIntegration, options ...Option) *Autopil a := &Autopilot{ raft: raft, delegate: delegate, + state: &State{}, promoter: DefaultPromoter(), logger: hclog.Default().Named("autopilot"), // should this be buffered? @@ -173,6 +195,7 @@ func New(raft Raft, delegate ApplicationIntegration, options ...Option) *Autopil reconcileInterval: DefaultReconcileInterval, updateInterval: DefaultUpdateInterval, time: &runtimeTimeProvider{}, + leaderLock: newMutex(), } for _, opt := range options { diff --git a/vendor/github.com/hashicorp/raft-autopilot/go.mod b/vendor/github.com/hashicorp/raft-autopilot/go.mod index 42f9498c8..2dd9ca4c0 100644 --- a/vendor/github.com/hashicorp/raft-autopilot/go.mod +++ b/vendor/github.com/hashicorp/raft-autopilot/go.mod @@ -7,4 +7,5 @@ require ( github.com/hashicorp/raft v1.2.0 github.com/stretchr/testify v1.6.1 go.uber.org/goleak v1.1.10 + golang.org/x/sync v0.0.0-20190423024810-112230192c58 ) diff --git a/vendor/github.com/hashicorp/raft-autopilot/go.sum b/vendor/github.com/hashicorp/raft-autopilot/go.sum index 86000e410..e0f4afa7c 100644 --- a/vendor/github.com/hashicorp/raft-autopilot/go.sum +++ b/vendor/github.com/hashicorp/raft-autopilot/go.sum @@ -67,6 +67,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/vendor/github.com/hashicorp/raft-autopilot/mutex.go b/vendor/github.com/hashicorp/raft-autopilot/mutex.go new file mode 100644 index 000000000..4ed4e20b1 --- /dev/null +++ b/vendor/github.com/hashicorp/raft-autopilot/mutex.go @@ -0,0 +1,35 @@ +/* +This code was taken from the same implementation in a branch from Consul and then +had the package updated and the mutex type unexported. +*/ +package autopilot + +import ( + "context" + + "golang.org/x/sync/semaphore" +) + +type mutex semaphore.Weighted + +// New returns a Mutex that is ready for use. +func newMutex() *mutex { + return (*mutex)(semaphore.NewWeighted(1)) +} + +func (m *mutex) Lock() { + _ = (*semaphore.Weighted)(m).Acquire(context.Background(), 1) +} + +func (m *mutex) Unlock() { + (*semaphore.Weighted)(m).Release(1) +} + +// TryLock acquires the mutex, blocking until resources are available or ctx is +// done. On success, returns nil. On failure, returns ctx.Err() and leaves the +// semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (m *mutex) TryLock(ctx context.Context) error { + return (*semaphore.Weighted)(m).Acquire(ctx, 1) +} diff --git a/vendor/github.com/hashicorp/raft-autopilot/run.go b/vendor/github.com/hashicorp/raft-autopilot/run.go index 1e13d2017..382fd1146 100644 --- a/vendor/github.com/hashicorp/raft-autopilot/run.go +++ b/vendor/github.com/hashicorp/raft-autopilot/run.go @@ -9,50 +9,100 @@ import ( // When the context passed in is cancelled or the Stop method is called // then these routines will exit. func (a *Autopilot) Start(ctx context.Context) { - a.runLock.Lock() - defer a.runLock.Unlock() + a.execLock.Lock() + defer a.execLock.Unlock() // already running so there is nothing to do - if a.running { + if a.execution != nil && a.execution.status == Running { return } ctx, shutdown := context.WithCancel(ctx) - a.shutdown = shutdown a.startTime = a.time.Now() - a.done = make(chan struct{}) - // While a go routine executed by a.run below will periodically - // update the state, we want to go ahead and force updating it now - // so that during a leadership transfer we don't report an empty - // autopilot state. We put a pretty small timeout on this though - // so as to prevent leader establishment from taking too long - updateCtx, updateCancel := context.WithTimeout(ctx, time.Second) - defer updateCancel() - a.updateState(updateCtx) + exec := &execInfo{ + status: Running, + shutdown: shutdown, + done: make(chan struct{}), + } - go a.run(ctx) - a.running = true + if a.execution == nil || a.execution.status == NotRunning { + // In theory with a nil execution or the current execution being in the not + // running state, we should be able to immediately gain the leader lock as + // nothing else should be running and holding the lock. While true we still + // gain the lock to ensure that only one thread may even attempt to be + // modifying the autopilot state at once. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := a.leaderLock.TryLock(ctx); err == nil { + a.updateState(ctx) + a.leaderLock.Unlock() + } + } + + go a.beginExecution(ctx, exec) + a.execution = exec + return } // Stop will terminate the go routines being executed to perform autopilot. func (a *Autopilot) Stop() <-chan struct{} { - a.runLock.Lock() - defer a.runLock.Unlock() + a.execLock.Lock() + defer a.execLock.Unlock() // Nothing to do - if !a.running { + if a.execution == nil || a.execution.status == NotRunning { done := make(chan struct{}) close(done) return done } - a.shutdown() - return a.done + a.execution.shutdown() + a.execution.status = ShuttingDown + return a.execution.done } -func (a *Autopilot) run(ctx context.Context) { +// IsRunning returns the current execution status of the autopilot +// go routines as well as a chan which will be closed when the +// routines are no longer running +func (a *Autopilot) IsRunning() (ExecutionStatus, <-chan struct{}) { + a.execLock.Lock() + defer a.execLock.Unlock() + + if a.execution == nil || a.execution.status == NotRunning { + done := make(chan struct{}) + close(done) + return NotRunning, done + } + + return a.execution.status, a.execution.done +} + +func (a *Autopilot) finishExecution(exec *execInfo) { + // need to gain the lock because if this was the active execution + // then these values may be read while they are updated. + a.execLock.Lock() + defer a.execLock.Unlock() + + exec.shutdown = nil + exec.status = NotRunning + // this should be the final cleanup task as it is what notifies the rest + // of the world that we are now done + close(exec.done) + exec.done = nil +} + +func (a *Autopilot) beginExecution(ctx context.Context, exec *execInfo) { + // This will wait for any other go routine to finish executing + // before running any code ourselves to prevent any conflicting + // activity between the two. + if err := a.leaderLock.TryLock(ctx); err != nil { + a.finishExecution(exec) + return + } + a.logger.Debug("autopilot is now running") + // autopilot needs to do 3 things // // 1. periodically update the cluster state @@ -78,14 +128,8 @@ func (a *Autopilot) run(ctx context.Context) { a.logger.Debug("autopilot is now stopped") - a.runLock.Lock() - a.shutdown = nil - a.running = false - // this should be the final cleanup task as it is what notifies the rest - // of the world that we are now done - close(a.done) - a.done = nil - a.runLock.Unlock() + a.finishExecution(exec) + a.leaderLock.Unlock() }() reconcileTicker := time.NewTicker(a.reconcileInterval) diff --git a/vendor/github.com/hashicorp/raft-autopilot/state.go b/vendor/github.com/hashicorp/raft-autopilot/state.go index f967b32b0..035357bea 100644 --- a/vendor/github.com/hashicorp/raft-autopilot/state.go +++ b/vendor/github.com/hashicorp/raft-autopilot/state.go @@ -30,10 +30,8 @@ type nextStateInputs struct { Now time.Time StartTime time.Time Config *Config - State *State RaftConfig *raft.Configuration KnownServers map[raft.ServerID]*Server - AliveServers map[raft.ServerID]*Server LatestIndex uint64 LastTerm uint64 FetchedStats map[raft.ServerID]*ServerStats @@ -48,16 +46,10 @@ type nextStateInputs struct { // - Current state // - Raft Configuration // - Known Servers -// - Latest raft index (gatered right before the remote server stats so that they should +// - Latest raft index (gathered right before the remote server stats so that they should // be from about the same point in time) // - Stats for all non-left servers func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs, error) { - // we are going to hold this lock for the entire function. In theory nothing should - // modify the state on any other go routine so this really shouldn't block anything - // else. However we want to ensure that the inputs are as consistent as possible. - a.stateLock.RLock() - defer a.stateLock.RUnlock() - // there are a lot of inputs to computing the next state so they get put into a // struct so that we don't have to return 8 values. inputs := &nextStateInputs{ @@ -72,12 +64,6 @@ func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs } inputs.Config = config - // retrieve the current state - inputs.State = a.state - if inputs.State == nil { - inputs.State = &State{} - } - // retrieve the raft configuration raftConfig, err := a.getRaftConfiguration() if err != nil { @@ -125,9 +111,6 @@ func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs return nil, ctx.Err() } - // filter the known servers to have a map of just the alive servers - inputs.AliveServers = aliveServers(inputs.KnownServers) - // we only allow the fetch to take place for up to half the health interval // the next health interval will attempt to fetch the stats again but if // we do not see responses within this time then we can assume they are @@ -136,7 +119,7 @@ func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs fetchCtx, cancel := context.WithDeadline(ctx, d) defer cancel() - inputs.FetchedStats = a.delegate.FetchServerStats(fetchCtx, inputs.AliveServers) + inputs.FetchedStats = a.delegate.FetchServerStats(fetchCtx, aliveServers(inputs.KnownServers)) // it might be nil but we propagate the ctx.Err just in case our context was // cancelled since the last time we checked. @@ -239,7 +222,7 @@ func (a *Autopilot) nextServers(inputs *nextStateInputs) map[raft.ServerID]*Serv newServers := make(map[raft.ServerID]*ServerState) for _, srv := range inputs.RaftConfig.Servers { - state := buildServerState(inputs, srv) + state := a.buildServerState(inputs, srv) // update any promoter specific information. This isn't done within // buildServerState to keep that function "pure" and not require @@ -257,7 +240,7 @@ func (a *Autopilot) nextServers(inputs *nextStateInputs) map[raft.ServerID]*Serv // buildServerState takes all the nextStateInputs and builds out a ServerState // for the given Raft server. This will take into account the raft configuration // existing state, application known servers and recently fetched stats. -func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState { +func (a *Autopilot) buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState { // Note that the ordering of operations in this method are very important. // We are building up the ServerState from the least important sources // and overriding them with more up to date values. @@ -292,23 +275,24 @@ func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState { state.State = RaftLeader } - var existingHealth *ServerHealth + var previousHealthy *bool + a.stateLock.RLock() // copy some state from an existing server into the new state - most of this // should be overridden soon but at this point we are just building the base. - if existing, found := inputs.State.Servers[srv.ID]; found { + if existing, found := a.state.Servers[srv.ID]; found { state.Stats = existing.Stats state.Health = existing.Health - existingHealth = &existing.Health + previousHealthy = &state.Health.Healthy // it is is important to note that the map values we retrieved this from are // stored by value. Therefore we are modifying a copy of what is in the existing // state and not the actual state itself. We want to ensure that the Address // is what Raft will know about. - existing.Server.Address = srv.Address - state.Server = existing.Server + state.Server.Address = srv.Address } + a.stateLock.RUnlock() // pull in the latest information from the applications knowledge of the // server. Mainly we want the NodeStatus & Meta @@ -317,8 +301,8 @@ func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState { // map we retrieved this from has a non-pointer type value. We definitely // do not want to modify the current known servers but we do want to ensure // that we do not overwrite the Address - known.Address = srv.Address state.Server = *known + state.Server.Address = srv.Address } else { // TODO (mkeeler) do we need a None state. In the previous autopilot code // we would have set this to serf.StatusNone @@ -336,7 +320,7 @@ func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState { // the health status changes. No need for an else as we previously set // it when we overwrote the whole Health structure when finding a // server in the existing state - if existingHealth == nil || existingHealth.Healthy != state.Health.Healthy { + if previousHealthy == nil || *previousHealthy != state.Health.Healthy { state.Health.StableSince = inputs.Now } diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go new file mode 100644 index 000000000..30f632c57 --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore.go @@ -0,0 +1,136 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package semaphore provides a weighted semaphore implementation. +package semaphore // import "golang.org/x/sync/semaphore" + +import ( + "container/list" + "context" + "sync" +) + +type waiter struct { + n int64 + ready chan<- struct{} // Closed when semaphore acquired. +} + +// NewWeighted creates a new weighted semaphore with the given +// maximum combined weight for concurrent access. +func NewWeighted(n int64) *Weighted { + w := &Weighted{size: n} + return w +} + +// Weighted provides a way to bound concurrent access to a resource. +// The callers can request access with a given weight. +type Weighted struct { + size int64 + cur int64 + mu sync.Mutex + waiters list.List +} + +// Acquire acquires the semaphore with a weight of n, blocking until resources +// are available or ctx is done. On success, returns nil. On failure, returns +// ctx.Err() and leaves the semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (s *Weighted) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + if n > s.size { + // Don't make other Acquire calls block on one that's doomed to fail. + s.mu.Unlock() + <-ctx.Done() + return ctx.Err() + } + + ready := make(chan struct{}) + w := waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-ctx.Done(): + err := ctx.Err() + s.mu.Lock() + select { + case <-ready: + // Acquired the semaphore after we were canceled. Rather than trying to + // fix up the queue, just pretend we didn't notice the cancelation. + err = nil + default: + isFront := s.waiters.Front() == elem + s.waiters.Remove(elem) + // If we're at the front and there're extra tokens left, notify other waiters. + if isFront && s.size > s.cur { + s.notifyWaiters() + } + } + s.mu.Unlock() + return err + + case <-ready: + return nil + } +} + +// TryAcquire acquires the semaphore with a weight of n without blocking. +// On success, returns true. On failure, returns false and leaves the semaphore unchanged. +func (s *Weighted) TryAcquire(n int64) bool { + s.mu.Lock() + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n + } + s.mu.Unlock() + return success +} + +// Release releases the semaphore with a weight of n. +func (s *Weighted) Release(n int64) { + s.mu.Lock() + s.cur -= n + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: released more than held") + } + s.notifyWaiters() + s.mu.Unlock() +} + +func (s *Weighted) notifyWaiters() { + for { + next := s.waiters.Front() + if next == nil { + break // No more waiters blocked. + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // Not enough tokens for the next waiter. We could keep going (to try to + // find a waiter with a smaller request), but under load that could cause + // starvation for large requests; instead, we leave all remaining waiters + // blocked. + // + // Consider a semaphore used as a read-write lock, with N tokens, N + // readers, and one writer. Each reader can Acquire(1) to obtain a read + // lock. The writer can Acquire(N) to obtain a write lock, excluding all + // of the readers. If we allow the readers to jump ahead in the queue, + // the writer will starve — there is always one token available for every + // reader. + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index f62567aa2..36120d83c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -280,7 +280,7 @@ github.com/hashicorp/memberlist github.com/hashicorp/net-rpc-msgpackrpc # github.com/hashicorp/raft v1.2.0 github.com/hashicorp/raft -# github.com/hashicorp/raft-autopilot v0.1.1 +# github.com/hashicorp/raft-autopilot v0.1.2 github.com/hashicorp/raft-autopilot # github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea github.com/hashicorp/raft-boltdb @@ -507,6 +507,7 @@ golang.org/x/oauth2/jws golang.org/x/oauth2/jwt # golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a golang.org/x/sync/errgroup +golang.org/x/sync/semaphore golang.org/x/sync/singleflight # golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 golang.org/x/sys/cpu