agent: decouple anti-entropy from local state

The anti-entropy code manages background synchronizations of the local
state on a regular basis or on demand when either the state has changed
or a new consul server has been added.

This patch moves the anti-entropy code into its own package and
decouples it from the local state code since they are performing
two different functions.

To simplify code-review this revision does not make any optimizations,
renames or refactorings. This will happen in subsequent commits.
This commit is contained in:
Frank Schroeder 2017-08-28 14:17:09 +02:00 committed by Frank Schröder
parent 7a002aa047
commit 034ee43cef
8 changed files with 257 additions and 190 deletions

146
agent/ae/ae.go Normal file
View File

@ -0,0 +1,146 @@
// Package ae provides an anti-entropy mechanism for the local state.
package ae
import (
"log"
"math"
"sync/atomic"
"time"
"github.com/hashicorp/consul/lib"
)
const (
// This scale factor means we will add a minute after we cross 128 nodes,
// another at 256, another at 512, etc. By 8192 nodes, we will scale up
// by a factor of 8.
//
// If you update this, you may need to adjust the tuning of
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
aeScaleThreshold = 128
syncStaggerIntv = 3 * time.Second
syncRetryIntv = 15 * time.Second
)
// aeScale is used to scale the time interval at which anti-entropy updates take
// place. It is used to prevent saturation as the cluster size grows.
func aeScale(d time.Duration, n int) time.Duration {
// Don't scale until we cross the threshold
if n <= aeScaleThreshold {
return d
}
mult := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0
return time.Duration(mult) * d
}
type StateSyncer struct {
// paused is used to check if we are paused. Must be the first
// element due to a go bug.
// todo(fs): which bug? still relevant?
paused int32
// State contains the data that needs to be synchronized.
State interface {
UpdateSyncState() error
SyncChanges() error
}
// Interval is the time between two sync runs.
Interval time.Duration
// ClusterSize returns the number of members in the cluster.
// todo(fs): we use this for staggering but what about a random number?
ClusterSize func() int
// ShutdownCh is closed when the application is shutting down.
ShutdownCh chan struct{}
// ConsulCh contains data when a new consul server has been added to the cluster.
ConsulCh chan struct{}
// TriggerCh contains data when a sync should run immediately.
TriggerCh chan struct{}
Logger *log.Logger
}
// Pause is used to pause state synchronization, this can be
// used to make batch changes
func (ae *StateSyncer) Pause() {
atomic.AddInt32(&ae.paused, 1)
}
// Resume is used to resume state synchronization
func (ae *StateSyncer) Resume() {
paused := atomic.AddInt32(&ae.paused, -1)
if paused < 0 {
panic("unbalanced State.Resume() detected")
}
ae.changeMade()
}
// Paused is used to check if we are paused
func (ae *StateSyncer) Paused() bool {
return atomic.LoadInt32(&ae.paused) > 0
}
func (ae *StateSyncer) changeMade() {
select {
case ae.TriggerCh <- struct{}{}:
default:
}
}
// antiEntropy is a long running method used to perform anti-entropy
// between local and remote state.
func (ae *StateSyncer) Run() {
SYNC:
// Sync our state with the servers
for {
err := ae.State.UpdateSyncState()
if err == nil {
break
}
ae.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
select {
case <-ae.ConsulCh:
// Stagger the retry on leader election, avoid a thundering heard
select {
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, ae.ClusterSize()))):
case <-ae.ShutdownCh:
return
}
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, ae.ClusterSize()))):
case <-ae.ShutdownCh:
return
}
}
// Force-trigger AE to pickup any changes
ae.changeMade()
// Schedule the next full sync, with a random stagger
aeIntv := aeScale(ae.Interval, ae.ClusterSize())
aeIntv = aeIntv + lib.RandomStagger(aeIntv)
aeTimer := time.After(aeIntv)
// Wait for sync events
for {
select {
case <-aeTimer:
goto SYNC
case <-ae.TriggerCh:
// Skip the sync if we are paused
if ae.Paused() {
continue
}
if err := ae.State.SyncChanges(); err != nil {
ae.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
case <-ae.ShutdownCh:
return
}
}
}

55
agent/ae/ae_test.go Normal file
View File

