Remove commits return value
...and still protect against leaking agent entries in Consul on shutdown.
This commit is contained in:
parent
16ac08ac8c
commit
6e0fd86361
|
@ -187,13 +187,11 @@ func (c *ServiceClient) Run() {
|
|||
}
|
||||
}
|
||||
|
||||
// commit operations and returns false if shutdown signalled before committing.
|
||||
func (c *ServiceClient) commit(ops *operations) bool {
|
||||
// commit operations unless already shutting down.
|
||||
func (c *ServiceClient) commit(ops *operations) {
|
||||
select {
|
||||
case c.opCh <- ops:
|
||||
return true
|
||||
case <-c.shutdownCh:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -365,21 +363,25 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service)
|
|||
}
|
||||
}
|
||||
|
||||
// Now add them to the registration queue
|
||||
if ok := c.commit(&ops); !ok {
|
||||
// shutting down, exit
|
||||
// Don't bother committing agent checks if we're already shutting down
|
||||
c.agentLock.Lock()
|
||||
defer c.agentLock.Unlock()
|
||||
select {
|
||||
case <-c.shutdownCh:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Now add them to the registration queue
|
||||
c.commit(&ops)
|
||||
|
||||
// Record IDs for deregistering on shutdown
|
||||
c.agentLock.Lock()
|
||||
for _, id := range ops.regServices {
|
||||
c.agentServices[id.ID] = struct{}{}
|
||||
}
|
||||
for _, id := range ops.regChecks {
|
||||
c.agentChecks[id.ID] = struct{}{}
|
||||
}
|
||||
c.agentLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -544,30 +546,35 @@ func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) {
|
|||
}
|
||||
|
||||
// Shutdown the Consul client. Update running task registations and deregister
|
||||
// agent from Consul. Blocks up to shutdownWait before giving up on syncing
|
||||
// operations.
|
||||
// agent from Consul. On first call blocks up to shutdownWait before giving up
|
||||
// on syncing operations.
|
||||
func (c *ServiceClient) Shutdown() error {
|
||||
// Serialize Shutdown calls with RegisterAgent to prevent leaking agent
|
||||
// entries.
|
||||
c.agentLock.Lock()
|
||||
select {
|
||||
case <-c.shutdownCh:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// First deregister Nomad agent Consul entries
|
||||
// Deregister Nomad agent Consul entries before closing shutdown.
|
||||
ops := operations{}
|
||||
c.agentLock.Lock()
|
||||
for id := range c.agentServices {
|
||||
ops.deregServices = append(ops.deregServices, id)
|
||||
}
|
||||
for id := range c.agentChecks {
|
||||
ops.deregChecks = append(ops.deregChecks, id)
|
||||
}
|
||||
c.agentLock.Unlock()
|
||||
c.commit(&ops)
|
||||
|
||||
// Then signal shutdown
|
||||
close(c.shutdownCh)
|
||||
|
||||
// Safe to unlock after shutdownCh closed as RegisterAgent will check
|
||||
// shutdownCh before committing.
|
||||
c.agentLock.Unlock()
|
||||
|
||||
// Give run loop time to sync, but don't block indefinitely
|
||||
deadline := time.After(c.shutdownWait)
|
||||
|
||||
|
|
Loading…
Reference in New Issue