diff --git a/agent/ae/ae.go b/agent/ae/ae.go index d5f56c37a..78bf93e25 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -1,4 +1,4 @@ -// Package ae provides an anti-entropy mechanism for the local state. +// Package ae provides tools to synchronize state between local and remote consul servers. package ae import ( @@ -10,35 +10,43 @@ import ( "github.com/hashicorp/consul/lib" ) -const ( - // This scale factor means we will add a minute after we cross 128 nodes, - // another at 256, another at 512, etc. By 8192 nodes, we will scale up - // by a factor of 8. - // - // If you update this, you may need to adjust the tuning of - // CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. - aeScaleThreshold = 128 +// scaleThreshold is the number of nodes after which regular sync runs are +// spread out farther apart. The value should be a power of 2 since the +// scale function uses log2. +// +// When set to 128 nodes the delay between regular runs is doubled when the +// cluster is larger than 128 nodes. It doubles again when it passes 256 +// nodes, and again at 512 nodes and so forth. At 8192 nodes, the delay +// factor is 8. +// +// If you update this, you may need to adjust the tuning of +// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. +const scaleThreshold = 128 - syncStaggerIntv = 3 * time.Second - syncRetryIntv = 15 * time.Second -) - -// aeScale is used to scale the time interval at which anti-entropy updates take -// place. It is used to prevent saturation as the cluster size grows. -func aeScale(d time.Duration, n int) time.Duration { - // Don't scale until we cross the threshold - if n <= aeScaleThreshold { - return d +// scaleFactor returns a factor by which the next sync run should be delayed to +// avoid saturation of the cluster. The larger the cluster grows the farther +// the sync runs should be spread apart. +// +// The current implementation uses a log2 scale which doubles the delay between +// runs every time the cluster doubles in size. +func scaleFactor(nodes int) int { + if nodes <= scaleThreshold { + return 1.0 } - - mult := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0 - return time.Duration(mult) * d + return int(math.Ceil(math.Log2(float64(nodes))-math.Log2(float64(scaleThreshold))) + 1.0) } +// StateSyncer manages background synchronization of the given state. +// +// The state is synchronized on a regular basis or on demand when either +// the state has changed or a new Consul server has joined the cluster. +// +// The regular state sychronization provides a self-healing mechanism +// for the cluster which is also called anti-entropy. type StateSyncer struct { - // paused is used to check if we are paused. Must be the first - // element due to a go bug. - // todo(fs): which bug? still relevant? + // paused flags whether sync runs are temporarily disabled. + // Must be the first element due to a go bug. + // todo(fs): which bug? Is this still relevant? paused int32 // State contains the data that needs to be synchronized. @@ -47,18 +55,18 @@ type StateSyncer struct { SyncChanges() error } - // Interval is the time between two sync runs. + // Interval is the time between two regular sync runs. Interval time.Duration - // ClusterSize returns the number of members in the cluster. - // todo(fs): we use this for staggering but what about a random number? + // ClusterSize returns the number of members in the cluster to + // allow staggering the sync runs based on cluster size. ClusterSize func() int // ShutdownCh is closed when the application is shutting down. ShutdownCh chan struct{} - // ConsulCh contains data when a new consul server has been added to the cluster. - ConsulCh chan struct{} + // ServerUpCh contains data when a new consul server has been added to the cluster. + ServerUpCh chan struct{} // TriggerCh contains data when a sync should run immediately. TriggerCh chan struct{} @@ -66,81 +74,112 @@ type StateSyncer struct { Logger *log.Logger } -// Pause is used to pause state synchronization, this can be -// used to make batch changes -func (ae *StateSyncer) Pause() { - atomic.AddInt32(&ae.paused, 1) -} +const ( + // serverUpIntv is the max time to wait before a sync is triggered + // when a consul server has been added to the cluster. + serverUpIntv = 3 * time.Second -// Resume is used to resume state synchronization -func (ae *StateSyncer) Resume() { - paused := atomic.AddInt32(&ae.paused, -1) - if paused < 0 { - panic("unbalanced State.Resume() detected") + // retryFailIntv is the min time to wait before a failed sync is retried. + retryFailIntv = 15 * time.Second +) + +// Run is the long running method to perform state synchronization +// between local and remote servers. +func (s *StateSyncer) Run() { + stagger := func(d time.Duration) time.Duration { + f := scaleFactor(s.ClusterSize()) + return lib.RandomStagger(time.Duration(f) * d) } - ae.changeMade() -} -// Paused is used to check if we are paused -func (ae *StateSyncer) Paused() bool { - return atomic.LoadInt32(&ae.paused) > 0 -} - -func (ae *StateSyncer) changeMade() { - select { - case ae.TriggerCh <- struct{}{}: - default: - } -} - -// antiEntropy is a long running method used to perform anti-entropy -// between local and remote state. -func (ae *StateSyncer) Run() { -SYNC: - // Sync our state with the servers +Sync: for { - err := ae.State.UpdateSyncState() + // update the sync status + err := s.State.UpdateSyncState() if err == nil { break } - ae.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) + + s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) + + // retry updating sync status after some time or when a consul + // server was added. select { - case <-ae.ConsulCh: - // Stagger the retry on leader election, avoid a thundering heard + + // consul server added to cluster. + // retry sooner than retryFailIntv to converge cluster quicker + // but stagger delay to avoid thundering herd + case <-s.ServerUpCh: select { - case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, ae.ClusterSize()))): - case <-ae.ShutdownCh: + case <-time.After(stagger(serverUpIntv)): + case <-s.ShutdownCh: return } - case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, ae.ClusterSize()))): - case <-ae.ShutdownCh: + + // retry full sync after some time + // todo(fs): why don't we use s.Interval here? + case <-time.After(retryFailIntv + stagger(retryFailIntv)): + + case <-s.ShutdownCh: return } } - // Force-trigger AE to pickup any changes - ae.changeMade() - - // Schedule the next full sync, with a random stagger - aeIntv := aeScale(ae.Interval, ae.ClusterSize()) - aeIntv = aeIntv + lib.RandomStagger(aeIntv) - aeTimer := time.After(aeIntv) + // Force-trigger sync to pickup any changes + s.triggerSync() // Wait for sync events for { select { - case <-aeTimer: - goto SYNC - case <-ae.TriggerCh: - // Skip the sync if we are paused - if ae.Paused() { + // todo(fs): why don't we honor the ServerUpCh here as well? + // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) + // case <-s.ServerUpCh: + // select { + // case <-time.After(stagger(serverUpIntv)): + // continue Sync + // case <-s.ShutdownCh: + // return + // } + + case <-time.After(s.Interval + stagger(s.Interval)): + goto Sync + + case <-s.TriggerCh: + if s.Paused() { continue } - if err := ae.State.SyncChanges(); err != nil { - ae.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) + if err := s.State.SyncChanges(); err != nil { + s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) } - case <-ae.ShutdownCh: + + case <-s.ShutdownCh: return } } } + +// Pause temporarily disables sync runs. +func (s *StateSyncer) Pause() { + atomic.AddInt32(&s.paused, 1) +} + +// Paused returns whether sync runs are temporarily disabled. +func (s *StateSyncer) Paused() bool { + return atomic.LoadInt32(&s.paused) > 0 +} + +// Resume re-enables sync runs. +func (s *StateSyncer) Resume() { + paused := atomic.AddInt32(&s.paused, -1) + if paused < 0 { + panic("unbalanced StateSyncer.Resume() detected") + } + s.triggerSync() +} + +// triggerSync queues a sync run if one has not been triggered already. +func (s *StateSyncer) triggerSync() { + select { + case s.TriggerCh <- struct{}{}: + default: + } +} diff --git a/agent/ae/ae_test.go b/agent/ae/ae_test.go index 5d91f2f9b..7246e7e24 100644 --- a/agent/ae/ae_test.go +++ b/agent/ae/ae_test.go @@ -1,24 +1,27 @@ package ae import ( + "fmt" "testing" - "time" ) -func TestAE_scale(t *testing.T) { +func TestAE_scaleFactor(t *testing.T) { t.Parallel() - intv := time.Minute - if v := aeScale(intv, 100); v != intv { - t.Fatalf("Bad: %v", v) + tests := []struct { + nodes int + scale int + }{ + {100, 1}, + {200, 2}, + {1000, 4}, + {10000, 8}, } - if v := aeScale(intv, 200); v != 2*intv { - t.Fatalf("Bad: %v", v) - } - if v := aeScale(intv, 1000); v != 4*intv { - t.Fatalf("Bad: %v", v) - } - if v := aeScale(intv, 10000); v != 8*intv { - t.Fatalf("Bad: %v", v) + for _, tt := range tests { + t.Run(fmt.Sprintf("%d nodes", tt.nodes), func(t *testing.T) { + if got, want := scaleFactor(tt.nodes), tt.scale; got != want { + t.Fatalf("got scale factor %d want %d", got, want) + } + }) } } diff --git a/agent/agent.go b/agent/agent.go index 1238359f3..ddd438a6b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -248,7 +248,7 @@ func (a *Agent) Start() error { // create a notif channel to trigger state sychronizations // when a consul server was added to the cluster. - consulCh := make(chan struct{}, 1) + serverUpCh := make(chan struct{}, 1) // create a notif channel to trigger state synchronizations // when the state has changed. @@ -263,7 +263,7 @@ func (a *Agent) Start() error { State: a.state, Interval: c.AEInterval, ShutdownCh: a.shutdownCh, - ConsulCh: consulCh, + ServerUpCh: serverUpCh, TriggerCh: triggerCh, Logger: a.logger, } @@ -280,7 +280,7 @@ func (a *Agent) Start() error { // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer consulCfg.ServerUp = func() { select { - case consulCh <- struct{}{}: + case serverUpCh <- struct{}{}: default: } }