@ -0,0 +1,55 @@
package ae
import (
"testing"
"time"
)
func TestAE_scale(t *testing.T) {
t.Parallel()
intv := time.Minute
if v := aeScale(intv, 100); v != intv {
t.Fatalf("Bad: %v", v)
}
if v := aeScale(intv, 200); v != 2*intv {
t.Fatalf("Bad: %v", v)
}
if v := aeScale(intv, 1000); v != 4*intv {
t.Fatalf("Bad: %v", v)
}
if v := aeScale(intv, 10000); v != 8*intv {
t.Fatalf("Bad: %v", v)
}
}
func TestAE_nestedPauseResume(t *testing.T) {
t.Parallel()
l := new(StateSyncer)
if l.Paused() != false {
t.Fatal("syncer should be unPaused after init")
}
l.Pause()
if l.Paused() != true {
t.Fatal("syncer should be Paused after first call to Pause()")
}
l.Pause()
if l.Paused() != true {
t.Fatal("syncer should STILL be Paused after second call to Pause()")
}
l.Resume()
if l.Paused() != true {
t.Fatal("syncer should STILL be Paused after FIRST call to Resume()")
}
l.Resume()
if l.Paused() != false {
t.Fatal("syncer should NOT be Paused after SECOND call to Resume()")
}
defer func() {
err := recover()
if err == nil {
t.Fatal("unbalanced Resume() should cause a panic()")
}
}()
l.Resume()
}

View File

