Merge pull request #2681 from hashicorp/b-deadlock
Fix a deadlock relating to blocked allocations
This commit is contained in:
commit
044f1da5ff
|
@ -137,7 +137,7 @@ type Client struct {
|
|||
|
||||
// migratingAllocs is the set of allocs whose data migration is in flight
|
||||
migratingAllocs map[string]*migrateAllocCtrl
|
||||
migratingAllocsLock sync.Mutex
|
||||
migratingAllocsLock sync.RWMutex
|
||||
|
||||
// allocUpdates stores allocations that need to be synced to the server.
|
||||
allocUpdates chan *structs.Allocation
|
||||
|
@ -724,14 +724,17 @@ func (c *Client) getAllocRunners() map[string]*AllocRunner {
|
|||
// fulfill the AllocCounter interface for the GC.
|
||||
func (c *Client) NumAllocs() int {
|
||||
c.allocLock.RLock()
|
||||
c.blockedAllocsLock.Lock()
|
||||
c.migratingAllocsLock.Lock()
|
||||
n := len(c.allocs)
|
||||
n += len(c.blockedAllocations)
|
||||
n += len(c.migratingAllocs)
|
||||
c.migratingAllocsLock.Unlock()
|
||||
c.blockedAllocsLock.Unlock()
|
||||
c.allocLock.RUnlock()
|
||||
|
||||
c.blockedAllocsLock.RLock()
|
||||
n += len(c.blockedAllocations)
|
||||
c.blockedAllocsLock.RUnlock()
|
||||
|
||||
c.migratingAllocsLock.RLock()
|
||||
n += len(c.migratingAllocs)
|
||||
c.migratingAllocsLock.RUnlock()
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
|
@ -1241,25 +1244,31 @@ func (c *Client) updateNodeStatus() error {
|
|||
func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
|
||||
// If this alloc was blocking another alloc and transitioned to a
|
||||
// terminal state then start the blocked allocation
|
||||
c.blockedAllocsLock.Lock()
|
||||
if blockedAlloc, ok := c.blockedAllocations[alloc.ID]; ok && alloc.Terminated() {
|
||||
var prevAllocDir *allocdir.AllocDir
|
||||
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg != nil && tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky {
|
||||
prevAllocDir = ar.GetAllocDir()
|
||||
}
|
||||
}
|
||||
if err := c.addAlloc(blockedAlloc, prevAllocDir); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to add alloc which was previously blocked %q: %v",
|
||||
blockedAlloc.ID, err)
|
||||
}
|
||||
delete(c.blockedAllocations, blockedAlloc.PreviousAllocation)
|
||||
}
|
||||
c.blockedAllocsLock.Unlock()
|
||||
|
||||
// Mark the allocation for GC if it is in terminal state
|
||||
if alloc.Terminated() {
|
||||
c.blockedAllocsLock.Lock()
|
||||
blockedAlloc, ok := c.blockedAllocations[alloc.ID]
|
||||
if ok {
|
||||
var prevAllocDir *allocdir.AllocDir
|
||||
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg != nil && tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky {
|
||||
prevAllocDir = ar.GetAllocDir()
|
||||
}
|
||||
}
|
||||
|
||||
delete(c.blockedAllocations, blockedAlloc.PreviousAllocation)
|
||||
c.blockedAllocsLock.Unlock()
|
||||
|
||||
// Need to call addAlloc without holding the lock
|
||||
if err := c.addAlloc(blockedAlloc, prevAllocDir); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to add alloc which was previously blocked %q: %v",
|
||||
blockedAlloc.ID, err)
|
||||
}
|
||||
} else {
|
||||
c.blockedAllocsLock.Unlock()
|
||||
}
|
||||
|
||||
// Mark the allocation for GC if it is in terminal state
|
||||
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
|
||||
if err := c.garbageCollector.MarkForCollection(ar); err != nil {
|
||||
c.logger.Printf("[DEBUG] client: couldn't add alloc %v for GC: %v", alloc.ID, err)
|
||||
|
@ -1566,9 +1575,9 @@ func (c *Client) runAllocs(update *allocUpdates) {
|
|||
}
|
||||
|
||||
// See if the updated alloc is getting migrated
|
||||
c.migratingAllocsLock.Lock()
|
||||
c.migratingAllocsLock.RLock()
|
||||
ch, ok := c.migratingAllocs[update.updated.ID]
|
||||
c.migratingAllocsLock.Unlock()
|
||||
c.migratingAllocsLock.RUnlock()
|
||||
if ok {
|
||||
// Stopping the migration if the allocation doesn't need any
|
||||
// migration
|
||||
|
@ -2327,13 +2336,13 @@ func (c *Client) emitClientMetrics() {
|
|||
nodeID := c.Node().ID
|
||||
|
||||
// Emit allocation metrics
|
||||
c.migratingAllocsLock.Lock()
|
||||
migrating := len(c.migratingAllocs)
|
||||
c.migratingAllocsLock.Unlock()
|
||||
|
||||
c.blockedAllocsLock.Lock()
|
||||
c.blockedAllocsLock.RLock()
|
||||
blocked := len(c.blockedAllocations)
|
||||
c.blockedAllocsLock.Unlock()
|
||||
c.blockedAllocsLock.RUnlock()
|
||||
|
||||
c.migratingAllocsLock.RLock()
|
||||
migrating := len(c.migratingAllocs)
|
||||
c.migratingAllocsLock.RUnlock()
|
||||
|
||||
pending, running, terminal := 0, 0, 0
|
||||
for _, ar := range c.getAllocRunners() {
|
||||
|
@ -2405,17 +2414,17 @@ func (c *Client) allAllocs() map[string]*structs.Allocation {
|
|||
a := ar.Alloc()
|
||||
allocs[a.ID] = a
|
||||
}
|
||||
c.blockedAllocsLock.Lock()
|
||||
c.blockedAllocsLock.RLock()
|
||||
for _, alloc := range c.blockedAllocations {
|
||||
allocs[alloc.ID] = alloc
|
||||
}
|
||||
c.blockedAllocsLock.Unlock()
|
||||
c.blockedAllocsLock.RUnlock()
|
||||
|
||||
c.migratingAllocsLock.Lock()
|
||||
c.migratingAllocsLock.RLock()
|
||||
for _, ctrl := range c.migratingAllocs {
|
||||
allocs[ctrl.alloc.ID] = ctrl.alloc
|
||||
}
|
||||
c.migratingAllocsLock.Unlock()
|
||||
c.migratingAllocsLock.RUnlock()
|
||||
return allocs
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue