local state: address review comments
* move non-blocking notification mechanism into ae.Trigger * move Pause/Resume into separate type
This commit is contained in:
parent
cfc8bd092f
commit
40e17f9f01
157
agent/ae/ae.go
157
agent/ae/ae.go
|
@ -36,6 +36,11 @@ func scaleFactor(nodes int) int {
|
|||
return int(math.Ceil(math.Log2(float64(nodes))-math.Log2(float64(scaleThreshold))) + 1.0)
|
||||
}
|
||||
|
||||
type State interface {
|
||||
SyncChanges() error
|
||||
SyncFull() error
|
||||
}
|
||||
|
||||
// StateSyncer manages background synchronization of the given state.
|
||||
//
|
||||
// The state is synchronized on a regular basis or on demand when either
|
||||
|
@ -44,34 +49,45 @@ func scaleFactor(nodes int) int {
|
|||
// The regular state sychronization provides a self-healing mechanism
|
||||
// for the cluster which is also called anti-entropy.
|
||||
type StateSyncer struct {
|
||||
// paused flags whether sync runs are temporarily disabled.
|
||||
// Must be the first element due to a go bug.
|
||||
// todo(fs): which bug? Is this still relevant?
|
||||
paused int32
|
||||
|
||||
// State contains the data that needs to be synchronized.
|
||||
State interface {
|
||||
SyncChanges() error
|
||||
SyncFull() error
|
||||
}
|
||||
State State
|
||||
|
||||
// Interval is the time between two regular sync runs.
|
||||
Interval time.Duration
|
||||
|
||||
// ClusterSize returns the number of members in the cluster to
|
||||
// allow staggering the sync runs based on cluster size.
|
||||
ClusterSize func() int
|
||||
|
||||
// ShutdownCh is closed when the application is shutting down.
|
||||
ShutdownCh chan struct{}
|
||||
|
||||
// ServerUpCh contains data when a new consul server has been added to the cluster.
|
||||
ServerUpCh chan struct{}
|
||||
|
||||
// TriggerCh contains data when a sync should run immediately.
|
||||
TriggerCh chan struct{}
|
||||
|
||||
// Logger is the logger.
|
||||
Logger *log.Logger
|
||||
|
||||
// ClusterSize returns the number of members in the cluster to
|
||||
// allow staggering the sync runs based on cluster size.
|
||||
// This needs to be set before Run() is called.
|
||||
ClusterSize func() int
|
||||
|
||||
// SyncFull allows triggering an immediate but staggered full sync
|
||||
// in a non-blocking way.
|
||||
SyncFull *Trigger
|
||||
|
||||
// SyncChanges allows triggering an immediate partial sync
|
||||
// in a non-blocking way.
|
||||
SyncChanges *Trigger
|
||||
|
||||
// paused stores whether sync runs are temporarily disabled.
|
||||
paused *toggle
|
||||
}
|
||||
|
||||
func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
|
||||
return &StateSyncer{
|
||||
State: state,
|
||||
Interval: intv,
|
||||
ShutdownCh: shutdownCh,
|
||||
Logger: logger,
|
||||
SyncFull: NewTrigger(),
|
||||
SyncChanges: NewTrigger(),
|
||||
paused: new(toggle),
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -86,6 +102,10 @@ const (
|
|||
// Run is the long running method to perform state synchronization
|
||||
// between local and remote servers.
|
||||
func (s *StateSyncer) Run() {
|
||||
if s.ClusterSize == nil {
|
||||
panic("ClusterSize not set")
|
||||
}
|
||||
|
||||
stagger := func(d time.Duration) time.Duration {
|
||||
f := scaleFactor(s.ClusterSize())
|
||||
return lib.RandomStagger(time.Duration(f) * d)
|
||||
|
@ -93,20 +113,18 @@ func (s *StateSyncer) Run() {
|
|||
|
||||
FullSync:
|
||||
for {
|
||||
switch err := s.State.SyncFull(); {
|
||||
|
||||
// full sync failed
|
||||
case err != nil:
|
||||
// attempt a full sync
|
||||
if err := s.State.SyncFull(); err != nil {
|
||||
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
||||
|
||||
// retry full sync after some time or when a consul
|
||||
// server was added.
|
||||
select {
|
||||
|
||||
// consul server added to cluster.
|
||||
// retry sooner than retryFailIntv to converge cluster sooner
|
||||
// but stagger delay to avoid thundering herd
|
||||
case <-s.ServerUpCh:
|
||||
// trigger a full sync immediately.
|
||||
// this is usually called when a consul server was added to the cluster.
|
||||
// stagger the delay to avoid a thundering herd.
|
||||
case <-s.SyncFull.Notif():
|
||||
select {
|
||||
case <-time.After(stagger(serverUpIntv)):
|
||||
case <-s.ShutdownCh:
|
||||
|
@ -121,36 +139,38 @@ FullSync:
|
|||
return
|
||||
}
|
||||
|
||||
// full sync OK
|
||||
default:
|
||||
continue
|
||||
}
|
||||
|
||||
// do partial syncs until it is time for a full sync again
|
||||
for {
|
||||
// do partial syncs until it is time for a full sync again
|
||||
for {
|
||||
select {
|
||||
// trigger a full sync immediately
|
||||
// this is usually called when a consul server was added to the cluster.
|
||||
// stagger the delay to avoid a thundering herd.
|
||||
case <-s.SyncFull.Notif():
|
||||
select {
|
||||
// todo(fs): why don't we honor the ServerUpCh here as well?
|
||||
// todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv)
|
||||
// case <-s.ServerUpCh:
|
||||
// select {
|
||||
// case <-time.After(stagger(serverUpIntv)):
|
||||
// continue Sync
|
||||
// case <-s.ShutdownCh:
|
||||
// return
|
||||
// }
|
||||
|
||||
case <-time.After(s.Interval + stagger(s.Interval)):
|
||||
case <-time.After(stagger(serverUpIntv)):
|
||||
continue FullSync
|
||||
|
||||
case <-s.TriggerCh:
|
||||
if s.Paused() {
|
||||
continue
|
||||
}
|
||||
if err := s.State.SyncChanges(); err != nil {
|
||||
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||
}
|
||||
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
|
||||
// time for a full sync again
|
||||
case <-time.After(s.Interval + stagger(s.Interval)):
|
||||
continue FullSync
|
||||
|
||||
// do partial syncs on demand
|
||||
case <-s.SyncChanges.Notif():
|
||||
if s.Paused() {
|
||||
continue
|
||||
}
|
||||
if err := s.State.SyncChanges(); err != nil {
|
||||
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||
}
|
||||
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -158,27 +178,38 @@ FullSync:
|
|||
|
||||
// Pause temporarily disables sync runs.
|
||||
func (s *StateSyncer) Pause() {
|
||||
atomic.AddInt32(&s.paused, 1)
|
||||
s.paused.On()
|
||||
}
|
||||
|
||||
// Paused returns whether sync runs are temporarily disabled.
|
||||
func (s *StateSyncer) Paused() bool {
|
||||
return atomic.LoadInt32(&s.paused) > 0
|
||||
return s.paused.IsOn()
|
||||
}
|
||||
|
||||
// Resume re-enables sync runs.
|
||||
func (s *StateSyncer) Resume() {
|
||||
paused := atomic.AddInt32(&s.paused, -1)
|
||||
if paused < 0 {
|
||||
panic("unbalanced StateSyncer.Resume() detected")
|
||||
}
|
||||
s.triggerSync()
|
||||
s.paused.Off()
|
||||
s.SyncChanges.Trigger()
|
||||
}
|
||||
|
||||
// triggerSync queues a sync run if one has not been triggered already.
|
||||
func (s *StateSyncer) triggerSync() {
|
||||
select {
|
||||
case s.TriggerCh <- struct{}{}:
|
||||
default:
|
||||
// toggle implements an on/off switch using methods from the atomic
|
||||
// package. Since fields in structs that are accessed via
|
||||
// atomic.Load/Add methods need to be aligned properly on some platforms
|
||||
// we move that code into a separate struct.
|
||||
//
|
||||
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for details
|
||||
type toggle int32
|
||||
|
||||
func (p *toggle) On() {
|
||||
atomic.AddInt32((*int32)(p), 1)
|
||||
}
|
||||
|
||||
func (p *toggle) Off() {
|
||||
if atomic.AddInt32((*int32)(p), -1) < 0 {
|
||||
panic("toggle not on")
|
||||
}
|
||||
}
|
||||
|
||||
func (p *toggle) IsOn() bool {
|
||||
return atomic.LoadInt32((*int32)(p)) > 0
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ func TestAE_scaleFactor(t *testing.T) {
|
|||
|
||||
func TestAE_nestedPauseResume(t *testing.T) {
|
||||
t.Parallel()
|
||||
l := new(StateSyncer)
|
||||
l := NewStateSyner(nil, 0, nil, nil)
|
||||
if l.Paused() != false {
|
||||
t.Fatal("syncer should be unPaused after init")
|
||||
}
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
package ae
|
||||
|
||||
// Trigger implements a non-blocking event notifier. Events can be
|
||||
// triggered without blocking and notifications happen only when the
|
||||
// previous event was consumed.
|
||||
type Trigger struct {
|
||||
ch chan struct{}
|
||||
}
|
||||
|
||||
func NewTrigger() *Trigger {
|
||||
return &Trigger{make(chan struct{}, 1)}
|
||||
}
|
||||
|
||||
func (t Trigger) Trigger() {
|
||||
select {
|
||||
case t.ch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (t Trigger) Notif() <-chan struct{} {
|
||||
return t.ch
|
||||
}
|
|
@ -263,27 +263,12 @@ func (a *Agent) Start() error {
|
|||
return fmt.Errorf("Failed to setup node ID: %v", err)
|
||||
}
|
||||
|
||||
// create a notif channel to trigger state sychronizations
|
||||
// when a consul server was added to the cluster.
|
||||
serverUpCh := make(chan struct{}, 1)
|
||||
|
||||
// create a notif channel to trigger state synchronizations
|
||||
// when the state has changed.
|
||||
triggerCh := make(chan struct{}, 1)
|
||||
|
||||
// create the local state
|
||||
a.State = local.NewState(LocalConfig(c), a.logger, a.tokens, triggerCh)
|
||||
a.State = local.NewState(LocalConfig(c), a.logger, a.tokens)
|
||||
|
||||
// create the state synchronization manager which performs
|
||||
// regular and on-demand state synchronizations (anti-entropy).
|
||||
a.sync = &ae.StateSyncer{
|
||||
State: a.State,
|
||||
Interval: c.AEInterval,
|
||||
ShutdownCh: a.shutdownCh,
|
||||
ServerUpCh: serverUpCh,
|
||||
TriggerCh: triggerCh,
|
||||
Logger: a.logger,
|
||||
}
|
||||
a.sync = ae.NewStateSyner(a.State, c.AEInterval, a.shutdownCh, a.logger)
|
||||
|
||||
// create the config for the rpc server/client
|
||||
consulCfg, err := a.consulConfig()
|
||||
|
@ -294,13 +279,7 @@ func (a *Agent) Start() error {
|
|||
// ServerUp is used to inform that a new consul server is now
|
||||
// up. This can be used to speed up the sync process if we are blocking
|
||||
// waiting to discover a consul server
|
||||
// todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer
|
||||
consulCfg.ServerUp = func() {
|
||||
select {
|
||||
case serverUpCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
consulCfg.ServerUp = a.sync.SyncFull.Trigger
|
||||
|
||||
// Setup either the client or the server.
|
||||
if c.ServerMode {
|
||||
|
@ -308,21 +287,25 @@ func (a *Agent) Start() error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("Failed to start Consul server: %v", err)
|
||||
}
|
||||
|
||||
a.delegate = server
|
||||
a.State.SetDelegate(server)
|
||||
a.sync.ClusterSize = func() int { return len(server.LANMembers()) }
|
||||
} else {
|
||||
client, err := consul.NewClientLogger(consulCfg, a.logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to start Consul client: %v", err)
|
||||
}
|
||||
|
||||
a.delegate = client
|
||||
a.State.SetDelegate(client)
|
||||
a.sync.ClusterSize = func() int { return len(client.LANMembers()) }
|
||||
}
|
||||
|
||||
// the staggering of the state syncing depends on the cluster size.
|
||||
a.sync.ClusterSize = func() int { return len(a.delegate.LANMembers()) }
|
||||
|
||||
// link the state with the consul server/client and the state syncer
|
||||
// via callbacks. After several attempts this was easier than using
|
||||
// channels since the event notification needs to be non-blocking
|
||||
// and that should be hidden in the state syncer implementation.
|
||||
a.State.Delegate = a.delegate
|
||||
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
|
||||
|
||||
// Load checks/services/metadata.
|
||||
if err := a.loadServices(c); err != nil {
|
||||
return err
|
||||
|
@ -1316,7 +1299,7 @@ func (a *Agent) WANMembers() []serf.Member {
|
|||
// This is called to prevent a race between clients and the anti-entropy routines
|
||||
func (a *Agent) StartSync() {
|
||||
go a.sync.Run()
|
||||
a.logger.Printf("[INFO] agent: starting state syncer")
|
||||
a.logger.Printf("[INFO] agent: started state syncer")
|
||||
}
|
||||
|
||||
// PauseSync is used to pause anti-entropy while bulk changes are make
|
||||
|
@ -2173,8 +2156,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// unloadServices will deregister all services other than the 'consul' service
|
||||
// known to the local agent.
|
||||
// unloadServices will deregister all services.
|
||||
func (a *Agent) unloadServices() error {
|
||||
for id := range a.State.Services() {
|
||||
if err := a.RemoveService(id, false); err != nil {
|
||||
|
|
|
@ -1271,7 +1271,7 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
|
|||
EnableTagOverride: true,
|
||||
}
|
||||
|
||||
if got, want := a.state.Services()["test"], svc; !verify.Values(t, "", got, want) {
|
||||
if got, want := a.State.Service("test"), svc; !verify.Values(t, "", got, want) {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,40 +12,6 @@ import (
|
|||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
func TestCatalogRegister(t *testing.T) {
|
||||
t.Skip("skipping since it is not clear what this test is supposed to verify")
|
||||
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t.Name(), "")
|
||||
defer a.Shutdown()
|
||||
|
||||
// Register node
|
||||
args := &structs.RegisterRequest{
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
}
|
||||
req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args))
|
||||
obj, err := a.srv.CatalogRegister(nil, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
res := obj.(bool)
|
||||
if res != true {
|
||||
t.Fatalf("bad: %v", res)
|
||||
}
|
||||
|
||||
if err := a.State.SyncChanges(); err != nil {
|
||||
t.Fatal("sync failed: ", err)
|
||||
}
|
||||
s := a.State.ServiceState("foo")
|
||||
if s == nil {
|
||||
t.Fatal("service 'foo' missing")
|
||||
}
|
||||
if !s.InSync {
|
||||
t.Fatalf("service 'foo' should be in sync")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t.Name(), "")
|
||||
|
|
|
@ -18,9 +18,6 @@ import (
|
|||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
// permissionDenied is returned when an ACL based rejection happens.
|
||||
const permissionDenied = "Permission denied"
|
||||
|
||||
// Config is the configuration for the State. It is
|
||||
// populated during NewLocalAgent from the agent configuration to avoid
|
||||
// race conditions with the agent configuration.
|
||||
|
@ -39,7 +36,8 @@ type ServiceState struct {
|
|||
// Service is the local copy of the service record.
|
||||
Service *structs.NodeService
|
||||
|
||||
// Token is the ACL to update the service record on the server.
|
||||
// Token is the ACL to update or delete the service record on the
|
||||
// server.
|
||||
Token string
|
||||
|
||||
// InSync contains whether the local state of the service record
|
||||
|
@ -64,8 +62,8 @@ type CheckState struct {
|
|||
// Check is the local copy of the health check record.
|
||||
Check *structs.HealthCheck
|
||||
|
||||
// Token is the ACL record to update the health check record
|
||||
// on the server.
|
||||
// Token is the ACL record to update or delete the health check
|
||||
// record on the server.
|
||||
Token string
|
||||
|
||||
// CriticalTime is the last time the health check status went
|
||||
|
@ -74,8 +72,8 @@ type CheckState struct {
|
|||
CriticalTime time.Time
|
||||
|
||||
// DeferCheck is used to delay the sync of a health check when
|
||||
// only the status has changed.
|
||||
// todo(fs): ^^ this needs double checking...
|
||||
// only the output has changed. This rate limits changes which
|
||||
// do not affect the state of the node and/or service.
|
||||
DeferCheck *time.Timer
|
||||
|
||||
// InSync contains whether the local state of the health check
|
||||
|
@ -107,7 +105,7 @@ func (c *CheckState) CriticalFor() time.Duration {
|
|||
return time.Since(c.CriticalTime)
|
||||
}
|
||||
|
||||
type delegate interface {
|
||||
type rpc interface {
|
||||
RPC(method string, args interface{}, reply interface{}) error
|
||||
}
|
||||
|
||||
|
@ -116,14 +114,25 @@ type delegate interface {
|
|||
// catalog representation
|
||||
type State struct {
|
||||
sync.RWMutex
|
||||
|
||||
// Delegate the RPC interface to the consul server or agent.
|
||||
//
|
||||
// It is set after both the state and the consul server/agent have
|
||||
// been created.
|
||||
Delegate rpc
|
||||
|
||||
// TriggerSyncChanges is used to notify the state syncer that a
|
||||
// partial sync should be performed.
|
||||
//
|
||||
// It is set after both the state and the state syncer have been
|
||||
// created.
|
||||
TriggerSyncChanges func()
|
||||
|
||||
logger *log.Logger
|
||||
|
||||
// Config is the agent config
|
||||
config Config
|
||||
|
||||
// delegate is the consul interface to use for keeping in sync
|
||||
delegate delegate
|
||||
|
||||
// nodeInfoInSync tracks whether the server has our correct top-level
|
||||
// node information in sync
|
||||
nodeInfoInSync bool
|
||||
|
@ -134,13 +143,9 @@ type State struct {
|
|||
// Checks tracks the local checks
|
||||
checks map[types.CheckID]*CheckState
|
||||
|
||||
// metadata tracks the local metadata fields
|
||||
// metadata tracks the node metadata fields
|
||||
metadata map[string]string
|
||||
|
||||
// triggerCh is used to inform of a change to local state
|
||||
// that requires anti-entropy with the server
|
||||
triggerCh chan struct{}
|
||||
|
||||
// discardCheckOutput stores whether the output of health checks
|
||||
// is stored in the raft log.
|
||||
discardCheckOutput atomic.Value // bool
|
||||
|
@ -150,33 +155,19 @@ type State struct {
|
|||
}
|
||||
|
||||
// NewLocalState creates a is used to initialize the local state
|
||||
func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *State {
|
||||
func NewState(c Config, lg *log.Logger, tokens *token.Store) *State {
|
||||
l := &State{
|
||||
config: c,
|
||||
logger: lg,
|
||||
services: make(map[string]*ServiceState),
|
||||
checks: make(map[types.CheckID]*CheckState),
|
||||
metadata: make(map[string]string),
|
||||
triggerCh: triggerCh,
|
||||
tokens: tokens,
|
||||
config: c,
|
||||
logger: lg,
|
||||
services: make(map[string]*ServiceState),
|
||||
checks: make(map[types.CheckID]*CheckState),
|
||||
metadata: make(map[string]string),
|
||||
tokens: tokens,
|
||||
}
|
||||
l.discardCheckOutput.Store(c.DiscardCheckOutput)
|
||||
l.SetDiscardCheckOutput(c.DiscardCheckOutput)
|
||||
return l
|
||||
}
|
||||
|
||||
func (l *State) SetDelegate(d delegate) {
|
||||
l.delegate = d
|
||||
}
|
||||
|
||||
// changeMade is used to trigger an anti-entropy run
|
||||
func (l *State) changeMade() {
|
||||
// todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer
|
||||
select {
|
||||
case l.triggerCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (l *State) SetDiscardCheckOutput(b bool) {
|
||||
l.discardCheckOutput.Store(b)
|
||||
}
|
||||
|
@ -204,14 +195,12 @@ func (l *State) serviceToken(id string) string {
|
|||
// AddService is used to add a service entry to the local state.
|
||||
// This entry is persistent and the agent will make a best effort to
|
||||
// ensure it is registered
|
||||
// todo(fs): where is the persistence happening?
|
||||
func (l *State) AddService(service *structs.NodeService, token string) error {
|
||||
if service == nil {
|
||||
return fmt.Errorf("no service")
|
||||
}
|
||||
|
||||
// use the service name as id if the id was omitted
|
||||
// todo(fs): is this for backwards compatibility?
|
||||
if service.ID == "" {
|
||||
service.ID = service.Service
|
||||
}
|
||||
|
@ -228,7 +217,7 @@ func (l *State) AddServiceState(s *ServiceState) {
|
|||
defer l.Unlock()
|
||||
|
||||
l.services[s.Service.ID] = s
|
||||
l.changeMade()
|
||||
l.TriggerSyncChanges()
|
||||
}
|
||||
|
||||
// RemoveService is used to remove a service entry from the local state.
|
||||
|
@ -247,7 +236,7 @@ func (l *State) RemoveService(id string) error {
|
|||
// entry around until it is actually removed.
|
||||
s.InSync = false
|
||||
s.Deleted = true
|
||||
l.changeMade()
|
||||
l.TriggerSyncChanges()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -366,7 +355,7 @@ func (l *State) AddCheckState(c *CheckState) {
|
|||
defer l.Unlock()
|
||||
|
||||
l.checks[c.Check.CheckID] = c
|
||||
l.changeMade()
|
||||
l.TriggerSyncChanges()
|
||||
}
|
||||
|
||||
// RemoveCheck is used to remove a health check from the local state.
|
||||
|
@ -387,7 +376,7 @@ func (l *State) RemoveCheck(id types.CheckID) error {
|
|||
// entry around until it is actually removed.
|
||||
c.InSync = false
|
||||
c.Deleted = true
|
||||
l.changeMade()
|
||||
l.TriggerSyncChanges()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -443,7 +432,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
|
|||
return
|
||||
}
|
||||
c.InSync = false
|
||||
l.changeMade()
|
||||
l.TriggerSyncChanges()
|
||||
})
|
||||
}
|
||||
return
|
||||
|
@ -453,7 +442,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
|
|||
c.Check.Status = status
|
||||
c.Check.Output = output
|
||||
c.InSync = false
|
||||
l.changeMade()
|
||||
l.TriggerSyncChanges()
|
||||
}
|
||||
|
||||
// Check returns the locally registered check that the
|
||||
|
@ -549,12 +538,12 @@ func (l *State) updateSyncState() error {
|
|||
}
|
||||
|
||||
var out1 structs.IndexedNodeServices
|
||||
if err := l.delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil {
|
||||
if err := l.Delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var out2 structs.IndexedHealthChecks
|
||||
if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil {
|
||||
if err := l.Delegate.RPC("Health.NodeChecks", &req, &out2); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -605,8 +594,7 @@ func (l *State) updateSyncState() error {
|
|||
continue
|
||||
}
|
||||
|
||||
// If the service is scheduled for removal skip it.
|
||||
// todo(fs): is this correct?
|
||||
// If the service is already scheduled for removal skip it
|
||||
if ls.Deleted {
|
||||
continue
|
||||
}
|
||||
|
@ -646,8 +634,7 @@ func (l *State) updateSyncState() error {
|
|||
continue
|
||||
}
|
||||
|
||||
// If the check is scheduled for removal skip it.
|
||||
// todo(fs): is this correct?
|
||||
// If the check is already scheduled for removal skip it.
|
||||
if lc.Deleted {
|
||||
continue
|
||||
}
|
||||
|
@ -687,10 +674,13 @@ func (l *State) updateSyncState() error {
|
|||
func (l *State) SyncFull() error {
|
||||
// note that we do not acquire the lock here since the methods
|
||||
// we are calling will do that themself.
|
||||
//
|
||||
// Also note that we don't hold the lock for the entire operation
|
||||
// but release it between the two calls. This is not an issue since
|
||||
// the algorithm is best-effort to achieve eventual consistency.
|
||||
// SyncChanges will sync whatever updateSyncState() has determined
|
||||
// needs updating.
|
||||
|
||||
// todo(fs): is it an issue that we do not hold the lock for the entire time?
|
||||
// todo(fs): IMO, this doesn't matter since SyncChanges will sync whatever
|
||||
// todo(fs): was determined in the update step.
|
||||
if err := l.updateSyncState(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -764,7 +754,7 @@ func (l *State) LoadMetadata(data map[string]string) error {
|
|||
for k, v := range data {
|
||||
l.metadata[k] = v
|
||||
}
|
||||
l.changeMade()
|
||||
l.TriggerSyncChanges()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -815,19 +805,24 @@ func (l *State) deleteService(id string) error {
|
|||
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
||||
if err == nil || strings.Contains(err.Error(), "Unknown service") {
|
||||
err := l.Delegate.RPC("Catalog.Deregister", &req, &out)
|
||||
switch {
|
||||
case err == nil || strings.Contains(err.Error(), "Unknown service"):
|
||||
delete(l.services, id)
|
||||
l.logger.Printf("[INFO] agent: Deregistered service '%s'", id)
|
||||
l.logger.Printf("[INFO] agent: Deregistered service %q", id)
|
||||
return nil
|
||||
}
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// todo(fs): why is the service in sync here?
|
||||
|
||||
case acl.IsErrPermissionDenied(err):
|
||||
// todo(fs): mark the service to be in sync to prevent excessive retrying before next full sync
|
||||
// todo(fs): some backoff strategy might be a better solution
|
||||
l.services[id].InSync = true
|
||||
l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id)
|
||||
l.logger.Printf("[WARN] agent: Service %q deregistration blocked by ACLs", id)
|
||||
return nil
|
||||
|
||||
default:
|
||||
l.logger.Printf("[WARN] agent: Deregistering service %q failed. %s", id, err)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// deleteCheck is used to delete a check from the server
|
||||
|
@ -843,20 +838,28 @@ func (l *State) deleteCheck(id types.CheckID) error {
|
|||
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
||||
if err == nil || strings.Contains(err.Error(), "Unknown check") {
|
||||
// todo(fs): do we need to stop the deferCheck timer here?
|
||||
err := l.Delegate.RPC("Catalog.Deregister", &req, &out)
|
||||
switch {
|
||||
case err == nil || strings.Contains(err.Error(), "Unknown check"):
|
||||
c := l.checks[id]
|
||||
if c != nil && c.DeferCheck != nil {
|
||||
c.DeferCheck.Stop()
|
||||
}
|
||||
delete(l.checks, id)
|
||||
l.logger.Printf("[INFO] agent: Deregistered check '%s'", id)
|
||||
l.logger.Printf("[INFO] agent: Deregistered check %q", id)
|
||||
return nil
|
||||
}
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// todo(fs): why is the check in sync here?
|
||||
|
||||
case acl.IsErrPermissionDenied(err):
|
||||
// todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync
|
||||
// todo(fs): some backoff strategy might be a better solution
|
||||
l.checks[id].InSync = true
|
||||
l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id)
|
||||
l.logger.Printf("[WARN] agent: Check %q deregistration blocked by ACLs", id)
|
||||
return nil
|
||||
|
||||
default:
|
||||
l.logger.Printf("[WARN] agent: Deregistering check %q failed. %s", id, err)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// syncService is used to sync a service to the server
|
||||
|
@ -900,8 +903,9 @@ func (l *State) syncService(id string) error {
|
|||
}
|
||||
|
||||
var out struct{}
|
||||
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
err := l.Delegate.RPC("Catalog.Register", &req, &out)
|
||||
switch {
|
||||
case err == nil:
|
||||
l.services[id].InSync = true
|
||||
// Given how the register API works, this info is also updated
|
||||
// every time we sync a service.
|
||||
|
@ -909,20 +913,23 @@ func (l *State) syncService(id string) error {
|
|||
for _, check := range checks {
|
||||
l.checks[check.CheckID].InSync = true
|
||||
}
|
||||
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
|
||||
l.logger.Printf("[INFO] agent: Synced service %q", id)
|
||||
return nil
|
||||
}
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// todo(fs): why are the service and the checks in sync here?
|
||||
// todo(fs): why is the node info not in sync here?
|
||||
|
||||
case acl.IsErrPermissionDenied(err):
|
||||
// todo(fs): mark the service and the checks to be in sync to prevent excessive retrying before next full sync
|
||||
// todo(fs): some backoff strategy might be a better solution
|
||||
l.services[id].InSync = true
|
||||
for _, check := range checks {
|
||||
l.checks[check.CheckID].InSync = true
|
||||
}
|
||||
l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id)
|
||||
l.logger.Printf("[WARN] agent: Service %q registration blocked by ACLs", id)
|
||||
return nil
|
||||
|
||||
default:
|
||||
l.logger.Printf("[WARN] agent: Syncing service %q failed. %s", id, err)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// syncCheck is used to sync a check to the server
|
||||
|
@ -947,22 +954,27 @@ func (l *State) syncCheck(id types.CheckID) error {
|
|||
}
|
||||
|
||||
var out struct{}
|
||||
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
err := l.Delegate.RPC("Catalog.Register", &req, &out)
|
||||
switch {
|
||||
case err == nil:
|
||||
l.checks[id].InSync = true
|
||||
// Given how the register API works, this info is also updated
|
||||
// every time we sync a check.
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
||||
l.logger.Printf("[INFO] agent: Synced check %q", id)
|
||||
return nil
|
||||
}
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// todo(fs): why is the check in sync here?
|
||||
|
||||
case acl.IsErrPermissionDenied(err):
|
||||
// todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync
|
||||
// todo(fs): some backoff strategy might be a better solution
|
||||
l.checks[id].InSync = true
|
||||
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
|
||||
l.logger.Printf("[WARN] agent: Check %q registration blocked by ACLs", id)
|
||||
return nil
|
||||
|
||||
default:
|
||||
l.logger.Printf("[WARN] agent: Syncing check %q failed. %s", id, err)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *State) syncNodeInfo() error {
|
||||
|
@ -976,17 +988,22 @@ func (l *State) syncNodeInfo() error {
|
|||
WriteRequest: structs.WriteRequest{Token: l.tokens.AgentToken()},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
err := l.Delegate.RPC("Catalog.Register", &req, &out)
|
||||
switch {
|
||||
case err == nil:
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[INFO] agent: Synced node info")
|
||||
return nil
|
||||
}
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// todo(fs): why is the node info in sync here?
|
||||
|
||||
case acl.IsErrPermissionDenied(err):
|
||||
// todo(fs): mark the node info to be in sync to prevent excessive retrying before next full sync
|
||||
// todo(fs): some backoff strategy might be a better solution
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[WARN] agent: Node info update blocked by ACLs")
|
||||
return nil
|
||||
|
||||
default:
|
||||
l.logger.Printf("[WARN] agent: Syncing node info failed. %s", err)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1011,16 +1011,18 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
|
|||
|
||||
func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t.Name(), `
|
||||
a := agent.NewTestAgent(t.Name(), `
|
||||
discard_check_output = true
|
||||
check_update_interval = "0s" # set to "0s" since otherwise output checks are deferred
|
||||
`)
|
||||
defer a.Shutdown()
|
||||
|
||||
inSync := func(id string) bool {
|
||||
a.state.Lock()
|
||||
defer a.state.Unlock()
|
||||
return a.state.checkStatus[types.CheckID(id)].inSync
|
||||
s := a.State.CheckState(types.CheckID(id))
|
||||
if s == nil {
|
||||
return false
|
||||
}
|
||||
return s.InSync
|
||||
}
|
||||
|
||||
// register a check
|
||||
|
@ -1031,7 +1033,7 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) {
|
|||
Status: api.HealthPassing,
|
||||
Output: "first output",
|
||||
}
|
||||
if err := a.state.AddCheck(check, ""); err != nil {
|
||||
if err := a.State.AddCheck(check, ""); err != nil {
|
||||
t.Fatalf("bad: %s", err)
|
||||
}
|
||||
|
||||
|
@ -1045,15 +1047,15 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) {
|
|||
|
||||
// update the check with the same status but different output
|
||||
// and the check should still be in sync.
|
||||
a.state.UpdateCheck(check.CheckID, api.HealthPassing, "second output")
|
||||
a.State.UpdateCheck(check.CheckID, api.HealthPassing, "second output")
|
||||
if !inSync("web") {
|
||||
t.Fatal("check should be in sync")
|
||||
}
|
||||
|
||||
// disable discarding of check output and update the check again with different
|
||||
// output. Then the check should be out of sync.
|
||||
a.state.SetDiscardCheckOutput(false)
|
||||
a.state.UpdateCheck(check.CheckID, api.HealthPassing, "third output")
|
||||
a.State.SetDiscardCheckOutput(false)
|
||||
a.State.UpdateCheck(check.CheckID, api.HealthPassing, "third output")
|
||||
if inSync("web") {
|
||||
t.Fatal("check should be out of sync")
|
||||
}
|
||||
|
@ -1316,8 +1318,9 @@ func TestAgent_ServiceTokens(t *testing.T) {
|
|||
|
||||
tokens := new(token.Store)
|
||||
tokens.UpdateUserToken("default")
|
||||
lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`))
|
||||
l := local.NewState(lcfg, nil, tokens, make(chan struct{}, 1))
|
||||
cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)
|
||||
l := local.NewState(agent.LocalConfig(cfg), nil, tokens)
|
||||
l.TriggerSyncChanges = func() {}
|
||||
|
||||
l.AddService(&structs.NodeService{ID: "redis"}, "")
|
||||
|
||||
|
@ -1344,8 +1347,9 @@ func TestAgent_CheckTokens(t *testing.T) {
|
|||
|
||||
tokens := new(token.Store)
|
||||
tokens.UpdateUserToken("default")
|
||||
lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`))
|
||||
l := local.NewState(lcfg, nil, tokens, make(chan struct{}, 1))
|
||||
cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)
|
||||
l := local.NewState(agent.LocalConfig(cfg), nil, tokens)
|
||||
l.TriggerSyncChanges = func() {}
|
||||
|
||||
// Returns default when no token is set
|
||||
l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("mem")}, "")
|
||||
|
@ -1368,8 +1372,9 @@ func TestAgent_CheckTokens(t *testing.T) {
|
|||
|
||||
func TestAgent_CheckCriticalTime(t *testing.T) {
|
||||
t.Parallel()
|
||||
lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`))
|
||||
l := local.NewState(lcfg, nil, new(token.Store), make(chan struct{}, 1))
|
||||
cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)
|
||||
l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store))
|
||||
l.TriggerSyncChanges = func() {}
|
||||
|
||||
svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000}
|
||||
l.AddService(svc, "")
|
||||
|
@ -1431,8 +1436,9 @@ func TestAgent_CheckCriticalTime(t *testing.T) {
|
|||
|
||||
func TestAgent_AddCheckFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`))
|
||||
l := local.NewState(lcfg, nil, new(token.Store), make(chan struct{}, 1))
|
||||
cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)
|
||||
l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store))
|
||||
l.TriggerSyncChanges = func() {}
|
||||
|
||||
// Add a check for a service that does not exist and verify that it fails
|
||||
checkID := types.CheckID("redis:1")
|
||||
|
|
Loading…
Reference in New Issue