open-consul/agent/ae/ae.go
Paul Banks 979e1c9c94 Add -sidecar-for and new /agent/service/:service_id endpoint (#4691)
- A new endpoint `/v1/agent/service/:service_id` which is a generic way to look up the service for a single instance. The primary value here is that it:
   - **supports hash-based blocking** and so;
   - **replaces `/agent/connect/proxy/:proxy_id`** as the mechanism the built-in proxy uses to read its config.
   - It's not proxy specific and so works for any service.
   - It has a temporary shim to call through to the existing endpoint to preserve current managed proxy config defaulting behaviour until that is removed entirely (tested).
 - The built-in proxy now uses the new endpoint exclusively for it's config
 - The built-in proxy now has a `-sidecar-for` flag that allows the service ID of the _target_ service to be specified, on the condition that there is exactly one "sidecar" proxy (that is one that has `Proxy.DestinationServiceID` set) for the service registered.
 - Several fixes for edge cases for SidecarService
 - A fix for `Alias` checks - when running locally they didn't update their state until some external thing updated the target. If the target service has no checks registered as below, then the alias never made it past critical.
2018-10-10 16:55:34 +01:00

324 lines
9 KiB
Go

// Package ae provides tools to synchronize state between local and remote consul servers.
package ae
import (
"fmt"
"log"
"math"
"sync"
"time"
"github.com/hashicorp/consul/lib"
)
// scaleThreshold is the number of nodes after which regular sync runs are
// spread out farther apart. The value should be a power of 2 since the
// scale function uses log2.
//
// When set to 128 nodes the delay between regular runs is doubled when the
// cluster is larger than 128 nodes. It doubles again when it passes 256
// nodes, and again at 512 nodes and so forth. At 8192 nodes, the delay
// factor is 8.
//
// If you update this, you may need to adjust the tuning of
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
const scaleThreshold = 128
// scaleFactor returns a factor by which the next sync run should be delayed to
// avoid saturation of the cluster. The larger the cluster grows the farther
// the sync runs should be spread apart.
//
// The current implementation uses a log2 scale which doubles the delay between
// runs every time the cluster doubles in size.
func scaleFactor(nodes int) int {
if nodes <= scaleThreshold {
return 1.0
}
return int(math.Ceil(math.Log2(float64(nodes))-math.Log2(float64(scaleThreshold))) + 1.0)
}
type SyncState interface {
SyncChanges() error
SyncFull() error
}
// StateSyncer manages background synchronization of the given state.
//
// The state is synchronized on a regular basis or on demand when either
// the state has changed or a new Consul server has joined the cluster.
//
// The regular state synchronization provides a self-healing mechanism
// for the cluster which is also called anti-entropy.
type StateSyncer struct {
// State contains the data that needs to be synchronized.
State SyncState
// Interval is the time between two full sync runs.
Interval time.Duration
// ShutdownCh is closed when the application is shutting down.
ShutdownCh chan struct{}
// Logger is the logger.
Logger *log.Logger
// ClusterSize returns the number of members in the cluster to
// allow staggering the sync runs based on cluster size.
// This needs to be set before Run() is called.
ClusterSize func() int
// SyncFull allows triggering an immediate but staggered full sync
// in a non-blocking way.
SyncFull *Trigger
// SyncChanges allows triggering an immediate partial sync
// in a non-blocking way.
SyncChanges *Trigger
// paused stores whether sync runs are temporarily disabled.
pauseLock sync.Mutex
paused int
// serverUpInterval is the max time after which a full sync is
// performed when a server has been added to the cluster.
serverUpInterval time.Duration
// retryFailInterval is the time after which a failed full sync is retried.
retryFailInterval time.Duration
// stagger randomly picks a duration between 0s and the given duration.
stagger func(time.Duration) time.Duration
// retrySyncFullEvent generates an event based on multiple conditions
// when the state machine is trying to retry a full state sync.
retrySyncFullEvent func() event
// syncChangesEvent generates an event based on multiple conditions
// when the state machine is performing partial state syncs.
syncChangesEvent func() event
}
const (
// serverUpIntv is the max time to wait before a sync is triggered
// when a consul server has been added to the cluster.
serverUpIntv = 3 * time.Second
// retryFailIntv is the min time to wait before a failed sync is retried.
retryFailIntv = 15 * time.Second
)
func NewStateSyncer(state SyncState, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
s := &StateSyncer{
State: state,
Interval: intv,
ShutdownCh: shutdownCh,
Logger: logger,
SyncFull: NewTrigger(),
SyncChanges: NewTrigger(),
serverUpInterval: serverUpIntv,
retryFailInterval: retryFailIntv,
}
// retain these methods as member variables so that
// we can mock them for testing.
s.retrySyncFullEvent = s.retrySyncFullEventFn
s.syncChangesEvent = s.syncChangesEventFn
s.stagger = s.staggerFn
return s
}
// fsmState defines states for the state machine.
type fsmState string
const (
doneState fsmState = "done"
fullSyncState fsmState = "fullSync"
partialSyncState fsmState = "partialSync"
retryFullSyncState fsmState = "retryFullSync"
)
// Run is the long running method to perform state synchronization
// between local and remote servers.
func (s *StateSyncer) Run() {
if s.ClusterSize == nil {
panic("ClusterSize not set")
}
s.runFSM(fullSyncState, s.nextFSMState)
}
// runFSM runs the state machine.
func (s *StateSyncer) runFSM(fs fsmState, next func(fsmState) fsmState) {
for {
if fs = next(fs); fs == doneState {
return
}
}
}
// nextFSMState determines the next state based on the current state.
func (s *StateSyncer) nextFSMState(fs fsmState) fsmState {
switch fs {
case fullSyncState:
if s.Paused() {
return retryFullSyncState
}
err := s.State.SyncFull()
if err != nil {
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
return retryFullSyncState
}
return partialSyncState
case retryFullSyncState:
e := s.retrySyncFullEvent()
switch e {
case syncFullNotifEvent, syncFullTimerEvent:
return fullSyncState
case shutdownEvent:
return doneState
default:
panic(fmt.Sprintf("invalid event: %s", e))
}
case partialSyncState:
e := s.syncChangesEvent()
switch e {
case syncFullNotifEvent, syncFullTimerEvent:
return fullSyncState
case syncChangesNotifEvent:
if s.Paused() {
return partialSyncState
}
err := s.State.SyncChanges()
if err != nil {
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
return partialSyncState
case shutdownEvent:
return doneState
default:
panic(fmt.Sprintf("invalid event: %s", e))
}
default:
panic(fmt.Sprintf("invalid state: %s", fs))
}
}
// event defines a timing or notification event from multiple timers and
// channels.
type event string
const (
shutdownEvent event = "shutdown"
syncFullNotifEvent event = "syncFullNotif"
syncFullTimerEvent event = "syncFullTimer"
syncChangesNotifEvent event = "syncChangesNotif"
)
// retrySyncFullEventFn waits for an event which triggers a retry
// of a full sync or a termination signal. This function should not be
// called directly but through s.retryFullSyncState to allow mocking for
// testing.
func (s *StateSyncer) retrySyncFullEventFn() event {
select {
// trigger a full sync immediately.
// this is usually called when a consul server was added to the cluster.
// stagger the delay to avoid a thundering herd.
case <-s.SyncFull.Notif():
select {
case <-time.After(s.stagger(s.serverUpInterval)):
return syncFullNotifEvent
case <-s.ShutdownCh:
return shutdownEvent
}
// retry full sync after some time
// todo(fs): why don't we use s.Interval here?
case <-time.After(s.retryFailInterval + s.stagger(s.retryFailInterval)):
return syncFullTimerEvent
case <-s.ShutdownCh:
return shutdownEvent
}
}
// syncChangesEventFn waits for a event which either triggers a full
// or a partial sync or a termination signal. This function should not
// be called directly but through s.syncChangesEvent to allow mocking
// for testing.
func (s *StateSyncer) syncChangesEventFn() event {
select {
// trigger a full sync immediately
// this is usually called when a consul server was added to the cluster.
// stagger the delay to avoid a thundering herd.
case <-s.SyncFull.Notif():
select {
case <-time.After(s.stagger(s.serverUpInterval)):
return syncFullNotifEvent
case <-s.ShutdownCh:
return shutdownEvent
}
// time for a full sync again
case <-time.After(s.Interval + s.stagger(s.Interval)):
return syncFullTimerEvent
// do partial syncs on demand
case <-s.SyncChanges.Notif():
return syncChangesNotifEvent
case <-s.ShutdownCh:
return shutdownEvent
}
}
// stubbed out for testing
var libRandomStagger = lib.RandomStagger
// staggerFn returns a random duration which depends on the cluster size
// and a random factor which should provide some timely distribution of
// cluster wide events. This function should not be called directly
// but through s.stagger to allow mocking for testing.
func (s *StateSyncer) staggerFn(d time.Duration) time.Duration {
f := scaleFactor(s.ClusterSize())
return libRandomStagger(time.Duration(f) * d)
}
// Pause temporarily disables sync runs.
func (s *StateSyncer) Pause() {
s.pauseLock.Lock()
s.paused++
s.pauseLock.Unlock()
}
// Paused returns whether sync runs are temporarily disabled.
func (s *StateSyncer) Paused() bool {
s.pauseLock.Lock()
defer s.pauseLock.Unlock()
return s.paused != 0
}
// Resume re-enables sync runs. It returns true if it was the last pause/resume
// pair on the stack and so actually caused the state syncer to resume.
func (s *StateSyncer) Resume() bool {
s.pauseLock.Lock()
s.paused--
if s.paused < 0 {
panic("unbalanced pause/resume")
}
trigger := s.paused == 0
s.pauseLock.Unlock()
if trigger {
s.SyncChanges.Trigger()
}
return trigger
}