diff --git a/agent/ae/ae.go b/agent/ae/ae.go deleted file mode 100644 index d5f56c37a..000000000 --- a/agent/ae/ae.go +++ /dev/null @@ -1,146 +0,0 @@ -// Package ae provides an anti-entropy mechanism for the local state. -package ae - -import ( - "log" - "math" - "sync/atomic" - "time" - - "github.com/hashicorp/consul/lib" -) - -const ( - // This scale factor means we will add a minute after we cross 128 nodes, - // another at 256, another at 512, etc. By 8192 nodes, we will scale up - // by a factor of 8. - // - // If you update this, you may need to adjust the tuning of - // CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. - aeScaleThreshold = 128 - - syncStaggerIntv = 3 * time.Second - syncRetryIntv = 15 * time.Second -) - -// aeScale is used to scale the time interval at which anti-entropy updates take -// place. It is used to prevent saturation as the cluster size grows. -func aeScale(d time.Duration, n int) time.Duration { - // Don't scale until we cross the threshold - if n <= aeScaleThreshold { - return d - } - - mult := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0 - return time.Duration(mult) * d -} - -type StateSyncer struct { - // paused is used to check if we are paused. Must be the first - // element due to a go bug. - // todo(fs): which bug? still relevant? - paused int32 - - // State contains the data that needs to be synchronized. - State interface { - UpdateSyncState() error - SyncChanges() error - } - - // Interval is the time between two sync runs. - Interval time.Duration - - // ClusterSize returns the number of members in the cluster. - // todo(fs): we use this for staggering but what about a random number? - ClusterSize func() int - - // ShutdownCh is closed when the application is shutting down. - ShutdownCh chan struct{} - - // ConsulCh contains data when a new consul server has been added to the cluster. - ConsulCh chan struct{} - - // TriggerCh contains data when a sync should run immediately. - TriggerCh chan struct{} - - Logger *log.Logger -} - -// Pause is used to pause state synchronization, this can be -// used to make batch changes -func (ae *StateSyncer) Pause() { - atomic.AddInt32(&ae.paused, 1) -} - -// Resume is used to resume state synchronization -func (ae *StateSyncer) Resume() { - paused := atomic.AddInt32(&ae.paused, -1) - if paused < 0 { - panic("unbalanced State.Resume() detected") - } - ae.changeMade() -} - -// Paused is used to check if we are paused -func (ae *StateSyncer) Paused() bool { - return atomic.LoadInt32(&ae.paused) > 0 -} - -func (ae *StateSyncer) changeMade() { - select { - case ae.TriggerCh <- struct{}{}: - default: - } -} - -// antiEntropy is a long running method used to perform anti-entropy -// between local and remote state. -func (ae *StateSyncer) Run() { -SYNC: - // Sync our state with the servers - for { - err := ae.State.UpdateSyncState() - if err == nil { - break - } - ae.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) - select { - case <-ae.ConsulCh: - // Stagger the retry on leader election, avoid a thundering heard - select { - case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, ae.ClusterSize()))): - case <-ae.ShutdownCh: - return - } - case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, ae.ClusterSize()))): - case <-ae.ShutdownCh: - return - } - } - - // Force-trigger AE to pickup any changes - ae.changeMade() - - // Schedule the next full sync, with a random stagger - aeIntv := aeScale(ae.Interval, ae.ClusterSize()) - aeIntv = aeIntv + lib.RandomStagger(aeIntv) - aeTimer := time.After(aeIntv) - - // Wait for sync events - for { - select { - case <-aeTimer: - goto SYNC - case <-ae.TriggerCh: - // Skip the sync if we are paused - if ae.Paused() { - continue - } - if err := ae.State.SyncChanges(); err != nil { - ae.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) - } - case <-ae.ShutdownCh: - return - } - } -} diff --git a/agent/ae/ae_test.go b/agent/ae/ae_test.go deleted file mode 100644 index 5d91f2f9b..000000000 --- a/agent/ae/ae_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package ae - -import ( - "testing" - "time" -) - -func TestAE_scale(t *testing.T) { - t.Parallel() - intv := time.Minute - if v := aeScale(intv, 100); v != intv { - t.Fatalf("Bad: %v", v) - } - if v := aeScale(intv, 200); v != 2*intv { - t.Fatalf("Bad: %v", v) - } - if v := aeScale(intv, 1000); v != 4*intv { - t.Fatalf("Bad: %v", v) - } - if v := aeScale(intv, 10000); v != 8*intv { - t.Fatalf("Bad: %v", v) - } -} - -func TestAE_nestedPauseResume(t *testing.T) { - t.Parallel() - l := new(StateSyncer) - if l.Paused() != false { - t.Fatal("syncer should be unPaused after init") - } - l.Pause() - if l.Paused() != true { - t.Fatal("syncer should be Paused after first call to Pause()") - } - l.Pause() - if l.Paused() != true { - t.Fatal("syncer should STILL be Paused after second call to Pause()") - } - l.Resume() - if l.Paused() != true { - t.Fatal("syncer should STILL be Paused after FIRST call to Resume()") - } - l.Resume() - if l.Paused() != false { - t.Fatal("syncer should NOT be Paused after SECOND call to Resume()") - } - - defer func() { - err := recover() - if err == nil { - t.Fatal("unbalanced Resume() should cause a panic()") - } - }() - l.Resume() -} diff --git a/agent/agent.go b/agent/agent.go index 1238359f3..c2abbb76d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -20,7 +20,6 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/ae" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" @@ -110,10 +109,6 @@ type Agent struct { // services and checks. Used for anti-entropy. state *localState - // sync manages the synchronization of the local - // and the remote state. - sync *ae.StateSyncer - // checkReapAfter maps the check ID to a timeout after which we should // reap its associated service checkReapAfter map[types.CheckID]time.Duration @@ -246,27 +241,8 @@ func (a *Agent) Start() error { return fmt.Errorf("Failed to setup node ID: %v", err) } - // create a notif channel to trigger state sychronizations - // when a consul server was added to the cluster. - consulCh := make(chan struct{}, 1) - - // create a notif channel to trigger state synchronizations - // when the state has changed. - triggerCh := make(chan struct{}, 1) - // create the local state - a.state = NewLocalState(c, a.logger, a.tokens, triggerCh) - - // create the state synchronization manager which performs - // regular and on-demand state synchronizations (anti-entropy). - a.sync = &ae.StateSyncer{ - State: a.state, - Interval: c.AEInterval, - ShutdownCh: a.shutdownCh, - ConsulCh: consulCh, - TriggerCh: triggerCh, - Logger: a.logger, - } + a.state = NewLocalState(c, a.logger, a.tokens) // create the config for the rpc server/client consulCfg, err := a.consulConfig() @@ -274,16 +250,8 @@ func (a *Agent) Start() error { return err } - // ServerUp is used to inform that a new consul server is now - // up. This can be used to speed up the sync process if we are blocking - // waiting to discover a consul server - // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer - consulCfg.ServerUp = func() { - select { - case consulCh <- struct{}{}: - default: - } - } + // link consul client/server with the state + consulCfg.ServerUp = a.state.ConsulServerUp // Setup either the client or the server. if c.ServerMode { @@ -294,7 +262,6 @@ func (a *Agent) Start() error { a.delegate = server a.state.delegate = server - a.sync.ClusterSize = func() int { return len(server.LANMembers()) } } else { client, err := consul.NewClientLogger(consulCfg, a.logger) if err != nil { @@ -303,7 +270,6 @@ func (a *Agent) Start() error { a.delegate = client a.state.delegate = client - a.sync.ClusterSize = func() int { return len(client.LANMembers()) } } // Load checks/services/metadata. @@ -1298,18 +1264,18 @@ func (a *Agent) WANMembers() []serf.Member { // StartSync is called once Services and Checks are registered. // This is called to prevent a race between clients and the anti-entropy routines func (a *Agent) StartSync() { - go a.sync.Run() - a.logger.Printf("[INFO] agent: starting state syncer") + // Start the anti entropy routine + go a.state.antiEntropy(a.shutdownCh) } // PauseSync is used to pause anti-entropy while bulk changes are make func (a *Agent) PauseSync() { - a.sync.Pause() + a.state.Pause() } // ResumeSync is used to unpause anti-entropy after bulk changes are make func (a *Agent) ResumeSync() { - a.sync.Resume() + a.state.Resume() } // GetLANCoordinate returns the coordinates of this node in the local pools diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 13dcfb062..271f53da6 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -304,7 +304,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request // services and checks to the server. If the operation fails, we only // only warn because the write did succeed and anti-entropy will sync later. func (s *HTTPServer) syncChanges() { - if err := s.agent.state.SyncChanges(); err != nil { + if err := s.agent.state.syncChanges(); err != nil { s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err) } } diff --git a/agent/local.go b/agent/local.go index 58da5f7f3..2a29a9c6c 100644 --- a/agent/local.go +++ b/agent/local.go @@ -18,6 +18,11 @@ import ( "github.com/hashicorp/consul/types" ) +const ( + syncStaggerIntv = 3 * time.Second + syncRetryIntv = 15 * time.Second +) + // syncStatus is used to represent the difference between // the local and remote state, and if action needs to be taken type syncStatus struct { @@ -28,6 +33,7 @@ type syncStatus struct { // populated during NewLocalAgent from the agent configuration to avoid // race conditions with the agent configuration. type localStateConfig struct { + AEInterval time.Duration AdvertiseAddr string CheckUpdateInterval time.Duration Datacenter string @@ -41,6 +47,10 @@ type localStateConfig struct { // and checks. We used it to perform anti-entropy with the // catalog representation type localState struct { + // paused is used to check if we are paused. Must be the first + // element due to a go bug. + paused int32 + sync.RWMutex logger *log.Logger @@ -71,6 +81,10 @@ type localState struct { // metadata tracks the local metadata fields metadata map[string]string + // consulCh is used to inform of a change to the known + // consul nodes. This may be used to retry a sync run + consulCh chan struct{} + // triggerCh is used to inform of a change to local state // that requires anti-entropy with the server triggerCh chan struct{} @@ -81,8 +95,9 @@ type localState struct { } // NewLocalState creates a is used to initialize the local state -func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *localState { +func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store) *localState { lc := localStateConfig{ + AEInterval: c.AEInterval, AdvertiseAddr: c.AdvertiseAddrLAN.String(), CheckUpdateInterval: c.CheckUpdateInterval, Datacenter: c.Datacenter, @@ -107,7 +122,8 @@ func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store, checkCriticalTime: make(map[types.CheckID]time.Time), deferCheck: make(map[types.CheckID]*time.Timer), metadata: make(map[string]string), - triggerCh: triggerCh, + consulCh: make(chan struct{}, 1), + triggerCh: make(chan struct{}, 1), } l.discardCheckOutput.Store(c.DiscardCheckOutput) return l @@ -115,13 +131,42 @@ func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store, // changeMade is used to trigger an anti-entropy run func (l *localState) changeMade() { - // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer select { case l.triggerCh <- struct{}{}: default: } } +// ConsulServerUp is used to inform that a new consul server is now +// up. This can be used to speed up the sync process if we are blocking +// waiting to discover a consul server +func (l *localState) ConsulServerUp() { + select { + case l.consulCh <- struct{}{}: + default: + } +} + +// Pause is used to pause state synchronization, this can be +// used to make batch changes +func (l *localState) Pause() { + atomic.AddInt32(&l.paused, 1) +} + +// Resume is used to resume state synchronization +func (l *localState) Resume() { + paused := atomic.AddInt32(&l.paused, -1) + if paused < 0 { + panic("unbalanced localState.Resume() detected") + } + l.changeMade() +} + +// isPaused is used to check if we are paused +func (l *localState) isPaused() bool { + return atomic.LoadInt32(&l.paused) > 0 +} + func (l *localState) SetDiscardCheckOutput(b bool) { l.discardCheckOutput.Store(b) } @@ -367,12 +412,61 @@ func (l *localState) Metadata() map[string]string { return metadata } -// UpdateSyncState does a read of the server state, and updates -// the local sync status as appropriate -func (l *localState) UpdateSyncState() error { - if l == nil { - panic("config == nil") +// antiEntropy is a long running method used to perform anti-entropy +// between local and remote state. +func (l *localState) antiEntropy(shutdownCh chan struct{}) { +SYNC: + // Sync our state with the servers + for { + err := l.setSyncState() + if err == nil { + break + } + l.logger.Printf("[ERR] agent: failed to sync remote state: %v", err) + select { + case <-l.consulCh: + // Stagger the retry on leader election, avoid a thundering heard + select { + case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.delegate.LANMembers())))): + case <-shutdownCh: + return + } + case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.delegate.LANMembers())))): + case <-shutdownCh: + return + } } + + // Force-trigger AE to pickup any changes + l.changeMade() + + // Schedule the next full sync, with a random stagger + aeIntv := aeScale(l.config.AEInterval, len(l.delegate.LANMembers())) + aeIntv = aeIntv + lib.RandomStagger(aeIntv) + aeTimer := time.After(aeIntv) + + // Wait for sync events + for { + select { + case <-aeTimer: + goto SYNC + case <-l.triggerCh: + // Skip the sync if we are paused + if l.isPaused() { + continue + } + if err := l.syncChanges(); err != nil { + l.logger.Printf("[ERR] agent: failed to sync changes: %v", err) + } + case <-shutdownCh: + return + } + } +} + +// setSyncState does a read of the server state, and updates +// the local syncStatus as appropriate +func (l *localState) setSyncState() error { req := structs.NodeSpecificRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, @@ -496,9 +590,9 @@ func (l *localState) UpdateSyncState() error { return nil } -// SyncChanges is used to scan the status our local services and checks +// syncChanges is used to scan the status our local services and checks // and update any that are out of sync with the server -func (l *localState) SyncChanges() error { +func (l *localState) syncChanges() error { l.Lock() defer l.Unlock() diff --git a/agent/local_test.go b/agent/local_test.go index baad74a1e..2d0d2fb02 100644 --- a/agent/local_test.go +++ b/agent/local_test.go @@ -1482,7 +1482,7 @@ func TestAgent_serviceTokens(t *testing.T) { tokens := new(token.Store) tokens.UpdateUserToken("default") - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens) l.AddService(&structs.NodeService{ ID: "redis", @@ -1511,7 +1511,7 @@ func TestAgent_checkTokens(t *testing.T) { tokens := new(token.Store) tokens.UpdateUserToken("default") - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1)) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens) // Returns default when no token is set if token := l.CheckToken("mem"); token != "default" { @@ -1533,7 +1533,7 @@ func TestAgent_checkTokens(t *testing.T) { func TestAgent_checkCriticalTime(t *testing.T) { t.Parallel() - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store)) svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} l.AddService(svc, "") @@ -1595,7 +1595,7 @@ func TestAgent_checkCriticalTime(t *testing.T) { func TestAgent_AddCheckFailure(t *testing.T) { t.Parallel() - l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1)) + l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store)) // Add a check for a service that does not exist and verify that it fails checkID := types.CheckID("redis:1") @@ -1613,6 +1613,38 @@ func TestAgent_AddCheckFailure(t *testing.T) { } +func TestAgent_nestedPauseResume(t *testing.T) { + t.Parallel() + l := new(localState) + if l.isPaused() != false { + t.Fatal("localState should be unPaused after init") + } + l.Pause() + if l.isPaused() != true { + t.Fatal("localState should be Paused after first call to Pause()") + } + l.Pause() + if l.isPaused() != true { + t.Fatal("localState should STILL be Paused after second call to Pause()") + } + l.Resume() + if l.isPaused() != true { + t.Fatal("localState should STILL be Paused after FIRST call to Resume()") + } + l.Resume() + if l.isPaused() != false { + t.Fatal("localState should NOT be Paused after SECOND call to Resume()") + } + + defer func() { + err := recover() + if err == nil { + t.Fatal("unbalanced Resume() should cause a panic()") + } + }() + l.Resume() +} + func TestAgent_sendCoordinate(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), ` diff --git a/agent/util.go b/agent/util.go index ae7c3c154..3a13c2172 100644 --- a/agent/util.go +++ b/agent/util.go @@ -4,16 +4,28 @@ import ( "bytes" "crypto/md5" "fmt" + "math" "os" "os/exec" "os/signal" osuser "os/user" "strconv" + "time" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-msgpack/codec" ) +const ( + // This scale factor means we will add a minute after we cross 128 nodes, + // another at 256, another at 512, etc. By 8192 nodes, we will scale up + // by a factor of 8. + // + // If you update this, you may need to adjust the tuning of + // CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. + aeScaleThreshold = 128 +) + // msgpackHandle is a shared handle for encoding/decoding of // messages var msgpackHandle = &codec.MsgpackHandle{ @@ -21,6 +33,18 @@ var msgpackHandle = &codec.MsgpackHandle{ WriteExt: true, } +// aeScale is used to scale the time interval at which anti-entropy updates take +// place. It is used to prevent saturation as the cluster size grows. +func aeScale(interval time.Duration, n int) time.Duration { + // Don't scale until we cross the threshold + if n <= aeScaleThreshold { + return interval + } + + multiplier := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0 + return time.Duration(multiplier) * interval +} + // decodeMsgPack is used to decode a MsgPack encoded object func decodeMsgPack(buf []byte, out interface{}) error { return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) diff --git a/agent/util_test.go b/agent/util_test.go index b10f274c0..ea12c708d 100644 --- a/agent/util_test.go +++ b/agent/util_test.go @@ -4,10 +4,28 @@ import ( "os" "runtime" "testing" + "time" "github.com/hashicorp/consul/testutil" ) +func TestAEScale(t *testing.T) { + t.Parallel() + intv := time.Minute + if v := aeScale(intv, 100); v != intv { + t.Fatalf("Bad: %v", v) + } + if v := aeScale(intv, 200); v != 2*intv { + t.Fatalf("Bad: %v", v) + } + if v := aeScale(intv, 1000); v != 4*intv { + t.Fatalf("Bad: %v", v) + } + if v := aeScale(intv, 10000); v != 8*intv { + t.Fatalf("Bad: %v", v) + } +} + func TestStringHash(t *testing.T) { t.Parallel() in := "hello world"