agent: cleanup StateSyncer
This patch cleans up the state syncer code by renaming fields, adding helpers and documentation.
This commit is contained in:
parent
e7051da8d1
commit
8cc4ee886d
203
agent/ae/ae.go
203
agent/ae/ae.go
|
@ -1,4 +1,4 @@
|
|||
// Package ae provides an anti-entropy mechanism for the local state.
|
||||
// Package ae provides tools to synchronize state between local and remote consul servers.
|
||||
package ae
|
||||
|
||||
import (
|
||||
|
@ -10,35 +10,43 @@ import (
|
|||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
const (
|
||||
// This scale factor means we will add a minute after we cross 128 nodes,
|
||||
// another at 256, another at 512, etc. By 8192 nodes, we will scale up
|
||||
// by a factor of 8.
|
||||
//
|
||||
// If you update this, you may need to adjust the tuning of
|
||||
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
|
||||
aeScaleThreshold = 128
|
||||
// scaleThreshold is the number of nodes after which regular sync runs are
|
||||
// spread out farther apart. The value should be a power of 2 since the
|
||||
// scale function uses log2.
|
||||
//
|
||||
// When set to 128 nodes the delay between regular runs is doubled when the
|
||||
// cluster is larger than 128 nodes. It doubles again when it passes 256
|
||||
// nodes, and again at 512 nodes and so forth. At 8192 nodes, the delay
|
||||
// factor is 8.
|
||||
//
|
||||
// If you update this, you may need to adjust the tuning of
|
||||
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
|
||||
const scaleThreshold = 128
|
||||
|
||||
syncStaggerIntv = 3 * time.Second
|
||||
syncRetryIntv = 15 * time.Second
|
||||
)
|
||||
|
||||
// aeScale is used to scale the time interval at which anti-entropy updates take
|
||||
// place. It is used to prevent saturation as the cluster size grows.
|
||||
func aeScale(d time.Duration, n int) time.Duration {
|
||||
// Don't scale until we cross the threshold
|
||||
if n <= aeScaleThreshold {
|
||||
return d
|
||||
// scaleFactor returns a factor by which the next sync run should be delayed to
|
||||
// avoid saturation of the cluster. The larger the cluster grows the farther
|
||||
// the sync runs should be spread apart.
|
||||
//
|
||||
// The current implementation uses a log2 scale which doubles the delay between
|
||||
// runs every time the cluster doubles in size.
|
||||
func scaleFactor(nodes int) int {
|
||||
if nodes <= scaleThreshold {
|
||||
return 1.0
|
||||
}
|
||||
|
||||
mult := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0
|
||||
return time.Duration(mult) * d
|
||||
return int(math.Ceil(math.Log2(float64(nodes))-math.Log2(float64(scaleThreshold))) + 1.0)
|
||||
}
|
||||
|
||||
// StateSyncer manages background synchronization of the given state.
|
||||
//
|
||||
// The state is synchronized on a regular basis or on demand when either
|
||||
// the state has changed or a new Consul server has joined the cluster.
|
||||
//
|
||||
// The regular state sychronization provides a self-healing mechanism
|
||||
// for the cluster which is also called anti-entropy.
|
||||
type StateSyncer struct {
|
||||
// paused is used to check if we are paused. Must be the first
|
||||
// element due to a go bug.
|
||||
// todo(fs): which bug? still relevant?
|
||||
// paused flags whether sync runs are temporarily disabled.
|
||||
// Must be the first element due to a go bug.
|
||||
// todo(fs): which bug? Is this still relevant?
|
||||
paused int32
|
||||
|
||||
// State contains the data that needs to be synchronized.
|
||||
|
@ -47,18 +55,18 @@ type StateSyncer struct {
|
|||
SyncChanges() error
|
||||
}
|
||||
|
||||
// Interval is the time between two sync runs.
|
||||
// Interval is the time between two regular sync runs.
|
||||
Interval time.Duration
|
||||
|
||||
// ClusterSize returns the number of members in the cluster.
|
||||
// todo(fs): we use this for staggering but what about a random number?
|
||||
// ClusterSize returns the number of members in the cluster to
|
||||
// allow staggering the sync runs based on cluster size.
|
||||
ClusterSize func() int
|
||||
|
||||
// ShutdownCh is closed when the application is shutting down.
|
||||
ShutdownCh chan struct{}
|
||||
|
||||
// ConsulCh contains data when a new consul server has been added to the cluster.
|
||||
ConsulCh chan struct{}
|
||||
// ServerUpCh contains data when a new consul server has been added to the cluster.
|
||||
ServerUpCh chan struct{}
|
||||
|
||||
// TriggerCh contains data when a sync should run immediately.
|
||||
TriggerCh chan struct{}
|
||||
|
@ -66,81 +74,112 @@ type StateSyncer struct {
|
|||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// Pause is used to pause state synchronization, this can be
|
||||
// used to make batch changes
|
||||
func (ae *StateSyncer) Pause() {
|
||||
atomic.AddInt32(&ae.paused, 1)
|
||||
}
|
||||
const (
|
||||
// serverUpIntv is the max time to wait before a sync is triggered
|
||||
// when a consul server has been added to the cluster.
|
||||
serverUpIntv = 3 * time.Second
|
||||
|
||||
// Resume is used to resume state synchronization
|
||||
func (ae *StateSyncer) Resume() {
|
||||
paused := atomic.AddInt32(&ae.paused, -1)
|
||||
if paused < 0 {
|
||||
panic("unbalanced State.Resume() detected")
|
||||
// retryFailIntv is the min time to wait before a failed sync is retried.
|
||||
retryFailIntv = 15 * time.Second
|
||||
)
|
||||
|
||||
// Run is the long running method to perform state synchronization
|
||||
// between local and remote servers.
|
||||
func (s *StateSyncer) Run() {
|
||||
stagger := func(d time.Duration) time.Duration {
|
||||
f := scaleFactor(s.ClusterSize())
|
||||
return lib.RandomStagger(time.Duration(f) * d)
|
||||
}
|
||||
ae.changeMade()
|
||||
}
|
||||
|
||||
// Paused is used to check if we are paused
|
||||
func (ae *StateSyncer) Paused() bool {
|
||||
return atomic.LoadInt32(&ae.paused) > 0
|
||||
}
|
||||
|
||||
func (ae *StateSyncer) changeMade() {
|
||||
select {
|
||||
case ae.TriggerCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// antiEntropy is a long running method used to perform anti-entropy
|
||||
// between local and remote state.
|
||||
func (ae *StateSyncer) Run() {
|
||||
SYNC:
|
||||
// Sync our state with the servers
|
||||
Sync:
|
||||
for {
|
||||
err := ae.State.UpdateSyncState()
|
||||
// update the sync status
|
||||
err := s.State.UpdateSyncState()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
ae.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
||||
|
||||
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
||||
|
||||
// retry updating sync status after some time or when a consul
|
||||
// server was added.
|
||||
select {
|
||||
case <-ae.ConsulCh:
|
||||
// Stagger the retry on leader election, avoid a thundering heard
|
||||
|
||||
// consul server added to cluster.
|
||||
// retry sooner than retryFailIntv to converge cluster quicker
|
||||
// but stagger delay to avoid thundering herd
|
||||
case <-s.ServerUpCh:
|
||||
select {
|
||||
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, ae.ClusterSize()))):
|
||||
case <-ae.ShutdownCh:
|
||||
case <-time.After(stagger(serverUpIntv)):
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, ae.ClusterSize()))):
|
||||
case <-ae.ShutdownCh:
|
||||
|
||||
// retry full sync after some time
|
||||
// todo(fs): why don't we use s.Interval here?
|
||||
case <-time.After(retryFailIntv + stagger(retryFailIntv)):
|
||||
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Force-trigger AE to pickup any changes
|
||||
ae.changeMade()
|
||||
|
||||
// Schedule the next full sync, with a random stagger
|
||||
aeIntv := aeScale(ae.Interval, ae.ClusterSize())
|
||||
aeIntv = aeIntv + lib.RandomStagger(aeIntv)
|
||||
aeTimer := time.After(aeIntv)
|
||||
// Force-trigger sync to pickup any changes
|
||||
s.triggerSync()
|
||||
|
||||
// Wait for sync events
|
||||
for {
|
||||
select {
|
||||
case <-aeTimer:
|
||||
goto SYNC
|
||||
case <-ae.TriggerCh:
|
||||
// Skip the sync if we are paused
|
||||
if ae.Paused() {
|
||||
// todo(fs): why don't we honor the ServerUpCh here as well?
|
||||
// todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv)
|
||||
// case <-s.ServerUpCh:
|
||||
// select {
|
||||
// case <-time.After(stagger(serverUpIntv)):
|
||||
// continue Sync
|
||||
// case <-s.ShutdownCh:
|
||||
// return
|
||||
// }
|
||||
|
||||
case <-time.After(s.Interval + stagger(s.Interval)):
|
||||
goto Sync
|
||||
|
||||
case <-s.TriggerCh:
|
||||
if s.Paused() {
|
||||
continue
|
||||
}
|
||||
if err := ae.State.SyncChanges(); err != nil {
|
||||
ae.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||
if err := s.State.SyncChanges(); err != nil {
|
||||
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||
}
|
||||
case <-ae.ShutdownCh:
|
||||
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Pause temporarily disables sync runs.
|
||||
func (s *StateSyncer) Pause() {
|
||||
atomic.AddInt32(&s.paused, 1)
|
||||
}
|
||||
|
||||
// Paused returns whether sync runs are temporarily disabled.
|
||||
func (s *StateSyncer) Paused() bool {
|
||||
return atomic.LoadInt32(&s.paused) > 0
|
||||
}
|
||||
|
||||
// Resume re-enables sync runs.
|
||||
func (s *StateSyncer) Resume() {
|
||||
paused := atomic.AddInt32(&s.paused, -1)
|
||||
if paused < 0 {
|
||||
panic("unbalanced StateSyncer.Resume() detected")
|
||||
}
|
||||
s.triggerSync()
|
||||
}
|
||||
|
||||
// triggerSync queues a sync run if one has not been triggered already.
|
||||
func (s *StateSyncer) triggerSync() {
|
||||
select {
|
||||
case s.TriggerCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,24 +1,27 @@
|
|||
package ae
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestAE_scale(t *testing.T) {
|
||||
func TestAE_scaleFactor(t *testing.T) {
|
||||
t.Parallel()
|
||||
intv := time.Minute
|
||||
if v := aeScale(intv, 100); v != intv {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
tests := []struct {
|
||||
nodes int
|
||||
scale int
|
||||
}{
|
||||
{100, 1},
|
||||
{200, 2},
|
||||
{1000, 4},
|
||||
{10000, 8},
|
||||
}
|
||||
if v := aeScale(intv, 200); v != 2*intv {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := aeScale(intv, 1000); v != 4*intv {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
}
|
||||
if v := aeScale(intv, 10000); v != 8*intv {
|
||||
t.Fatalf("Bad: %v", v)
|
||||
for _, tt := range tests {
|
||||
t.Run(fmt.Sprintf("%d nodes", tt.nodes), func(t *testing.T) {
|
||||
if got, want := scaleFactor(tt.nodes), tt.scale; got != want {
|
||||
t.Fatalf("got scale factor %d want %d", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -248,7 +248,7 @@ func (a *Agent) Start() error {
|
|||
|
||||
// create a notif channel to trigger state sychronizations
|
||||
// when a consul server was added to the cluster.
|
||||
consulCh := make(chan struct{}, 1)
|
||||
serverUpCh := make(chan struct{}, 1)
|
||||
|
||||
// create a notif channel to trigger state synchronizations
|
||||
// when the state has changed.
|
||||
|
@ -263,7 +263,7 @@ func (a *Agent) Start() error {
|
|||
State: a.state,
|
||||
Interval: c.AEInterval,
|
||||
ShutdownCh: a.shutdownCh,
|
||||
ConsulCh: consulCh,
|
||||
ServerUpCh: serverUpCh,
|
||||
TriggerCh: triggerCh,
|
||||
Logger: a.logger,
|
||||
}
|
||||
|
@ -280,7 +280,7 @@ func (a *Agent) Start() error {
|
|||
// todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer
|
||||
consulCfg.ServerUp = func() {
|
||||
select {
|
||||
case consulCh <- struct{}{}:
|
||||
case serverUpCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue