Merge pull request #2853 from hashicorp/b-watcher

Improve alloc health watcher
This commit is contained in:
Alex Dadgar 2017-07-18 14:12:28 -07:00 committed by GitHub
commit d2381c9263
3 changed files with 46 additions and 20 deletions

View file

@ -143,6 +143,7 @@ type allocRunnerMutableState struct {
AllocClientStatus string
AllocClientDescription string
TaskStates map[string]*structs.TaskState
DeploymentStatus *structs.AllocDeploymentStatus
}
// NewAllocRunner is used to create a new allocation context
@ -157,7 +158,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
logger: logger,
alloc: alloc,
allocID: alloc.ID,
allocBroadcast: cstructs.NewAllocBroadcaster(0),
allocBroadcast: cstructs.NewAllocBroadcaster(8),
dirtyCh: make(chan struct{}, 1),
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
tasks: make(map[string]*TaskRunner),
@ -255,6 +256,7 @@ func (r *AllocRunner) RestoreState() error {
r.allocClientDescription = mutable.AllocClientDescription
r.taskStates = mutable.TaskStates
r.alloc.ClientStatus = getClientStatus(r.taskStates)
r.alloc.DeploymentStatus = mutable.DeploymentStatus
return nil
})
@ -422,6 +424,7 @@ func (r *AllocRunner) saveAllocRunnerState() error {
AllocClientStatus: allocClientStatus,
AllocClientDescription: allocClientDescription,
TaskStates: alloc.TaskStates,
DeploymentStatus: alloc.DeploymentStatus,
}
if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil {
@ -573,7 +576,19 @@ func (r *AllocRunner) syncStatus() error {
// Get a copy of our alloc, update status server side and sync to disk
alloc := r.Alloc()
r.updater(alloc)
r.allocBroadcast.Send(alloc)
// Try to send the alloc up to three times with a delay to allow recovery.
sent := false
for i := 0; i < 3; i++ {
if sent = r.allocBroadcast.Send(alloc); sent {
break
}
time.Sleep(500 * time.Millisecond)
}
if !sent {
r.logger.Printf("[WARN] client: failed to broadcast update to allocation %q", r.allocID)
}
return r.saveAllocRunnerState()
}

View file

@ -24,6 +24,9 @@ func (r *AllocRunner) watchHealth(ctx context.Context) {
if alloc.DeploymentID == "" {
r.logger.Printf("[TRACE] client.alloc_watcher: exiting because alloc isn't part of a deployment")
return
} else if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() {
r.logger.Printf("[TRACE] client.alloc_watcher: exiting because alloc deployment health already determined")
return
}
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
@ -67,9 +70,16 @@ func (r *AllocRunner) watchHealth(ctx context.Context) {
latestTaskHealthy := time.Unix(0, 0)
latestChecksHealthy := time.Unix(0, 0)
healthyTimer := time.NewTimer(0)
if !healthyTimer.Stop() {
<-healthyTimer.C
healthyTime := time.Time{}
cancelHealthyTimer := func() {
if !healthyTimer.Stop() {
select {
case <-healthyTimer.C:
default:
}
}
}
cancelHealthyTimer()
// Cleanup function
defer func() {
@ -166,6 +176,7 @@ OUTER:
// If we should have checks and they aren't all healthy continue
if len(checks) != desiredChecks {
r.logger.Printf("[TRACE] client.alloc_watcher: continuing since all checks (want %d; got %d) haven't been registered for alloc %q", desiredChecks, len(checks), alloc.ID)
cancelHealthyTimer()
continue OUTER
}
@ -174,6 +185,7 @@ OUTER:
if check.Status != api.HealthPassing {
r.logger.Printf("[TRACE] client.alloc_watcher: continuing since check %q isn't passing for alloc %q", check.CheckID, alloc.ID)
latestChecksHealthy = time.Time{}
cancelHealthyTimer()
continue OUTER
}
}
@ -193,26 +205,21 @@ OUTER:
}
}
// Don't need to set the timer if we are healthy and have marked
// ourselves healthy.
if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil && *alloc.DeploymentStatus.Healthy {
continue OUTER
}
// Determine when we can mark ourselves as healthy.
totalHealthy := latestTaskHealthy
if totalHealthy.Before(latestChecksHealthy) {
totalHealthy = latestChecksHealthy
}
d := time.Until(totalHealthy.Add(u.MinHealthyTime))
if !healthyTimer.Stop() {
select {
case <-healthyTimer.C:
default:
}
// Nothing to do since we are already waiting for the healthy timer to
// fire at the same time.
if totalHealthy.Equal(healthyTime) {
continue OUTER
}
healthyTime = totalHealthy
cancelHealthyTimer()
d := time.Until(totalHealthy.Add(u.MinHealthyTime))
healthyTimer.Reset(d)
r.logger.Printf("[TRACE] client.alloc_watcher: setting healthy timer to %v for alloc %q", d, alloc.ID)
}

View file

@ -29,20 +29,24 @@ type AllocListener struct {
id int
}
// Send broadcasts a message to the channel.
// Sending on a closed channel causes a runtime panic.
func (b *AllocBroadcaster) Send(v *structs.Allocation) {
// Send broadcasts a message to the channel. Send returns whether the message
// was sent to all channels.
func (b *AllocBroadcaster) Send(v *structs.Allocation) bool {
b.m.Lock()
defer b.m.Unlock()
if b.closed {
return
return false
}
sent := true
for _, l := range b.listeners {
select {
case l <- v:
default:
sent = false
}
}
return sent
}
// Close closes the channel, disabling the sending of further messages.