Extract the heartbeat and saveState into their own go routines

This commit is contained in:
Alex Dadgar 2016-02-17 11:32:17 -08:00
parent 9301581bb4
commit 5473b6ae63
1 changed files with 53 additions and 24 deletions

View File

@ -152,16 +152,23 @@ func NewClient(cfg *config.Config) (*Client, error) {
// initialized.
c.configCopy = c.config.Copy()
// Start the consul service
go c.consulService.SyncWithConsul()
// Restore the state
if err := c.restoreState(); err != nil {
return nil, fmt.Errorf("failed to restore state: %v", err)
}
// Register and then start heartbeating to the servers.
go c.registerAndHeartbeat()
// Begin periodic snapshotting of state.
go c.periodicSnapshot()
// Start the client!
go c.run()
// Start the consul service
go c.consulService.SyncWithConsul()
return c, nil
}
@ -625,13 +632,12 @@ func (c *Client) retryIntv(base time.Duration) time.Duration {
return base + randomStagger(base)
}
// run is a long lived goroutine used to run the client
func (c *Client) run() {
// registerAndHeartbeat is a long lived goroutine used to register the client
// and then start heartbeatng to the server.
func (c *Client) registerAndHeartbeat() {
// Register the node
c.retryRegisterNode()
// Watch for node changes
go c.watchNodeUpdates()
// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
// in development mode.
@ -642,25 +648,8 @@ func (c *Client) run() {
heartbeat = time.After(randomStagger(initialHeartbeatStagger))
}
// Watch for changes in allocations
allocUpdates := make(chan *allocUpdates, 1)
go c.watchAllocations(allocUpdates)
// Create a snapshot timer
snapshot := time.After(stateSnapshotIntv)
// Periodically update our status and wait for termination
for {
select {
case <-snapshot:
snapshot = time.After(stateSnapshotIntv)
if err := c.saveState(); err != nil {
c.logger.Printf("[ERR] client: failed to save state: %v", err)
}
case update := <-allocUpdates:
c.runAllocs(update)
case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
heartbeat = time.After(c.retryIntv(registerRetryIntv))
@ -676,6 +665,46 @@ func (c *Client) run() {
}
}
// periodicSnapshot is a long lived goroutine used to periodically snapshot the
// state of the client
func (c *Client) periodicSnapshot() {
// Create a snapshot timer
snapshot := time.After(stateSnapshotIntv)
for {
select {
case <-snapshot:
snapshot = time.After(stateSnapshotIntv)
if err := c.saveState(); err != nil {
c.logger.Printf("[ERR] client: failed to save state: %v", err)
}
case <-c.shutdownCh:
return
}
}
}
// run is a long lived goroutine used to run the client
func (c *Client) run() {
// Watch for node changes
go c.watchNodeUpdates()
// Watch for changes in allocations
allocUpdates := make(chan *allocUpdates, 1)
go c.watchAllocations(allocUpdates)
for {
select {
case update := <-allocUpdates:
c.runAllocs(update)
case <-c.shutdownCh:
return
}
}
}
// hasNodeChanged calculates a hash for the node attributes- and meta map.
// The new hash values are compared against the old (passed-in) hash values to
// determine if the node properties have changed. It returns the new hash values