open-nomad/nomad/deploymentwatcher/deployment_watcher.go

834 lines
24 KiB
Go
Raw Normal View History

2017-06-26 21:23:52 +00:00
package deploymentwatcher
import (
2017-06-28 04:36:16 +00:00
"context"
2018-03-23 17:56:00 +00:00
"fmt"
2017-06-26 21:23:52 +00:00
"log"
"sync"
"time"
2017-06-28 04:36:16 +00:00
"golang.org/x/time/rate"
2017-08-31 00:45:32 +00:00
memdb "github.com/hashicorp/go-memdb"
2017-07-06 22:03:27 +00:00
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
2017-08-31 00:45:32 +00:00
"github.com/hashicorp/nomad/nomad/state"
2017-06-26 21:23:52 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
const (
2017-07-03 18:08:35 +00:00
// perJobEvalBatchPeriod is the batching length before creating an evaluation to
2017-06-26 21:23:52 +00:00
// trigger the scheduler when allocations are marked as healthy.
2017-07-03 18:08:35 +00:00
perJobEvalBatchPeriod = 1 * time.Second
2017-06-26 21:23:52 +00:00
)
2018-04-07 00:23:35 +00:00
var (
2018-04-10 22:02:52 +00:00
// allowRescheduleTransition is the transition that allows failed
2018-04-07 00:23:35 +00:00
// allocations part of a deployment to be rescheduled. We create a one off
// variable to avoid creating a new object for every request.
2018-04-10 22:02:52 +00:00
allowRescheduleTransition = &structs.DesiredTransition{
2018-04-07 00:23:35 +00:00
Reschedule: helper.BoolToPtr(true),
}
)
2017-06-26 21:23:52 +00:00
// deploymentTriggers are the set of functions required to trigger changes on
// behalf of a deployment
type deploymentTriggers interface {
2018-04-10 22:02:52 +00:00
// createUpdate is used to create allocation desired transition updates and
// an evaluation.
createUpdate(allocs map[string]*structs.DesiredTransition, eval *structs.Evaluation) (uint64, error)
2017-06-26 21:23:52 +00:00
// upsertJob is used to roll back a job when autoreverting for a deployment
upsertJob(job *structs.Job) (uint64, error)
// upsertDeploymentStatusUpdate is used to upsert a deployment status update
// and an optional evaluation and job to upsert
upsertDeploymentStatusUpdate(u *structs.DeploymentStatusUpdate, eval *structs.Evaluation, job *structs.Job) (uint64, error)
// upsertDeploymentPromotion is used to promote canaries in a deployment
upsertDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error)
// upsertDeploymentAllocHealth is used to set the health of allocations in a
// deployment
upsertDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error)
}
// deploymentWatcher is used to watch a single deployment and trigger the
2018-03-11 19:06:05 +00:00
// scheduler when allocation health transitions.
2017-06-26 21:23:52 +00:00
type deploymentWatcher struct {
2017-06-28 04:36:16 +00:00
// queryLimiter is used to limit the rate of blocking queries
queryLimiter *rate.Limiter
2017-06-26 21:23:52 +00:00
// deploymentTriggers holds the methods required to trigger changes on behalf of the
// deployment
deploymentTriggers
2017-08-31 00:45:32 +00:00
// state is the state that is watched for state changes.
state *state.StateStore
2017-06-26 21:23:52 +00:00
// deploymentID is the deployment's ID being watched
deploymentID string
// deploymentUpdateCh is triggered when there is an updated deployment
deploymentUpdateCh chan struct{}
2017-06-26 21:23:52 +00:00
// d is the deployment being watched
d *structs.Deployment
// j is the job the deployment is for
j *structs.Job
// outstandingBatch marks whether an outstanding function exists to create
2018-04-07 00:23:35 +00:00
// the evaluation. Access should be done through the lock.
2017-06-26 21:23:52 +00:00
outstandingBatch bool
2018-04-07 00:23:35 +00:00
// outstandingAllowReplacements is the map of allocations that will be
// marked as allowing a replacement. Access should be done through the lock.
outstandingAllowReplacements map[string]*structs.DesiredTransition
2017-06-28 21:32:11 +00:00
// latestEval is the latest eval for the job. It is updated by the watch
// loop and any time an evaluation is created. The field should be accessed
// by holding the lock or using the setter and getter methods.
2017-06-28 20:19:41 +00:00
latestEval uint64
2017-06-26 21:23:52 +00:00
logger *log.Logger
2017-06-28 04:36:16 +00:00
ctx context.Context
exitFn context.CancelFunc
2017-06-26 21:23:52 +00:00
l sync.RWMutex
}
// newDeploymentWatcher returns a deployment watcher that is used to watch
// deployments and trigger the scheduler as needed.
2017-06-28 21:32:11 +00:00
func newDeploymentWatcher(parent context.Context, queryLimiter *rate.Limiter,
2017-08-31 00:45:32 +00:00
logger *log.Logger, state *state.StateStore, d *structs.Deployment,
2017-06-28 21:32:11 +00:00
j *structs.Job, triggers deploymentTriggers) *deploymentWatcher {
2017-06-26 21:23:52 +00:00
2017-06-28 04:36:16 +00:00
ctx, exitFn := context.WithCancel(parent)
2017-06-26 21:23:52 +00:00
w := &deploymentWatcher{
2017-07-03 18:08:35 +00:00
queryLimiter: queryLimiter,
deploymentID: d.ID,
deploymentUpdateCh: make(chan struct{}, 1),
2017-07-03 18:08:35 +00:00
d: d,
j: j,
2017-08-31 00:45:32 +00:00
state: state,
2017-07-03 18:08:35 +00:00
deploymentTriggers: triggers,
logger: logger,
ctx: ctx,
exitFn: exitFn,
2017-06-26 21:23:52 +00:00
}
2017-06-28 21:32:11 +00:00
// Start the long lived watcher that scans for allocation updates
go w.watch()
2017-06-28 21:32:11 +00:00
2017-06-26 21:23:52 +00:00
return w
}
// updateDeployment is used to update the tracked deployment.
func (w *deploymentWatcher) updateDeployment(d *structs.Deployment) {
w.l.Lock()
defer w.l.Unlock()
// Update and trigger
w.d = d
select {
case w.deploymentUpdateCh <- struct{}{}:
default:
}
}
// getDeployment returns the tracked deployment.
func (w *deploymentWatcher) getDeployment() *structs.Deployment {
w.l.RLock()
defer w.l.RUnlock()
return w.d
}
2017-06-28 04:36:16 +00:00
func (w *deploymentWatcher) SetAllocHealth(
req *structs.DeploymentAllocHealthRequest,
resp *structs.DeploymentUpdateResponse) error {
2017-06-26 21:23:52 +00:00
// If we are failing the deployment, update the status and potentially
// rollback
var j *structs.Job
var u *structs.DeploymentStatusUpdate
// If there are unhealthy allocations we need to mark the deployment as
// failed and check if we should roll back to a stable job.
if l := len(req.UnhealthyAllocationIDs); l != 0 {
unhealthy := make(map[string]struct{}, l)
for _, alloc := range req.UnhealthyAllocationIDs {
unhealthy[alloc] = struct{}{}
}
// Get the allocations for the deployment
2017-08-31 00:45:32 +00:00
snap, err := w.state.Snapshot()
if err != nil {
return err
}
allocs, err := snap.AllocsByDeployment(nil, req.DeploymentID)
if err != nil {
2017-06-28 04:36:16 +00:00
return err
2017-06-26 21:23:52 +00:00
}
2017-07-03 18:08:35 +00:00
// Determine if we should autorevert to an older job
2017-06-26 21:23:52 +00:00
desc := structs.DeploymentStatusDescriptionFailedAllocations
2017-08-31 00:45:32 +00:00
for _, alloc := range allocs {
2017-06-26 21:23:52 +00:00
// Check that the alloc has been marked unhealthy
if _, ok := unhealthy[alloc.ID]; !ok {
continue
}
// Check if the group has autorevert set
group, ok := w.getDeployment().TaskGroups[alloc.TaskGroup]
2017-06-30 19:35:59 +00:00
if !ok || !group.AutoRevert {
2017-06-26 21:23:52 +00:00
continue
}
var err error
j, err = w.latestStableJob()
if err != nil {
2017-06-28 04:36:16 +00:00
return err
2017-06-26 21:23:52 +00:00
}
2017-07-06 22:03:27 +00:00
if j != nil {
j, desc = w.handleRollbackValidity(j, desc)
2017-07-06 22:03:27 +00:00
}
2017-06-26 21:23:52 +00:00
break
}
u = w.getDeploymentStatusUpdate(structs.DeploymentStatusFailed, desc)
}
2017-09-07 23:56:15 +00:00
// Canonicalize the job in case it doesn't have namespace set
j.Canonicalize()
2017-06-26 21:23:52 +00:00
// Create the request
areq := &structs.ApplyDeploymentAllocHealthRequest{
DeploymentAllocHealthRequest: *req,
2018-04-06 20:11:58 +00:00
Timestamp: time.Now(),
Eval: w.getEval(),
DeploymentUpdate: u,
Job: j,
2017-06-26 21:23:52 +00:00
}
index, err := w.upsertDeploymentAllocHealth(areq)
if err != nil {
2017-06-28 04:36:16 +00:00
return err
2017-06-26 21:23:52 +00:00
}
2017-06-28 04:36:16 +00:00
// Build the response
resp.EvalID = areq.Eval.ID
resp.EvalCreateIndex = index
resp.DeploymentModifyIndex = index
2017-06-29 05:00:18 +00:00
resp.Index = index
2017-07-06 22:03:27 +00:00
if j != nil {
resp.RevertedJobVersion = helper.Uint64ToPtr(j.Version)
}
2018-04-06 20:19:20 +00:00
w.setLatestEval(index)
2017-06-28 04:36:16 +00:00
return nil
2017-06-26 21:23:52 +00:00
}
// handleRollbackValidity checks if the job being rolled back to has the same spec as the existing job
// Returns a modified description and job accordingly.
func (w *deploymentWatcher) handleRollbackValidity(rollbackJob *structs.Job, desc string) (*structs.Job, string) {
// Only rollback if job being changed has a different spec.
// This prevents an infinite revert cycle when a previously stable version of the job fails to start up during a rollback
// If the job we are trying to rollback to is identical to the current job, we stop because the rollback will not succeed.
if w.j.SpecChanged(rollbackJob) {
desc = structs.DeploymentStatusDescriptionRollback(desc, rollbackJob.Version)
} else {
desc = structs.DeploymentStatusDescriptionRollbackNoop(desc, rollbackJob.Version)
rollbackJob = nil
}
return rollbackJob, desc
}
2017-06-26 21:23:52 +00:00
2017-06-28 04:36:16 +00:00
func (w *deploymentWatcher) PromoteDeployment(
req *structs.DeploymentPromoteRequest,
resp *structs.DeploymentUpdateResponse) error {
2017-06-26 21:23:52 +00:00
// Create the request
areq := &structs.ApplyDeploymentPromoteRequest{
DeploymentPromoteRequest: *req,
Eval: w.getEval(),
}
index, err := w.upsertDeploymentPromotion(areq)
if err != nil {
2017-06-28 04:36:16 +00:00
return err
2017-06-26 21:23:52 +00:00
}
2017-06-28 04:36:16 +00:00
// Build the response
resp.EvalID = areq.Eval.ID
resp.EvalCreateIndex = index
resp.DeploymentModifyIndex = index
2017-06-29 05:00:18 +00:00
resp.Index = index
2018-04-06 20:19:20 +00:00
w.setLatestEval(index)
2017-06-28 04:36:16 +00:00
return nil
2017-06-26 21:23:52 +00:00
}
2017-06-28 04:36:16 +00:00
func (w *deploymentWatcher) PauseDeployment(
req *structs.DeploymentPauseRequest,
resp *structs.DeploymentUpdateResponse) error {
// Determine the status we should transition to and if we need to create an
2017-06-26 21:23:52 +00:00
// evaluation
status, desc := structs.DeploymentStatusPaused, structs.DeploymentStatusDescriptionPaused
var eval *structs.Evaluation
evalID := ""
if !req.Pause {
status, desc = structs.DeploymentStatusRunning, structs.DeploymentStatusDescriptionRunning
2017-06-28 04:36:16 +00:00
eval = w.getEval()
2017-06-26 21:23:52 +00:00
evalID = eval.ID
}
update := w.getDeploymentStatusUpdate(status, desc)
// Commit the change
i, err := w.upsertDeploymentStatusUpdate(update, eval, nil)
if err != nil {
2017-06-28 04:36:16 +00:00
return err
2017-06-26 21:23:52 +00:00
}
2017-06-28 04:36:16 +00:00
// Build the response
2017-06-29 05:00:18 +00:00
if evalID != "" {
resp.EvalID = evalID
resp.EvalCreateIndex = i
}
2017-06-28 04:36:16 +00:00
resp.DeploymentModifyIndex = i
2017-06-29 05:00:18 +00:00
resp.Index = i
2018-04-06 20:19:20 +00:00
w.setLatestEval(i)
2017-06-28 04:36:16 +00:00
return nil
2017-06-26 21:23:52 +00:00
}
2017-06-28 23:29:48 +00:00
func (w *deploymentWatcher) FailDeployment(
2017-06-29 05:00:18 +00:00
req *structs.DeploymentFailRequest,
2017-06-28 23:29:48 +00:00
resp *structs.DeploymentUpdateResponse) error {
status, desc := structs.DeploymentStatusFailed, structs.DeploymentStatusDescriptionFailedByUser
2017-07-06 22:03:27 +00:00
// Determine if we should rollback
rollback := false
for _, state := range w.getDeployment().TaskGroups {
2017-07-06 22:03:27 +00:00
if state.AutoRevert {
rollback = true
break
}
}
var rollbackJob *structs.Job
if rollback {
var err error
rollbackJob, err = w.latestStableJob()
if err != nil {
return err
}
if rollbackJob != nil {
rollbackJob, desc = w.handleRollbackValidity(rollbackJob, desc)
} else {
desc = structs.DeploymentStatusDescriptionNoRollbackTarget(desc)
2017-07-06 22:03:27 +00:00
}
}
2017-06-28 23:29:48 +00:00
// Commit the change
2017-07-06 22:03:27 +00:00
update := w.getDeploymentStatusUpdate(status, desc)
eval := w.getEval()
i, err := w.upsertDeploymentStatusUpdate(update, eval, rollbackJob)
2017-06-28 23:29:48 +00:00
if err != nil {
return err
}
// Build the response
resp.EvalID = eval.ID
resp.EvalCreateIndex = i
resp.DeploymentModifyIndex = i
2017-06-29 05:00:18 +00:00
resp.Index = i
2017-07-06 22:03:27 +00:00
if rollbackJob != nil {
resp.RevertedJobVersion = helper.Uint64ToPtr(rollbackJob.Version)
}
2018-04-06 20:19:20 +00:00
w.setLatestEval(i)
2017-06-28 23:29:48 +00:00
return nil
}
2017-06-26 21:23:52 +00:00
// StopWatch stops watching the deployment. This should be called whenever a
// deployment is completed or the watcher is no longer needed.
func (w *deploymentWatcher) StopWatch() {
2017-06-28 04:36:16 +00:00
w.exitFn()
2017-06-26 21:23:52 +00:00
}
// watch is the long running watcher that watches for both allocation and
// deployment changes. Its function is to create evaluations to trigger the
// scheduler when more progress can be made, to fail the deployment if it has
2018-04-10 18:42:13 +00:00
// failed and potentially rolling back the job. Progress can be made when an
2018-04-10 22:02:52 +00:00
// allocation transitions to healthy, so we create an eval.
2017-06-26 21:23:52 +00:00
func (w *deploymentWatcher) watch() {
// Get the deadline. This is likely a zero time to begin with but we need to
// handle the case that the deployment has already progressed and we are now
2018-04-10 18:42:13 +00:00
// just starting to watch it. This must likely would occur if there was a
2018-04-10 22:02:52 +00:00
// leader transition and we are now starting our watcher.
currentDeadline := getDeploymentProgressCutoff(w.getDeployment())
var deadlineTimer *time.Timer
if currentDeadline.IsZero() {
deadlineTimer = time.NewTimer(0)
if !deadlineTimer.Stop() {
<-deadlineTimer.C
2018-04-04 20:54:53 +00:00
}
} else {
deadlineTimer = time.NewTimer(currentDeadline.Sub(time.Now()))
2018-03-23 17:56:00 +00:00
}
allocIndex := uint64(1)
var updates *allocUpdates
rollback, deadlineHit := false, false
FAIL:
for {
select {
case <-w.ctx.Done():
return
case <-deadlineTimer.C:
// We have hit the progress deadline so fail the deployment. We need
2018-04-10 18:42:13 +00:00
// to determine whether we should roll back the job by inspecting
2018-03-23 17:56:00 +00:00
// which allocs as part of the deployment are healthy and which
// aren't.
deadlineHit = true
fail, rback, err := w.shouldFail()
2018-03-23 17:56:00 +00:00
if err != nil {
w.logger.Printf("[ERR] nomad.deployment_watcher: failed to determine whether to rollback job for deployment %q: %v", w.deploymentID, err)
2018-03-23 17:56:00 +00:00
}
if !fail {
w.logger.Printf("[DEBUG] nomad.deployment_watcher: skipping deadline for deployment %q", w.deploymentID)
continue
}
w.logger.Printf("[DEBUG] nomad.deployment_watcher: deadline for deployment %q hit and rollback is %v", w.deploymentID, rback)
rollback = rback
2018-03-23 17:56:00 +00:00
break FAIL
case <-w.deploymentUpdateCh:
// Get the updated deployment and check if we should change the
// deadline timer
next := getDeploymentProgressCutoff(w.getDeployment())
if !next.Equal(currentDeadline) {
prevDeadlineZero := currentDeadline.IsZero()
currentDeadline = next
// The most recent deadline can be zero if no allocs were created for this deployment.
// The deadline timer would have already been stopped once in that case. To prevent
// deadlocking on the already stopped deadline timer, we only drain the channel if
// the previous deadline was not zero.
if !prevDeadlineZero && !deadlineTimer.Stop() {
select {
case <-deadlineTimer.C:
default:
}
}
deadlineTimer.Reset(next.Sub(time.Now()))
}
2018-03-23 17:56:00 +00:00
case updates = <-w.getAllocsCh(allocIndex):
if err := updates.err; err != nil {
if err == context.Canceled || w.ctx.Err() == context.Canceled {
return
}
w.logger.Printf("[ERR] nomad.deployment_watcher: failed to retrieve allocations for deployment %q: %v", w.deploymentID, err)
2018-03-23 17:56:00 +00:00
return
}
allocIndex = updates.index
// We have allocation changes for this deployment so determine the
// steps to take.
res, err := w.handleAllocUpdate(updates.allocs)
2018-03-23 17:56:00 +00:00
if err != nil {
if err == context.Canceled || w.ctx.Err() == context.Canceled {
return
}
w.logger.Printf("[ERR] nomad.deployment_watcher: failed handling allocation updates: %v", err)
return
}
// The deployment has failed, so break out of the watch loop and
// handle the failure
2018-03-23 17:56:00 +00:00
if res.failDeployment {
rollback = res.rollback
break FAIL
}
// Create an eval to push the deployment along
2018-04-07 00:23:35 +00:00
if res.createEval || len(res.allowReplacements) != 0 {
w.createBatchedUpdate(res.allowReplacements, allocIndex)
2018-03-23 17:56:00 +00:00
}
}
}
// Change the deployments status to failed
desc := structs.DeploymentStatusDescriptionFailedAllocations
if deadlineHit {
desc = structs.DeploymentStatusDescriptionProgressDeadline
}
// Rollback to the old job if necessary
var j *structs.Job
if rollback {
var err error
j, err = w.latestStableJob()
if err != nil {
w.logger.Printf("[ERR] nomad.deployment_watcher: failed to lookup latest stable job for %q: %v", w.j.ID, err)
2018-03-23 17:56:00 +00:00
}
// Description should include that the job is being rolled back to
// version N
if j != nil {
j, desc = w.handleRollbackValidity(j, desc)
} else {
desc = structs.DeploymentStatusDescriptionNoRollbackTarget(desc)
}
}
// Update the status of the deployment to failed and create an evaluation.
e := w.getEval()
u := w.getDeploymentStatusUpdate(structs.DeploymentStatusFailed, desc)
2018-04-06 20:19:20 +00:00
if index, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil {
w.logger.Printf("[ERR] nomad.deployment_watcher: failed to update deployment %q status: %v", w.deploymentID, err)
2018-04-06 20:19:20 +00:00
} else {
w.setLatestEval(index)
2018-03-23 17:56:00 +00:00
}
}
// allocUpdateResult is used to return the desired actions given the newest set
// of allocations for the deployment.
2018-03-23 17:56:00 +00:00
type allocUpdateResult struct {
2018-04-07 00:23:35 +00:00
createEval bool
failDeployment bool
rollback bool
allowReplacements []string
2018-03-23 17:56:00 +00:00
}
// handleAllocUpdate is used to compute the set of actions to take based on the
// updated allocations for the deployment.
func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (allocUpdateResult, error) {
2018-03-23 17:56:00 +00:00
var res allocUpdateResult
// Get the latest evaluation index
latestEval, err := w.latestEvalIndex()
if err != nil {
if err == context.Canceled || w.ctx.Err() == context.Canceled {
return res, err
}
return res, fmt.Errorf("failed to determine last evaluation index for job %q: %v", w.j.ID, err)
2018-03-23 17:56:00 +00:00
}
deployment := w.getDeployment()
2018-03-23 17:56:00 +00:00
for _, alloc := range allocs {
dstate, ok := deployment.TaskGroups[alloc.TaskGroup]
if !ok {
2018-03-23 17:56:00 +00:00
continue
}
// Nothing to do for this allocation
if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.ModifyIndex <= latestEval {
continue
}
2018-04-07 00:23:35 +00:00
// Determine if the update stanza for this group is progress based
progressBased := dstate.ProgressDeadline != 0
2018-03-23 17:56:00 +00:00
// We need to create an eval so the job can progress.
if alloc.DeploymentStatus.IsHealthy() {
2018-03-23 17:56:00 +00:00
res.createEval = true
2018-04-07 00:23:35 +00:00
} else if progressBased && alloc.DeploymentStatus.IsUnhealthy() && deployment.Active() && !alloc.DesiredTransition.ShouldReschedule() {
res.allowReplacements = append(res.allowReplacements, alloc.ID)
2018-03-23 17:56:00 +00:00
}
2018-04-07 00:23:35 +00:00
// If the group is using a progress deadline, we don't have to do anything.
if progressBased {
2018-03-23 17:56:00 +00:00
continue
}
// Fail on the first bad allocation
if alloc.DeploymentStatus.IsUnhealthy() {
// Check if the group has autorevert set
if dstate.AutoRevert {
2018-03-23 17:56:00 +00:00
res.rollback = true
}
// Since we have an unhealthy allocation, fail the deployment
res.failDeployment = true
}
// All conditions have been hit so we can break
if res.createEval && res.failDeployment && res.rollback {
break
}
}
return res, nil
}
// shouldFail returns whether the job should be failed and whether it should
// rolled back to an earlier stable version by examining the allocations in the
// deployment.
func (w *deploymentWatcher) shouldFail() (fail, rollback bool, err error) {
2018-03-23 17:56:00 +00:00
snap, err := w.state.Snapshot()
if err != nil {
return false, false, err
2018-03-23 17:56:00 +00:00
}
d, err := snap.DeploymentByID(nil, w.deploymentID)
2018-03-23 17:56:00 +00:00
if err != nil {
return false, false, err
2018-03-23 17:56:00 +00:00
}
if d == nil {
// The deployment wasn't in the state store, possibly due to a system gc
return false, false, fmt.Errorf("deployment id not found: %q", w.deploymentID)
}
2018-03-23 17:56:00 +00:00
fail = false
2018-03-23 17:56:00 +00:00
for tg, state := range d.TaskGroups {
// If we are in a canary state we fail if there aren't enough healthy
// allocs to satisfy DesiredCanaries
if state.DesiredCanaries > 0 && !state.Promoted {
if state.HealthyAllocs >= state.DesiredCanaries {
continue
}
} else if state.HealthyAllocs >= state.DesiredTotal {
2018-03-23 17:56:00 +00:00
continue
}
// We have failed this TG
fail = true
2018-03-23 17:56:00 +00:00
// We don't need to autorevert this group
upd := w.j.LookupTaskGroup(tg).Update
if upd == nil || !upd.AutoRevert {
continue
}
// Unhealthy allocs and we need to autorevert
return true, true, nil
2018-03-23 17:56:00 +00:00
}
return fail, false, nil
2018-03-23 17:56:00 +00:00
}
// getDeploymentProgressCutoff returns the progress cutoff for the given
// deployment
func getDeploymentProgressCutoff(d *structs.Deployment) time.Time {
var next time.Time
for _, state := range d.TaskGroups {
if next.IsZero() || state.RequireProgressBy.Before(next) {
next = state.RequireProgressBy
}
}
return next
}
2017-06-26 21:23:52 +00:00
// latestStableJob returns the latest stable job. It may be nil if none exist
func (w *deploymentWatcher) latestStableJob() (*structs.Job, error) {
2017-08-31 00:45:32 +00:00
snap, err := w.state.Snapshot()
if err != nil {
return nil, err
}
versions, err := snap.JobVersionsByID(nil, w.j.Namespace, w.j.ID)
2017-08-31 00:45:32 +00:00
if err != nil {
2017-06-26 21:23:52 +00:00
return nil, err
}
var stable *structs.Job
2017-08-31 00:45:32 +00:00
for _, job := range versions {
2017-06-26 21:23:52 +00:00
if job.Stable {
stable = job
break
}
}
return stable, nil
}
2018-04-07 00:23:35 +00:00
// createBatchedUpdate creates an eval for the given index as well as updating
// the given allocations to allow them to reschedule.
func (w *deploymentWatcher) createBatchedUpdate(allowReplacements []string, forIndex uint64) {
2017-06-26 21:23:52 +00:00
w.l.Lock()
defer w.l.Unlock()
2018-04-07 00:23:35 +00:00
// Store the allocations that can be replaced
for _, allocID := range allowReplacements {
if w.outstandingAllowReplacements == nil {
w.outstandingAllowReplacements = make(map[string]*structs.DesiredTransition, len(allowReplacements))
}
2018-04-10 22:02:52 +00:00
w.outstandingAllowReplacements[allocID] = allowRescheduleTransition
2018-04-07 00:23:35 +00:00
}
if w.outstandingBatch || (forIndex < w.latestEval && len(allowReplacements) == 0) {
2017-06-26 21:23:52 +00:00
return
}
2017-06-28 19:58:05 +00:00
w.outstandingBatch = true
2017-07-03 18:08:35 +00:00
time.AfterFunc(perJobEvalBatchPeriod, func() {
2018-02-20 20:47:43 +00:00
// If the timer has been created and then we shutdown, we need to no-op
// the evaluation creation.
select {
case <-w.ctx.Done():
return
default:
}
2018-04-07 00:23:35 +00:00
w.l.Lock()
replacements := w.outstandingAllowReplacements
w.outstandingAllowReplacements = nil
w.outstandingBatch = false
w.l.Unlock()
2017-06-28 21:32:11 +00:00
// Create the eval
2018-04-07 00:23:35 +00:00
if index, err := w.createUpdate(replacements, w.getEval()); err != nil {
w.logger.Printf("[ERR] nomad.deployment_watcher: failed to create evaluation for deployment %q: %v", w.deploymentID, err)
2018-04-06 20:19:20 +00:00
} else {
w.setLatestEval(index)
2017-06-28 20:19:41 +00:00
}
2017-07-03 18:08:35 +00:00
})
2017-06-26 21:23:52 +00:00
}
// getEval returns an evaluation suitable for the deployment
func (w *deploymentWatcher) getEval() *structs.Evaluation {
return &structs.Evaluation{
ID: uuid.Generate(),
2017-09-07 23:56:15 +00:00
Namespace: w.j.Namespace,
2017-06-26 21:23:52 +00:00
Priority: w.j.Priority,
Type: w.j.Type,
2017-07-03 18:08:35 +00:00
TriggeredBy: structs.EvalTriggerDeploymentWatcher,
2017-06-26 21:23:52 +00:00
JobID: w.j.ID,
DeploymentID: w.deploymentID,
2017-06-26 21:23:52 +00:00
Status: structs.EvalStatusPending,
}
}
// getDeploymentStatusUpdate returns a deployment status update
func (w *deploymentWatcher) getDeploymentStatusUpdate(status, desc string) *structs.DeploymentStatusUpdate {
return &structs.DeploymentStatusUpdate{
DeploymentID: w.deploymentID,
2017-06-26 21:23:52 +00:00
Status: status,
StatusDescription: desc,
}
}
2018-03-23 17:56:00 +00:00
type allocUpdates struct {
allocs []*structs.AllocListStub
index uint64
err error
}
// getAllocsCh retrieves the allocations that are part of the deployment blocking
// at the given index.
func (w *deploymentWatcher) getAllocsCh(index uint64) <-chan *allocUpdates {
out := make(chan *allocUpdates, 1)
go func() {
allocs, index, err := w.getAllocs(index)
out <- &allocUpdates{
allocs: allocs,
index: index,
err: err,
}
}()
return out
}
2017-06-26 21:23:52 +00:00
// getAllocs retrieves the allocations that are part of the deployment blocking
// at the given index.
2017-08-31 00:45:32 +00:00
func (w *deploymentWatcher) getAllocs(index uint64) ([]*structs.AllocListStub, uint64, error) {
resp, index, err := w.state.BlockingQuery(w.getAllocsImpl, index, w.ctx)
if err != nil {
return nil, 0, err
}
if err := w.ctx.Err(); err != nil {
return nil, 0, err
2017-06-28 04:36:16 +00:00
}
2017-08-31 00:45:32 +00:00
return resp.([]*structs.AllocListStub), index, nil
}
2017-06-26 21:23:52 +00:00
2017-08-31 00:45:32 +00:00
// getDeploysImpl retrieves all deployments from the passed state store.
func (w *deploymentWatcher) getAllocsImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
if err := w.queryLimiter.Wait(w.ctx); err != nil {
return nil, 0, err
}
// Capture all the allocations
allocs, err := state.AllocsByDeployment(ws, w.deploymentID)
2017-08-31 00:45:32 +00:00
if err != nil {
return nil, 0, err
}
stubs := make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
stubs = append(stubs, alloc.Stub())
2017-06-28 04:36:16 +00:00
}
2017-06-26 21:23:52 +00:00
2017-08-31 00:45:32 +00:00
// Use the last index that affected the jobs table
index, err := state.Index("allocs")
if err != nil {
return nil, index, err
}
return stubs, index, nil
2017-06-26 21:23:52 +00:00
}
2017-06-28 04:36:16 +00:00
// latestEvalIndex returns the index of the last evaluation created for
2017-06-26 21:23:52 +00:00
// the job. The index is used to determine if an allocation update requires an
// evaluation to be triggered.
2017-06-28 04:36:16 +00:00
func (w *deploymentWatcher) latestEvalIndex() (uint64, error) {
if err := w.queryLimiter.Wait(w.ctx); err != nil {
return 0, err
}
2017-08-31 00:45:32 +00:00
snap, err := w.state.Snapshot()
if err != nil {
return 0, err
2017-06-26 21:23:52 +00:00
}
2017-08-31 00:45:32 +00:00
evals, err := snap.EvalsByJob(nil, w.j.Namespace, w.j.ID)
2017-06-26 21:23:52 +00:00
if err != nil {
2017-06-28 04:36:16 +00:00
return 0, err
2017-06-26 21:23:52 +00:00
}
2017-08-31 00:45:32 +00:00
if len(evals) == 0 {
idx, err := snap.Index("evals")
2018-04-06 20:19:20 +00:00
if err != nil {
w.setLatestEval(idx)
}
2017-08-31 00:45:32 +00:00
return idx, err
2017-06-28 04:36:16 +00:00
}
// Prefer using the snapshot index. Otherwise use the create index
2017-08-31 00:45:32 +00:00
e := evals[0]
2017-06-28 04:36:16 +00:00
if e.SnapshotIndex != 0 {
2018-04-06 20:19:20 +00:00
w.setLatestEval(e.SnapshotIndex)
2017-06-28 04:36:16 +00:00
return e.SnapshotIndex, nil
2017-06-26 21:23:52 +00:00
}
2018-04-06 20:19:20 +00:00
w.setLatestEval(e.CreateIndex)
2017-06-28 04:36:16 +00:00
return e.CreateIndex, nil
2017-06-26 21:23:52 +00:00
}
2018-04-06 20:19:20 +00:00
// setLatestEval sets the given index as the latest eval unless the currently
// stored index is higher.
func (w *deploymentWatcher) setLatestEval(index uint64) {
w.l.Lock()
defer w.l.Unlock()
if index > w.latestEval {
w.latestEval = index
}
}
// getLatestEval returns the latest eval index.
func (w *deploymentWatcher) getLatestEval() uint64 {
w.l.Lock()
defer w.l.Unlock()
return w.latestEval
}