@ -20,6 +20,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/ae"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
@ -109,6 +110,10 @@ type Agent struct {
// services and checks. Used for anti-entropy.
state *localState
// sync manages the synchronization of the local
// and the remote state.
sync *ae.StateSyncer
// checkReapAfter maps the check ID to a timeout after which we should
// reap its associated service
checkReapAfter map[types.CheckID]time.Duration
@ -241,8 +246,27 @@ func (a *Agent) Start() error {
return fmt.Errorf("Failed to setup node ID: %v", err)
}
// create a notif channel to trigger state sychronizations
// when a consul server was added to the cluster.
consulCh := make(chan struct{}, 1)
// create a notif channel to trigger state synchronizations
// when the state has changed.
triggerCh := make(chan struct{}, 1)
// create the local state
a.state = NewLocalState(c, a.logger, a.tokens)
a.state = NewLocalState(c, a.logger, a.tokens, triggerCh)
// create the state synchronization manager which performs
// regular and on-demand state synchronizations (anti-entropy).
a.sync = &ae.StateSyncer{
State: a.state,
Interval: c.AEInterval,
ShutdownCh: a.shutdownCh,
ConsulCh: consulCh,
TriggerCh: triggerCh,
Logger: a.logger,
}
// create the config for the rpc server/client
consulCfg, err := a.consulConfig()
@ -250,8 +274,16 @@ func (a *Agent) Start() error {
return err
}
// link consul client/server with the state
consulCfg.ServerUp = a.state.ConsulServerUp
// ServerUp is used to inform that a new consul server is now
// up. This can be used to speed up the sync process if we are blocking
// waiting to discover a consul server
// todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer
consulCfg.ServerUp = func() {
select {
case consulCh <- struct{}{}:
default:
}
}
// Setup either the client or the server.
if c.ServerMode {
@ -262,6 +294,7 @@ func (a *Agent) Start() error {
a.delegate = server
a.state.delegate = server
a.sync.ClusterSize = func() int { return len(server.LANMembers()) }
} else {
client, err := consul.NewClientLogger(consulCfg, a.logger)
if err != nil {
@ -270,6 +303,7 @@ func (a *Agent) Start() error {
a.delegate = client
a.state.delegate = client
a.sync.ClusterSize = func() int { return len(client.LANMembers()) }
}
// Load checks/services/metadata.
@ -1264,18 +1298,18 @@ func (a *Agent) WANMembers() []serf.Member {
// StartSync is called once Services and Checks are registered.
// This is called to prevent a race between clients and the anti-entropy routines
func (a *Agent) StartSync() {
// Start the anti entropy routine
go a.state.antiEntropy(a.shutdownCh)
go a.sync.Run()
a.logger.Printf("[INFO] agent: starting state syncer")
}
// PauseSync is used to pause anti-entropy while bulk changes are make
func (a *Agent) PauseSync() {
a.state.Pause()
a.sync.Pause()
}
// ResumeSync is used to unpause anti-entropy after bulk changes are make
func (a *Agent) ResumeSync() {
a.state.Resume()
a.sync.Resume()
}
// GetLANCoordinate returns the coordinates of this node in the local pools

View File

@ -304,7 +304,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request
// services and checks to the server. If the operation fails, we only
// only warn because the write did succeed and anti-entropy will sync later.
func (s *HTTPServer) syncChanges() {
if err := s.agent.state.syncChanges(); err != nil {
if err := s.agent.state.SyncChanges(); err != nil {
s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
}

View File

@ -18,11 +18,6 @@ import (
"github.com/hashicorp/consul/types"
)
const (
syncStaggerIntv = 3 * time.Second
syncRetryIntv = 15 * time.Second
)
// syncStatus is used to represent the difference between
// the local and remote state, and if action needs to be taken
type syncStatus struct {
@ -33,7 +28,6 @@ type syncStatus struct {
// populated during NewLocalAgent from the agent configuration to avoid
// race conditions with the agent configuration.
type localStateConfig struct {
AEInterval time.Duration
AdvertiseAddr string
CheckUpdateInterval time.Duration
Datacenter string
@ -47,10 +41,6 @@ type localStateConfig struct {
// and checks. We used it to perform anti-entropy with the
// catalog representation
type localState struct {
// paused is used to check if we are paused. Must be the first
// element due to a go bug.
paused int32
sync.RWMutex
logger *log.Logger
@ -81,10 +71,6 @@ type localState struct {
// metadata tracks the local metadata fields
metadata map[string]string
// consulCh is used to inform of a change to the known
// consul nodes. This may be used to retry a sync run
consulCh chan struct{}
// triggerCh is used to inform of a change to local state
// that requires anti-entropy with the server
triggerCh chan struct{}
@ -95,9 +81,8 @@ type localState struct {
}
// NewLocalState creates a is used to initialize the local state
func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store) *localState {
func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *localState {
lc := localStateConfig{
AEInterval: c.AEInterval,
AdvertiseAddr: c.AdvertiseAddrLAN.String(),
CheckUpdateInterval: c.CheckUpdateInterval,
Datacenter: c.Datacenter,
@ -122,8 +107,7 @@ func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store)
checkCriticalTime: make(map[types.CheckID]time.Time),
deferCheck: make(map[types.CheckID]*time.Timer),
metadata: make(map[string]string),
consulCh: make(chan struct{}, 1),
triggerCh: make(chan struct{}, 1),
triggerCh: triggerCh,
}
l.discardCheckOutput.Store(c.DiscardCheckOutput)
return l
@ -131,42 +115,13 @@ func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store)
// changeMade is used to trigger an anti-entropy run
func (l *localState) changeMade() {
// todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer
select {
case l.triggerCh <- struct{}{}:
default:
}
}
// ConsulServerUp is used to inform that a new consul server is now
// up. This can be used to speed up the sync process if we are blocking
// waiting to discover a consul server
func (l *localState) ConsulServerUp() {
select {
case l.consulCh <- struct{}{}:
default:
}
}
// Pause is used to pause state synchronization, this can be
// used to make batch changes
func (l *localState) Pause() {
atomic.AddInt32(&l.paused, 1)
}
// Resume is used to resume state synchronization
func (l *localState) Resume() {
paused := atomic.AddInt32(&l.paused, -1)
if paused < 0 {
panic("unbalanced localState.Resume() detected")
}
l.changeMade()
}
// isPaused is used to check if we are paused
func (l *localState) isPaused() bool {
return atomic.LoadInt32(&l.paused) > 0
}
func (l *localState) SetDiscardCheckOutput(b bool) {
l.discardCheckOutput.Store(b)
}
@ -412,61 +367,12 @@ func (l *localState) Metadata() map[string]string {
return metadata
}
// antiEntropy is a long running method used to perform anti-entropy
// between local and remote state.
func (l *localState) antiEntropy(shutdownCh chan struct{}) {
SYNC:
// Sync our state with the servers
for {
err := l.setSyncState()
if err == nil {
break
// UpdateSyncState does a read of the server state, and updates
// the local sync status as appropriate
func (l *localState) UpdateSyncState() error {
if l == nil {
panic("config == nil")
}
l.logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
select {
case <-l.consulCh:
// Stagger the retry on leader election, avoid a thundering heard
select {
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.delegate.LANMembers())))):
case <-shutdownCh:
return
}
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.delegate.LANMembers())))):
case <-shutdownCh:
return
}
}
// Force-trigger AE to pickup any changes
l.changeMade()
// Schedule the next full sync, with a random stagger
aeIntv := aeScale(l.config.AEInterval, len(l.delegate.LANMembers()))
aeIntv = aeIntv + lib.RandomStagger(aeIntv)
aeTimer := time.After(aeIntv)
// Wait for sync events
for {
select {
case <-aeTimer:
goto SYNC
case <-l.triggerCh:
// Skip the sync if we are paused
if l.isPaused() {
continue
}
if err := l.syncChanges(); err != nil {
l.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
case <-shutdownCh:
return
}
}
}
// setSyncState does a read of the server state, and updates
// the local syncStatus as appropriate
func (l *localState) setSyncState() error {
req := structs.NodeSpecificRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
@ -590,9 +496,9 @@ func (l *localState) setSyncState() error {
return nil
}
// syncChanges is used to scan the status our local services and checks
// SyncChanges is used to scan the status our local services and checks
// and update any that are out of sync with the server
func (l *localState) syncChanges() error {
func (l *localState) SyncChanges() error {
l.Lock()
defer l.Unlock()

View File

@ -1482,7 +1482,7 @@ func TestAgent_serviceTokens(t *testing.T) {
tokens := new(token.Store)
tokens.UpdateUserToken("default")
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens)
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
l.AddService(&structs.NodeService{
ID: "redis",
@ -1511,7 +1511,7 @@ func TestAgent_checkTokens(t *testing.T) {
tokens := new(token.Store)
tokens.UpdateUserToken("default")
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens)
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
// Returns default when no token is set
if token := l.CheckToken("mem"); token != "default" {
@ -1533,7 +1533,7 @@ func TestAgent_checkTokens(t *testing.T) {
func TestAgent_checkCriticalTime(t *testing.T) {
t.Parallel()
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store))
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000}
l.AddService(svc, "")
@ -1595,7 +1595,7 @@ func TestAgent_checkCriticalTime(t *testing.T) {
func TestAgent_AddCheckFailure(t *testing.T) {
t.Parallel()
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store))
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
// Add a check for a service that does not exist and verify that it fails
checkID := types.CheckID("redis:1")
@ -1613,38 +1613,6 @@ func TestAgent_AddCheckFailure(t *testing.T) {
}
func TestAgent_nestedPauseResume(t *testing.T) {
t.Parallel()
l := new(localState)
if l.isPaused() != false {
t.Fatal("localState should be unPaused after init")
}
l.Pause()
if l.isPaused() != true {
t.Fatal("localState should be Paused after first call to Pause()")
}
l.Pause()
if l.isPaused() != true {
t.Fatal("localState should STILL be Paused after second call to Pause()")
}
l.Resume()
if l.isPaused() != true {
t.Fatal("localState should STILL be Paused after FIRST call to Resume()")
}
l.Resume()
if l.isPaused() != false {
t.Fatal("localState should NOT be Paused after SECOND call to Resume()")
}
defer func() {
err := recover()
if err == nil {
t.Fatal("unbalanced Resume() should cause a panic()")
}
}()
l.Resume()
}
func TestAgent_sendCoordinate(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), `

View File

@ -4,28 +4,16 @@ import (
"bytes"
"crypto/md5"
"fmt"
"math"
"os"
"os/exec"
"os/signal"
osuser "os/user"
"strconv"
"time"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-msgpack/codec"
)
const (
// This scale factor means we will add a minute after we cross 128 nodes,
// another at 256, another at 512, etc. By 8192 nodes, we will scale up
// by a factor of 8.
//
// If you update this, you may need to adjust the tuning of
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
aeScaleThreshold = 128
)
// msgpackHandle is a shared handle for encoding/decoding of
// messages
var msgpackHandle = &codec.MsgpackHandle{
@ -33,18 +21,6 @@ var msgpackHandle = &codec.MsgpackHandle{
WriteExt: true,
}
// aeScale is used to scale the time interval at which anti-entropy updates take
// place. It is used to prevent saturation as the cluster size grows.
func aeScale(interval time.Duration, n int) time.Duration {
// Don't scale until we cross the threshold
if n <= aeScaleThreshold {
return interval
}
multiplier := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0
return time.Duration(multiplier) * interval
}
// decodeMsgPack is used to decode a MsgPack encoded object
func decodeMsgPack(buf []byte, out interface{}) error {
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)

View File

@ -4,28 +4,10 @@ import (
"os"
"runtime"
"testing"
"time"
"github.com/hashicorp/consul/testutil"
)
func TestAEScale(t *testing.T) {
t.Parallel()
intv := time.Minute
if v := aeScale(intv, 100); v != intv {
t.Fatalf("Bad: %v", v)
}
if v := aeScale(intv, 200); v != 2*intv {
t.Fatalf("Bad: %v", v)
}
if v := aeScale(intv, 1000); v != 4*intv {
t.Fatalf("Bad: %v", v)
}
if v := aeScale(intv, 10000); v != 8*intv {
t.Fatalf("Bad: %v", v)
}
}
func TestStringHash(t *testing.T) {
t.Parallel()
in := "hello world"