diff --git a/agent/ae/ae.go b/agent/ae/ae.go index 78bf93e25..f3b674503 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -93,66 +93,67 @@ func (s *StateSyncer) Run() { Sync: for { - // update the sync status - err := s.State.UpdateSyncState() - if err == nil { - break - } + switch err := s.State.UpdateSyncState(); { - s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) + // update sync status failed + case err != nil: + s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) - // retry updating sync status after some time or when a consul - // server was added. - select { - - // consul server added to cluster. - // retry sooner than retryFailIntv to converge cluster quicker - // but stagger delay to avoid thundering herd - case <-s.ServerUpCh: + // retry updating sync status after some time or when a consul + // server was added. select { - case <-time.After(stagger(serverUpIntv)): + + // consul server added to cluster. + // retry sooner than retryFailIntv to converge cluster sooner + // but stagger delay to avoid thundering herd + case <-s.ServerUpCh: + select { + case <-time.After(stagger(serverUpIntv)): + case <-s.ShutdownCh: + return + } + + // retry full sync after some time + // todo(fs): why don't we use s.Interval here? + case <-time.After(retryFailIntv + stagger(retryFailIntv)): + case <-s.ShutdownCh: return } - // retry full sync after some time - // todo(fs): why don't we use s.Interval here? - case <-time.After(retryFailIntv + stagger(retryFailIntv)): + // update sync status OK + default: + // force-trigger sync to pickup any changes + s.triggerSync() - case <-s.ShutdownCh: - return - } - } + // do partial syncs until it is time for a full sync again + for { + select { + // todo(fs): why don't we honor the ServerUpCh here as well? + // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) + // case <-s.ServerUpCh: + // select { + // case <-time.After(stagger(serverUpIntv)): + // continue Sync + // case <-s.ShutdownCh: + // return + // } - // Force-trigger sync to pickup any changes - s.triggerSync() + case <-time.After(s.Interval + stagger(s.Interval)): + continue Sync - // Wait for sync events - for { - select { - // todo(fs): why don't we honor the ServerUpCh here as well? - // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) - // case <-s.ServerUpCh: - // select { - // case <-time.After(stagger(serverUpIntv)): - // continue Sync - // case <-s.ShutdownCh: - // return - // } + case <-s.TriggerCh: + if s.Paused() { + continue + } + if err := s.State.SyncChanges(); err != nil { + s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) + } - case <-time.After(s.Interval + stagger(s.Interval)): - goto Sync - - case <-s.TriggerCh: - if s.Paused() { - continue + case <-s.ShutdownCh: + return + } } - if err := s.State.SyncChanges(); err != nil { - s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) - } - - case <-s.ShutdownCh: - return } } }