Bring up-to-date with master

This commit is contained in:
Ivo Verberk 2015-12-24 21:16:32 +01:00
commit fd177f4c6f
33 changed files with 2384 additions and 89 deletions

View file

@ -1,3 +1,17 @@
## 0.3.0 (UNRELEASED)
IMPROVEMENTS:
* core: Periodic specification for jobs [GH-540]
* core: Improved restart policy with more user configuration [GH-594]
* core: Batch jobs are garbage collected from the Nomad Servers [GH-586]
* driver/rkt: Add support for CPU/Memory isolation [GH-610]
* cli: Output of agent-info is sorted [GH-617]
BUG FIXES:
* cli: Handle parsing of un-named ports [GH-604]
* client: Handle non-200 codes when parsing AWS metadata [GH-614]
* cli: Enforce absolute paths for data directories [GH-622]
## 0.2.3 (December 17, 2015) ## 0.2.3 (December 17, 2015)
BUG FIXES: BUG FIXES:

View file

@ -106,12 +106,19 @@ func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta,
return resp.EvalID, wm, nil return resp.EvalID, wm, nil
} }
//UpdateStrategy is for serializing update strategy for a job. // UpdateStrategy is for serializing update strategy for a job.
type UpdateStrategy struct { type UpdateStrategy struct {
Stagger time.Duration Stagger time.Duration
MaxParallel int MaxParallel int
} }
// PeriodicConfig is for serializing periodic config for a job.
type PeriodicConfig struct {
Enabled bool
Spec string
SpecType string
}
// Job is used to serialize a job. // Job is used to serialize a job.
type Job struct { type Job struct {
Region string Region string
@ -124,6 +131,7 @@ type Job struct {
Constraints []*Constraint Constraints []*Constraint
TaskGroups []*TaskGroup TaskGroups []*TaskGroup
Update *UpdateStrategy Update *UpdateStrategy
Periodic *PeriodicConfig
Meta map[string]string Meta map[string]string
Status string Status string
StatusDescription string StatusDescription string

View file

@ -230,12 +230,10 @@ func TestJobs_Deregister(t *testing.T) {
} }
assertWriteMeta(t, wm) assertWriteMeta(t, wm)
// Attempting delete on non-existing job does not error // Attempting delete on non-existing job returns an error
_, wm2, err := jobs.Deregister("nope", nil) if _, _, err = jobs.Deregister("nope", nil); err == nil {
if err != nil { t.Fatalf("expected error deregistering job")
t.Fatalf("err: %s", err)
} }
assertWriteMeta(t, wm2)
// Deleting an existing job works // Deleting an existing job works
evalID, wm3, err := jobs.Deregister("job1", nil) evalID, wm3, err := jobs.Deregister("job1", nil)

View file

@ -14,6 +14,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/driver/structs" cstructs "github.com/hashicorp/nomad/client/driver/structs"
@ -28,6 +29,16 @@ var (
reAppcVersion = regexp.MustCompile(`appc version (\d[.\d]+)`) reAppcVersion = regexp.MustCompile(`appc version (\d[.\d]+)`)
) )
const (
// minRktVersion is the earliest supported version of rkt. rkt added support
// for CPU and memory isolators in 0.14.0. We cannot support an earlier
// version to maintain an uniform interface across all drivers
minRktVersion = "0.14.0"
// bytesToMB is the conversion from bytes to megabytes.
bytesToMB = 1024 * 1024
)
// RktDriver is a driver for running images via Rkt // RktDriver is a driver for running images via Rkt
// We attempt to chose sane defaults for now, with more configuration available // We attempt to chose sane defaults for now, with more configuration available
// planned in the future // planned in the future
@ -85,6 +96,13 @@ func (d *RktDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e
node.Attributes["driver.rkt.version"] = rktMatches[1] node.Attributes["driver.rkt.version"] = rktMatches[1]
node.Attributes["driver.rkt.appc.version"] = appcMatches[1] node.Attributes["driver.rkt.appc.version"] = appcMatches[1]
minVersion, _ := version.NewVersion(minRktVersion)
currentVersion, _ := version.NewVersion(node.Attributes["driver.rkt.version"])
if currentVersion.LessThan(minVersion) {
// Do not allow rkt < 0.14.0
d.logger.Printf("[WARN] driver.rkt: please upgrade rkt to a version >= %s", minVersion)
node.Attributes["driver.rkt"] = "0"
}
return true, nil return true, nil
} }
@ -108,23 +126,26 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
} }
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal) taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)
// Build the command.
var cmdArgs []string
// Add the given trust prefix // Add the given trust prefix
trust_prefix, trust_cmd := task.Config["trust_prefix"] trustPrefix, trustCmd := task.Config["trust_prefix"]
if trust_cmd { if trustCmd {
var outBuf, errBuf bytes.Buffer var outBuf, errBuf bytes.Buffer
cmd := exec.Command("rkt", "trust", fmt.Sprintf("--prefix=%s", trust_prefix)) cmd := exec.Command("rkt", "trust", fmt.Sprintf("--prefix=%s", trustPrefix))
cmd.Stdout = &outBuf cmd.Stdout = &outBuf
cmd.Stderr = &errBuf cmd.Stderr = &errBuf
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("Error running rkt trust: %s\n\nOutput: %s\n\nError: %s", return nil, fmt.Errorf("Error running rkt trust: %s\n\nOutput: %s\n\nError: %s",
err, outBuf.String(), errBuf.String()) err, outBuf.String(), errBuf.String())
} }
d.logger.Printf("[DEBUG] driver.rkt: added trust prefix: %q", trust_prefix) d.logger.Printf("[DEBUG] driver.rkt: added trust prefix: %q", trustPrefix)
} else {
// Disble signature verification if the trust command was not run.
cmdArgs = append(cmdArgs, "--insecure-skip-verify")
} }
// Build the command.
var cmd_args []string
// Inject the environment variables. // Inject the environment variables.
envVars := TaskEnvironmentVariables(ctx, task) envVars := TaskEnvironmentVariables(ctx, task)
@ -133,33 +154,41 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
envVars.ClearAllocDir() envVars.ClearAllocDir()
for k, v := range envVars.Map() { for k, v := range envVars.Map() {
cmd_args = append(cmd_args, fmt.Sprintf("--set-env=%v=%v", k, v)) cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%v", k, v))
}
// Disble signature verification if the trust command was not run.
if !trust_cmd {
cmd_args = append(cmd_args, "--insecure-skip-verify")
} }
// Append the run command. // Append the run command.
cmd_args = append(cmd_args, "run", "--mds-register=false", img) cmdArgs = append(cmdArgs, "run", "--mds-register=false", img)
// Check if the user has overriden the exec command. // Check if the user has overriden the exec command.
if exec_cmd, ok := task.Config["command"]; ok { if execCmd, ok := task.Config["command"]; ok {
cmd_args = append(cmd_args, fmt.Sprintf("--exec=%v", exec_cmd)) cmdArgs = append(cmdArgs, fmt.Sprintf("--exec=%v", execCmd))
} }
if task.Resources.MemoryMB == 0 {
return nil, fmt.Errorf("Memory limit cannot be zero")
}
if task.Resources.CPU == 0 {
return nil, fmt.Errorf("CPU limit cannot be zero")
}
// Add memory isolator
cmdArgs = append(cmdArgs, fmt.Sprintf("--memory=%vM", int64(task.Resources.MemoryMB)*bytesToMB))
// Add CPU isolator
cmdArgs = append(cmdArgs, fmt.Sprintf("--cpu=%vm", int64(task.Resources.CPU)))
// Add user passed arguments. // Add user passed arguments.
if len(driverConfig.Args) != 0 { if len(driverConfig.Args) != 0 {
parsed := args.ParseAndReplace(driverConfig.Args, envVars.Map()) parsed := args.ParseAndReplace(driverConfig.Args, envVars.Map())
// Need to start arguments with "--" // Need to start arguments with "--"
if len(parsed) > 0 { if len(parsed) > 0 {
cmd_args = append(cmd_args, "--") cmdArgs = append(cmdArgs, "--")
} }
for _, arg := range parsed { for _, arg := range parsed {
cmd_args = append(cmd_args, fmt.Sprintf("%v", arg)) cmdArgs = append(cmdArgs, fmt.Sprintf("%v", arg))
} }
} }
@ -177,7 +206,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
return nil, fmt.Errorf("Error opening file to redirect stderr: %v", err) return nil, fmt.Errorf("Error opening file to redirect stderr: %v", err)
} }
cmd := exec.Command("rkt", cmd_args...) cmd := exec.Command("rkt", cmdArgs...)
cmd.Stdout = stdo cmd.Stdout = stdo
cmd.Stderr = stde cmd.Stderr = stde

View file

@ -81,6 +81,10 @@ func TestRktDriver_Start(t *testing.T) {
"image": "coreos.com/etcd:v2.0.4", "image": "coreos.com/etcd:v2.0.4",
"command": "/etcd", "command": "/etcd",
}, },
Resources: &structs.Resources{
MemoryMB: 256,
CPU: 512,
},
} }
driverCtx := testDriverContext(task.Name) driverCtx := testDriverContext(task.Name)
@ -121,6 +125,10 @@ func TestRktDriver_Start_Wait(t *testing.T) {
"command": "/etcd", "command": "/etcd",
"args": []string{"--version"}, "args": []string{"--version"},
}, },
Resources: &structs.Resources{
MemoryMB: 256,
CPU: 512,
},
} }
driverCtx := testDriverContext(task.Name) driverCtx := testDriverContext(task.Name)
@ -162,6 +170,10 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) {
"command": "/etcd", "command": "/etcd",
"args": []string{"--version"}, "args": []string{"--version"},
}, },
Resources: &structs.Resources{
MemoryMB: 256,
CPU: 512,
},
} }
driverCtx := testDriverContext(task.Name) driverCtx := testDriverContext(task.Name)
@ -204,6 +216,10 @@ func TestRktDriver_Start_Wait_Logs(t *testing.T) {
"command": "/etcd", "command": "/etcd",
"args": []string{"--version"}, "args": []string{"--version"},
}, },
Resources: &structs.Resources{
MemoryMB: 256,
CPU: 512,
},
} }
driverCtx := testDriverContext(task.Name) driverCtx := testDriverContext(task.Name)

View file

@ -116,6 +116,10 @@ func (f *EnvAWSFingerprint) Fingerprint(cfg *config.Config, node *structs.Node)
} }
for _, k := range keys { for _, k := range keys {
res, err := client.Get(metadataURL + k) res, err := client.Get(metadataURL + k)
if res.StatusCode != http.StatusOK {
f.logger.Printf("[WARN]: fingerprint.env_aws: Could not read value for attribute %q", k)
continue
}
if err != nil { if err != nil {
// if it's a URL error, assume we're not in an AWS environment // if it's a URL error, assume we're not in an AWS environment
// TODO: better way to detect AWS? Check xen virtualization? // TODO: better way to detect AWS? Check xen virtualization?

View file

@ -94,7 +94,8 @@ func (f *EnvGCEFingerprint) Get(attribute string, recursive bool) (string, error
} }
res, err := f.client.Do(req) res, err := f.client.Do(req)
if err != nil { if err != nil || res.StatusCode != http.StatusOK {
f.logger.Printf("[WARN]: fingerprint.env_gce: Could not read value for attribute %q", attribute)
return "", err return "", err
} }

View file

@ -193,6 +193,23 @@ func (c *Command) readConfig() *Config {
return nil return nil
} }
// Verify the paths are absolute.
dirs := map[string]string{
"data-dir": config.DataDir,
"alloc-dir": config.Client.AllocDir,
"state-dir": config.Client.StateDir,
}
for k, dir := range dirs {
if dir == "" {
continue
}
if !filepath.IsAbs(dir) {
c.Ui.Error(fmt.Sprintf("%s must be given as an absolute path: got %v", k, dir))
return nil
}
}
// Ensure that we have the directories we neet to run. // Ensure that we have the directories we neet to run.
if config.Server.Enabled && config.DataDir == "" { if config.Server.Enabled && config.DataDir == "" {
c.Ui.Error("Must specify data directory") c.Ui.Error("Must specify data directory")

View file

@ -5,6 +5,7 @@ import (
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/jobspec" "github.com/hashicorp/nomad/jobspec"
@ -89,8 +90,11 @@ func (c *RunCommand) Run(args []string) int {
return 1 return 1
} }
// Check if the job is periodic.
periodic := job.IsPeriodic()
// Convert it to something we can use // Convert it to something we can use
apiJob, err := convertJob(job) apiJob, err := convertStructJob(job)
if err != nil { if err != nil {
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err)) c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
return 1 return 1
@ -111,9 +115,14 @@ func (c *RunCommand) Run(args []string) int {
} }
// Check if we should enter monitor mode // Check if we should enter monitor mode
if detach { if detach || periodic {
c.Ui.Output("Job registration successful") c.Ui.Output("Job registration successful")
c.Ui.Output("Evaluation ID: " + evalID) if periodic {
c.Ui.Output(fmt.Sprintf("Approximate next launch time: %v", job.Periodic.Next(time.Now())))
} else {
c.Ui.Output("Evaluation ID: " + evalID)
}
return 0 return 0
} }
@ -123,9 +132,9 @@ func (c *RunCommand) Run(args []string) int {
} }
// convertJob is used to take a *structs.Job and convert it to an *api.Job. // convertStructJob is used to take a *structs.Job and convert it to an *api.Job.
// This function is just a hammer and probably needs to be revisited. // This function is just a hammer and probably needs to be revisited.
func convertJob(in *structs.Job) (*api.Job, error) { func convertStructJob(in *structs.Job) (*api.Job, error) {
gob.Register([]map[string]interface{}{}) gob.Register([]map[string]interface{}{})
gob.Register([]interface{}{}) gob.Register([]interface{}{})
var apiJob *api.Job var apiJob *api.Job

View file

@ -1,8 +1,14 @@
package command package command
import ( import (
"bytes"
"encoding/gob"
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
) )
type StatusCommand struct { type StatusCommand struct {
@ -118,6 +124,14 @@ func (c *StatusCommand) Run(args []string) int {
} }
} }
// Check if it is periodic
sJob, err := convertApiJob(job)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
return 1
}
periodic := sJob.IsPeriodic()
// Format the job info // Format the job info
basic := []string{ basic := []string{
fmt.Sprintf("ID|%s", job.ID), fmt.Sprintf("ID|%s", job.ID),
@ -126,10 +140,19 @@ func (c *StatusCommand) Run(args []string) int {
fmt.Sprintf("Priority|%d", job.Priority), fmt.Sprintf("Priority|%d", job.Priority),
fmt.Sprintf("Datacenters|%s", strings.Join(job.Datacenters, ",")), fmt.Sprintf("Datacenters|%s", strings.Join(job.Datacenters, ",")),
fmt.Sprintf("Status|%s", job.Status), fmt.Sprintf("Status|%s", job.Status),
fmt.Sprintf("Periodic|%v", periodic),
} }
var evals, allocs []string if periodic {
if !short { basic = append(basic, fmt.Sprintf("Next Periodic Launch|%v",
sJob.Periodic.Next(time.Now())))
}
c.Ui.Output(formatKV(basic))
if !periodic && !short {
var evals, allocs []string
// Query the evaluations // Query the evaluations
jobEvals, _, err := client.Jobs().Evaluations(job.ID, nil) jobEvals, _, err := client.Jobs().Evaluations(job.ID, nil)
if err != nil { if err != nil {
@ -167,15 +190,28 @@ func (c *StatusCommand) Run(args []string) int {
alloc.DesiredStatus, alloc.DesiredStatus,
alloc.ClientStatus) alloc.ClientStatus)
} }
}
// Dump the output
c.Ui.Output(formatKV(basic))
if !short {
c.Ui.Output("\n==> Evaluations") c.Ui.Output("\n==> Evaluations")
c.Ui.Output(formatList(evals)) c.Ui.Output(formatList(evals))
c.Ui.Output("\n==> Allocations") c.Ui.Output("\n==> Allocations")
c.Ui.Output(formatList(allocs)) c.Ui.Output(formatList(allocs))
} }
return 0 return 0
} }
// convertApiJob is used to take a *api.Job and convert it to an *struct.Job.
// This function is just a hammer and probably needs to be revisited.
func convertApiJob(in *api.Job) (*structs.Job, error) {
gob.Register(map[string]interface{}{})
gob.Register([]interface{}{})
var structJob *structs.Job
buf := new(bytes.Buffer)
if err := gob.NewEncoder(buf).Encode(in); err != nil {
return nil, err
}
if err := gob.NewDecoder(buf).Decode(&structJob); err != nil {
return nil, err
}
return structJob, nil
}

View file

@ -698,8 +698,8 @@ func parsePeriodic(result **structs.PeriodicConfig, list *ast.ObjectList) error
m["Enabled"] = enabled m["Enabled"] = enabled
} }
// If "cron_spec" is provided, set the type to "cron" and store the spec. // If "cron" is provided, set the type to "cron" and store the spec.
if cron, ok := m["cron_spec"]; ok { if cron, ok := m["cron"]; ok {
m["SpecType"] = structs.PeriodicSpecCron m["SpecType"] = structs.PeriodicSpecCron
m["Spec"] = cron m["Spec"] = cron
} }

View file

@ -1,5 +1,5 @@
job "foo" { job "foo" {
periodic { periodic {
cron_spec = "*/5 * * *" cron = "*/5 * * *"
} }
} }

View file

@ -32,17 +32,19 @@ const (
EvalSnapshot EvalSnapshot
AllocSnapshot AllocSnapshot
TimeTableSnapshot TimeTableSnapshot
PeriodicLaunchSnapshot
) )
// nomadFSM implements a finite state machine that is used // nomadFSM implements a finite state machine that is used
// along with Raft to provide strong consistency. We implement // along with Raft to provide strong consistency. We implement
// this outside the Server to avoid exposing this outside the package. // this outside the Server to avoid exposing this outside the package.
type nomadFSM struct { type nomadFSM struct {
evalBroker *EvalBroker evalBroker *EvalBroker
logOutput io.Writer periodicDispatcher *PeriodicDispatch
logger *log.Logger logOutput io.Writer
state *state.StateStore logger *log.Logger
timetable *TimeTable state *state.StateStore
timetable *TimeTable
} }
// nomadSnapshot is used to provide a snapshot of the current // nomadSnapshot is used to provide a snapshot of the current
@ -58,7 +60,7 @@ type snapshotHeader struct {
} }
// NewFSMPath is used to construct a new FSM with a blank state // NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(evalBroker *EvalBroker, logOutput io.Writer) (*nomadFSM, error) { func NewFSM(evalBroker *EvalBroker, periodic *PeriodicDispatch, logOutput io.Writer) (*nomadFSM, error) {
// Create a state store // Create a state store
state, err := state.NewStateStore(logOutput) state, err := state.NewStateStore(logOutput)
if err != nil { if err != nil {
@ -66,11 +68,12 @@ func NewFSM(evalBroker *EvalBroker, logOutput io.Writer) (*nomadFSM, error) {
} }
fsm := &nomadFSM{ fsm := &nomadFSM{
evalBroker: evalBroker, evalBroker: evalBroker,
logOutput: logOutput, periodicDispatcher: periodic,
logger: log.New(logOutput, "", log.LstdFlags), logOutput: logOutput,
state: state, logger: log.New(logOutput, "", log.LstdFlags),
timetable: NewTimeTable(timeTableGranularity, timeTableLimit), state: state,
timetable: NewTimeTable(timeTableGranularity, timeTableLimit),
} }
return fsm, nil return fsm, nil
} }
@ -204,6 +207,65 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
n.logger.Printf("[ERR] nomad.fsm: UpsertJob failed: %v", err) n.logger.Printf("[ERR] nomad.fsm: UpsertJob failed: %v", err)
return err return err
} }
// We always add the job to the periodic dispatcher because there is the
// possibility that the periodic spec was removed and then we should stop
// tracking it.
if err := n.periodicDispatcher.Add(req.Job); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
return err
}
// If it is periodic, record the time it was inserted. This is necessary for
// recovering during leader election. It is possible that from the time it
// is added to when it was suppose to launch, leader election occurs and the
// job was not launched. In this case, we use the insertion time to
// determine if a launch was missed.
if req.Job.IsPeriodic() {
prevLaunch, err := n.state.PeriodicLaunchByID(req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err)
return err
}
// Record the insertion time as a launch. We overload the launch table
// such that the first entry is the insertion time.
if prevLaunch == nil {
launch := &structs.PeriodicLaunch{ID: req.Job.ID, Launch: time.Now()}
if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err)
return err
}
}
}
// Check if the parent job is periodic and mark the launch time.
parentID := req.Job.ParentID
if parentID != "" {
parent, err := n.state.JobByID(parentID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: JobByID(%v) lookup for parent failed: %v", parentID, err)
return err
} else if parent == nil {
// The parent has been deregistered.
return nil
}
if parent.IsPeriodic() {
t, err := n.periodicDispatcher.LaunchTime(req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: LaunchTime(%v) failed: %v", req.Job.ID, err)
return err
}
launch := &structs.PeriodicLaunch{ID: parentID, Launch: t}
if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err)
return err
}
}
}
return nil return nil
} }
@ -218,6 +280,17 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err) n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err)
return err return err
} }
if err := n.periodicDispatcher.Remove(req.JobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Remove failed: %v", err)
return err
}
// We always delete from the periodic launch table because it is possible that
// the job was updated to be non-perioidic, thus checking if it is periodic
// doesn't ensure we clean it up properly.
n.state.DeletePeriodicLaunch(index, req.JobID)
return nil return nil
} }
@ -392,6 +465,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err return err
} }
case PeriodicLaunchSnapshot:
launch := new(structs.PeriodicLaunch)
if err := dec.Decode(launch); err != nil {
return err
}
if err := restore.PeriodicLaunchRestore(launch); err != nil {
return err
}
default: default:
return fmt.Errorf("Unrecognized snapshot type: %v", msgType) return fmt.Errorf("Unrecognized snapshot type: %v", msgType)
} }
@ -442,6 +524,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel() sink.Cancel()
return err return err
} }
if err := s.persistPeriodicLaunches(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil return nil
} }
@ -580,6 +666,33 @@ func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink,
return nil return nil
} }
func (s *nomadSnapshot) persistPeriodicLaunches(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the jobs
launches, err := s.snap.PeriodicLaunches()
if err != nil {
return err
}
for {
// Get the next item
raw := launches.Next()
if raw == nil {
break
}
// Prepare the request struct
launch := raw.(*structs.PeriodicLaunch)
// Write out a job registration
sink.Write([]byte{byte(PeriodicLaunchSnapshot)})
if err := encoder.Encode(launch); err != nil {
return err
}
}
return nil
}
// Release is a no-op, as we just need to GC the pointer // Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly // to the state store snapshot. There is nothing to explicitly
// cleanup. // cleanup.

View file

@ -43,7 +43,8 @@ func testStateStore(t *testing.T) *state.StateStore {
} }
func testFSM(t *testing.T) *nomadFSM { func testFSM(t *testing.T) *nomadFSM {
fsm, err := NewFSM(testBroker(t, 0), os.Stderr) p, _ := testPeriodicDispatcher()
fsm, err := NewFSM(testBroker(t, 0), p, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -222,8 +223,9 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
func TestFSM_RegisterJob(t *testing.T) { func TestFSM_RegisterJob(t *testing.T) {
fsm := testFSM(t) fsm := testFSM(t)
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{ req := structs.JobRegisterRequest{
Job: mock.Job(), Job: job,
} }
buf, err := structs.Encode(structs.JobRegisterRequestType, req) buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil { if err != nil {
@ -236,22 +238,39 @@ func TestFSM_RegisterJob(t *testing.T) {
} }
// Verify we are registered // Verify we are registered
job, err := fsm.State().JobByID(req.Job.ID) jobOut, err := fsm.State().JobByID(req.Job.ID)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if job == nil { if jobOut == nil {
t.Fatalf("not found!") t.Fatalf("not found!")
} }
if job.CreateIndex != 1 { if jobOut.CreateIndex != 1 {
t.Fatalf("bad index: %d", job.CreateIndex) t.Fatalf("bad index: %d", jobOut.CreateIndex)
}
// Verify it was added to the periodic runner.
if _, ok := fsm.periodicDispatcher.tracked[job.ID]; !ok {
t.Fatal("job not added to periodic runner")
}
// Verify the launch time was tracked.
launchOut, err := fsm.State().PeriodicLaunchByID(req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut == nil {
t.Fatalf("not found!")
}
if launchOut.Launch.IsZero() {
t.Fatalf("bad launch time: %v", launchOut.Launch)
} }
} }
func TestFSM_DeregisterJob(t *testing.T) { func TestFSM_DeregisterJob(t *testing.T) {
fsm := testFSM(t) fsm := testFSM(t)
job := mock.Job() job := mock.PeriodicJob()
req := structs.JobRegisterRequest{ req := structs.JobRegisterRequest{
Job: job, Job: job,
} }
@ -279,13 +298,27 @@ func TestFSM_DeregisterJob(t *testing.T) {
} }
// Verify we are NOT registered // Verify we are NOT registered
job, err = fsm.State().JobByID(req.Job.ID) jobOut, err := fsm.State().JobByID(req.Job.ID)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if job != nil { if jobOut != nil {
t.Fatalf("job found!") t.Fatalf("job found!")
} }
// Verify it was removed from the periodic runner.
if _, ok := fsm.periodicDispatcher.tracked[job.ID]; ok {
t.Fatal("job not removed from periodic runner")
}
// Verify it was removed from the periodic launch table.
launchOut, err := fsm.State().PeriodicLaunchByID(req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut != nil {
t.Fatalf("launch found!")
}
} }
func TestFSM_UpdateEval(t *testing.T) { func TestFSM_UpdateEval(t *testing.T) {
@ -607,3 +640,27 @@ func TestFSM_SnapshotRestore_TimeTable(t *testing.T) {
t.Fatalf("bad") t.Fatalf("bad")
} }
} }
func TestFSM_SnapshotRestore_PeriodicLaunches(t *testing.T) {
// Add some state
fsm := testFSM(t)
state := fsm.State()
job1 := mock.Job()
launch1 := &structs.PeriodicLaunch{ID: job1.ID, Launch: time.Now()}
state.UpsertPeriodicLaunch(1000, launch1)
job2 := mock.Job()
launch2 := &structs.PeriodicLaunch{ID: job2.ID, Launch: time.Now()}
state.UpsertPeriodicLaunch(1001, launch2)
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.PeriodicLaunchByID(launch1.ID)
out2, _ := state2.PeriodicLaunchByID(launch2.ID)
if !reflect.DeepEqual(launch1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, job1)
}
if !reflect.DeepEqual(launch2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, job2)
}
}

View file

@ -50,6 +50,14 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err return err
} }
// Populate the reply with job information
reply.JobModifyIndex = index
// If the job is periodic, we don't create an eval.
if args.Job.IsPeriodic() {
return nil
}
// Create a new evaluation // Create a new evaluation
eval := &structs.Evaluation{ eval := &structs.Evaluation{
ID: structs.GenerateUUID(), ID: structs.GenerateUUID(),
@ -74,10 +82,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err return err
} }
// Setup the reply // Populate the reply with eval information
reply.EvalID = eval.ID reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex reply.EvalCreateIndex = evalIndex
reply.JobModifyIndex = index
reply.Index = evalIndex reply.Index = evalIndex
return nil return nil
} }
@ -117,6 +124,10 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis
return fmt.Errorf("job not found") return fmt.Errorf("job not found")
} }
if job.IsPeriodic() {
return fmt.Errorf("can't evaluate periodic job")
}
// Create a new evaluation // Create a new evaluation
eval := &structs.Evaluation{ eval := &structs.Evaluation{
ID: structs.GenerateUUID(), ID: structs.GenerateUUID(),
@ -154,6 +165,24 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
} }
defer metrics.MeasureSince([]string{"nomad", "job", "deregister"}, time.Now()) defer metrics.MeasureSince([]string{"nomad", "job", "deregister"}, time.Now())
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
}
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
job, err := snap.JobByID(args.JobID)
if err != nil {
return err
}
if job == nil {
return fmt.Errorf("job not found")
}
// Commit this update via Raft // Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args) _, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil { if err != nil {
@ -161,6 +190,14 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err return err
} }
// Populate the reply with job information
reply.JobModifyIndex = index
// If the job is periodic, we don't create an eval.
if job.IsPeriodic() {
return nil
}
// Create a new evaluation // Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high // XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was. The scheduler itself also doesn't matter, // priority even if the job was. The scheduler itself also doesn't matter,
@ -186,10 +223,9 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err return err
} }
// Setup the reply // Populate the reply with eval information
reply.EvalID = eval.ID reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex reply.EvalCreateIndex = evalIndex
reply.JobModifyIndex = index
reply.Index = evalIndex reply.Index = evalIndex
return nil return nil
} }

View file

@ -233,6 +233,53 @@ func TestJobEndpoint_Register_GC_Set(t *testing.T) {
} }
} }
func TestJobEndpoint_Register_Periodic(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request for a periodic job.
job := mock.PeriodicJob()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.JobModifyIndex == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.CreateIndex != resp.JobModifyIndex {
t.Fatalf("index mis-match")
}
serviceName := out.TaskGroups[0].Tasks[0].Services[0].Name
expectedServiceName := "web-frontend"
if serviceName != expectedServiceName {
t.Fatalf("Expected Service Name: %s, Actual: %s", expectedServiceName, serviceName)
}
if resp.EvalID != "" {
t.Fatalf("Register created an eval for a periodic job")
}
}
func TestJobEndpoint_Evaluate(t *testing.T) { func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, func(c *Config) { s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue c.NumSchedulers = 0 // Prevent automatic dequeue
@ -304,6 +351,42 @@ func TestJobEndpoint_Evaluate(t *testing.T) {
} }
} }
func TestJobEndpoint_Evaluate_Periodic(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.PeriodicJob()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.JobModifyIndex == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Force a re-evaluation
reEval := &structs.JobEvaluateRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Evaluate", reEval, &resp); err == nil {
t.Fatal("expect an err")
}
}
func TestJobEndpoint_Deregister(t *testing.T) { func TestJobEndpoint_Deregister(t *testing.T) {
s1 := testServer(t, func(c *Config) { s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue c.NumSchedulers = 0 // Prevent automatic dequeue
@ -380,6 +463,55 @@ func TestJobEndpoint_Deregister(t *testing.T) {
} }
} }
func TestJobEndpoint_Deregister_Periodic(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.PeriodicJob()
reg := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Deregister
dereg := &structs.JobDeregisterRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.JobDeregisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.JobModifyIndex == 0 {
t.Fatalf("bad index: %d", resp2.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("unexpected job")
}
if resp.EvalID != "" {
t.Fatalf("Deregister created an eval for a periodic job")
}
}
func TestJobEndpoint_GetJob(t *testing.T) { func TestJobEndpoint_GetJob(t *testing.T) {
s1 := testServer(t, nil) s1 := testServer(t, nil)
defer s1.Shutdown() defer s1.Shutdown()

View file

@ -1,6 +1,7 @@
package nomad package nomad
import ( import (
"errors"
"fmt" "fmt"
"time" "time"
@ -117,6 +118,15 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
return err return err
} }
// Enable the periodic dispatcher, since we are now the leader.
s.periodicDispatcher.SetEnabled(true)
s.periodicDispatcher.Start()
// Restore the periodic dispatcher state
if err := s.restorePeriodicDispatcher(); err != nil {
return err
}
// Scheduler periodic jobs // Scheduler periodic jobs
go s.schedulePeriodic(stopCh) go s.schedulePeriodic(stopCh)
@ -167,6 +177,52 @@ func (s *Server) restoreEvalBroker() error {
return nil return nil
} }
// restorePeriodicDispatcher is used to restore all periodic jobs into the
// periodic dispatcher. It also determines if a periodic job should have been
// created during the leadership transition and force runs them. The periodic
// dispatcher is maintained only by the leader, so it must be restored anytime a
// leadership transition takes place.
func (s *Server) restorePeriodicDispatcher() error {
iter, err := s.fsm.State().JobsByPeriodic(true)
if err != nil {
return fmt.Errorf("failed to get periodic jobs: %v", err)
}
now := time.Now()
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)
s.periodicDispatcher.Add(job)
// If the periodic job has never been launched before, launch will hold
// the time the periodic job was added. Otherwise it has the last launch
// time of the periodic job.
launch, err := s.fsm.State().PeriodicLaunchByID(job.ID)
if err != nil || launch == nil {
return fmt.Errorf("failed to get periodic launch time: %v", err)
}
// nextLaunch is the next launch that should occur.
nextLaunch := job.Periodic.Next(launch.Launch)
// We skip force launching the job if there should be no next launch
// (the zero case) or if the next launch time is in the future. If it is
// in the future, it will be handled by the periodic dispatcher.
if nextLaunch.IsZero() || !nextLaunch.Before(now) {
continue
}
if err := s.periodicDispatcher.ForceRun(job.ID); err != nil {
msg := fmt.Sprintf("force run of periodic job %q failed: %v", job.ID, err)
s.logger.Printf("[ERR] nomad.periodic: %s", msg)
return errors.New(msg)
}
s.logger.Printf("[DEBUG] nomad.periodic: periodic job %q force"+
" run during leadership establishment", job.ID)
}
return nil
}
// schedulePeriodic is used to do periodic job dispatch while we are leader // schedulePeriodic is used to do periodic job dispatch while we are leader
func (s *Server) schedulePeriodic(stopCh chan struct{}) { func (s *Server) schedulePeriodic(stopCh chan struct{}) {
evalGC := time.NewTicker(s.config.EvalGCInterval) evalGC := time.NewTicker(s.config.EvalGCInterval)
@ -250,6 +306,9 @@ func (s *Server) revokeLeadership() error {
// Disable the eval broker, since it is only useful as a leader // Disable the eval broker, since it is only useful as a leader
s.evalBroker.SetEnabled(false) s.evalBroker.SetEnabled(false)
// Disable the periodic dispatcher, since it is only useful as a leader
s.periodicDispatcher.SetEnabled(false)
// Clear the heartbeat timers on either shutdown or step down, // Clear the heartbeat timers on either shutdown or step down,
// since we are no longer responsible for TTL expirations. // since we are no longer responsible for TTL expirations.
if err := s.clearAllHeartbeatTimers(); err != nil { if err := s.clearAllHeartbeatTimers(); err != nil {

View file

@ -286,6 +286,186 @@ func TestLeader_EvalBroker_Reset(t *testing.T) {
}) })
} }
func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer s1.Shutdown()
s2 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
s3 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
c.DevDisableBootstrap = true
})
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
testJoin(t, s1, s2, s3)
testutil.WaitForLeader(t, s1.RPC)
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
var leader *Server
for _, s := range servers {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
t.Fatalf("Should have a leader")
}
// Inject a periodic job and non-periodic job
periodic := mock.PeriodicJob()
nonPeriodic := mock.Job()
for _, job := range []*structs.Job{nonPeriodic, periodic} {
req := structs.JobRegisterRequest{
Job: job,
}
_, _, err := leader.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
}
// Kill the leader
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
// Wait for a new leader
leader = nil
testutil.WaitForResult(func() (bool, error) {
for _, s := range servers {
if s.IsLeader() {
leader = s
return true, nil
}
}
return false, nil
}, func(err error) {
t.Fatalf("should have leader")
})
// Check that the new leader is tracking the periodic job.
testutil.WaitForResult(func() (bool, error) {
_, tracked := leader.periodicDispatcher.tracked[periodic.ID]
return tracked, nil
}, func(err error) {
t.Fatalf("periodic job not tracked")
})
}
func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Inject a periodic job that will be triggered soon.
launch := time.Now().Add(1 * time.Second)
job := testPeriodicJob(launch)
req := structs.JobRegisterRequest{
Job: job,
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)
// Get the current time to ensure the launch time is after this once we
// restore.
now := time.Now()
// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)
// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.periodicDispatcher.Start()
s1.restorePeriodicDispatcher()
// Ensure the job is tracked.
if _, tracked := s1.periodicDispatcher.tracked[job.ID]; !tracked {
t.Fatalf("periodic job not restored")
}
// Check that an eval was made.
last, err := s1.fsm.State().PeriodicLaunchByID(job.ID)
if err != nil || last == nil {
t.Fatalf("failed to get periodic launch time: %v", err)
}
if last.Launch.Before(now) {
t.Fatalf("restorePeriodicDispatcher did not force launch: last %v; want after %v", last.Launch, now)
}
}
func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)
job := testPeriodicJob(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create an eval for the past launch.
s1.periodicDispatcher.createEval(job, past)
// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.SetEnabled(false)
// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)
// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.periodicDispatcher.Start()
s1.restorePeriodicDispatcher()
// Ensure the job is tracked.
if _, tracked := s1.periodicDispatcher.tracked[job.ID]; !tracked {
t.Fatalf("periodic job not restored")
}
// Check that an eval was made.
last, err := s1.fsm.State().PeriodicLaunchByID(job.ID)
if err != nil || last == nil {
t.Fatalf("failed to get periodic launch time: %v", err)
}
if last.Launch == past {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}
}
func TestLeader_PeriodicDispatch(t *testing.T) { func TestLeader_PeriodicDispatch(t *testing.T) {
s1 := testServer(t, func(c *Config) { s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 c.NumSchedulers = 0

View file

@ -190,6 +190,17 @@ func SystemJob() *structs.Job {
return job return job
} }
func PeriodicJob() *structs.Job {
job := Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{
Enabled: true,
SpecType: structs.PeriodicSpecCron,
Spec: "*/30 * * * *",
}
return job
}
func Eval() *structs.Evaluation { func Eval() *structs.Evaluation {
eval := &structs.Evaluation{ eval := &structs.Evaluation{
ID: structs.GenerateUUID(), ID: structs.GenerateUUID(),

503
nomad/periodic.go Normal file
View file

@ -0,0 +1,503 @@
package nomad
import (
"container/heap"
"fmt"
"log"
"strconv"
"strings"
"sync"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// The string appended to the periodic jobs ID when launching derived
// instances of it.
JobLaunchSuffix = "/periodic-"
)
// PeriodicDispatch is used to track and launch periodic jobs. It maintains the
// set of periodic jobs and creates derived jobs and evaluations per
// instantiation which is determined by the periodic spec.
type PeriodicDispatch struct {
dispatcher JobEvalDispatcher
enabled bool
running bool
tracked map[string]*structs.Job
heap *periodicHeap
updateCh chan struct{}
stopCh chan struct{}
waitCh chan struct{}
logger *log.Logger
l sync.RWMutex
}
// JobEvalDispatcher is an interface to submit jobs and have evaluations created
// for them.
type JobEvalDispatcher interface {
// DispatchJob takes a job a new, untracked job and creates an evaluation
// for it.
DispatchJob(job *structs.Job) error
}
// DispatchJob creates an evaluation for the passed job and commits both the
// evaluation and the job to the raft log.
func (s *Server) DispatchJob(job *structs.Job) error {
// Commit this update via Raft
req := structs.JobRegisterRequest{Job: job}
_, index, err := s.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
return err
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
// Commit this evaluation via Raft
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, _, err = s.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
return err
}
return nil
}
// NewPeriodicDispatch returns a periodic dispatcher that is used to track and
// launch periodic jobs.
func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *PeriodicDispatch {
return &PeriodicDispatch{
dispatcher: dispatcher,
tracked: make(map[string]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
waitCh: make(chan struct{}),
logger: logger,
}
}
// SetEnabled is used to control if the periodic dispatcher is enabled. It
// should only be enabled on the active leader. Disabling an active dispatcher
// will stop any launched go routine and flush the dispatcher.
func (p *PeriodicDispatch) SetEnabled(enabled bool) {
p.l.Lock()
p.enabled = enabled
p.l.Unlock()
if !enabled {
if p.running {
close(p.stopCh)
<-p.waitCh
p.running = false
}
p.Flush()
}
}
// Start begins the goroutine that creates derived jobs and evals.
func (p *PeriodicDispatch) Start() {
p.l.Lock()
p.running = true
p.l.Unlock()
go p.run()
}
// Tracked returns the set of tracked job IDs.
func (p *PeriodicDispatch) Tracked() []*structs.Job {
p.l.RLock()
defer p.l.RUnlock()
tracked := make([]*structs.Job, len(p.tracked))
i := 0
for _, job := range p.tracked {
tracked[i] = job
i++
}
return tracked
}
// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec.
func (p *PeriodicDispatch) Add(job *structs.Job) error {
p.l.Lock()
defer p.l.Unlock()
// Do nothing if not enabled
if !p.enabled {
return nil
}
// If we were tracking a job and it has been disabled or made non-periodic remove it.
disabled := !job.IsPeriodic() || !job.Periodic.Enabled
_, tracked := p.tracked[job.ID]
if disabled {
if tracked {
p.removeLocked(job.ID)
}
// If the job is disabled and we aren't tracking it, do nothing.
return nil
}
// Add or update the job.
p.tracked[job.ID] = job
next := job.Periodic.Next(time.Now())
if tracked {
if err := p.heap.Update(job, next); err != nil {
return fmt.Errorf("failed to update job %v launch time: %v", job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q", job.ID)
} else {
if err := p.heap.Push(job, next); err != nil {
return fmt.Errorf("failed to add job %v: %v", job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q", job.ID)
}
// Signal an update.
if p.running {
select {
case p.updateCh <- struct{}{}:
default:
}
}
return nil
}
// Remove stops tracking the passed job. If the job is not tracked, it is a
// no-op.
func (p *PeriodicDispatch) Remove(jobID string) error {
p.l.Lock()
defer p.l.Unlock()
return p.removeLocked(jobID)
}
// Remove stops tracking the passed job. If the job is not tracked, it is a
// no-op. It assumes this is called while a lock is held.
func (p *PeriodicDispatch) removeLocked(jobID string) error {
// Do nothing if not enabled
if !p.enabled {
return nil
}
job, tracked := p.tracked[jobID]
if !tracked {
return nil
}
delete(p.tracked, jobID)
if err := p.heap.Remove(job); err != nil {
return fmt.Errorf("failed to remove tracked job %v: %v", jobID, err)
}
// Signal an update.
if p.running {
select {
case p.updateCh <- struct{}{}:
default:
}
}
p.logger.Printf("[DEBUG] nomad.periodic: deregistered periodic job %q", jobID)
return nil
}
// ForceRun causes the periodic job to be evaluated immediately.
func (p *PeriodicDispatch) ForceRun(jobID string) error {
p.l.Lock()
// Do nothing if not enabled
if !p.enabled {
return fmt.Errorf("periodic dispatch disabled")
}
job, tracked := p.tracked[jobID]
if !tracked {
return fmt.Errorf("can't force run non-tracked job %v", jobID)
}
p.l.Unlock()
return p.createEval(job, time.Now())
}
// shouldRun returns whether the long lived run function should run.
func (p *PeriodicDispatch) shouldRun() bool {
p.l.RLock()
defer p.l.RUnlock()
return p.enabled && p.running
}
// run is a long-lived function that waits till a job's periodic spec is met and
// then creates an evaluation to run the job.
func (p *PeriodicDispatch) run() {
defer close(p.waitCh)
var launchCh <-chan time.Time
for p.shouldRun() {
job, launch := p.nextLaunch()
if launch.IsZero() {
launchCh = nil
} else {
launchDur := launch.Sub(time.Now())
launchCh = time.After(launchDur)
p.logger.Printf("[DEBUG] nomad.periodic: launching job %q in %s", job.ID, launchDur)
}
select {
case <-p.stopCh:
return
case <-p.updateCh:
continue
case <-launchCh:
p.dispatch(job, launch)
}
}
}
// dispatch creates an evaluation for the job and updates its next launchtime
// based on the passed launch time.
func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
p.l.Lock()
defer p.l.Unlock()
nextLaunch := job.Periodic.Next(launchTime)
if err := p.heap.Update(job, nextLaunch); err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q: %v", job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: launching job %v at %v", job.ID, launchTime)
p.createEval(job, launchTime)
}
// nextLaunch returns the next job to launch and when it should be launched. If
// the next job can't be determined, an error is returned. If the dispatcher is
// stopped, a nil job will be returned.
func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time) {
// If there is nothing wait for an update.
p.l.RLock()
defer p.l.RUnlock()
if p.heap.Length() == 0 {
return nil, time.Time{}
}
nextJob := p.heap.Peek()
if nextJob == nil {
return nil, time.Time{}
}
return nextJob.job, nextJob.next
}
// createEval instantiates a job based on the passed periodic job and submits an
// evaluation for it.
func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) error {
derived, err := p.deriveJob(periodicJob, time)
if err != nil {
return err
}
if err := p.dispatcher.DispatchJob(derived); err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to dispatch job %q: %v", periodicJob.ID, err)
return err
}
return nil
}
// deriveJob instantiates a new job based on the passed periodic job and the
// launch time.
func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
derived *structs.Job, err error) {
// Have to recover in case the job copy panics.
defer func() {
if r := recover(); r != nil {
p.logger.Printf("[ERR] nomad.periodic: deriving job from"+
" periodic job %v failed; deregistering from periodic runner: %v",
periodicJob.ID, r)
p.Remove(periodicJob.ID)
derived = nil
err = fmt.Errorf("Failed to create a copy of the periodic job %v: %v", periodicJob.ID, r)
}
}()
// Create a copy of the periodic job, give it a derived ID/Name and make it
// non-periodic.
derived = periodicJob.Copy()
derived.ParentID = periodicJob.ID
derived.ID = p.derivedJobID(periodicJob, time)
derived.Name = derived.ID
derived.Periodic = nil
derived.GC = true
return
}
// deriveJobID returns a job ID based on the parent periodic job and the launch
// time.
func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string {
return fmt.Sprintf("%s%s%d", periodicJob.ID, JobLaunchSuffix, time.Unix())
}
// LaunchTime returns the launch time of the job. This is only valid for
// jobs created by PeriodicDispatch and will otherwise return an error.
func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
index := strings.LastIndex(jobID, JobLaunchSuffix)
if index == -1 {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
launch, err := strconv.Atoi(jobID[index+len(JobLaunchSuffix):])
if err != nil {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
return time.Unix(int64(launch), 0), nil
}
// Flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) Flush() {
p.l.Lock()
defer p.l.Unlock()
p.stopCh = make(chan struct{})
p.updateCh = make(chan struct{}, 1)
p.waitCh = make(chan struct{})
p.tracked = make(map[string]*structs.Job)
p.heap = NewPeriodicHeap()
}
// periodicHeap wraps a heap and gives operations other than Push/Pop.
type periodicHeap struct {
index map[string]*periodicJob
heap periodicHeapImp
}
type periodicJob struct {
job *structs.Job
next time.Time
index int
}
func NewPeriodicHeap() *periodicHeap {
return &periodicHeap{
index: make(map[string]*periodicJob),
heap: make(periodicHeapImp, 0),
}
}
func (p *periodicHeap) Push(job *structs.Job, next time.Time) error {
if _, ok := p.index[job.ID]; ok {
return fmt.Errorf("job %v already exists", job.ID)
}
pJob := &periodicJob{job, next, 0}
p.index[job.ID] = pJob
heap.Push(&p.heap, pJob)
return nil
}
func (p *periodicHeap) Pop() *periodicJob {
if len(p.heap) == 0 {
return nil
}
pJob := heap.Pop(&p.heap).(*periodicJob)
delete(p.index, pJob.job.ID)
return pJob
}
func (p *periodicHeap) Peek() *periodicJob {
if len(p.heap) == 0 {
return nil
}
return p.heap[0]
}
func (p *periodicHeap) Contains(job *structs.Job) bool {
_, ok := p.index[job.ID]
return ok
}
func (p *periodicHeap) Update(job *structs.Job, next time.Time) error {
if pJob, ok := p.index[job.ID]; ok {
// Need to update the job as well because its spec can change.
pJob.job = job
pJob.next = next
heap.Fix(&p.heap, pJob.index)
return nil
}
return fmt.Errorf("heap doesn't contain job %v", job.ID)
}
func (p *periodicHeap) Remove(job *structs.Job) error {
if pJob, ok := p.index[job.ID]; ok {
heap.Remove(&p.heap, pJob.index)
delete(p.index, job.ID)
return nil
}
return fmt.Errorf("heap doesn't contain job %v", job.ID)
}
func (p *periodicHeap) Length() int {
return len(p.heap)
}
type periodicHeapImp []*periodicJob
func (h periodicHeapImp) Len() int { return len(h) }
func (h periodicHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].next.Before(h[j].next)
}
func (h periodicHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *periodicHeapImp) Push(x interface{}) {
n := len(*h)
job := x.(*periodicJob)
job.index = n
*h = append(*h, job)
}
func (h *periodicHeapImp) Pop() interface{} {
old := *h
n := len(old)
job := old[n-1]
job.index = -1 // for safety
*h = old[0 : n-1]
return job
}

471
nomad/periodic_test.go Normal file
View file

@ -0,0 +1,471 @@
package nomad
import (
"fmt"
"log"
"math/rand"
"os"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
)
type MockJobEvalDispatcher struct {
Jobs map[string]*structs.Job
lock sync.Mutex
}
func NewMockJobEvalDispatcher() *MockJobEvalDispatcher {
return &MockJobEvalDispatcher{Jobs: make(map[string]*structs.Job)}
}
func (m *MockJobEvalDispatcher) DispatchJob(job *structs.Job) error {
m.lock.Lock()
defer m.lock.Unlock()
m.Jobs[job.ID] = job
return nil
}
// LaunchTimes returns the launch times of child jobs in sorted order.
func (m *MockJobEvalDispatcher) LaunchTimes(p *PeriodicDispatch, parentID string) ([]time.Time, error) {
m.lock.Lock()
defer m.lock.Unlock()
var launches []time.Time
for _, job := range m.Jobs {
if job.ParentID != parentID {
continue
}
t, err := p.LaunchTime(job.ID)
if err != nil {
return nil, err
}
launches = append(launches, t)
}
sort.Sort(times(launches))
return launches, nil
}
type times []time.Time
func (t times) Len() int { return len(t) }
func (t times) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t times) Less(i, j int) bool { return t[i].Before(t[j]) }
// testPeriodicDispatcher returns an enabled PeriodicDispatcher which uses the
// MockJobEvalDispatcher.
func testPeriodicDispatcher() (*PeriodicDispatch, *MockJobEvalDispatcher) {
logger := log.New(os.Stderr, "", log.LstdFlags)
m := NewMockJobEvalDispatcher()
d := NewPeriodicDispatch(logger, m)
d.SetEnabled(true)
d.Start()
return d, m
}
// testPeriodicJob is a helper that creates a periodic job that launches at the
// passed times.
func testPeriodicJob(times ...time.Time) *structs.Job {
job := mock.PeriodicJob()
job.Periodic.SpecType = structs.PeriodicSpecTest
l := make([]string, len(times))
for i, t := range times {
l[i] = strconv.Itoa(int(t.Round(1 * time.Second).Unix()))
}
job.Periodic.Spec = strings.Join(l, ",")
return job
}
func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.Job()
if err := p.Add(job); err != nil {
t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err)
}
tracked := p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Add of non-periodic job should be no-op: %v", tracked)
}
}
func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
// Update the job and add it again.
job.Periodic.Spec = "foo"
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked = p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't update: %v", tracked)
}
if !reflect.DeepEqual(job, tracked[0]) {
t.Fatalf("Add didn't properly update: got %v; want %v", tracked[0], job)
}
}
func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
// Update the job to be non-periodic and add it again.
job.Periodic = nil
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked = p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Add didn't remove: %v", tracked)
}
}
func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher()
// Create a job that won't be evalauted for a while.
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Update it to be sooner and re-add.
expected := time.Now().Round(1 * time.Second).Add(1 * time.Second)
job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix())
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Check that nothing is created.
if _, ok := m.Jobs[job.ID]; ok {
t.Fatalf("periodic dispatcher created eval at the wrong time")
}
time.Sleep(2 * time.Second)
// Check that job was launched correctly.
times, err := m.LaunchTimes(p, job.ID)
if err != nil {
t.Fatalf("failed to get launch times for job %q", job.ID)
}
if len(times) != 1 {
t.Fatalf("incorrect number of launch times for job %q", job.ID)
}
if times[0] != expected {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], expected)
}
}
func TestPeriodicDispatch_Remove_Untracked(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
if err := p.Remove("foo"); err != nil {
t.Fatalf("Remove failed %v; expected a no-op", err)
}
}
func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
if err := p.Remove(job.ID); err != nil {
t.Fatalf("Remove failed %v", err)
}
tracked = p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Remove didn't untrack the job: %v", tracked)
}
}
func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
// Create a job that will be evaluated soon.
job := testPeriodicJob(time.Now().Add(1 * time.Second))
// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Remove the job.
if err := p.Remove(job.ID); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(2 * time.Second)
// Check that an eval wasn't created.
d := p.dispatcher.(*MockJobEvalDispatcher)
if _, ok := d.Jobs[job.ID]; ok {
t.Fatalf("Remove didn't cancel creation of an eval")
}
}
func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
if err := p.ForceRun("foo"); err == nil {
t.Fatal("ForceRun of untracked job should fail")
}
}
func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher()
// Create a job that won't be evalauted for a while.
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// ForceRun the job
if err := p.ForceRun(job.ID); err != nil {
t.Fatalf("ForceRun failed %v", err)
}
// Check that job was launched correctly.
launches, err := m.LaunchTimes(p, job.ID)
if err != nil {
t.Fatalf("failed to get launch times for job %q: %v", job.ID, err)
}
l := len(launches)
if l != 1 {
t.Fatalf("restorePeriodicDispatcher() created an unexpected"+
" number of evals; got %d; want 1", l)
}
}
func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher()
// Create a job that will be launched twice.
launch1 := time.Now().Round(1 * time.Second).Add(1 * time.Second)
launch2 := time.Now().Round(1 * time.Second).Add(2 * time.Second)
job := testPeriodicJob(launch1, launch2)
// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(3 * time.Second)
// Check that job was launched correctly.
times, err := m.LaunchTimes(p, job.ID)
if err != nil {
t.Fatalf("failed to get launch times for job %q", job.ID)
}
if len(times) != 2 {
t.Fatalf("incorrect number of launch times for job %q", job.ID)
}
if times[0] != launch1 {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch1)
}
if times[1] != launch2 {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[1], launch2)
}
}
func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher()
// Create two job that will be launched at the same time.
launch := time.Now().Round(1 * time.Second).Add(1 * time.Second)
job := testPeriodicJob(launch)
job2 := testPeriodicJob(launch)
// Add them.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(2 * time.Second)
// Check that the jobs were launched correctly.
for _, job := range []*structs.Job{job, job2} {
times, err := m.LaunchTimes(p, job.ID)
if err != nil {
t.Fatalf("failed to get launch times for job %q", job.ID)
}
if len(times) != 1 {
t.Fatalf("incorrect number of launch times for job %q; got %d; want 1", job.ID, len(times))
}
if times[0] != launch {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch)
}
}
}
// This test adds and removes a bunch of jobs, some launching at the same time,
// some after each other and some invalid times, and ensures the correct
// behavior.
func TestPeriodicDispatch_Complex(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher()
// Create some jobs launching at different times.
now := time.Now().Round(1 * time.Second)
same := now.Add(1 * time.Second)
launch1 := same.Add(1 * time.Second)
launch2 := same.Add(2 * time.Second)
launch3 := same.Add(3 * time.Second)
invalid := now.Add(-200 * time.Second)
// Create two jobs launching at the same time.
job1 := testPeriodicJob(same)
job2 := testPeriodicJob(same)
// Create a job that will never launch.
job3 := testPeriodicJob(invalid)
// Create a job that launches twice.
job4 := testPeriodicJob(launch1, launch3)
// Create a job that launches once.
job5 := testPeriodicJob(launch2)
// Create 3 jobs we will delete.
job6 := testPeriodicJob(same)
job7 := testPeriodicJob(launch1, launch3)
job8 := testPeriodicJob(launch2)
// Create a map of expected eval job ids.
expected := map[string][]time.Time{
job1.ID: []time.Time{same},
job2.ID: []time.Time{same},
job3.ID: nil,
job4.ID: []time.Time{launch1, launch3},
job5.ID: []time.Time{launch2},
job6.ID: nil,
job7.ID: nil,
job8.ID: nil,
}
// Shuffle the jobs so they can be added randomly
jobs := []*structs.Job{job1, job2, job3, job4, job5, job6, job7, job8}
toDelete := []*structs.Job{job6, job7, job8}
shuffle(jobs)
shuffle(toDelete)
for _, job := range jobs {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
}
for _, job := range toDelete {
if err := p.Remove(job.ID); err != nil {
t.Fatalf("Remove failed %v", err)
}
}
time.Sleep(5 * time.Second)
actual := make(map[string][]time.Time, len(expected))
for _, job := range jobs {
launches, err := m.LaunchTimes(p, job.ID)
if err != nil {
t.Fatalf("LaunchTimes(%v) failed %v", job.ID, err)
}
actual[job.ID] = launches
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("Unexpected launches; got %#v; want %#v", actual, expected)
}
}
func shuffle(jobs []*structs.Job) {
rand.Seed(time.Now().Unix())
for i := range jobs {
j := rand.Intn(len(jobs))
jobs[i], jobs[j] = jobs[j], jobs[i]
}
}
func TestPeriodicHeap_Order(t *testing.T) {
t.Parallel()
h := NewPeriodicHeap()
j1 := mock.PeriodicJob()
j2 := mock.PeriodicJob()
j3 := mock.PeriodicJob()
lookup := map[*structs.Job]string{
j1: "j1",
j2: "j2",
j3: "j3",
}
h.Push(j1, time.Time{})
h.Push(j2, time.Unix(10, 0))
h.Push(j3, time.Unix(11, 0))
exp := []string{"j2", "j3", "j1"}
var act []string
for i := 0; i < 3; i++ {
pJob := h.Pop()
act = append(act, lookup[pJob.job])
}
if !reflect.DeepEqual(act, exp) {
t.Fatalf("Wrong ordering; got %v; want %v", act, exp)
}
}

View file

@ -113,6 +113,9 @@ type Server struct {
// plans that are waiting to be assessed by the leader // plans that are waiting to be assessed by the leader
planQueue *PlanQueue planQueue *PlanQueue
// periodicDispatcher is used to track and create evaluations for periodic jobs.
periodicDispatcher *PeriodicDispatch
// heartbeatTimers track the expiration time of each heartbeat that has // heartbeatTimers track the expiration time of each heartbeat that has
// a TTL. On expiration, the node status is updated to be 'down'. // a TTL. On expiration, the node status is updated to be 'down'.
heartbeatTimers map[string]*time.Timer heartbeatTimers map[string]*time.Timer
@ -181,6 +184,9 @@ func NewServer(config *Config) (*Server, error) {
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
} }
// Create the periodic dispatcher for launching periodic jobs.
s.periodicDispatcher = NewPeriodicDispatch(s.logger, s)
// Initialize the RPC layer // Initialize the RPC layer
// TODO: TLS... // TODO: TLS...
if err := s.setupRPC(nil); err != nil { if err := s.setupRPC(nil); err != nil {
@ -406,7 +412,7 @@ func (s *Server) setupRaft() error {
// Create the FSM // Create the FSM
var err error var err error
s.fsm, err = NewFSM(s.evalBroker, s.config.LogOutput) s.fsm, err = NewFSM(s.evalBroker, s.periodicDispatcher, s.config.LogOutput)
if err != nil { if err != nil {
return err return err
} }

View file

@ -19,6 +19,7 @@ func stateStoreSchema() *memdb.DBSchema {
indexTableSchema, indexTableSchema,
nodeTableSchema, nodeTableSchema,
jobTableSchema, jobTableSchema,
periodicLaunchTableSchema,
evalTableSchema, evalTableSchema,
allocTableSchema, allocTableSchema,
} }
@ -109,6 +110,14 @@ func jobTableSchema() *memdb.TableSchema {
Conditional: jobIsGCable, Conditional: jobIsGCable,
}, },
}, },
"periodic": &memdb.IndexSchema{
Name: "periodic",
AllowMissing: false,
Unique: false,
Indexer: &memdb.ConditionalIndex{
Conditional: jobIsPeriodic,
},
},
}, },
} }
} }
@ -124,6 +133,43 @@ func jobIsGCable(obj interface{}) (bool, error) {
return j.GC, nil return j.GC, nil
} }
// jobIsPeriodic satisfies the ConditionalIndexFunc interface and creates an index
// on whether a job is periodic.
func jobIsPeriodic(obj interface{}) (bool, error) {
j, ok := obj.(*structs.Job)
if !ok {
return false, fmt.Errorf("Unexpected type: %v", obj)
}
if j.Periodic != nil && j.Periodic.Enabled == true {
return true, nil
}
return false, nil
}
// periodicLaunchTableSchema returns the MemDB schema tracking the most recent
// launch time for a perioidic job.
func periodicLaunchTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "periodic_launch",
Indexes: map[string]*memdb.IndexSchema{
// Primary index is used for job management
// and simple direct lookup. ID is required to be
// unique.
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "ID",
Lowercase: true,
},
},
},
}
}
// evalTableSchema returns the MemDB schema for the eval table. // evalTableSchema returns the MemDB schema for the eval table.
// This table is used to store all the evaluations that are pending // This table is used to store all the evaluations that are pending
// or recently completed. // or recently completed.

View file

@ -131,10 +131,6 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
txn := s.db.Txn(true) txn := s.db.Txn(true)
defer txn.Abort() defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "nodes"})
watcher.Add(watch.Item{Node: nodeID})
// Lookup the node // Lookup the node
existing, err := txn.First("nodes", "id", nodeID) existing, err := txn.First("nodes", "id", nodeID)
if err != nil { if err != nil {
@ -144,6 +140,10 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
return fmt.Errorf("node not found") return fmt.Errorf("node not found")
} }
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "nodes"})
watcher.Add(watch.Item{Node: nodeID})
// Delete the node // Delete the node
if err := txn.Delete("nodes", existing); err != nil { if err := txn.Delete("nodes", existing); err != nil {
return fmt.Errorf("node delete failed: %v", err) return fmt.Errorf("node delete failed: %v", err)
@ -318,10 +318,6 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
txn := s.db.Txn(true) txn := s.db.Txn(true)
defer txn.Abort() defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: jobID})
// Lookup the node // Lookup the node
existing, err := txn.First("jobs", "id", jobID) existing, err := txn.First("jobs", "id", jobID)
if err != nil { if err != nil {
@ -331,6 +327,10 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
return fmt.Errorf("job not found") return fmt.Errorf("job not found")
} }
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: jobID})
// Delete the node // Delete the node
if err := txn.Delete("jobs", existing); err != nil { if err := txn.Delete("jobs", existing); err != nil {
return fmt.Errorf("job delete failed: %v", err) return fmt.Errorf("job delete failed: %v", err)
@ -383,6 +383,17 @@ func (s *StateStore) Jobs() (memdb.ResultIterator, error) {
return iter, nil return iter, nil
} }
// JobsByPeriodic returns an iterator over all the periodic or non-periodic jobs.
func (s *StateStore) JobsByPeriodic(periodic bool) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
iter, err := txn.Get("jobs", "periodic", periodic)
if err != nil {
return nil, err
}
return iter, nil
}
// JobsByScheduler returns an iterator over all the jobs with the specific // JobsByScheduler returns an iterator over all the jobs with the specific
// scheduler type. // scheduler type.
func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator, error) { func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator, error) {
@ -408,6 +419,102 @@ func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) {
return iter, nil return iter, nil
} }
// UpsertPeriodicLaunch is used to register a launch or update it.
func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "periodic_launch"})
watcher.Add(watch.Item{Job: launch.ID})
// Check if the job already exists
existing, err := txn.First("periodic_launch", "id", launch.ID)
if err != nil {
return fmt.Errorf("periodic launch lookup failed: %v", err)
}
// Setup the indexes correctly
if existing != nil {
launch.CreateIndex = existing.(*structs.PeriodicLaunch).CreateIndex
launch.ModifyIndex = index
} else {
launch.CreateIndex = index
launch.ModifyIndex = index
}
// Insert the job
if err := txn.Insert("periodic_launch", launch); err != nil {
return fmt.Errorf("launch insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
// DeletePeriodicLaunch is used to delete the periodic launch
func (s *StateStore) DeletePeriodicLaunch(index uint64, jobID string) error {
txn := s.db.Txn(true)
defer txn.Abort()
// Lookup the launch
existing, err := txn.First("periodic_launch", "id", jobID)
if err != nil {
return fmt.Errorf("launch lookup failed: %v", err)
}
if existing == nil {
return fmt.Errorf("launch not found")
}
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "periodic_launch"})
watcher.Add(watch.Item{Job: jobID})
// Delete the launch
if err := txn.Delete("periodic_launch", existing); err != nil {
return fmt.Errorf("launch delete failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
// PeriodicLaunchByID is used to lookup a periodic launch by the periodic job
// ID.
func (s *StateStore) PeriodicLaunchByID(id string) (*structs.PeriodicLaunch, error) {
txn := s.db.Txn(false)
existing, err := txn.First("periodic_launch", "id", id)
if err != nil {
return nil, fmt.Errorf("periodic launch lookup failed: %v", err)
}
if existing != nil {
return existing.(*structs.PeriodicLaunch), nil
}
return nil, nil
}
// PeriodicLaunches returns an iterator over all the periodic launches
func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
// Walk the entire table
iter, err := txn.Get("periodic_launch", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// UpsertEvaluation is used to upsert an evaluation // UpsertEvaluation is used to upsert an evaluation
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error { func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
txn := s.db.Txn(true) txn := s.db.Txn(true)
@ -875,6 +982,16 @@ func (r *StateRestore) IndexRestore(idx *IndexEntry) error {
return nil return nil
} }
// PeriodicLaunchRestore is used to restore a periodic launch.
func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) error {
r.items.Add(watch.Item{Table: "periodic_launch"})
r.items.Add(watch.Item{Job: launch.ID})
if err := r.txn.Insert("periodic_launch", launch); err != nil {
return fmt.Errorf("periodic launch insert failed: %v", err)
}
return nil
}
// stateWatch holds shared state for watching updates. This is // stateWatch holds shared state for watching updates. This is
// outside of StateStore so it can be shared with snapshots. // outside of StateStore so it can be shared with snapshots.
type stateWatch struct { type stateWatch struct {

View file

@ -5,6 +5,7 @@ import (
"reflect" "reflect"
"sort" "sort"
"testing" "testing"
"time"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
@ -546,6 +547,64 @@ func TestStateStore_JobsByIDPrefix(t *testing.T) {
} }
} }
func TestStateStore_JobsByPeriodic(t *testing.T) {
state := testStateStore(t)
var periodic, nonPeriodic []*structs.Job
for i := 0; i < 10; i++ {
job := mock.Job()
nonPeriodic = append(nonPeriodic, job)
err := state.UpsertJob(1000+uint64(i), job)
if err != nil {
t.Fatalf("err: %v", err)
}
}
for i := 0; i < 10; i++ {
job := mock.PeriodicJob()
periodic = append(periodic, job)
err := state.UpsertJob(2000+uint64(i), job)
if err != nil {
t.Fatalf("err: %v", err)
}
}
iter, err := state.JobsByPeriodic(true)
var outPeriodic []*structs.Job
for {
raw := iter.Next()
if raw == nil {
break
}
outPeriodic = append(outPeriodic, raw.(*structs.Job))
}
iter, err = state.JobsByPeriodic(false)
var outNonPeriodic []*structs.Job
for {
raw := iter.Next()
if raw == nil {
break
}
outNonPeriodic = append(outNonPeriodic, raw.(*structs.Job))
}
sort.Sort(JobIDSort(periodic))
sort.Sort(JobIDSort(nonPeriodic))
sort.Sort(JobIDSort(outPeriodic))
sort.Sort(JobIDSort(outNonPeriodic))
if !reflect.DeepEqual(periodic, outPeriodic) {
t.Fatalf("bad: %#v %#v", periodic, outPeriodic)
}
if !reflect.DeepEqual(nonPeriodic, outNonPeriodic) {
t.Fatalf("bad: %#v %#v", nonPeriodic, outNonPeriodic)
}
}
func TestStateStore_JobsByScheduler(t *testing.T) { func TestStateStore_JobsByScheduler(t *testing.T) {
state := testStateStore(t) state := testStateStore(t)
var serviceJobs []*structs.Job var serviceJobs []*structs.Job
@ -702,6 +761,222 @@ func TestStateStore_RestoreJob(t *testing.T) {
notify.verify(t) notify.verify(t)
} }
func TestStateStore_UpsertPeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
notify := setupNotifyTest(
state,
watch.Item{Table: "periodic_launch"},
watch.Item{Job: job.ID})
err := state.UpsertPeriodicLaunch(1000, launch)
if err != nil {
t.Fatalf("err: %v", err)
}
out, err := state.PeriodicLaunchByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.CreateIndex != 1000 {
t.Fatalf("bad: %#v", out)
}
if out.ModifyIndex != 1000 {
t.Fatalf("bad: %#v", out)
}
if !reflect.DeepEqual(launch, out) {
t.Fatalf("bad: %#v %#v", job, out)
}
index, err := state.Index("periodic_launch")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1000 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
notify := setupNotifyTest(
state,
watch.Item{Table: "periodic_launch"},
watch.Item{Job: job.ID})
err := state.UpsertPeriodicLaunch(1000, launch)
if err != nil {
t.Fatalf("err: %v", err)
}
launch2 := &structs.PeriodicLaunch{
ID: job.ID,
Launch: launch.Launch.Add(1 * time.Second),
}
err = state.UpsertPeriodicLaunch(1001, launch2)
if err != nil {
t.Fatalf("err: %v", err)
}
out, err := state.PeriodicLaunchByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.CreateIndex != 1000 {
t.Fatalf("bad: %#v", out)
}
if out.ModifyIndex != 1001 {
t.Fatalf("bad: %#v", out)
}
if !reflect.DeepEqual(launch2, out) {
t.Fatalf("bad: %#v %#v", launch2, out)
}
index, err := state.Index("periodic_launch")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_DeletePeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
notify := setupNotifyTest(
state,
watch.Item{Table: "periodic_launch"},
watch.Item{Job: job.ID})
err := state.UpsertPeriodicLaunch(1000, launch)
if err != nil {
t.Fatalf("err: %v", err)
}
err = state.DeletePeriodicLaunch(1001, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
out, err := state.PeriodicLaunchByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %#v %#v", job, out)
}
index, err := state.Index("periodic_launch")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_PeriodicLaunches(t *testing.T) {
state := testStateStore(t)
var launches []*structs.PeriodicLaunch
for i := 0; i < 10; i++ {
job := mock.Job()
launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
launches = append(launches, launch)
err := state.UpsertPeriodicLaunch(1000+uint64(i), launch)
if err != nil {
t.Fatalf("err: %v", err)
}
}
iter, err := state.PeriodicLaunches()
if err != nil {
t.Fatalf("err: %v", err)
}
out := make(map[string]*structs.PeriodicLaunch, 10)
for {
raw := iter.Next()
if raw == nil {
break
}
launch := raw.(*structs.PeriodicLaunch)
if _, ok := out[launch.ID]; ok {
t.Fatalf("duplicate: %v", launch.ID)
}
out[launch.ID] = launch
}
for _, launch := range launches {
l, ok := out[launch.ID]
if !ok {
t.Fatalf("bad %v", launch.ID)
}
if !reflect.DeepEqual(launch, l) {
t.Fatalf("bad: %#v %#v", launch, l)
}
delete(out, launch.ID)
}
if len(out) != 0 {
t.Fatalf("leftover: %#v", out)
}
}
func TestStateStore_RestorePeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
notify := setupNotifyTest(
state,
watch.Item{Table: "periodic_launch"},
watch.Item{Job: job.ID})
restore, err := state.Restore()
if err != nil {
t.Fatalf("err: %v", err)
}
err = restore.PeriodicLaunchRestore(launch)
if err != nil {
t.Fatalf("err: %v", err)
}
restore.Commit()
out, err := state.PeriodicLaunchByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(out, launch) {
t.Fatalf("Bad: %#v %#v", out, job)
}
notify.verify(t)
}
func TestStateStore_Indexes(t *testing.T) { func TestStateStore_Indexes(t *testing.T) {
state := testStateStore(t) state := testStateStore(t)
node := mock.Node() node := mock.Node()

View file

@ -8,6 +8,7 @@ import (
"io" "io"
"reflect" "reflect"
"regexp" "regexp"
"strconv"
"strings" "strings"
"time" "time"
@ -16,6 +17,7 @@ import (
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version" "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/args" "github.com/hashicorp/nomad/helper/args"
"github.com/mitchellh/copystructure"
) )
var ( var (
@ -722,6 +724,9 @@ type Job struct {
// specified hierarchically like LineOfBiz/OrgName/Team/Project // specified hierarchically like LineOfBiz/OrgName/Team/Project
ID string ID string
// ParentID is the unique identifier of the job that spawned this job.
ParentID string
// Name is the logical name of the job used to refer to it. This is unique // Name is the logical name of the job used to refer to it. This is unique
// per region, but not unique globally. // per region, but not unique globally.
Name string Name string
@ -790,6 +795,17 @@ func (j *Job) InitFields() {
} }
} }
// Copy returns a deep copy of the Job. It is expected that callers use recover.
// This job can panic if the deep copy failed as it uses reflection.
func (j *Job) Copy() *Job {
i, err := copystructure.Copy(j)
if err != nil {
panic(err)
}
return i.(*Job)
}
// Validate is used to sanity check a job input // Validate is used to sanity check a job input
func (j *Job) Validate() error { func (j *Job) Validate() error {
var mErr multierror.Error var mErr multierror.Error
@ -850,9 +866,15 @@ func (j *Job) Validate() error {
} }
// Validate periodic is only used with batch jobs. // Validate periodic is only used with batch jobs.
if j.Periodic != nil && j.Periodic.Enabled && j.Type != JobTypeBatch { if j.IsPeriodic() {
mErr.Errors = append(mErr.Errors, if j.Type != JobTypeBatch {
fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch)) mErr.Errors = append(mErr.Errors,
fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch))
}
if err := j.Periodic.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
} }
return mErr.ErrorOrNil() return mErr.ErrorOrNil()
@ -917,6 +939,10 @@ func (u *UpdateStrategy) Rolling() bool {
const ( const (
// PeriodicSpecCron is used for a cron spec. // PeriodicSpecCron is used for a cron spec.
PeriodicSpecCron = "cron" PeriodicSpecCron = "cron"
// PeriodicSpecTest is only used by unit tests. It is a sorted, comma
// seperated list of unix timestamps at which to launch.
PeriodicSpecTest = "_internal_test"
) )
// Periodic defines the interval a job should be run at. // Periodic defines the interval a job should be run at.
@ -947,8 +973,10 @@ func (p *PeriodicConfig) Validate() error {
if _, err := cronexpr.Parse(p.Spec); err != nil { if _, err := cronexpr.Parse(p.Spec); err != nil {
return fmt.Errorf("Invalid cron spec %q: %v", p.Spec, err) return fmt.Errorf("Invalid cron spec %q: %v", p.Spec, err)
} }
case PeriodicSpecTest:
// No-op
default: default:
return fmt.Errorf("Unknown specification type %q", p.SpecType) return fmt.Errorf("Unknown periodic specification type %q", p.SpecType)
} }
return nil return nil
@ -964,11 +992,44 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
if e, err := cronexpr.Parse(p.Spec); err == nil { if e, err := cronexpr.Parse(p.Spec); err == nil {
return e.Next(fromTime) return e.Next(fromTime)
} }
case PeriodicSpecTest:
split := strings.Split(p.Spec, ",")
if len(split) == 1 && split[0] == "" {
return time.Time{}
}
// Parse the times
times := make([]time.Time, len(split))
for i, s := range split {
unix, err := strconv.Atoi(s)
if err != nil {
return time.Time{}
}
times[i] = time.Unix(int64(unix), 0)
}
// Find the next match
for _, next := range times {
if fromTime.Before(next) {
return next
}
}
} }
return time.Time{} return time.Time{}
} }
// PeriodicLaunch tracks the last launch time of a periodic job.
type PeriodicLaunch struct {
ID string // ID of the periodic job.
Launch time.Time // The last launch time.
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
var ( var (
defaultServiceJobRestartPolicy = RestartPolicy{ defaultServiceJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second, Delay: 15 * time.Second,

View file

@ -93,6 +93,80 @@ func TestJob_Validate(t *testing.T) {
} }
} }
func TestJob_Copy(t *testing.T) {
j := &Job{
Region: "global",
ID: GenerateUUID(),
Name: "my-job",
Type: JobTypeService,
Priority: 50,
AllAtOnce: false,
Datacenters: []string{"dc1"},
Constraints: []*Constraint{
&Constraint{
LTarget: "$attr.kernel.name",
RTarget: "linux",
Operand: "=",
},
},
Periodic: &PeriodicConfig{
Enabled: false,
},
TaskGroups: []*TaskGroup{
&TaskGroup{
Name: "web",
Count: 10,
RestartPolicy: &RestartPolicy{
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
},
Tasks: []*Task{
&Task{
Name: "web",
Driver: "exec",
Config: map[string]interface{}{
"command": "/bin/date",
},
Env: map[string]string{
"FOO": "bar",
},
Services: []*Service{
{
Name: "${TASK}-frontend",
PortLabel: "http",
},
},
Resources: &Resources{
CPU: 500,
MemoryMB: 256,
Networks: []*NetworkResource{
&NetworkResource{
MBits: 50,
DynamicPorts: []Port{{Label: "http"}},
},
},
},
},
},
Meta: map[string]string{
"elb_check_type": "http",
"elb_check_interval": "30s",
"elb_check_min": "3",
},
},
},
Meta: map[string]string{
"owner": "armon",
},
}
c := j.Copy()
if !reflect.DeepEqual(j, c) {
t.Fatalf("Copy() returned an unequal Job; got %v; want %v", c, j)
}
}
func TestJob_IsPeriodic(t *testing.T) { func TestJob_IsPeriodic(t *testing.T) {
j := &Job{ j := &Job{
Type: JobTypeService, Type: JobTypeService,

View file

@ -64,7 +64,7 @@ func (t *TimeTable) Deserialize(dec *codec.Decoder) error {
return nil return nil
} }
// Witness is used to witness a new inde and time. // Witness is used to witness a new index and time.
func (t *TimeTable) Witness(index uint64, when time.Time) { func (t *TimeTable) Witness(index uint64, when time.Time) {
t.l.Lock() t.l.Lock()
defer t.l.Unlock() defer t.l.Unlock()

View file

@ -405,6 +405,9 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) {
// Ensure all allocations placed // Ensure all allocations placed
if len(out) != 10 { if len(out) != 10 {
for _, alloc := range out {
t.Logf("%#v", alloc)
}
t.Fatalf("bad: %#v", out) t.Fatalf("bad: %#v", out)
} }
h.AssertEvalStatus(t, structs.EvalStatusComplete) h.AssertEvalStatus(t, structs.EvalStatusComplete)

View file

@ -5,9 +5,9 @@ var GitCommit string
var GitDescribe string var GitDescribe string
// The main version number that is being run at the moment. // The main version number that is being run at the moment.
const Version = "0.2.3" const Version = "0.3.0"
// A pre-release marker for the version. If this is "" (empty string) // A pre-release marker for the version. If this is "" (empty string)
// then it means that it is a final release. Otherwise, this is a pre-release // then it means that it is a final release. Otherwise, this is a pre-release
// such as "dev" (in development), "beta", "rc1", etc. // such as "dev" (in development), "beta", "rc1", etc.
const VersionPrerelease = "" const VersionPrerelease = "dev"

View file

@ -94,7 +94,7 @@ nodes, unless otherwise specified:
directory by default to store temporary allocation data as well as cluster directory by default to store temporary allocation data as well as cluster
information. Server nodes use this directory to store cluster state, including information. Server nodes use this directory to store cluster state, including
the replicated log and snapshot data. This option is required to start the the replicated log and snapshot data. This option is required to start the
Nomad agent. Nomad agent and must be specified as an absolute path.
* `log_level`: Controls the verbosity of logs the Nomad agent will output. Valid * `log_level`: Controls the verbosity of logs the Nomad agent will output. Valid
log levels include `WARN`, `INFO`, or `DEBUG` in increasing order of log levels include `WARN`, `INFO`, or `DEBUG` in increasing order of
@ -253,14 +253,14 @@ configured on server nodes.
configuration options depend on this value. Defaults to `false`. configuration options depend on this value. Defaults to `false`.
* <a id="state_dir">`state_dir`</a>: This is the state dir used to store * <a id="state_dir">`state_dir`</a>: This is the state dir used to store
client state. By default, it lives inside of the [data_dir](#data_dir), in client state. By default, it lives inside of the [data_dir](#data_dir), in
the "client" sub-path. the "client" sub-path. It must be specified as an absolute path.
* <a id="alloc_dir">`alloc_dir`</a>: A directory used to store allocation data. * <a id="alloc_dir">`alloc_dir`</a>: A directory used to store allocation data.
Depending on the workload, the size of this directory can grow arbitrarily Depending on the workload, the size of this directory can grow arbitrarily
large as it is used to store downloaded artifacts for drivers (QEMU images, large as it is used to store downloaded artifacts for drivers (QEMU images,
JAR files, etc.). It is therefore important to ensure this directory is JAR files, etc.). It is therefore important to ensure this directory is
placed some place on the filesystem with adequate storage capacity. By placed some place on the filesystem with adequate storage capacity. By
default, this directory lives under the [data_dir](#data_dir) at the default, this directory lives under the [data_dir](#data_dir) at the
"alloc" sub-path. "alloc" sub-path. It must be specified as an absolute path.
* <a id="servers">`servers`</a>: An array of server addresses. This list is * <a id="servers">`servers`</a>: An array of server addresses. This list is
used to register the client with the server nodes and advertise the used to register the client with the server nodes and advertise the
available resources so that the agent can receive work. available resources so that the agent can receive work.

View file

@ -11,10 +11,9 @@ description: |-
Name: `rkt` Name: `rkt`
The `rkt` driver provides an interface for using CoreOS rkt for running The `rkt` driver provides an interface for using CoreOS rkt for running
application containers. Currently, the driver supports launching application containers. Currently, the driver supports launching containers but
containers but does not support resource isolation or dynamic ports. This can does not support dynamic ports. This can lead to port conflicts and as such,
lead to resource over commitment and port conflicts and as such, this driver is this driver is being marked as experimental and should be used with care.
being marked as experimental and should be used with care.
## Task Configuration ## Task Configuration
@ -49,9 +48,11 @@ The `rkt` driver will set the following client attributes:
* `driver.rkt` - Set to `1` if rkt is found on the host node. Nomad determines * `driver.rkt` - Set to `1` if rkt is found on the host node. Nomad determines
this by executing `rkt version` on the host and parsing the output this by executing `rkt version` on the host and parsing the output
* `driver.rkt.version` - Version of `rkt` eg: `0.8.1` * `driver.rkt.version` - Version of `rkt` eg: `0.8.1`. Note that the minimum required
version is `0.14.0`
* `driver.rkt.appc.version` - Version of `appc` that `rkt` is using eg: `0.8.1` * `driver.rkt.appc.version` - Version of `appc` that `rkt` is using eg: `0.8.1`
## Resource Isolation ## Resource Isolation
This driver does not support any resource isolation as of now. This driver supports CPU and memory isolation by delegating to `rkt`. Network isolation
is not supported as of now.

View file

@ -156,6 +156,24 @@ The `job` object supports the following keys:
and "h" suffix can be used, such as "30s". Both values default to zero, and "h" suffix can be used, such as "30s". Both values default to zero,
which disables rolling updates. which disables rolling updates.
* `periodic` - `periodic` allows the job to be scheduled at fixed times, dates
or intervals. The `periodic` block has the following configuration:
```
periodic {
// Enabled is defaulted to true if the block is included. It can be set
// to false to pause the periodic job from running.
enabled = true
// A cron expression configuring the interval the job is launched at.
// Supports predefined expressions such as "@daily" and "@weekly"
cron = "*/15 * * * * *"
}
```
`cron`: See [here](https://github.com/gorhill/cronexpr#implementation)
for full documentation of supported cron specs and the predefined expressions.
### Task Group ### Task Group
The `group` object supports the following keys: The `group` object supports the following keys: