diff --git a/agent/ae/ae.go b/agent/ae/ae.go new file mode 100644 index 000000000..d5f56c37a --- /dev/null +++ b/agent/ae/ae.go @@ -0,0 +1,146 @@ +// Package ae provides an anti-entropy mechanism for the local state. +package ae + +import ( + "log" + "math" + "sync/atomic" + "time" + + "github.com/hashicorp/consul/lib" +) + +const ( + // This scale factor means we will add a minute after we cross 128 nodes, + // another at 256, another at 512, etc. By 8192 nodes, we will scale up + // by a factor of 8. + // + // If you update this, you may need to adjust the tuning of + // CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. + aeScaleThreshold = 128 + + syncStaggerIntv = 3 * time.Second + syncRetryIntv = 15 * time.Second +) + +// aeScale is used to scale the time interval at which anti-entropy updates take +// place. It is used to prevent saturation as the cluster size grows. +func aeScale(d time.Duration, n int) time.Duration { + // Don't scale until we cross the threshold + if n <= aeScaleThreshold { + return d + } + + mult := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0 + return time.Duration(mult) * d +} + +type StateSyncer struct { + // paused is used to check if we are paused. Must be the first + // element due to a go bug. + // todo(fs): which bug? still relevant? + paused int32 + + // State contains the data that needs to be synchronized. + State interface { + UpdateSyncState() error + SyncChanges() error + } + + // Interval is the time between two sync runs. + Interval time.Duration + + // ClusterSize returns the number of members in the cluster. + // todo(fs): we use this for staggering but what about a random number? + ClusterSize func() int + + // ShutdownCh is closed when the application is shutting down. + ShutdownCh chan struct{} + + // ConsulCh contains data when a new consul server has been added to the cluster. + ConsulCh chan struct{} + + // TriggerCh contains data when a sync should run immediately. + TriggerCh chan struct{} + + Logger *log.Logger +} + +// Pause is used to pause state synchronization, this can be +// used to make batch changes +func (ae *StateSyncer) Pause() { + atomic.AddInt32(&ae.paused, 1) +} + +// Resume is used to resume state synchronization +func (ae *StateSyncer) Resume() { + paused := atomic.AddInt32(&ae.paused, -1) + if paused < 0 { + panic("unbalanced State.Resume() detected") + } + ae.changeMade() +} + +// Paused is used to check if we are paused +func (ae *StateSyncer) Paused() bool { + return atomic.LoadInt32(&ae.paused) > 0 +} + +func (ae *StateSyncer) changeMade() { + select { + case ae.TriggerCh <- struct{}{}: + default: + } +} + +// antiEntropy is a long running method used to perform anti-entropy +// between local and remote state. +func (ae *StateSyncer) Run() { +SYNC: + // Sync our state with the servers + for { + err := ae.State.UpdateSyncState() + if err == nil { + break + } + ae.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) + select { + case <-ae.ConsulCh: + // Stagger the retry on leader election, avoid a thundering heard + select { + case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, ae.ClusterSize()))): + case <-ae.ShutdownCh: + return + } + case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, ae.ClusterSize()))): + case <-ae.ShutdownCh: + return + } + } + + // Force-trigger AE to pickup any changes + ae.changeMade() + + // Schedule the next full sync, with a random stagger + aeIntv := aeScale(ae.Interval, ae.ClusterSize()) + aeIntv = aeIntv + lib.RandomStagger(aeIntv) + aeTimer := time.After(aeIntv) + + // Wait for sync events + for { + select { + case <-aeTimer: + goto SYNC + case <-ae.TriggerCh: + // Skip the sync if we are paused + if ae.Paused() { + continue + } + if err := ae.State.SyncChanges(); err != nil { + ae.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) + } + case <-ae.ShutdownCh: + return + } + } +} diff --git a/agent/ae/ae_test.go b/agent/ae/ae_test.go new file mode 100644 index 000000000..5d91f2f9b --- /dev/null +++ b/agent/ae/ae_test.go @@ -0,0 +1,55 @@ +package ae + +import ( + "testing" + "time" +) + +func TestAE_scale(t *testing.T) { + t.Parallel() + intv := time.Minute + if v := aeScale(intv, 100); v != intv { + t.Fatalf("Bad: %v", v) + } + if v := aeScale(intv, 200); v != 2*intv { + t.Fatalf("Bad: %v", v) + } + if v := aeScale(intv, 1000); v != 4*intv { + t.Fatalf("Bad: %v", v) + } + if v := aeScale(intv, 10000); v != 8*intv { + t.Fatalf("Bad: %v", v) + } +} + +func TestAE_nestedPauseResume(t *testing.T) { + t.Parallel() + l := new(StateSyncer) + if l.Paused() != false { + t.Fatal("syncer should be unPaused after init") + } + l.Pause() + if l.Paused() != true { + t.Fatal("syncer should be Paused after first call to Pause()") + } + l.Pause() + if l.Paused() != true { + t.Fatal("syncer should STILL be Paused after second call to Pause()") + } + l.Resume() + if l.Paused() != true { + t.Fatal("syncer should STILL be Paused after FIRST call to Resume()") + } + l.Resume() + if l.Paused() != false { + t.Fatal("syncer should NOT be Paused after SECOND call to Resume()") + } + + defer func() { + err := recover() + if err == nil { + t.Fatal("unbalanced Resume() should cause a panic()") + } + }() + l.Resume() +} diff --git a/agent/agent.go b/agent/agent.go index c2abbb76d..1238359f3 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -20,6 +20,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/ae" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" @@ -109,6 +110,10 @@ type Agent struct { // services and checks. Used for anti-entropy. state *localState + // sync manages the synchronization of the local + // and the remote state. + sync *ae.StateSyncer + // checkReapAfter maps the check ID to a timeout after which we should // reap its associated service checkReapAfter map[types.CheckID]time.Duration @@ -241,8 +246,27 @@ func (a *Agent) Start() error { return fmt.Errorf("Failed to setup node ID: %v", err) } + // create a notif channel to trigger state sychronizations + // when a consul server was added to the cluster. + consulCh := make(chan struct{}, 1) + + // create a notif channel to trigger state synchronizations + // when the state has changed. + triggerCh := make(chan struct{}, 1) + // create the local state - a.state = NewLocalState(c, a.logger, a.tokens) + a.state = NewLocalState(c, a.logger, a.tokens, triggerCh) + + // create the state synchronization manager which performs + // regular and on-demand state synchronizations (anti-entropy). + a.sync = &ae.StateSyncer{ + State: a.state, + Interval: c.AEInterval, + ShutdownCh: a.shutdownCh, + ConsulCh: consulCh, + TriggerCh: triggerCh, + Logger: a.logger, + } // create the config for the rpc server/client consulCfg, err := a.consulConfig() @@ -250,8 +274,16 @@ func (a *Agent) Start() error { return err } - // link consul client/server with the state - consulCfg.ServerUp = a.state.ConsulServerUp + // ServerUp is used to inform that a new consul server is now + // up. This can be used to speed up the sync process if we are blocking + // waiting to discover a consul server + // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer + consulCfg.ServerUp = func() { + select { + case consulCh <- struct{}{}: + default: + } + } // Setup either the client or the server. if c.ServerMode { @@ -262,6 +294,7 @@ func (a *Agent) Start() error { a.delegate = server a.state.delegate = server + a.sync.ClusterSize = func() int { return len(server.LANMembers()) } } else { client, err := consul.NewClientLogger(consulCfg, a.logger) if err != nil { @@ -270,6 +303,7 @@ func (a *Agent) Start() error { a.delegate = client a.state.delegate = client + a.sync.ClusterSize = func() int { return len(client.LANMembers()) } } // Load checks/services/metadata. @@ -1264,18 +1298,18 @@ func (a *Agent) WANMembers() []serf.Member { // StartSync is called once Services and Checks are registered. // This is called to prevent a race between clients and the anti-entropy routines func (a *Agent) StartSync() { - // Start the anti entropy routine - go a.state.antiEntropy(a.shutdownCh) + go a.sync.Run() + a.logger.Printf("[INFO] agent: starting state syncer") } // PauseSync is used to pause anti-entropy while bulk changes are make func (a *Agent) PauseSync() { - a.state.Pause() + a.sync.Pause() } // ResumeSync is used to unpause anti-entropy after bulk changes are make func (a *Agent) ResumeSync() { - a.state.Resume() + a.sync.Resume() } // GetLANCoordinate returns the coordinates of this node in the local pools diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 271f53da6..13dcfb062 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -304,7 +304,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request // services and checks to the server. If the operation fails, we only // only warn because the write did succeed and anti-entropy will sync later. func (s *HTTPServer) syncChanges() { - if err := s.agent.state.syncChanges(); err != nil { + if err := s.agent.state.SyncChanges(); err != nil { s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err) } } diff --git a/agent/local.go b/agent/local.go index 2a29a9c6c..58da5f7f3 100644 --- a/agent/local.go +++ b/agent/local.go @@ -18,11 +18,6 @@ import ( "github.com/hashicorp/consul/types" ) -const ( - syncStaggerIntv = 3 * time.Second - syncRetryIntv = 15 * time.Second -) - // syncStatus is used to represent the difference between // the local and remote state, and if action needs to be taken type syncStatus struct { @@ -33,7 +28,6 @@ type syncStatus struct { // populated during NewLocalAgent from the agent configuration to avoid // race conditions with the agent configuration. type localStateConfig struct { - AEInterval time.Duration AdvertiseAddr string CheckUpdateInterval time.Duration Datacenter string @@ -47,10 +41,6 @@ type localStateConfig struct { // and checks. We used it to perform anti-entropy with the // catalog representation type localState struct { - // paused is used to check if we are paused. Must be the first - // element due to a go bug. - paused int32 - sync.RWMutex logger *log.Logger @@ -81,10 +71,6 @@ type localState struct { // metadata tracks the local metadata fields metadata map[string]string - // consulCh is used to inform of a change to the known - // consul nodes. This may be used to retry a sync run - consulCh chan struct{} - // triggerCh is used to inform of a change to local state // that requires anti-entropy with the server triggerCh chan struct{} @@ -95,9 +81,8 @@ type localState struct { } // NewLocalState creates a is used to initialize the local state -func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store) *localState { +func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *localState { lc := localStateConfig{ - AEInterval: c.AEInterval, AdvertiseAddr: c.AdvertiseAddrLAN.String(), CheckUpdateInterval: c.CheckUpdateInterval, Datacenter: c.Datacenter, @@ -122,8 +107,7 @@ func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store) checkCriticalTime: make(map[types.CheckID]time.Time), deferCheck: make(map[types.CheckID]*time.Timer), metadata: make(map[string]string), - consulCh: make(chan struct{}, 1), - triggerCh: make(chan struct{}, 1), + triggerCh: triggerCh, } l.discardCheckOutput.Store(c.DiscardCheckOutput) return l @@ -131,42 +115,13 @@ func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store) // changeMade is used to trigger an anti-entropy run func (l *localState) changeMade() { + // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer select { case l.triggerCh <- struct{}{}: default: } } -// ConsulServerUp is used to inform that a new consul server is now -// up. This can be used to speed up the sync process if we are blocking -// waiting to discover a consul server -func (l *localState) ConsulServerUp() { - select { - case l.consulCh <- struct{}{}: - default: - } -} - -// Pause is used to pause state synchronization, this can be -// used to make batch changes -func (l *localState) Pause() { - atomic.AddInt32(&l.paused, 1) -} - -// Resume is used to resume state synchronization -func (l *localState) Resume() { - paused := atomic.AddInt32(&l.paused, -1) - if paused < 0 { - panic("unbalanced localState.Resume() detected") - } - l.changeMade() -} - -// isPaused is used to check if we are paused -func (l *localState) isPaused() bool { - return atomic.LoadInt32(&l.paused) > 0 -} - func (l *localState) SetDiscardCheckOutput(b bool) { l.discardCheckOutput.Store(b) } @@ -412,61 +367,12 @@ func (l *localState) Metadata() map[string]string { return metadata } -// antiEntropy is a long running method used to perform anti-entropy -// between local and remote state. -func (l *localState) antiEntropy(shutdownCh chan struct{}) { -SYNC: - // Sync our state with the servers - for { - err := l.setSyncState() - if err == nil { - break - } - l.logger.Printf("[ERR] agent: failed to sync remote state: %v", err) - select { - case <-l.consulCh: - // Stagger the retry on leader election, avoid a thundering heard - select { - case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.delegate.LANMembers())))): - case <-shutdownCh: - return - } - case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.delegate.LANMembers())))): - case <-shutdownCh: - return - } +// UpdateSyncState does a read of the server state, and updates +// the local sync status as appropriate +func (l *localState) UpdateSyncState() error { + if l == nil { + panic("config == nil") } - - // Force-trigger AE to pickup any changes - l.changeMade() - - // Schedule the next full sync, with a random stagger - aeIntv := aeScale(l.config.AEInterval, len(l.delegate.LANMembers())) - aeIntv = aeIntv + lib.RandomStagger(aeIntv) - aeTimer := time.After(aeIntv) - - // Wait for sync events - for { - select { - case <-aeTimer: - goto SYNC - case <-l.triggerCh: - // Skip the sync if we are paused - if l.isPaused() { - continue - } - if err := l.syncChanges(); err != nil { - l.logger.Printf("[ERR] agent: failed to sync changes: %v", err) - } - case <-shutdownCh: - return - } - } -} - -// setSyncState does a read of the server state, and updates -// the local syncStatus as appropriate -func (l *localState) setSyncState() error { req := structs.NodeSpecificRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, @@ -590,9 +496,9 @@ func (l *localState) setSyncState() error { return nil } -// syncChanges is used to scan the status our local services and checks +// SyncChanges is used to scan the status our local services and checks // and update any that are out of sync with the server -func (l *localState) syncChanges() error { +func (l *localState) SyncChanges() error { l.Lock() defer l.Unlock() diff --git a/agent/local_test.go b/agent/local_test.go index 2d0d2fb02..baad74a1e 100644 --- a/agent/local_test.go +++ b/agent/local_test.go @@ -1482,7 +1482,7 @@ func TestAgent_serviceTokens(t *testing.T) { tokens := new(token.Store) tokens.UpdateUserToken("default") - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) l.AddService(&structs.NodeService{ ID: "redis", @@ -1511,7 +1511,7 @@ func TestAgent_checkTokens(t *testing.T) { tokens := new(token.Store) tokens.UpdateUserToken("default") - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) // Returns default when no token is set if token := l.CheckToken("mem"); token != "default" { @@ -1533,7 +1533,7 @@ func TestAgent_checkTokens(t *testing.T) { func TestAgent_checkCriticalTime(t *testing.T) { t.Parallel() - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store)) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} l.AddService(svc, "") @@ -1595,7 +1595,7 @@ func TestAgent_checkCriticalTime(t *testing.T) { func TestAgent_AddCheckFailure(t *testing.T) { t.Parallel() - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store)) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) // Add a check for a service that does not exist and verify that it fails checkID := types.CheckID("redis:1") @@ -1613,38 +1613,6 @@ func TestAgent_AddCheckFailure(t *testing.T) { } -func TestAgent_nestedPauseResume(t *testing.T) { - t.Parallel() - l := new(localState) - if l.isPaused() != false { - t.Fatal("localState should be unPaused after init") - } - l.Pause() - if l.isPaused() != true { - t.Fatal("localState should be Paused after first call to Pause()") - } - l.Pause() - if l.isPaused() != true { - t.Fatal("localState should STILL be Paused after second call to Pause()") - } - l.Resume() - if l.isPaused() != true { - t.Fatal("localState should STILL be Paused after FIRST call to Resume()") - } - l.Resume() - if l.isPaused() != false { - t.Fatal("localState should NOT be Paused after SECOND call to Resume()") - } - - defer func() { - err := recover() - if err == nil { - t.Fatal("unbalanced Resume() should cause a panic()") - } - }() - l.Resume() -} - func TestAgent_sendCoordinate(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), ` diff --git a/agent/util.go b/agent/util.go index 3a13c2172..ae7c3c154 100644 --- a/agent/util.go +++ b/agent/util.go @@ -4,28 +4,16 @@ import ( "bytes" "crypto/md5" "fmt" - "math" "os" "os/exec" "os/signal" osuser "os/user" "strconv" - "time" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-msgpack/codec" ) -const ( - // This scale factor means we will add a minute after we cross 128 nodes, - // another at 256, another at 512, etc. By 8192 nodes, we will scale up - // by a factor of 8. - // - // If you update this, you may need to adjust the tuning of - // CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. - aeScaleThreshold = 128 -) - // msgpackHandle is a shared handle for encoding/decoding of // messages var msgpackHandle = &codec.MsgpackHandle{ @@ -33,18 +21,6 @@ var msgpackHandle = &codec.MsgpackHandle{ WriteExt: true, } -// aeScale is used to scale the time interval at which anti-entropy updates take -// place. It is used to prevent saturation as the cluster size grows. -func aeScale(interval time.Duration, n int) time.Duration { - // Don't scale until we cross the threshold - if n <= aeScaleThreshold { - return interval - } - - multiplier := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0 - return time.Duration(multiplier) * interval -} - // decodeMsgPack is used to decode a MsgPack encoded object func decodeMsgPack(buf []byte, out interface{}) error { return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) diff --git a/agent/util_test.go b/agent/util_test.go index ea12c708d..b10f274c0 100644 --- a/agent/util_test.go +++ b/agent/util_test.go @@ -4,28 +4,10 @@ import ( "os" "runtime" "testing" - "time" "github.com/hashicorp/consul/testutil" ) -func TestAEScale(t *testing.T) { - t.Parallel() - intv := time.Minute - if v := aeScale(intv, 100); v != intv { - t.Fatalf("Bad: %v", v) - } - if v := aeScale(intv, 200); v != 2*intv { - t.Fatalf("Bad: %v", v) - } - if v := aeScale(intv, 1000); v != 4*intv { - t.Fatalf("Bad: %v", v) - } - if v := aeScale(intv, 10000); v != 8*intv { - t.Fatalf("Bad: %v", v) - } -} - func TestStringHash(t *testing.T) { t.Parallel() in := "hello world"