ae: ensure that syncs are blocked when paused

This commit is contained in:
Frank Schroeder 2017-10-18 16:50:15 +02:00
parent f187c37c27
commit 6064a2a764
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD
1 changed files with 37 additions and 37 deletions

View File

@ -2,9 +2,10 @@
package ae package ae
import ( import (
"errors"
"log" "log"
"math" "math"
"sync/atomic" "sync"
"time" "time"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
@ -75,7 +76,8 @@ type StateSyncer struct {
SyncChanges *Trigger SyncChanges *Trigger
// paused stores whether sync runs are temporarily disabled. // paused stores whether sync runs are temporarily disabled.
paused *toggle pauseLock sync.Mutex
paused bool
} }
func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer { func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
@ -86,7 +88,6 @@ func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, lo
Logger: logger, Logger: logger,
SyncFull: NewTrigger(), SyncFull: NewTrigger(),
SyncChanges: NewTrigger(), SyncChanges: NewTrigger(),
paused: new(toggle),
} }
} }
@ -99,6 +100,8 @@ const (
retryFailIntv = 15 * time.Second retryFailIntv = 15 * time.Second
) )
var errPaused = errors.New("paused")
// Run is the long running method to perform state synchronization // Run is the long running method to perform state synchronization
// between local and remote servers. // between local and remote servers.
func (s *StateSyncer) Run() { func (s *StateSyncer) Run() {
@ -114,13 +117,13 @@ func (s *StateSyncer) Run() {
FullSync: FullSync:
for { for {
// attempt a full sync // attempt a full sync
if err := s.State.SyncFull(); err != nil { err := s.ifNotPausedRun(s.State.SyncFull)
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) if err != nil {
if err != errPaused {
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
}
// retry full sync after some time or when a consul
// server was added.
select { select {
// trigger a full sync immediately. // trigger a full sync immediately.
// this is usually called when a consul server was added to the cluster. // this is usually called when a consul server was added to the cluster.
// stagger the delay to avoid a thundering herd. // stagger the delay to avoid a thundering herd.
@ -162,10 +165,8 @@ FullSync:
// do partial syncs on demand // do partial syncs on demand
case <-s.SyncChanges.Notif(): case <-s.SyncChanges.Notif():
if s.Paused() { err := s.ifNotPausedRun(s.State.SyncChanges)
continue if err != nil && err != errPaused {
}
if err := s.State.SyncChanges(); err != nil {
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
} }
@ -176,40 +177,39 @@ FullSync:
} }
} }
func (s *StateSyncer) ifNotPausedRun(f func() error) error {
s.pauseLock.Lock()
defer s.pauseLock.Unlock()
if s.paused {
return errPaused
}
return f()
}
// Pause temporarily disables sync runs. // Pause temporarily disables sync runs.
func (s *StateSyncer) Pause() { func (s *StateSyncer) Pause() {
s.paused.On() s.pauseLock.Lock()
if s.paused {
panic("pause while paused")
}
s.paused = true
s.pauseLock.Unlock()
} }
// Paused returns whether sync runs are temporarily disabled. // Paused returns whether sync runs are temporarily disabled.
func (s *StateSyncer) Paused() bool { func (s *StateSyncer) Paused() bool {
return s.paused.IsOn() s.pauseLock.Lock()
defer s.pauseLock.Unlock()
return s.paused
} }
// Resume re-enables sync runs. // Resume re-enables sync runs.
func (s *StateSyncer) Resume() { func (s *StateSyncer) Resume() {
s.paused.Off() s.pauseLock.Lock()
if !s.paused {
panic("resume while not paused")
}
s.paused = false
s.pauseLock.Unlock()
s.SyncChanges.Trigger() s.SyncChanges.Trigger()
} }
// toggle implements an on/off switch using methods from the atomic
// package. Since fields in structs that are accessed via
// atomic.Load/Add methods need to be aligned properly on some platforms
// we move that code into a separate struct.
//
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for details
type toggle int32
func (p *toggle) On() {
atomic.AddInt32((*int32)(p), 1)
}
func (p *toggle) Off() {
if atomic.AddInt32((*int32)(p), -1) < 0 {
panic("toggle not on")
}
}
func (p *toggle) IsOn() bool {
return atomic.LoadInt32((*int32)(p)) > 0
}