From 41f67e3535fe8565cc459afcca9117ad242400dc Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 18 Jul 2017 11:05:38 -0700 Subject: [PATCH 1/2] Small fixes --- client/alloc_runner.go | 16 +++++++++-- client/alloc_runner_health_watcher.go | 38 +++++++++++++++++---------- client/structs/funcs.go | 12 ++++++--- 3 files changed, 46 insertions(+), 20 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 80898f715..bcea151c0 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -157,7 +157,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, logger: logger, alloc: alloc, allocID: alloc.ID, - allocBroadcast: cstructs.NewAllocBroadcaster(0), + allocBroadcast: cstructs.NewAllocBroadcaster(8), dirtyCh: make(chan struct{}, 1), allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)), tasks: make(map[string]*TaskRunner), @@ -573,7 +573,19 @@ func (r *AllocRunner) syncStatus() error { // Get a copy of our alloc, update status server side and sync to disk alloc := r.Alloc() r.updater(alloc) - r.allocBroadcast.Send(alloc) + + // Try to send the alloc up to three times with a delay to allow recovery. + sent := false + for i := 0; i < 3; i++ { + if sent = r.allocBroadcast.Send(alloc); sent { + break + } + time.Sleep(500 * time.Millisecond) + } + if !sent { + r.logger.Printf("[WARN] client: failed to broadcase update to allocation %q", r.allocID) + } + return r.saveAllocRunnerState() } diff --git a/client/alloc_runner_health_watcher.go b/client/alloc_runner_health_watcher.go index 62378a594..97bad06fe 100644 --- a/client/alloc_runner_health_watcher.go +++ b/client/alloc_runner_health_watcher.go @@ -26,6 +26,12 @@ func (r *AllocRunner) watchHealth(ctx context.Context) { return } + // TODO Add to persisted state + if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() { + r.logger.Printf("[TRACE] client.alloc_watcher: exiting because alloc deployment health already determined") + return + } + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { r.logger.Printf("[ERR] client.alloc_watcher: failed to lookup allocation's task group. Exiting watcher") @@ -67,9 +73,16 @@ func (r *AllocRunner) watchHealth(ctx context.Context) { latestTaskHealthy := time.Unix(0, 0) latestChecksHealthy := time.Unix(0, 0) healthyTimer := time.NewTimer(0) - if !healthyTimer.Stop() { - <-healthyTimer.C + healthyTime := time.Time{} + cancelHealthyTimer := func() { + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: + } + } } + cancelHealthyTimer() // Cleanup function defer func() { @@ -166,6 +179,7 @@ OUTER: // If we should have checks and they aren't all healthy continue if len(checks) != desiredChecks { r.logger.Printf("[TRACE] client.alloc_watcher: continuing since all checks (want %d; got %d) haven't been registered for alloc %q", desiredChecks, len(checks), alloc.ID) + cancelHealthyTimer() continue OUTER } @@ -174,6 +188,7 @@ OUTER: if check.Status != api.HealthPassing { r.logger.Printf("[TRACE] client.alloc_watcher: continuing since check %q isn't passing for alloc %q", check.CheckID, alloc.ID) latestChecksHealthy = time.Time{} + cancelHealthyTimer() continue OUTER } } @@ -193,26 +208,21 @@ OUTER: } } - // Don't need to set the timer if we are healthy and have marked - // ourselves healthy. - if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil && *alloc.DeploymentStatus.Healthy { - continue OUTER - } - // Determine when we can mark ourselves as healthy. totalHealthy := latestTaskHealthy if totalHealthy.Before(latestChecksHealthy) { totalHealthy = latestChecksHealthy } - d := time.Until(totalHealthy.Add(u.MinHealthyTime)) - if !healthyTimer.Stop() { - select { - case <-healthyTimer.C: - default: - } + // Nothing to do since we are already waiting for the healthy timer to + // fire at the same time. + if totalHealthy.Equal(healthyTime) { + continue OUTER } + healthyTime = totalHealthy + cancelHealthyTimer() + d := time.Until(totalHealthy.Add(u.MinHealthyTime)) healthyTimer.Reset(d) r.logger.Printf("[TRACE] client.alloc_watcher: setting healthy timer to %v for alloc %q", d, alloc.ID) } diff --git a/client/structs/funcs.go b/client/structs/funcs.go index 833bd4e20..ec6d5421c 100644 --- a/client/structs/funcs.go +++ b/client/structs/funcs.go @@ -29,20 +29,24 @@ type AllocListener struct { id int } -// Send broadcasts a message to the channel. -// Sending on a closed channel causes a runtime panic. -func (b *AllocBroadcaster) Send(v *structs.Allocation) { +// Send broadcasts a message to the channel. Send returns whether the message +// was sent to all channels. +func (b *AllocBroadcaster) Send(v *structs.Allocation) bool { b.m.Lock() defer b.m.Unlock() if b.closed { - return + return false } + sent := true for _, l := range b.listeners { select { case l <- v: default: + sent = false } } + + return sent } // Close closes the channel, disabling the sending of further messages. From bd43bd509c02931c49a164b3695817d6751488b3 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 18 Jul 2017 12:31:05 -0700 Subject: [PATCH 2/2] Save deployment status --- client/alloc_runner.go | 5 ++++- client/alloc_runner_health_watcher.go | 5 +---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index bcea151c0..294479355 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -143,6 +143,7 @@ type allocRunnerMutableState struct { AllocClientStatus string AllocClientDescription string TaskStates map[string]*structs.TaskState + DeploymentStatus *structs.AllocDeploymentStatus } // NewAllocRunner is used to create a new allocation context @@ -255,6 +256,7 @@ func (r *AllocRunner) RestoreState() error { r.allocClientDescription = mutable.AllocClientDescription r.taskStates = mutable.TaskStates r.alloc.ClientStatus = getClientStatus(r.taskStates) + r.alloc.DeploymentStatus = mutable.DeploymentStatus return nil }) @@ -422,6 +424,7 @@ func (r *AllocRunner) saveAllocRunnerState() error { AllocClientStatus: allocClientStatus, AllocClientDescription: allocClientDescription, TaskStates: alloc.TaskStates, + DeploymentStatus: alloc.DeploymentStatus, } if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil { @@ -583,7 +586,7 @@ func (r *AllocRunner) syncStatus() error { time.Sleep(500 * time.Millisecond) } if !sent { - r.logger.Printf("[WARN] client: failed to broadcase update to allocation %q", r.allocID) + r.logger.Printf("[WARN] client: failed to broadcast update to allocation %q", r.allocID) } return r.saveAllocRunnerState() diff --git a/client/alloc_runner_health_watcher.go b/client/alloc_runner_health_watcher.go index 97bad06fe..31c7f5954 100644 --- a/client/alloc_runner_health_watcher.go +++ b/client/alloc_runner_health_watcher.go @@ -24,10 +24,7 @@ func (r *AllocRunner) watchHealth(ctx context.Context) { if alloc.DeploymentID == "" { r.logger.Printf("[TRACE] client.alloc_watcher: exiting because alloc isn't part of a deployment") return - } - - // TODO Add to persisted state - if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() { + } else if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() { r.logger.Printf("[TRACE] client.alloc_watcher: exiting because alloc deployment health already determined") return }