Upgrade raft-autopilot and wait for autopilot it to stop when revoking leadership (#9644)
Fixes: 9626
This commit is contained in:
parent
614c57a9c6
commit
1379b5f7d6
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
autopilot: Fixed a bug that would cause snapshot restoration to stop autopilot on the leader.
|
||||
```
|
|
@ -395,7 +395,9 @@ func (s *Server) revokeLeadership() {
|
|||
|
||||
s.resetConsistentReadReady()
|
||||
|
||||
s.autopilot.Stop()
|
||||
// Stop returns a chan and we want to block until it is closed
|
||||
// which indicates that autopilot is actually stopped.
|
||||
<-s.autopilot.Stop()
|
||||
}
|
||||
|
||||
// DEPRECATED (ACL-Legacy-Compat) - Remove once old ACL compatibility is removed
|
||||
|
|
|
@ -13,6 +13,8 @@ import (
|
|||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
autopilot "github.com/hashicorp/raft-autopilot"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// verifySnapshot is a helper that does a snapshot and restore.
|
||||
|
@ -165,6 +167,11 @@ func TestSnapshot(t *testing.T) {
|
|||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
verifySnapshot(t, s1, "dc1", "")
|
||||
|
||||
// ensure autopilot is still running
|
||||
// https://github.com/hashicorp/consul/issues/9626
|
||||
apstatus, _ := s1.autopilot.IsRunning()
|
||||
require.Equal(t, autopilot.Running, apstatus)
|
||||
}
|
||||
|
||||
func TestSnapshot_LeaderState(t *testing.T) {
|
||||
|
|
2
go.mod
2
go.mod
|
@ -53,7 +53,7 @@ require (
|
|||
github.com/hashicorp/memberlist v0.2.2
|
||||
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
|
||||
github.com/hashicorp/raft v1.2.0
|
||||
github.com/hashicorp/raft-autopilot v0.1.1
|
||||
github.com/hashicorp/raft-autopilot v0.1.2
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
|
||||
github.com/hashicorp/serf v0.9.5
|
||||
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086
|
||||
|
|
4
go.sum
4
go.sum
|
@ -289,8 +289,8 @@ github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mo
|
|||
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
||||
github.com/hashicorp/raft v1.2.0 h1:mHzHIrF0S91d3A7RPBvuqkgB4d/7oFJZyvf1Q4m7GA0=
|
||||
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
||||
github.com/hashicorp/raft-autopilot v0.1.1 h1:f8Dv2y1Vq8ttuH2+oh5l87Paj/BINpMm5TBrMLx+qGQ=
|
||||
github.com/hashicorp/raft-autopilot v0.1.1/go.mod h1:HUBUSYtpQRVkgjvvoOgsZPvwe6b6FZJ1xXtaftRZvrA=
|
||||
github.com/hashicorp/raft-autopilot v0.1.2 h1:yeqdUjWLjVJkBM+mcVxqwxi+w+aHsb9cEON2dz69OCs=
|
||||
github.com/hashicorp/raft-autopilot v0.1.2/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
|
||||
github.com/hashicorp/serf v0.9.5 h1:EBWvyu9tcRszt3Bxp3KNssBMP1KuHWyO51lz9+786iM=
|
||||
|
|
|
@ -75,6 +75,31 @@ func WithPromoter(promoter Promoter) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// ExecutionStatus represents the current status of the autopilot background go routines
|
||||
type ExecutionStatus string
|
||||
|
||||
const (
|
||||
NotRunning ExecutionStatus = "not-running"
|
||||
Running ExecutionStatus = "running"
|
||||
ShuttingDown ExecutionStatus = "shutting-down"
|
||||
)
|
||||
|
||||
type execInfo struct {
|
||||
// status is the current state of autopilot executation
|
||||
status ExecutionStatus
|
||||
|
||||
// shutdown is a function that can be execute to shutdown a running
|
||||
// autopilot's go routines.
|
||||
shutdown context.CancelFunc
|
||||
|
||||
// done is a chan that will be closed when the running autopilot go
|
||||
// routines have exited. Technically closing it is the very last
|
||||
// thing done in the go routine but at that point enough state has
|
||||
// been cleaned up that we would then allow it to be started
|
||||
// immediately afterward
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// Autopilot is the type to manage a running Raft instance.
|
||||
//
|
||||
// Each Raft node in the cluster will have a corresponding Autopilot instance but
|
||||
|
@ -132,10 +157,6 @@ type Autopilot struct {
|
|||
// brought up.
|
||||
startTime time.Time
|
||||
|
||||
// running is a simple bool to indicate whether the go routines to actually
|
||||
// execute autopilot are currently running
|
||||
running bool
|
||||
|
||||
// removeDeadCh is used to trigger the running autopilot go routines to
|
||||
// find and remove any dead/failed servers
|
||||
removeDeadCh chan struct{}
|
||||
|
@ -143,20 +164,20 @@ type Autopilot struct {
|
|||
// reconcileCh is used to trigger an immediate round of reconciliation.
|
||||
reconcileCh chan struct{}
|
||||
|
||||
// shutdown is a function that can be execute to shutdown a running
|
||||
// autopilot's go routines.
|
||||
shutdown context.CancelFunc
|
||||
// done is a chan that will be closed when the running autopilot go
|
||||
// routines have exited. Technically closing it is the very last
|
||||
// thing done in the go routine but at that point enough state has
|
||||
// been cleaned up that we would then allow it to be started
|
||||
// immediately afterward
|
||||
done chan struct{}
|
||||
// leaderLock implements a cancellable mutex that will be used to ensure
|
||||
// that only one autopilot go routine is the "leader". The leader is
|
||||
// the go routine that is currently responsible for updating the
|
||||
// autopilot state and performing raft promotions/demotions.
|
||||
leaderLock *mutex
|
||||
|
||||
// runLock is meant to protect all of the fields regarding coordination
|
||||
// of whether the autopilot go routines are running and
|
||||
// starting/stopping them.
|
||||
runLock sync.Mutex
|
||||
// execution is the information about the most recent autopilot execution.
|
||||
// Start will initialize this with the most recent execution and it will
|
||||
// be updated by Stop and by the go routines being executed when they are
|
||||
// finished.
|
||||
execution *execInfo
|
||||
|
||||
// execLock protects access to the execution field
|
||||
execLock sync.Mutex
|
||||
}
|
||||
|
||||
// New will create a new Autopilot instance utilizing the given Raft and Delegate.
|
||||
|
@ -166,6 +187,7 @@ func New(raft Raft, delegate ApplicationIntegration, options ...Option) *Autopil
|
|||
a := &Autopilot{
|
||||
raft: raft,
|
||||
delegate: delegate,
|
||||
state: &State{},
|
||||
promoter: DefaultPromoter(),
|
||||
logger: hclog.Default().Named("autopilot"),
|
||||
// should this be buffered?
|
||||
|
@ -173,6 +195,7 @@ func New(raft Raft, delegate ApplicationIntegration, options ...Option) *Autopil
|
|||
reconcileInterval: DefaultReconcileInterval,
|
||||
updateInterval: DefaultUpdateInterval,
|
||||
time: &runtimeTimeProvider{},
|
||||
leaderLock: newMutex(),
|
||||
}
|
||||
|
||||
for _, opt := range options {
|
||||
|
|
|
@ -7,4 +7,5 @@ require (
|
|||
github.com/hashicorp/raft v1.2.0
|
||||
github.com/stretchr/testify v1.6.1
|
||||
go.uber.org/goleak v1.1.10
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58
|
||||
)
|
||||
|
|
|
@ -67,6 +67,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
|
|||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
This code was taken from the same implementation in a branch from Consul and then
|
||||
had the package updated and the mutex type unexported.
|
||||
*/
|
||||
package autopilot
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
type mutex semaphore.Weighted
|
||||
|
||||
// New returns a Mutex that is ready for use.
|
||||
func newMutex() *mutex {
|
||||
return (*mutex)(semaphore.NewWeighted(1))
|
||||
}
|
||||
|
||||
func (m *mutex) Lock() {
|
||||
_ = (*semaphore.Weighted)(m).Acquire(context.Background(), 1)
|
||||
}
|
||||
|
||||
func (m *mutex) Unlock() {
|
||||
(*semaphore.Weighted)(m).Release(1)
|
||||
}
|
||||
|
||||
// TryLock acquires the mutex, blocking until resources are available or ctx is
|
||||
// done. On success, returns nil. On failure, returns ctx.Err() and leaves the
|
||||
// semaphore unchanged.
|
||||
//
|
||||
// If ctx is already done, Acquire may still succeed without blocking.
|
||||
func (m *mutex) TryLock(ctx context.Context) error {
|
||||
return (*semaphore.Weighted)(m).Acquire(ctx, 1)
|
||||
}
|
|
@ -9,50 +9,100 @@ import (
|
|||
// When the context passed in is cancelled or the Stop method is called
|
||||
// then these routines will exit.
|
||||
func (a *Autopilot) Start(ctx context.Context) {
|
||||
a.runLock.Lock()
|
||||
defer a.runLock.Unlock()
|
||||
a.execLock.Lock()
|
||||
defer a.execLock.Unlock()
|
||||
|
||||
// already running so there is nothing to do
|
||||
if a.running {
|
||||
if a.execution != nil && a.execution.status == Running {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, shutdown := context.WithCancel(ctx)
|
||||
a.shutdown = shutdown
|
||||
a.startTime = a.time.Now()
|
||||
a.done = make(chan struct{})
|
||||
|
||||
// While a go routine executed by a.run below will periodically
|
||||
// update the state, we want to go ahead and force updating it now
|
||||
// so that during a leadership transfer we don't report an empty
|
||||
// autopilot state. We put a pretty small timeout on this though
|
||||
// so as to prevent leader establishment from taking too long
|
||||
updateCtx, updateCancel := context.WithTimeout(ctx, time.Second)
|
||||
defer updateCancel()
|
||||
a.updateState(updateCtx)
|
||||
exec := &execInfo{
|
||||
status: Running,
|
||||
shutdown: shutdown,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
go a.run(ctx)
|
||||
a.running = true
|
||||
if a.execution == nil || a.execution.status == NotRunning {
|
||||
// In theory with a nil execution or the current execution being in the not
|
||||
// running state, we should be able to immediately gain the leader lock as
|
||||
// nothing else should be running and holding the lock. While true we still
|
||||
// gain the lock to ensure that only one thread may even attempt to be
|
||||
// modifying the autopilot state at once.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
if err := a.leaderLock.TryLock(ctx); err == nil {
|
||||
a.updateState(ctx)
|
||||
a.leaderLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
go a.beginExecution(ctx, exec)
|
||||
a.execution = exec
|
||||
return
|
||||
}
|
||||
|
||||
// Stop will terminate the go routines being executed to perform autopilot.
|
||||
func (a *Autopilot) Stop() <-chan struct{} {
|
||||
a.runLock.Lock()
|
||||
defer a.runLock.Unlock()
|
||||
a.execLock.Lock()
|
||||
defer a.execLock.Unlock()
|
||||
|
||||
// Nothing to do
|
||||
if !a.running {
|
||||
if a.execution == nil || a.execution.status == NotRunning {
|
||||
done := make(chan struct{})
|
||||
close(done)
|
||||
return done
|
||||
}
|
||||
|
||||
a.shutdown()
|
||||
return a.done
|
||||
a.execution.shutdown()
|
||||
a.execution.status = ShuttingDown
|
||||
return a.execution.done
|
||||
}
|
||||
|
||||
func (a *Autopilot) run(ctx context.Context) {
|
||||
// IsRunning returns the current execution status of the autopilot
|
||||
// go routines as well as a chan which will be closed when the
|
||||
// routines are no longer running
|
||||
func (a *Autopilot) IsRunning() (ExecutionStatus, <-chan struct{}) {
|
||||
a.execLock.Lock()
|
||||
defer a.execLock.Unlock()
|
||||
|
||||
if a.execution == nil || a.execution.status == NotRunning {
|
||||
done := make(chan struct{})
|
||||
close(done)
|
||||
return NotRunning, done
|
||||
}
|
||||
|
||||
return a.execution.status, a.execution.done
|
||||
}
|
||||
|
||||
func (a *Autopilot) finishExecution(exec *execInfo) {
|
||||
// need to gain the lock because if this was the active execution
|
||||
// then these values may be read while they are updated.
|
||||
a.execLock.Lock()
|
||||
defer a.execLock.Unlock()
|
||||
|
||||
exec.shutdown = nil
|
||||
exec.status = NotRunning
|
||||
// this should be the final cleanup task as it is what notifies the rest
|
||||
// of the world that we are now done
|
||||
close(exec.done)
|
||||
exec.done = nil
|
||||
}
|
||||
|
||||
func (a *Autopilot) beginExecution(ctx context.Context, exec *execInfo) {
|
||||
// This will wait for any other go routine to finish executing
|
||||
// before running any code ourselves to prevent any conflicting
|
||||
// activity between the two.
|
||||
if err := a.leaderLock.TryLock(ctx); err != nil {
|
||||
a.finishExecution(exec)
|
||||
return
|
||||
}
|
||||
|
||||
a.logger.Debug("autopilot is now running")
|
||||
|
||||
// autopilot needs to do 3 things
|
||||
//
|
||||
// 1. periodically update the cluster state
|
||||
|
@ -78,14 +128,8 @@ func (a *Autopilot) run(ctx context.Context) {
|
|||
|
||||
a.logger.Debug("autopilot is now stopped")
|
||||
|
||||
a.runLock.Lock()
|
||||
a.shutdown = nil
|
||||
a.running = false
|
||||
// this should be the final cleanup task as it is what notifies the rest
|
||||
// of the world that we are now done
|
||||
close(a.done)
|
||||
a.done = nil
|
||||
a.runLock.Unlock()
|
||||
a.finishExecution(exec)
|
||||
a.leaderLock.Unlock()
|
||||
}()
|
||||
|
||||
reconcileTicker := time.NewTicker(a.reconcileInterval)
|
||||
|
|
|
@ -30,10 +30,8 @@ type nextStateInputs struct {
|
|||
Now time.Time
|
||||
StartTime time.Time
|
||||
Config *Config
|
||||
State *State
|
||||
RaftConfig *raft.Configuration
|
||||
KnownServers map[raft.ServerID]*Server
|
||||
AliveServers map[raft.ServerID]*Server
|
||||
LatestIndex uint64
|
||||
LastTerm uint64
|
||||
FetchedStats map[raft.ServerID]*ServerStats
|
||||
|
@ -48,16 +46,10 @@ type nextStateInputs struct {
|
|||
// - Current state
|
||||
// - Raft Configuration
|
||||
// - Known Servers
|
||||
// - Latest raft index (gatered right before the remote server stats so that they should
|
||||
// - Latest raft index (gathered right before the remote server stats so that they should
|
||||
// be from about the same point in time)
|
||||
// - Stats for all non-left servers
|
||||
func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs, error) {
|
||||
// we are going to hold this lock for the entire function. In theory nothing should
|
||||
// modify the state on any other go routine so this really shouldn't block anything
|
||||
// else. However we want to ensure that the inputs are as consistent as possible.
|
||||
a.stateLock.RLock()
|
||||
defer a.stateLock.RUnlock()
|
||||
|
||||
// there are a lot of inputs to computing the next state so they get put into a
|
||||
// struct so that we don't have to return 8 values.
|
||||
inputs := &nextStateInputs{
|
||||
|
@ -72,12 +64,6 @@ func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs
|
|||
}
|
||||
inputs.Config = config
|
||||
|
||||
// retrieve the current state
|
||||
inputs.State = a.state
|
||||
if inputs.State == nil {
|
||||
inputs.State = &State{}
|
||||
}
|
||||
|
||||
// retrieve the raft configuration
|
||||
raftConfig, err := a.getRaftConfiguration()
|
||||
if err != nil {
|
||||
|
@ -125,9 +111,6 @@ func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs
|
|||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
// filter the known servers to have a map of just the alive servers
|
||||
inputs.AliveServers = aliveServers(inputs.KnownServers)
|
||||
|
||||
// we only allow the fetch to take place for up to half the health interval
|
||||
// the next health interval will attempt to fetch the stats again but if
|
||||
// we do not see responses within this time then we can assume they are
|
||||
|
@ -136,7 +119,7 @@ func (a *Autopilot) gatherNextStateInputs(ctx context.Context) (*nextStateInputs
|
|||
fetchCtx, cancel := context.WithDeadline(ctx, d)
|
||||
defer cancel()
|
||||
|
||||
inputs.FetchedStats = a.delegate.FetchServerStats(fetchCtx, inputs.AliveServers)
|
||||
inputs.FetchedStats = a.delegate.FetchServerStats(fetchCtx, aliveServers(inputs.KnownServers))
|
||||
|
||||
// it might be nil but we propagate the ctx.Err just in case our context was
|
||||
// cancelled since the last time we checked.
|
||||
|
@ -239,7 +222,7 @@ func (a *Autopilot) nextServers(inputs *nextStateInputs) map[raft.ServerID]*Serv
|
|||
newServers := make(map[raft.ServerID]*ServerState)
|
||||
|
||||
for _, srv := range inputs.RaftConfig.Servers {
|
||||
state := buildServerState(inputs, srv)
|
||||
state := a.buildServerState(inputs, srv)
|
||||
|
||||
// update any promoter specific information. This isn't done within
|
||||
// buildServerState to keep that function "pure" and not require
|
||||
|
@ -257,7 +240,7 @@ func (a *Autopilot) nextServers(inputs *nextStateInputs) map[raft.ServerID]*Serv
|
|||
// buildServerState takes all the nextStateInputs and builds out a ServerState
|
||||
// for the given Raft server. This will take into account the raft configuration
|
||||
// existing state, application known servers and recently fetched stats.
|
||||
func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState {
|
||||
func (a *Autopilot) buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState {
|
||||
// Note that the ordering of operations in this method are very important.
|
||||
// We are building up the ServerState from the least important sources
|
||||
// and overriding them with more up to date values.
|
||||
|
@ -292,23 +275,24 @@ func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState {
|
|||
state.State = RaftLeader
|
||||
}
|
||||
|
||||
var existingHealth *ServerHealth
|
||||
var previousHealthy *bool
|
||||
|
||||
a.stateLock.RLock()
|
||||
// copy some state from an existing server into the new state - most of this
|
||||
// should be overridden soon but at this point we are just building the base.
|
||||
if existing, found := inputs.State.Servers[srv.ID]; found {
|
||||
if existing, found := a.state.Servers[srv.ID]; found {
|
||||
state.Stats = existing.Stats
|
||||
state.Health = existing.Health
|
||||
existingHealth = &existing.Health
|
||||
previousHealthy = &state.Health.Healthy
|
||||
|
||||
// it is is important to note that the map values we retrieved this from are
|
||||
// stored by value. Therefore we are modifying a copy of what is in the existing
|
||||
// state and not the actual state itself. We want to ensure that the Address
|
||||
// is what Raft will know about.
|
||||
existing.Server.Address = srv.Address
|
||||
|
||||
state.Server = existing.Server
|
||||
state.Server.Address = srv.Address
|
||||
}
|
||||
a.stateLock.RUnlock()
|
||||
|
||||
// pull in the latest information from the applications knowledge of the
|
||||
// server. Mainly we want the NodeStatus & Meta
|
||||
|
@ -317,8 +301,8 @@ func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState {
|
|||
// map we retrieved this from has a non-pointer type value. We definitely
|
||||
// do not want to modify the current known servers but we do want to ensure
|
||||
// that we do not overwrite the Address
|
||||
known.Address = srv.Address
|
||||
state.Server = *known
|
||||
state.Server.Address = srv.Address
|
||||
} else {
|
||||
// TODO (mkeeler) do we need a None state. In the previous autopilot code
|
||||
// we would have set this to serf.StatusNone
|
||||
|
@ -336,7 +320,7 @@ func buildServerState(inputs *nextStateInputs, srv raft.Server) ServerState {
|
|||
// the health status changes. No need for an else as we previously set
|
||||
// it when we overwrote the whole Health structure when finding a
|
||||
// server in the existing state
|
||||
if existingHealth == nil || existingHealth.Healthy != state.Health.Healthy {
|
||||
if previousHealthy == nil || *previousHealthy != state.Health.Healthy {
|
||||
state.Health.StableSince = inputs.Now
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
// Copyright 2017 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package semaphore provides a weighted semaphore implementation.
|
||||
package semaphore // import "golang.org/x/sync/semaphore"
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type waiter struct {
|
||||
n int64
|
||||
ready chan<- struct{} // Closed when semaphore acquired.
|
||||
}
|
||||
|
||||
// NewWeighted creates a new weighted semaphore with the given
|
||||
// maximum combined weight for concurrent access.
|
||||
func NewWeighted(n int64) *Weighted {
|
||||
w := &Weighted{size: n}
|
||||
return w
|
||||
}
|
||||
|
||||
// Weighted provides a way to bound concurrent access to a resource.
|
||||
// The callers can request access with a given weight.
|
||||
type Weighted struct {
|
||||
size int64
|
||||
cur int64
|
||||
mu sync.Mutex
|
||||
waiters list.List
|
||||
}
|
||||
|
||||
// Acquire acquires the semaphore with a weight of n, blocking until resources
|
||||
// are available or ctx is done. On success, returns nil. On failure, returns
|
||||
// ctx.Err() and leaves the semaphore unchanged.
|
||||
//
|
||||
// If ctx is already done, Acquire may still succeed without blocking.
|
||||
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
|
||||
s.mu.Lock()
|
||||
if s.size-s.cur >= n && s.waiters.Len() == 0 {
|
||||
s.cur += n
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
if n > s.size {
|
||||
// Don't make other Acquire calls block on one that's doomed to fail.
|
||||
s.mu.Unlock()
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
ready := make(chan struct{})
|
||||
w := waiter{n: n, ready: ready}
|
||||
elem := s.waiters.PushBack(w)
|
||||
s.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := ctx.Err()
|
||||
s.mu.Lock()
|
||||
select {
|
||||
case <-ready:
|
||||
// Acquired the semaphore after we were canceled. Rather than trying to
|
||||
// fix up the queue, just pretend we didn't notice the cancelation.
|
||||
err = nil
|
||||
default:
|
||||
isFront := s.waiters.Front() == elem
|
||||
s.waiters.Remove(elem)
|
||||
// If we're at the front and there're extra tokens left, notify other waiters.
|
||||
if isFront && s.size > s.cur {
|
||||
s.notifyWaiters()
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return err
|
||||
|
||||
case <-ready:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// TryAcquire acquires the semaphore with a weight of n without blocking.
|
||||
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
|
||||
func (s *Weighted) TryAcquire(n int64) bool {
|
||||
s.mu.Lock()
|
||||
success := s.size-s.cur >= n && s.waiters.Len() == 0
|
||||
if success {
|
||||
s.cur += n
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return success
|
||||
}
|
||||
|
||||
// Release releases the semaphore with a weight of n.
|
||||
func (s *Weighted) Release(n int64) {
|
||||
s.mu.Lock()
|
||||
s.cur -= n
|
||||
if s.cur < 0 {
|
||||
s.mu.Unlock()
|
||||
panic("semaphore: released more than held")
|
||||
}
|
||||
s.notifyWaiters()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Weighted) notifyWaiters() {
|
||||
for {
|
||||
next := s.waiters.Front()
|
||||
if next == nil {
|
||||
break // No more waiters blocked.
|
||||
}
|
||||
|
||||
w := next.Value.(waiter)
|
||||
if s.size-s.cur < w.n {
|
||||
// Not enough tokens for the next waiter. We could keep going (to try to
|
||||
// find a waiter with a smaller request), but under load that could cause
|
||||
// starvation for large requests; instead, we leave all remaining waiters
|
||||
// blocked.
|
||||
//
|
||||
// Consider a semaphore used as a read-write lock, with N tokens, N
|
||||
// readers, and one writer. Each reader can Acquire(1) to obtain a read
|
||||
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
|
||||
// of the readers. If we allow the readers to jump ahead in the queue,
|
||||
// the writer will starve — there is always one token available for every
|
||||
// reader.
|
||||
break
|
||||
}
|
||||
|
||||
s.cur += w.n
|
||||
s.waiters.Remove(next)
|
||||
close(w.ready)
|
||||
}
|
||||
}
|
|
@ -280,7 +280,7 @@ github.com/hashicorp/memberlist
|
|||
github.com/hashicorp/net-rpc-msgpackrpc
|
||||
# github.com/hashicorp/raft v1.2.0
|
||||
github.com/hashicorp/raft
|
||||
# github.com/hashicorp/raft-autopilot v0.1.1
|
||||
# github.com/hashicorp/raft-autopilot v0.1.2
|
||||
github.com/hashicorp/raft-autopilot
|
||||
# github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
|
||||
github.com/hashicorp/raft-boltdb
|
||||
|
@ -507,6 +507,7 @@ golang.org/x/oauth2/jws
|
|||
golang.org/x/oauth2/jwt
|
||||
# golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
|
||||
golang.org/x/sync/errgroup
|
||||
golang.org/x/sync/semaphore
|
||||
golang.org/x/sync/singleflight
|
||||
# golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5
|
||||
golang.org/x/sys/cpu
|
||||
|
|
Loading…
Reference in New Issue