diff --git a/agent/ae/ae.go b/agent/ae/ae.go index b055f7aa1..6572d815b 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -36,6 +36,11 @@ func scaleFactor(nodes int) int { return int(math.Ceil(math.Log2(float64(nodes))-math.Log2(float64(scaleThreshold))) + 1.0) } +type State interface { + SyncChanges() error + SyncFull() error +} + // StateSyncer manages background synchronization of the given state. // // The state is synchronized on a regular basis or on demand when either @@ -44,34 +49,45 @@ func scaleFactor(nodes int) int { // The regular state sychronization provides a self-healing mechanism // for the cluster which is also called anti-entropy. type StateSyncer struct { - // paused flags whether sync runs are temporarily disabled. - // Must be the first element due to a go bug. - // todo(fs): which bug? Is this still relevant? - paused int32 - // State contains the data that needs to be synchronized. - State interface { - SyncChanges() error - SyncFull() error - } + State State // Interval is the time between two regular sync runs. Interval time.Duration - // ClusterSize returns the number of members in the cluster to - // allow staggering the sync runs based on cluster size. - ClusterSize func() int - // ShutdownCh is closed when the application is shutting down. ShutdownCh chan struct{} - // ServerUpCh contains data when a new consul server has been added to the cluster. - ServerUpCh chan struct{} - - // TriggerCh contains data when a sync should run immediately. - TriggerCh chan struct{} - + // Logger is the logger. Logger *log.Logger + + // ClusterSize returns the number of members in the cluster to + // allow staggering the sync runs based on cluster size. + // This needs to be set before Run() is called. + ClusterSize func() int + + // SyncFull allows triggering an immediate but staggered full sync + // in a non-blocking way. + SyncFull *Trigger + + // SyncChanges allows triggering an immediate partial sync + // in a non-blocking way. + SyncChanges *Trigger + + // paused stores whether sync runs are temporarily disabled. + paused *toggle +} + +func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer { + return &StateSyncer{ + State: state, + Interval: intv, + ShutdownCh: shutdownCh, + Logger: logger, + SyncFull: NewTrigger(), + SyncChanges: NewTrigger(), + paused: new(toggle), + } } const ( @@ -86,6 +102,10 @@ const ( // Run is the long running method to perform state synchronization // between local and remote servers. func (s *StateSyncer) Run() { + if s.ClusterSize == nil { + panic("ClusterSize not set") + } + stagger := func(d time.Duration) time.Duration { f := scaleFactor(s.ClusterSize()) return lib.RandomStagger(time.Duration(f) * d) @@ -93,20 +113,18 @@ func (s *StateSyncer) Run() { FullSync: for { - switch err := s.State.SyncFull(); { - - // full sync failed - case err != nil: + // attempt a full sync + if err := s.State.SyncFull(); err != nil { s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) // retry full sync after some time or when a consul // server was added. select { - // consul server added to cluster. - // retry sooner than retryFailIntv to converge cluster sooner - // but stagger delay to avoid thundering herd - case <-s.ServerUpCh: + // trigger a full sync immediately. + // this is usually called when a consul server was added to the cluster. + // stagger the delay to avoid a thundering herd. + case <-s.SyncFull.Notif(): select { case <-time.After(stagger(serverUpIntv)): case <-s.ShutdownCh: @@ -121,36 +139,38 @@ FullSync: return } - // full sync OK - default: + continue + } - // do partial syncs until it is time for a full sync again - for { + // do partial syncs until it is time for a full sync again + for { + select { + // trigger a full sync immediately + // this is usually called when a consul server was added to the cluster. + // stagger the delay to avoid a thundering herd. + case <-s.SyncFull.Notif(): select { - // todo(fs): why don't we honor the ServerUpCh here as well? - // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) - // case <-s.ServerUpCh: - // select { - // case <-time.After(stagger(serverUpIntv)): - // continue Sync - // case <-s.ShutdownCh: - // return - // } - - case <-time.After(s.Interval + stagger(s.Interval)): + case <-time.After(stagger(serverUpIntv)): continue FullSync - - case <-s.TriggerCh: - if s.Paused() { - continue - } - if err := s.State.SyncChanges(); err != nil { - s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) - } - case <-s.ShutdownCh: return } + + // time for a full sync again + case <-time.After(s.Interval + stagger(s.Interval)): + continue FullSync + + // do partial syncs on demand + case <-s.SyncChanges.Notif(): + if s.Paused() { + continue + } + if err := s.State.SyncChanges(); err != nil { + s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) + } + + case <-s.ShutdownCh: + return } } } @@ -158,27 +178,38 @@ FullSync: // Pause temporarily disables sync runs. func (s *StateSyncer) Pause() { - atomic.AddInt32(&s.paused, 1) + s.paused.On() } // Paused returns whether sync runs are temporarily disabled. func (s *StateSyncer) Paused() bool { - return atomic.LoadInt32(&s.paused) > 0 + return s.paused.IsOn() } // Resume re-enables sync runs. func (s *StateSyncer) Resume() { - paused := atomic.AddInt32(&s.paused, -1) - if paused < 0 { - panic("unbalanced StateSyncer.Resume() detected") - } - s.triggerSync() + s.paused.Off() + s.SyncChanges.Trigger() } -// triggerSync queues a sync run if one has not been triggered already. -func (s *StateSyncer) triggerSync() { - select { - case s.TriggerCh <- struct{}{}: - default: +// toggle implements an on/off switch using methods from the atomic +// package. Since fields in structs that are accessed via +// atomic.Load/Add methods need to be aligned properly on some platforms +// we move that code into a separate struct. +// +// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for details +type toggle int32 + +func (p *toggle) On() { + atomic.AddInt32((*int32)(p), 1) +} + +func (p *toggle) Off() { + if atomic.AddInt32((*int32)(p), -1) < 0 { + panic("toggle not on") } } + +func (p *toggle) IsOn() bool { + return atomic.LoadInt32((*int32)(p)) > 0 +} diff --git a/agent/ae/ae_test.go b/agent/ae/ae_test.go index 7246e7e24..3d3449d8b 100644 --- a/agent/ae/ae_test.go +++ b/agent/ae/ae_test.go @@ -27,7 +27,7 @@ func TestAE_scaleFactor(t *testing.T) { func TestAE_nestedPauseResume(t *testing.T) { t.Parallel() - l := new(StateSyncer) + l := NewStateSyner(nil, 0, nil, nil) if l.Paused() != false { t.Fatal("syncer should be unPaused after init") } diff --git a/agent/ae/trigger.go b/agent/ae/trigger.go new file mode 100644 index 000000000..1aa5f4586 --- /dev/null +++ b/agent/ae/trigger.go @@ -0,0 +1,23 @@ +package ae + +// Trigger implements a non-blocking event notifier. Events can be +// triggered without blocking and notifications happen only when the +// previous event was consumed. +type Trigger struct { + ch chan struct{} +} + +func NewTrigger() *Trigger { + return &Trigger{make(chan struct{}, 1)} +} + +func (t Trigger) Trigger() { + select { + case t.ch <- struct{}{}: + default: + } +} + +func (t Trigger) Notif() <-chan struct{} { + return t.ch +} diff --git a/agent/agent.go b/agent/agent.go index d800002e9..d08307609 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -263,27 +263,12 @@ func (a *Agent) Start() error { return fmt.Errorf("Failed to setup node ID: %v", err) } - // create a notif channel to trigger state sychronizations - // when a consul server was added to the cluster. - serverUpCh := make(chan struct{}, 1) - - // create a notif channel to trigger state synchronizations - // when the state has changed. - triggerCh := make(chan struct{}, 1) - // create the local state - a.State = local.NewState(LocalConfig(c), a.logger, a.tokens, triggerCh) + a.State = local.NewState(LocalConfig(c), a.logger, a.tokens) // create the state synchronization manager which performs // regular and on-demand state synchronizations (anti-entropy). - a.sync = &ae.StateSyncer{ - State: a.State, - Interval: c.AEInterval, - ShutdownCh: a.shutdownCh, - ServerUpCh: serverUpCh, - TriggerCh: triggerCh, - Logger: a.logger, - } + a.sync = ae.NewStateSyner(a.State, c.AEInterval, a.shutdownCh, a.logger) // create the config for the rpc server/client consulCfg, err := a.consulConfig() @@ -294,13 +279,7 @@ func (a *Agent) Start() error { // ServerUp is used to inform that a new consul server is now // up. This can be used to speed up the sync process if we are blocking // waiting to discover a consul server - // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer - consulCfg.ServerUp = func() { - select { - case serverUpCh <- struct{}{}: - default: - } - } + consulCfg.ServerUp = a.sync.SyncFull.Trigger // Setup either the client or the server. if c.ServerMode { @@ -308,21 +287,25 @@ func (a *Agent) Start() error { if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } - a.delegate = server - a.State.SetDelegate(server) - a.sync.ClusterSize = func() int { return len(server.LANMembers()) } } else { client, err := consul.NewClientLogger(consulCfg, a.logger) if err != nil { return fmt.Errorf("Failed to start Consul client: %v", err) } - a.delegate = client - a.State.SetDelegate(client) - a.sync.ClusterSize = func() int { return len(client.LANMembers()) } } + // the staggering of the state syncing depends on the cluster size. + a.sync.ClusterSize = func() int { return len(a.delegate.LANMembers()) } + + // link the state with the consul server/client and the state syncer + // via callbacks. After several attempts this was easier than using + // channels since the event notification needs to be non-blocking + // and that should be hidden in the state syncer implementation. + a.State.Delegate = a.delegate + a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger + // Load checks/services/metadata. if err := a.loadServices(c); err != nil { return err @@ -1316,7 +1299,7 @@ func (a *Agent) WANMembers() []serf.Member { // This is called to prevent a race between clients and the anti-entropy routines func (a *Agent) StartSync() { go a.sync.Run() - a.logger.Printf("[INFO] agent: starting state syncer") + a.logger.Printf("[INFO] agent: started state syncer") } // PauseSync is used to pause anti-entropy while bulk changes are make @@ -2173,8 +2156,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { return nil } -// unloadServices will deregister all services other than the 'consul' service -// known to the local agent. +// unloadServices will deregister all services. func (a *Agent) unloadServices() error { for id := range a.State.Services() { if err := a.RemoveService(id, false); err != nil { diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index d735d3b51..55fd2536c 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -1271,7 +1271,7 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) { EnableTagOverride: true, } - if got, want := a.state.Services()["test"], svc; !verify.Values(t, "", got, want) { + if got, want := a.State.Service("test"), svc; !verify.Values(t, "", got, want) { t.Fail() } } diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index fa73dbeee..845929117 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -12,40 +12,6 @@ import ( "github.com/hashicorp/serf/coordinate" ) -func TestCatalogRegister(t *testing.T) { - t.Skip("skipping since it is not clear what this test is supposed to verify") - - t.Parallel() - a := NewTestAgent(t.Name(), "") - defer a.Shutdown() - - // Register node - args := &structs.RegisterRequest{ - Node: "foo", - Address: "127.0.0.1", - } - req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args)) - obj, err := a.srv.CatalogRegister(nil, req) - if err != nil { - t.Fatalf("err: %v", err) - } - res := obj.(bool) - if res != true { - t.Fatalf("bad: %v", res) - } - - if err := a.State.SyncChanges(); err != nil { - t.Fatal("sync failed: ", err) - } - s := a.State.ServiceState("foo") - if s == nil { - t.Fatal("service 'foo' missing") - } - if !s.InSync { - t.Fatalf("service 'foo' should be in sync") - } -} - func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") diff --git a/agent/local/state.go b/agent/local/state.go index b980c569f..91bfc032d 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -18,9 +18,6 @@ import ( "github.com/hashicorp/consul/types" ) -// permissionDenied is returned when an ACL based rejection happens. -const permissionDenied = "Permission denied" - // Config is the configuration for the State. It is // populated during NewLocalAgent from the agent configuration to avoid // race conditions with the agent configuration. @@ -39,7 +36,8 @@ type ServiceState struct { // Service is the local copy of the service record. Service *structs.NodeService - // Token is the ACL to update the service record on the server. + // Token is the ACL to update or delete the service record on the + // server. Token string // InSync contains whether the local state of the service record @@ -64,8 +62,8 @@ type CheckState struct { // Check is the local copy of the health check record. Check *structs.HealthCheck - // Token is the ACL record to update the health check record - // on the server. + // Token is the ACL record to update or delete the health check + // record on the server. Token string // CriticalTime is the last time the health check status went @@ -74,8 +72,8 @@ type CheckState struct { CriticalTime time.Time // DeferCheck is used to delay the sync of a health check when - // only the status has changed. - // todo(fs): ^^ this needs double checking... + // only the output has changed. This rate limits changes which + // do not affect the state of the node and/or service. DeferCheck *time.Timer // InSync contains whether the local state of the health check @@ -107,7 +105,7 @@ func (c *CheckState) CriticalFor() time.Duration { return time.Since(c.CriticalTime) } -type delegate interface { +type rpc interface { RPC(method string, args interface{}, reply interface{}) error } @@ -116,14 +114,25 @@ type delegate interface { // catalog representation type State struct { sync.RWMutex + + // Delegate the RPC interface to the consul server or agent. + // + // It is set after both the state and the consul server/agent have + // been created. + Delegate rpc + + // TriggerSyncChanges is used to notify the state syncer that a + // partial sync should be performed. + // + // It is set after both the state and the state syncer have been + // created. + TriggerSyncChanges func() + logger *log.Logger // Config is the agent config config Config - // delegate is the consul interface to use for keeping in sync - delegate delegate - // nodeInfoInSync tracks whether the server has our correct top-level // node information in sync nodeInfoInSync bool @@ -134,13 +143,9 @@ type State struct { // Checks tracks the local checks checks map[types.CheckID]*CheckState - // metadata tracks the local metadata fields + // metadata tracks the node metadata fields metadata map[string]string - // triggerCh is used to inform of a change to local state - // that requires anti-entropy with the server - triggerCh chan struct{} - // discardCheckOutput stores whether the output of health checks // is stored in the raft log. discardCheckOutput atomic.Value // bool @@ -150,33 +155,19 @@ type State struct { } // NewLocalState creates a is used to initialize the local state -func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *State { +func NewState(c Config, lg *log.Logger, tokens *token.Store) *State { l := &State{ - config: c, - logger: lg, - services: make(map[string]*ServiceState), - checks: make(map[types.CheckID]*CheckState), - metadata: make(map[string]string), - triggerCh: triggerCh, - tokens: tokens, + config: c, + logger: lg, + services: make(map[string]*ServiceState), + checks: make(map[types.CheckID]*CheckState), + metadata: make(map[string]string), + tokens: tokens, } - l.discardCheckOutput.Store(c.DiscardCheckOutput) + l.SetDiscardCheckOutput(c.DiscardCheckOutput) return l } -func (l *State) SetDelegate(d delegate) { - l.delegate = d -} - -// changeMade is used to trigger an anti-entropy run -func (l *State) changeMade() { - // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer - select { - case l.triggerCh <- struct{}{}: - default: - } -} - func (l *State) SetDiscardCheckOutput(b bool) { l.discardCheckOutput.Store(b) } @@ -204,14 +195,12 @@ func (l *State) serviceToken(id string) string { // AddService is used to add a service entry to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -// todo(fs): where is the persistence happening? func (l *State) AddService(service *structs.NodeService, token string) error { if service == nil { return fmt.Errorf("no service") } // use the service name as id if the id was omitted - // todo(fs): is this for backwards compatibility? if service.ID == "" { service.ID = service.Service } @@ -228,7 +217,7 @@ func (l *State) AddServiceState(s *ServiceState) { defer l.Unlock() l.services[s.Service.ID] = s - l.changeMade() + l.TriggerSyncChanges() } // RemoveService is used to remove a service entry from the local state. @@ -247,7 +236,7 @@ func (l *State) RemoveService(id string) error { // entry around until it is actually removed. s.InSync = false s.Deleted = true - l.changeMade() + l.TriggerSyncChanges() return nil } @@ -366,7 +355,7 @@ func (l *State) AddCheckState(c *CheckState) { defer l.Unlock() l.checks[c.Check.CheckID] = c - l.changeMade() + l.TriggerSyncChanges() } // RemoveCheck is used to remove a health check from the local state. @@ -387,7 +376,7 @@ func (l *State) RemoveCheck(id types.CheckID) error { // entry around until it is actually removed. c.InSync = false c.Deleted = true - l.changeMade() + l.TriggerSyncChanges() return nil } @@ -443,7 +432,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { return } c.InSync = false - l.changeMade() + l.TriggerSyncChanges() }) } return @@ -453,7 +442,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { c.Check.Status = status c.Check.Output = output c.InSync = false - l.changeMade() + l.TriggerSyncChanges() } // Check returns the locally registered check that the @@ -549,12 +538,12 @@ func (l *State) updateSyncState() error { } var out1 structs.IndexedNodeServices - if err := l.delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil { + if err := l.Delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil { return err } var out2 structs.IndexedHealthChecks - if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil { + if err := l.Delegate.RPC("Health.NodeChecks", &req, &out2); err != nil { return err } @@ -605,8 +594,7 @@ func (l *State) updateSyncState() error { continue } - // If the service is scheduled for removal skip it. - // todo(fs): is this correct? + // If the service is already scheduled for removal skip it if ls.Deleted { continue } @@ -646,8 +634,7 @@ func (l *State) updateSyncState() error { continue } - // If the check is scheduled for removal skip it. - // todo(fs): is this correct? + // If the check is already scheduled for removal skip it. if lc.Deleted { continue } @@ -687,10 +674,13 @@ func (l *State) updateSyncState() error { func (l *State) SyncFull() error { // note that we do not acquire the lock here since the methods // we are calling will do that themself. + // + // Also note that we don't hold the lock for the entire operation + // but release it between the two calls. This is not an issue since + // the algorithm is best-effort to achieve eventual consistency. + // SyncChanges will sync whatever updateSyncState() has determined + // needs updating. - // todo(fs): is it an issue that we do not hold the lock for the entire time? - // todo(fs): IMO, this doesn't matter since SyncChanges will sync whatever - // todo(fs): was determined in the update step. if err := l.updateSyncState(); err != nil { return err } @@ -764,7 +754,7 @@ func (l *State) LoadMetadata(data map[string]string) error { for k, v := range data { l.metadata[k] = v } - l.changeMade() + l.TriggerSyncChanges() return nil } @@ -815,19 +805,24 @@ func (l *State) deleteService(id string) error { WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)}, } var out struct{} - err := l.delegate.RPC("Catalog.Deregister", &req, &out) - if err == nil || strings.Contains(err.Error(), "Unknown service") { + err := l.Delegate.RPC("Catalog.Deregister", &req, &out) + switch { + case err == nil || strings.Contains(err.Error(), "Unknown service"): delete(l.services, id) - l.logger.Printf("[INFO] agent: Deregistered service '%s'", id) + l.logger.Printf("[INFO] agent: Deregistered service %q", id) return nil - } - if acl.IsErrPermissionDenied(err) { - // todo(fs): why is the service in sync here? + + case acl.IsErrPermissionDenied(err): + // todo(fs): mark the service to be in sync to prevent excessive retrying before next full sync + // todo(fs): some backoff strategy might be a better solution l.services[id].InSync = true - l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id) + l.logger.Printf("[WARN] agent: Service %q deregistration blocked by ACLs", id) return nil + + default: + l.logger.Printf("[WARN] agent: Deregistering service %q failed. %s", id, err) + return err } - return err } // deleteCheck is used to delete a check from the server @@ -843,20 +838,28 @@ func (l *State) deleteCheck(id types.CheckID) error { WriteRequest: structs.WriteRequest{Token: l.checkToken(id)}, } var out struct{} - err := l.delegate.RPC("Catalog.Deregister", &req, &out) - if err == nil || strings.Contains(err.Error(), "Unknown check") { - // todo(fs): do we need to stop the deferCheck timer here? + err := l.Delegate.RPC("Catalog.Deregister", &req, &out) + switch { + case err == nil || strings.Contains(err.Error(), "Unknown check"): + c := l.checks[id] + if c != nil && c.DeferCheck != nil { + c.DeferCheck.Stop() + } delete(l.checks, id) - l.logger.Printf("[INFO] agent: Deregistered check '%s'", id) + l.logger.Printf("[INFO] agent: Deregistered check %q", id) return nil - } - if acl.IsErrPermissionDenied(err) { - // todo(fs): why is the check in sync here? + + case acl.IsErrPermissionDenied(err): + // todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync + // todo(fs): some backoff strategy might be a better solution l.checks[id].InSync = true - l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id) + l.logger.Printf("[WARN] agent: Check %q deregistration blocked by ACLs", id) return nil + + default: + l.logger.Printf("[WARN] agent: Deregistering check %q failed. %s", id, err) + return err } - return err } // syncService is used to sync a service to the server @@ -900,8 +903,9 @@ func (l *State) syncService(id string) error { } var out struct{} - err := l.delegate.RPC("Catalog.Register", &req, &out) - if err == nil { + err := l.Delegate.RPC("Catalog.Register", &req, &out) + switch { + case err == nil: l.services[id].InSync = true // Given how the register API works, this info is also updated // every time we sync a service. @@ -909,20 +913,23 @@ func (l *State) syncService(id string) error { for _, check := range checks { l.checks[check.CheckID].InSync = true } - l.logger.Printf("[INFO] agent: Synced service '%s'", id) + l.logger.Printf("[INFO] agent: Synced service %q", id) return nil - } - if acl.IsErrPermissionDenied(err) { - // todo(fs): why are the service and the checks in sync here? - // todo(fs): why is the node info not in sync here? + + case acl.IsErrPermissionDenied(err): + // todo(fs): mark the service and the checks to be in sync to prevent excessive retrying before next full sync + // todo(fs): some backoff strategy might be a better solution l.services[id].InSync = true for _, check := range checks { l.checks[check.CheckID].InSync = true } - l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id) + l.logger.Printf("[WARN] agent: Service %q registration blocked by ACLs", id) return nil + + default: + l.logger.Printf("[WARN] agent: Syncing service %q failed. %s", id, err) + return err } - return err } // syncCheck is used to sync a check to the server @@ -947,22 +954,27 @@ func (l *State) syncCheck(id types.CheckID) error { } var out struct{} - err := l.delegate.RPC("Catalog.Register", &req, &out) - if err == nil { + err := l.Delegate.RPC("Catalog.Register", &req, &out) + switch { + case err == nil: l.checks[id].InSync = true // Given how the register API works, this info is also updated // every time we sync a check. l.nodeInfoInSync = true - l.logger.Printf("[INFO] agent: Synced check '%s'", id) + l.logger.Printf("[INFO] agent: Synced check %q", id) return nil - } - if acl.IsErrPermissionDenied(err) { - // todo(fs): why is the check in sync here? + + case acl.IsErrPermissionDenied(err): + // todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync + // todo(fs): some backoff strategy might be a better solution l.checks[id].InSync = true - l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id) + l.logger.Printf("[WARN] agent: Check %q registration blocked by ACLs", id) return nil + + default: + l.logger.Printf("[WARN] agent: Syncing check %q failed. %s", id, err) + return err } - return err } func (l *State) syncNodeInfo() error { @@ -976,17 +988,22 @@ func (l *State) syncNodeInfo() error { WriteRequest: structs.WriteRequest{Token: l.tokens.AgentToken()}, } var out struct{} - err := l.delegate.RPC("Catalog.Register", &req, &out) - if err == nil { + err := l.Delegate.RPC("Catalog.Register", &req, &out) + switch { + case err == nil: l.nodeInfoInSync = true l.logger.Printf("[INFO] agent: Synced node info") return nil - } - if acl.IsErrPermissionDenied(err) { - // todo(fs): why is the node info in sync here? + + case acl.IsErrPermissionDenied(err): + // todo(fs): mark the node info to be in sync to prevent excessive retrying before next full sync + // todo(fs): some backoff strategy might be a better solution l.nodeInfoInSync = true l.logger.Printf("[WARN] agent: Node info update blocked by ACLs") return nil + + default: + l.logger.Printf("[WARN] agent: Syncing node info failed. %s", err) + return err } - return err } diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 8ea69698a..b092998aa 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1011,16 +1011,18 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) { t.Parallel() - a := NewTestAgent(t.Name(), ` + a := agent.NewTestAgent(t.Name(), ` discard_check_output = true check_update_interval = "0s" # set to "0s" since otherwise output checks are deferred `) defer a.Shutdown() inSync := func(id string) bool { - a.state.Lock() - defer a.state.Unlock() - return a.state.checkStatus[types.CheckID(id)].inSync + s := a.State.CheckState(types.CheckID(id)) + if s == nil { + return false + } + return s.InSync } // register a check @@ -1031,7 +1033,7 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) { Status: api.HealthPassing, Output: "first output", } - if err := a.state.AddCheck(check, ""); err != nil { + if err := a.State.AddCheck(check, ""); err != nil { t.Fatalf("bad: %s", err) } @@ -1045,15 +1047,15 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) { // update the check with the same status but different output // and the check should still be in sync. - a.state.UpdateCheck(check.CheckID, api.HealthPassing, "second output") + a.State.UpdateCheck(check.CheckID, api.HealthPassing, "second output") if !inSync("web") { t.Fatal("check should be in sync") } // disable discarding of check output and update the check again with different // output. Then the check should be out of sync. - a.state.SetDiscardCheckOutput(false) - a.state.UpdateCheck(check.CheckID, api.HealthPassing, "third output") + a.State.SetDiscardCheckOutput(false) + a.State.UpdateCheck(check.CheckID, api.HealthPassing, "third output") if inSync("web") { t.Fatal("check should be out of sync") } @@ -1316,8 +1318,9 @@ func TestAgent_ServiceTokens(t *testing.T) { tokens := new(token.Store) tokens.UpdateUserToken("default") - lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)) - l := local.NewState(lcfg, nil, tokens, make(chan struct{}, 1)) + cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) + l := local.NewState(agent.LocalConfig(cfg), nil, tokens) + l.TriggerSyncChanges = func() {} l.AddService(&structs.NodeService{ID: "redis"}, "") @@ -1344,8 +1347,9 @@ func TestAgent_CheckTokens(t *testing.T) { tokens := new(token.Store) tokens.UpdateUserToken("default") - lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)) - l := local.NewState(lcfg, nil, tokens, make(chan struct{}, 1)) + cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) + l := local.NewState(agent.LocalConfig(cfg), nil, tokens) + l.TriggerSyncChanges = func() {} // Returns default when no token is set l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("mem")}, "") @@ -1368,8 +1372,9 @@ func TestAgent_CheckTokens(t *testing.T) { func TestAgent_CheckCriticalTime(t *testing.T) { t.Parallel() - lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)) - l := local.NewState(lcfg, nil, new(token.Store), make(chan struct{}, 1)) + cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) + l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store)) + l.TriggerSyncChanges = func() {} svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} l.AddService(svc, "") @@ -1431,8 +1436,9 @@ func TestAgent_CheckCriticalTime(t *testing.T) { func TestAgent_AddCheckFailure(t *testing.T) { t.Parallel() - lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)) - l := local.NewState(lcfg, nil, new(token.Store), make(chan struct{}, 1)) + cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) + l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store)) + l.TriggerSyncChanges = func() {} // Add a check for a service that does not exist and verify that it fails checkID := types.CheckID("redis:1")