2017-06-26 21:23:52 +00:00
|
|
|
package deploymentwatcher
|
|
|
|
|
|
|
|
import (
|
2017-06-28 04:36:16 +00:00
|
|
|
"context"
|
2017-06-26 21:23:52 +00:00
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"sync"
|
2017-06-28 21:25:20 +00:00
|
|
|
"time"
|
2017-06-26 21:23:52 +00:00
|
|
|
|
2017-06-28 04:36:16 +00:00
|
|
|
"golang.org/x/time/rate"
|
|
|
|
|
2017-06-26 21:23:52 +00:00
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
)
|
|
|
|
|
|
|
|
// DeploymentRaftEndpoints exposes the deployment watcher to a set of functions
|
|
|
|
// to apply data transforms via Raft.
|
|
|
|
type DeploymentRaftEndpoints interface {
|
|
|
|
// UpsertEvals is used to upsert a set of evaluations
|
|
|
|
UpsertEvals([]*structs.Evaluation) (uint64, error)
|
|
|
|
|
|
|
|
// UpsertJob is used to upsert a job
|
|
|
|
UpsertJob(job *structs.Job) (uint64, error)
|
|
|
|
|
|
|
|
// UpsertDeploymentStatusUpdate is used to upsert a deployment status update
|
|
|
|
// and potentially create an evaluation.
|
|
|
|
UpsertDeploymentStatusUpdate(u *structs.DeploymentStatusUpdateRequest) (uint64, error)
|
|
|
|
|
|
|
|
// UpsertDeploymentPromotion is used to promote canaries in a deployment
|
|
|
|
UpsertDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error)
|
|
|
|
|
|
|
|
// UpsertDeploymentAllocHealth is used to set the health of allocations in a
|
|
|
|
// deployment
|
|
|
|
UpsertDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// DeploymentStateWatchers are the set of functions required to watch objects on
|
|
|
|
// behalf of a deployment
|
|
|
|
type DeploymentStateWatchers interface {
|
|
|
|
// Evaluations returns the set of evaluations for the given job
|
|
|
|
Evaluations(args *structs.JobSpecificRequest, reply *structs.JobEvaluationsResponse) error
|
|
|
|
|
|
|
|
// Allocations returns the set of allocations that are part of the
|
|
|
|
// deployment.
|
|
|
|
Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error
|
|
|
|
|
2017-06-27 18:52:14 +00:00
|
|
|
// List is used to list all the deployments in the system
|
|
|
|
List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error
|
|
|
|
|
2017-06-26 21:23:52 +00:00
|
|
|
// GetJobVersions is used to lookup the versions of a job. This is used when
|
|
|
|
// rolling back to find the latest stable job
|
|
|
|
GetJobVersions(args *structs.JobSpecificRequest, reply *structs.JobVersionsResponse) error
|
2017-06-27 18:52:14 +00:00
|
|
|
|
|
|
|
// GetJob is used to lookup a particular job.
|
|
|
|
GetJob(args *structs.JobSpecificRequest, reply *structs.SingleJobResponse) error
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
2017-06-28 04:36:16 +00:00
|
|
|
const (
|
2017-06-28 21:25:20 +00:00
|
|
|
// LimitStateQueriesPerSecond is the number of state queries allowed per
|
2017-06-28 04:36:16 +00:00
|
|
|
// second
|
2017-06-28 21:25:20 +00:00
|
|
|
LimitStateQueriesPerSecond = 15.0
|
|
|
|
|
|
|
|
// EvalBatchDuration is the duration in which evaluations are batched before
|
|
|
|
// commiting to Raft.
|
|
|
|
EvalBatchDuration = 250 * time.Millisecond
|
2017-06-28 04:36:16 +00:00
|
|
|
)
|
|
|
|
|
2017-06-26 21:23:52 +00:00
|
|
|
// Watcher is used to watch deployments and their allocations created
|
|
|
|
// by the scheduler and trigger the scheduler when allocation health
|
|
|
|
// transistions.
|
|
|
|
type Watcher struct {
|
|
|
|
enabled bool
|
|
|
|
logger *log.Logger
|
|
|
|
|
2017-06-28 04:36:16 +00:00
|
|
|
// queryLimiter is used to limit the rate of blocking queries
|
|
|
|
queryLimiter *rate.Limiter
|
|
|
|
|
2017-06-28 21:25:20 +00:00
|
|
|
// evalBatchDuration is the duration to batch eval creation across all
|
|
|
|
// deployment watchers
|
|
|
|
evalBatchDuration time.Duration
|
|
|
|
|
2017-06-26 21:23:52 +00:00
|
|
|
// raft contains the set of Raft endpoints that can be used by the
|
|
|
|
// deployments watcher
|
|
|
|
raft DeploymentRaftEndpoints
|
|
|
|
|
|
|
|
// stateWatchers is the set of functions required to watch a deployment for
|
|
|
|
// state changes
|
|
|
|
stateWatchers DeploymentStateWatchers
|
|
|
|
|
|
|
|
// watchers is the set of active watchers, one per deployment
|
|
|
|
watchers map[string]*deploymentWatcher
|
|
|
|
|
|
|
|
// evalBatcher is used to batch the creation of evaluations
|
|
|
|
evalBatcher *EvalBatcher
|
|
|
|
|
2017-06-28 04:36:16 +00:00
|
|
|
// ctx and exitFn are used to cancel the watcher
|
|
|
|
ctx context.Context
|
|
|
|
exitFn context.CancelFunc
|
2017-06-26 21:23:52 +00:00
|
|
|
|
|
|
|
l sync.RWMutex
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewDeploymentsWatcher returns a deployments watcher that is used to watch
|
|
|
|
// deployments and trigger the scheduler as needed.
|
2017-06-28 22:35:52 +00:00
|
|
|
func NewDeploymentsWatcher(logger *log.Logger, stateQueriesPerSecond float64,
|
2017-06-28 21:25:20 +00:00
|
|
|
evalBatchDuration time.Duration) *Watcher {
|
2017-06-28 22:35:52 +00:00
|
|
|
|
2017-06-26 21:23:52 +00:00
|
|
|
return &Watcher{
|
2017-06-28 21:25:20 +00:00
|
|
|
queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100),
|
|
|
|
evalBatchDuration: evalBatchDuration,
|
|
|
|
logger: logger,
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-06-28 22:35:52 +00:00
|
|
|
// SetStateWatchers sets the interface for accessing state watchers
|
|
|
|
func (w *Watcher) SetStateWatchers(watchers DeploymentStateWatchers) {
|
|
|
|
w.l.Lock()
|
|
|
|
defer w.l.Unlock()
|
|
|
|
w.stateWatchers = watchers
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetRaftEndpoints sets the interface for writing to Raft
|
|
|
|
func (w *Watcher) SetRaftEndpoints(raft DeploymentRaftEndpoints) {
|
|
|
|
w.l.Lock()
|
|
|
|
defer w.l.Unlock()
|
|
|
|
w.raft = raft
|
|
|
|
}
|
|
|
|
|
2017-06-26 21:23:52 +00:00
|
|
|
// SetEnabled is used to control if the watcher is enabled. The watcher
|
|
|
|
// should only be enabled on the active leader.
|
2017-06-28 22:35:52 +00:00
|
|
|
func (w *Watcher) SetEnabled(enabled bool) error {
|
2017-06-26 21:23:52 +00:00
|
|
|
w.l.Lock()
|
2017-06-28 22:35:52 +00:00
|
|
|
// Ensure our state is correct
|
|
|
|
if w.stateWatchers == nil || w.raft == nil {
|
|
|
|
return fmt.Errorf("State watchers and Raft endpoints must be set before starting")
|
|
|
|
}
|
|
|
|
|
2017-06-27 18:52:14 +00:00
|
|
|
wasEnabled := w.enabled
|
2017-06-26 21:23:52 +00:00
|
|
|
w.enabled = enabled
|
|
|
|
w.l.Unlock()
|
2017-06-28 22:35:52 +00:00
|
|
|
|
|
|
|
// Flush the state to create the necessary objects
|
|
|
|
w.Flush()
|
|
|
|
|
|
|
|
// If we are starting now, launch the watch daemon
|
|
|
|
if enabled && !wasEnabled {
|
2017-06-27 18:52:14 +00:00
|
|
|
go w.watchDeployments()
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
2017-06-28 22:35:52 +00:00
|
|
|
|
|
|
|
return nil
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Flush is used to clear the state of the watcher
|
|
|
|
func (w *Watcher) Flush() {
|
|
|
|
w.l.Lock()
|
|
|
|
defer w.l.Unlock()
|
|
|
|
|
|
|
|
// Stop all the watchers and clear it
|
|
|
|
for _, watcher := range w.watchers {
|
|
|
|
watcher.StopWatch()
|
|
|
|
}
|
|
|
|
|
2017-06-28 04:36:16 +00:00
|
|
|
// Kill everything associated with the watcher
|
2017-06-28 22:35:52 +00:00
|
|
|
if w.exitFn != nil {
|
|
|
|
w.exitFn()
|
|
|
|
}
|
2017-06-26 21:23:52 +00:00
|
|
|
|
|
|
|
w.watchers = make(map[string]*deploymentWatcher, 32)
|
2017-06-28 04:36:16 +00:00
|
|
|
w.ctx, w.exitFn = context.WithCancel(context.Background())
|
2017-06-28 21:25:20 +00:00
|
|
|
w.evalBatcher = NewEvalBatcher(w.evalBatchDuration, w.raft, w.ctx)
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
2017-06-27 18:52:14 +00:00
|
|
|
// watchDeployments is the long lived go-routine that watches for deployments to
|
|
|
|
// add and remove watchers on.
|
|
|
|
func (w *Watcher) watchDeployments() {
|
|
|
|
dindex := uint64(0)
|
|
|
|
for {
|
|
|
|
// Block getting all deployments using the last deployment index.
|
2017-06-28 04:36:16 +00:00
|
|
|
resp, err := w.getDeploys(dindex)
|
|
|
|
if err != nil {
|
|
|
|
if err == context.Canceled {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
w.logger.Printf("[ERR] nomad.deployments_watcher: failed to retrieve deploylements: %v", err)
|
2017-06-27 18:52:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Guard against npe
|
|
|
|
if resp == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ensure we are tracking the things we should and not tracking what we
|
|
|
|
// shouldn't be
|
|
|
|
for _, d := range resp.Deployments {
|
|
|
|
if d.Active() {
|
|
|
|
if err := w.add(d); err != nil {
|
|
|
|
w.logger.Printf("[ERR] nomad.deployments_watcher: failed to track deployment %q: %v", d.ID, err)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
w.remove(d)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update the latest index
|
|
|
|
dindex = resp.Index
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// getDeploys retrieves all deployments blocking at the given index.
|
2017-06-28 04:36:16 +00:00
|
|
|
func (w *Watcher) getDeploys(index uint64) (*structs.DeploymentListResponse, error) {
|
|
|
|
// Build the request
|
|
|
|
args := &structs.DeploymentListRequest{
|
|
|
|
QueryOptions: structs.QueryOptions{
|
|
|
|
MinQueryIndex: index,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
var resp structs.DeploymentListResponse
|
|
|
|
|
|
|
|
for resp.Index <= index {
|
|
|
|
if err := w.queryLimiter.Wait(w.ctx); err != nil {
|
|
|
|
return nil, err
|
2017-06-27 18:52:14 +00:00
|
|
|
}
|
|
|
|
|
2017-06-28 04:36:16 +00:00
|
|
|
if err := w.stateWatchers.List(args, &resp); err != nil {
|
|
|
|
return nil, err
|
2017-06-27 18:52:14 +00:00
|
|
|
}
|
2017-06-28 04:36:16 +00:00
|
|
|
}
|
2017-06-27 18:52:14 +00:00
|
|
|
|
2017-06-28 04:36:16 +00:00
|
|
|
return &resp, nil
|
2017-06-27 18:52:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// add adds a deployment to the watch list
|
|
|
|
func (w *Watcher) add(d *structs.Deployment) error {
|
2017-06-26 21:23:52 +00:00
|
|
|
w.l.Lock()
|
|
|
|
defer w.l.Unlock()
|
|
|
|
|
|
|
|
// Not enabled so no-op
|
|
|
|
if !w.enabled {
|
2017-06-27 18:52:14 +00:00
|
|
|
return nil
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Already watched so no-op
|
|
|
|
if _, ok := w.watchers[d.ID]; ok {
|
2017-06-27 18:52:14 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the job the deployment is referencing
|
|
|
|
args := &structs.JobSpecificRequest{
|
|
|
|
JobID: d.JobID,
|
|
|
|
}
|
|
|
|
var resp structs.SingleJobResponse
|
|
|
|
if err := w.stateWatchers.GetJob(args, &resp); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if resp.Job == nil {
|
|
|
|
return fmt.Errorf("deployment %q references unknown job %q", d.ID, d.JobID)
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
2017-06-28 04:36:16 +00:00
|
|
|
w.watchers[d.ID] = newDeploymentWatcher(w.ctx, w.queryLimiter, w.logger, w.stateWatchers, d, resp.Job, w)
|
2017-06-27 18:52:14 +00:00
|
|
|
return nil
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
2017-06-27 18:52:14 +00:00
|
|
|
// remove stops watching a deployment. This can be because the deployment is
|
2017-06-26 21:23:52 +00:00
|
|
|
// complete or being deleted.
|
2017-06-27 18:52:14 +00:00
|
|
|
func (w *Watcher) remove(d *structs.Deployment) {
|
2017-06-26 21:23:52 +00:00
|
|
|
w.l.Lock()
|
|
|
|
defer w.l.Unlock()
|
|
|
|
|
|
|
|
// Not enabled so no-op
|
|
|
|
if !w.enabled {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if watcher, ok := w.watchers[d.ID]; ok {
|
|
|
|
watcher.StopWatch()
|
|
|
|
delete(w.watchers, d.ID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// SetAllocHealth is used to set the health of allocations for a deployment. If
|
|
|
|
// there are any unhealthy allocations, the deployment is updated to be failed.
|
|
|
|
// Otherwise the allocations are updated and an evaluation is created.
|
2017-06-28 04:36:16 +00:00
|
|
|
func (w *Watcher) SetAllocHealth(req *structs.DeploymentAllocHealthRequest, resp *structs.DeploymentUpdateResponse) error {
|
2017-06-26 21:23:52 +00:00
|
|
|
w.l.Lock()
|
|
|
|
defer w.l.Unlock()
|
|
|
|
|
|
|
|
// Not enabled so no-op
|
|
|
|
if !w.enabled {
|
2017-06-28 04:36:16 +00:00
|
|
|
return nil
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
watcher, ok := w.watchers[req.DeploymentID]
|
|
|
|
if !ok {
|
2017-06-28 04:36:16 +00:00
|
|
|
return fmt.Errorf("deployment %q not being watched for updates", req.DeploymentID)
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
2017-06-28 04:36:16 +00:00
|
|
|
return watcher.SetAllocHealth(req, resp)
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// PromoteDeployment is used to promote a deployment. If promote is false,
|
|
|
|
// deployment is marked as failed. Otherwise the deployment is updated and an
|
|
|
|
// evaluation is created.
|
2017-06-28 04:36:16 +00:00
|
|
|
func (w *Watcher) PromoteDeployment(req *structs.DeploymentPromoteRequest, resp *structs.DeploymentUpdateResponse) error {
|
2017-06-26 21:23:52 +00:00
|
|
|
w.l.Lock()
|
|
|
|
defer w.l.Unlock()
|
|
|
|
|
|
|
|
// Not enabled so no-op
|
|
|
|
if !w.enabled {
|
2017-06-28 04:36:16 +00:00
|
|
|
return nil
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
watcher, ok := w.watchers[req.DeploymentID]
|
|
|
|
if !ok {
|
2017-06-28 04:36:16 +00:00
|
|
|
return fmt.Errorf("deployment %q not being watched for updates", req.DeploymentID)
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
2017-06-28 04:36:16 +00:00
|
|
|
return watcher.PromoteDeployment(req, resp)
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// PauseDeployment is used to toggle the pause state on a deployment. If the
|
|
|
|
// deployment is being unpaused, an evaluation is created.
|
2017-06-28 04:36:16 +00:00
|
|
|
func (w *Watcher) PauseDeployment(req *structs.DeploymentPauseRequest, resp *structs.DeploymentUpdateResponse) error {
|
2017-06-26 21:23:52 +00:00
|
|
|
w.l.Lock()
|
|
|
|
defer w.l.Unlock()
|
|
|
|
|
|
|
|
// Not enabled so no-op
|
|
|
|
if !w.enabled {
|
2017-06-28 04:36:16 +00:00
|
|
|
return nil
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
watcher, ok := w.watchers[req.DeploymentID]
|
|
|
|
if !ok {
|
2017-06-28 04:36:16 +00:00
|
|
|
return fmt.Errorf("deployment %q not being watched for updates", req.DeploymentID)
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
2017-06-28 04:36:16 +00:00
|
|
|
return watcher.PauseDeployment(req, resp)
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// createEvaluation commits the given evaluation to Raft but batches the commit
|
|
|
|
// with other calls.
|
|
|
|
func (w *Watcher) createEvaluation(eval *structs.Evaluation) (uint64, error) {
|
2017-06-28 21:25:20 +00:00
|
|
|
return w.evalBatcher.CreateEval(eval).Results()
|
2017-06-26 21:23:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// upsertJob commits the given job to Raft
|
|
|
|
func (w *Watcher) upsertJob(job *structs.Job) (uint64, error) {
|
|
|
|
return w.raft.UpsertJob(job)
|
|
|
|
}
|
|
|
|
|
|
|
|
// upsertDeploymentStatusUpdate commits the given deployment update and optional
|
|
|
|
// evaluation to Raft
|
|
|
|
func (w *Watcher) upsertDeploymentStatusUpdate(
|
|
|
|
u *structs.DeploymentStatusUpdate,
|
|
|
|
e *structs.Evaluation,
|
|
|
|
j *structs.Job) (uint64, error) {
|
|
|
|
return w.raft.UpsertDeploymentStatusUpdate(&structs.DeploymentStatusUpdateRequest{
|
|
|
|
DeploymentUpdate: u,
|
|
|
|
Eval: e,
|
|
|
|
Job: j,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// upsertDeploymentPromotion commits the given deployment promotion to Raft
|
|
|
|
func (w *Watcher) upsertDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error) {
|
|
|
|
return w.raft.UpsertDeploymentPromotion(req)
|
|
|
|
}
|
|
|
|
|
|
|
|
// upsertDeploymentAllocHealth commits the given allocation health changes to
|
|
|
|
// Raft
|
|
|
|
func (w *Watcher) upsertDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) {
|
|
|
|
return w.raft.UpsertDeploymentAllocHealth(req)
|
|
|
|
}
|