From c1badf103457691fb59c8e3e4ff84873ba362a5a Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Wed, 18 Oct 2017 16:50:15 +0200 Subject: [PATCH] ae: ensure that syncs are blocked when paused --- agent/ae/ae.go | 74 +++++++++++++++++++++++++------------------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/agent/ae/ae.go b/agent/ae/ae.go index 6572d815b..4ec72ed46 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -2,9 +2,10 @@ package ae import ( + "errors" "log" "math" - "sync/atomic" + "sync" "time" "github.com/hashicorp/consul/lib" @@ -75,7 +76,8 @@ type StateSyncer struct { SyncChanges *Trigger // paused stores whether sync runs are temporarily disabled. - paused *toggle + pauseLock sync.Mutex + paused bool } func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer { @@ -86,7 +88,6 @@ func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, lo Logger: logger, SyncFull: NewTrigger(), SyncChanges: NewTrigger(), - paused: new(toggle), } } @@ -99,6 +100,8 @@ const ( retryFailIntv = 15 * time.Second ) +var errPaused = errors.New("paused") + // Run is the long running method to perform state synchronization // between local and remote servers. func (s *StateSyncer) Run() { @@ -114,13 +117,13 @@ func (s *StateSyncer) Run() { FullSync: for { // attempt a full sync - if err := s.State.SyncFull(); err != nil { - s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) + err := s.ifNotPausedRun(s.State.SyncFull) + if err != nil { + if err != errPaused { + s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) + } - // retry full sync after some time or when a consul - // server was added. select { - // trigger a full sync immediately. // this is usually called when a consul server was added to the cluster. // stagger the delay to avoid a thundering herd. @@ -162,10 +165,8 @@ FullSync: // do partial syncs on demand case <-s.SyncChanges.Notif(): - if s.Paused() { - continue - } - if err := s.State.SyncChanges(); err != nil { + err := s.ifNotPausedRun(s.State.SyncChanges) + if err != nil && err != errPaused { s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) } @@ -176,40 +177,39 @@ FullSync: } } +func (s *StateSyncer) ifNotPausedRun(f func() error) error { + s.pauseLock.Lock() + defer s.pauseLock.Unlock() + if s.paused { + return errPaused + } + return f() +} + // Pause temporarily disables sync runs. func (s *StateSyncer) Pause() { - s.paused.On() + s.pauseLock.Lock() + if s.paused { + panic("pause while paused") + } + s.paused = true + s.pauseLock.Unlock() } // Paused returns whether sync runs are temporarily disabled. func (s *StateSyncer) Paused() bool { - return s.paused.IsOn() + s.pauseLock.Lock() + defer s.pauseLock.Unlock() + return s.paused } // Resume re-enables sync runs. func (s *StateSyncer) Resume() { - s.paused.Off() + s.pauseLock.Lock() + if !s.paused { + panic("resume while not paused") + } + s.paused = false + s.pauseLock.Unlock() s.SyncChanges.Trigger() } - -// toggle implements an on/off switch using methods from the atomic -// package. Since fields in structs that are accessed via -// atomic.Load/Add methods need to be aligned properly on some platforms -// we move that code into a separate struct. -// -// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for details -type toggle int32 - -func (p *toggle) On() { - atomic.AddInt32((*int32)(p), 1) -} - -func (p *toggle) Off() { - if atomic.AddInt32((*int32)(p), -1) < 0 { - panic("toggle not on") - } -} - -func (p *toggle) IsOn() bool { - return atomic.LoadInt32((*int32)(p)) > 0 -}