agent: refactor sync loop to linear flow of control
This commit is contained in:
parent
8cc4ee886d
commit
218cd4d501
|
@ -93,66 +93,67 @@ func (s *StateSyncer) Run() {
|
|||
|
||||
Sync:
|
||||
for {
|
||||
// update the sync status
|
||||
err := s.State.UpdateSyncState()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
switch err := s.State.UpdateSyncState(); {
|
||||
|
||||
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
||||
// update sync status failed
|
||||
case err != nil:
|
||||
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
||||
|
||||
// retry updating sync status after some time or when a consul
|
||||
// server was added.
|
||||
select {
|
||||
|
||||
// consul server added to cluster.
|
||||
// retry sooner than retryFailIntv to converge cluster quicker
|
||||
// but stagger delay to avoid thundering herd
|
||||
case <-s.ServerUpCh:
|
||||
// retry updating sync status after some time or when a consul
|
||||
// server was added.
|
||||
select {
|
||||
case <-time.After(stagger(serverUpIntv)):
|
||||
|
||||
// consul server added to cluster.
|
||||
// retry sooner than retryFailIntv to converge cluster sooner
|
||||
// but stagger delay to avoid thundering herd
|
||||
case <-s.ServerUpCh:
|
||||
select {
|
||||
case <-time.After(stagger(serverUpIntv)):
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
|
||||
// retry full sync after some time
|
||||
// todo(fs): why don't we use s.Interval here?
|
||||
case <-time.After(retryFailIntv + stagger(retryFailIntv)):
|
||||
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
|
||||
// retry full sync after some time
|
||||
// todo(fs): why don't we use s.Interval here?
|
||||
case <-time.After(retryFailIntv + stagger(retryFailIntv)):
|
||||
// update sync status OK
|
||||
default:
|
||||
// force-trigger sync to pickup any changes
|
||||
s.triggerSync()
|
||||
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
// do partial syncs until it is time for a full sync again
|
||||
for {
|
||||
select {
|
||||
// todo(fs): why don't we honor the ServerUpCh here as well?
|
||||
// todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv)
|
||||
// case <-s.ServerUpCh:
|
||||
// select {
|
||||
// case <-time.After(stagger(serverUpIntv)):
|
||||
// continue Sync
|
||||
// case <-s.ShutdownCh:
|
||||
// return
|
||||
// }
|
||||
|
||||
// Force-trigger sync to pickup any changes
|
||||
s.triggerSync()
|
||||
case <-time.After(s.Interval + stagger(s.Interval)):
|
||||
continue Sync
|
||||
|
||||
// Wait for sync events
|
||||
for {
|
||||
select {
|
||||
// todo(fs): why don't we honor the ServerUpCh here as well?
|
||||
// todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv)
|
||||
// case <-s.ServerUpCh:
|
||||
// select {
|
||||
// case <-time.After(stagger(serverUpIntv)):
|
||||
// continue Sync
|
||||
// case <-s.ShutdownCh:
|
||||
// return
|
||||
// }
|
||||
case <-s.TriggerCh:
|
||||
if s.Paused() {
|
||||
continue
|
||||
}
|
||||
if err := s.State.SyncChanges(); err != nil {
|
||||
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||
}
|
||||
|
||||
case <-time.After(s.Interval + stagger(s.Interval)):
|
||||
goto Sync
|
||||
|
||||
case <-s.TriggerCh:
|
||||
if s.Paused() {
|
||||
continue
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := s.State.SyncChanges(); err != nil {
|
||||
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||
}
|
||||
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue