diff --git a/CHANGELOG.md b/CHANGELOG.md
index e5c689af3..7e9fbc7ea 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,17 @@
+## 0.3.0 (UNRELEASED)
+
+IMPROVEMENTS:
+ * core: Periodic specification for jobs [GH-540]
+ * core: Improved restart policy with more user configuration [GH-594]
+ * core: Batch jobs are garbage collected from the Nomad Servers [GH-586]
+ * driver/rkt: Add support for CPU/Memory isolation [GH-610]
+ * cli: Output of agent-info is sorted [GH-617]
+
+BUG FIXES:
+ * cli: Handle parsing of un-named ports [GH-604]
+ * client: Handle non-200 codes when parsing AWS metadata [GH-614]
+ * cli: Enforce absolute paths for data directories [GH-622]
+
## 0.2.3 (December 17, 2015)
BUG FIXES:
diff --git a/api/jobs.go b/api/jobs.go
index 055b976eb..185539277 100644
--- a/api/jobs.go
+++ b/api/jobs.go
@@ -106,12 +106,19 @@ func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta,
return resp.EvalID, wm, nil
}
-//UpdateStrategy is for serializing update strategy for a job.
+// UpdateStrategy is for serializing update strategy for a job.
type UpdateStrategy struct {
Stagger time.Duration
MaxParallel int
}
+// PeriodicConfig is for serializing periodic config for a job.
+type PeriodicConfig struct {
+ Enabled bool
+ Spec string
+ SpecType string
+}
+
// Job is used to serialize a job.
type Job struct {
Region string
@@ -124,6 +131,7 @@ type Job struct {
Constraints []*Constraint
TaskGroups []*TaskGroup
Update *UpdateStrategy
+ Periodic *PeriodicConfig
Meta map[string]string
Status string
StatusDescription string
diff --git a/api/jobs_test.go b/api/jobs_test.go
index ba12d7fa3..3ab49a185 100644
--- a/api/jobs_test.go
+++ b/api/jobs_test.go
@@ -230,12 +230,10 @@ func TestJobs_Deregister(t *testing.T) {
}
assertWriteMeta(t, wm)
- // Attempting delete on non-existing job does not error
- _, wm2, err := jobs.Deregister("nope", nil)
- if err != nil {
- t.Fatalf("err: %s", err)
+ // Attempting delete on non-existing job returns an error
+ if _, _, err = jobs.Deregister("nope", nil); err == nil {
+ t.Fatalf("expected error deregistering job")
}
- assertWriteMeta(t, wm2)
// Deleting an existing job works
evalID, wm3, err := jobs.Deregister("job1", nil)
diff --git a/client/driver/rkt.go b/client/driver/rkt.go
index 1d90ba71e..99084b5f8 100644
--- a/client/driver/rkt.go
+++ b/client/driver/rkt.go
@@ -14,6 +14,7 @@ import (
"syscall"
"time"
+ "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
@@ -28,6 +29,16 @@ var (
reAppcVersion = regexp.MustCompile(`appc version (\d[.\d]+)`)
)
+const (
+ // minRktVersion is the earliest supported version of rkt. rkt added support
+ // for CPU and memory isolators in 0.14.0. We cannot support an earlier
+ // version to maintain an uniform interface across all drivers
+ minRktVersion = "0.14.0"
+
+ // bytesToMB is the conversion from bytes to megabytes.
+ bytesToMB = 1024 * 1024
+)
+
// RktDriver is a driver for running images via Rkt
// We attempt to chose sane defaults for now, with more configuration available
// planned in the future
@@ -85,6 +96,13 @@ func (d *RktDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e
node.Attributes["driver.rkt.version"] = rktMatches[1]
node.Attributes["driver.rkt.appc.version"] = appcMatches[1]
+ minVersion, _ := version.NewVersion(minRktVersion)
+ currentVersion, _ := version.NewVersion(node.Attributes["driver.rkt.version"])
+ if currentVersion.LessThan(minVersion) {
+ // Do not allow rkt < 0.14.0
+ d.logger.Printf("[WARN] driver.rkt: please upgrade rkt to a version >= %s", minVersion)
+ node.Attributes["driver.rkt"] = "0"
+ }
return true, nil
}
@@ -108,23 +126,26 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)
+ // Build the command.
+ var cmdArgs []string
+
// Add the given trust prefix
- trust_prefix, trust_cmd := task.Config["trust_prefix"]
- if trust_cmd {
+ trustPrefix, trustCmd := task.Config["trust_prefix"]
+ if trustCmd {
var outBuf, errBuf bytes.Buffer
- cmd := exec.Command("rkt", "trust", fmt.Sprintf("--prefix=%s", trust_prefix))
+ cmd := exec.Command("rkt", "trust", fmt.Sprintf("--prefix=%s", trustPrefix))
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("Error running rkt trust: %s\n\nOutput: %s\n\nError: %s",
err, outBuf.String(), errBuf.String())
}
- d.logger.Printf("[DEBUG] driver.rkt: added trust prefix: %q", trust_prefix)
+ d.logger.Printf("[DEBUG] driver.rkt: added trust prefix: %q", trustPrefix)
+ } else {
+ // Disble signature verification if the trust command was not run.
+ cmdArgs = append(cmdArgs, "--insecure-skip-verify")
}
- // Build the command.
- var cmd_args []string
-
// Inject the environment variables.
envVars := TaskEnvironmentVariables(ctx, task)
@@ -133,33 +154,41 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
envVars.ClearAllocDir()
for k, v := range envVars.Map() {
- cmd_args = append(cmd_args, fmt.Sprintf("--set-env=%v=%v", k, v))
- }
-
- // Disble signature verification if the trust command was not run.
- if !trust_cmd {
- cmd_args = append(cmd_args, "--insecure-skip-verify")
+ cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%v", k, v))
}
// Append the run command.
- cmd_args = append(cmd_args, "run", "--mds-register=false", img)
+ cmdArgs = append(cmdArgs, "run", "--mds-register=false", img)
// Check if the user has overriden the exec command.
- if exec_cmd, ok := task.Config["command"]; ok {
- cmd_args = append(cmd_args, fmt.Sprintf("--exec=%v", exec_cmd))
+ if execCmd, ok := task.Config["command"]; ok {
+ cmdArgs = append(cmdArgs, fmt.Sprintf("--exec=%v", execCmd))
}
+ if task.Resources.MemoryMB == 0 {
+ return nil, fmt.Errorf("Memory limit cannot be zero")
+ }
+ if task.Resources.CPU == 0 {
+ return nil, fmt.Errorf("CPU limit cannot be zero")
+ }
+
+ // Add memory isolator
+ cmdArgs = append(cmdArgs, fmt.Sprintf("--memory=%vM", int64(task.Resources.MemoryMB)*bytesToMB))
+
+ // Add CPU isolator
+ cmdArgs = append(cmdArgs, fmt.Sprintf("--cpu=%vm", int64(task.Resources.CPU)))
+
// Add user passed arguments.
if len(driverConfig.Args) != 0 {
parsed := args.ParseAndReplace(driverConfig.Args, envVars.Map())
// Need to start arguments with "--"
if len(parsed) > 0 {
- cmd_args = append(cmd_args, "--")
+ cmdArgs = append(cmdArgs, "--")
}
for _, arg := range parsed {
- cmd_args = append(cmd_args, fmt.Sprintf("%v", arg))
+ cmdArgs = append(cmdArgs, fmt.Sprintf("%v", arg))
}
}
@@ -177,7 +206,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
return nil, fmt.Errorf("Error opening file to redirect stderr: %v", err)
}
- cmd := exec.Command("rkt", cmd_args...)
+ cmd := exec.Command("rkt", cmdArgs...)
cmd.Stdout = stdo
cmd.Stderr = stde
diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go
index 15db27b27..8ee4425c2 100644
--- a/client/driver/rkt_test.go
+++ b/client/driver/rkt_test.go
@@ -81,6 +81,10 @@ func TestRktDriver_Start(t *testing.T) {
"image": "coreos.com/etcd:v2.0.4",
"command": "/etcd",
},
+ Resources: &structs.Resources{
+ MemoryMB: 256,
+ CPU: 512,
+ },
}
driverCtx := testDriverContext(task.Name)
@@ -121,6 +125,10 @@ func TestRktDriver_Start_Wait(t *testing.T) {
"command": "/etcd",
"args": []string{"--version"},
},
+ Resources: &structs.Resources{
+ MemoryMB: 256,
+ CPU: 512,
+ },
}
driverCtx := testDriverContext(task.Name)
@@ -162,6 +170,10 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) {
"command": "/etcd",
"args": []string{"--version"},
},
+ Resources: &structs.Resources{
+ MemoryMB: 256,
+ CPU: 512,
+ },
}
driverCtx := testDriverContext(task.Name)
@@ -204,6 +216,10 @@ func TestRktDriver_Start_Wait_Logs(t *testing.T) {
"command": "/etcd",
"args": []string{"--version"},
},
+ Resources: &structs.Resources{
+ MemoryMB: 256,
+ CPU: 512,
+ },
}
driverCtx := testDriverContext(task.Name)
diff --git a/client/fingerprint/env_aws.go b/client/fingerprint/env_aws.go
index 1963f6c50..3a55aff50 100644
--- a/client/fingerprint/env_aws.go
+++ b/client/fingerprint/env_aws.go
@@ -116,6 +116,10 @@ func (f *EnvAWSFingerprint) Fingerprint(cfg *config.Config, node *structs.Node)
}
for _, k := range keys {
res, err := client.Get(metadataURL + k)
+ if res.StatusCode != http.StatusOK {
+ f.logger.Printf("[WARN]: fingerprint.env_aws: Could not read value for attribute %q", k)
+ continue
+ }
if err != nil {
// if it's a URL error, assume we're not in an AWS environment
// TODO: better way to detect AWS? Check xen virtualization?
diff --git a/client/fingerprint/env_gce.go b/client/fingerprint/env_gce.go
index 357148c5f..0353dbdb1 100644
--- a/client/fingerprint/env_gce.go
+++ b/client/fingerprint/env_gce.go
@@ -94,7 +94,8 @@ func (f *EnvGCEFingerprint) Get(attribute string, recursive bool) (string, error
}
res, err := f.client.Do(req)
- if err != nil {
+ if err != nil || res.StatusCode != http.StatusOK {
+ f.logger.Printf("[WARN]: fingerprint.env_gce: Could not read value for attribute %q", attribute)
return "", err
}
diff --git a/command/agent/command.go b/command/agent/command.go
index 6976b9d76..7c3d5ef34 100644
--- a/command/agent/command.go
+++ b/command/agent/command.go
@@ -193,6 +193,23 @@ func (c *Command) readConfig() *Config {
return nil
}
+ // Verify the paths are absolute.
+ dirs := map[string]string{
+ "data-dir": config.DataDir,
+ "alloc-dir": config.Client.AllocDir,
+ "state-dir": config.Client.StateDir,
+ }
+ for k, dir := range dirs {
+ if dir == "" {
+ continue
+ }
+
+ if !filepath.IsAbs(dir) {
+ c.Ui.Error(fmt.Sprintf("%s must be given as an absolute path: got %v", k, dir))
+ return nil
+ }
+ }
+
// Ensure that we have the directories we neet to run.
if config.Server.Enabled && config.DataDir == "" {
c.Ui.Error("Must specify data directory")
diff --git a/command/run.go b/command/run.go
index d25692135..f54f96298 100644
--- a/command/run.go
+++ b/command/run.go
@@ -5,6 +5,7 @@ import (
"encoding/gob"
"fmt"
"strings"
+ "time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/jobspec"
@@ -89,8 +90,11 @@ func (c *RunCommand) Run(args []string) int {
return 1
}
+ // Check if the job is periodic.
+ periodic := job.IsPeriodic()
+
// Convert it to something we can use
- apiJob, err := convertJob(job)
+ apiJob, err := convertStructJob(job)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
return 1
@@ -111,9 +115,14 @@ func (c *RunCommand) Run(args []string) int {
}
// Check if we should enter monitor mode
- if detach {
+ if detach || periodic {
c.Ui.Output("Job registration successful")
- c.Ui.Output("Evaluation ID: " + evalID)
+ if periodic {
+ c.Ui.Output(fmt.Sprintf("Approximate next launch time: %v", job.Periodic.Next(time.Now())))
+ } else {
+ c.Ui.Output("Evaluation ID: " + evalID)
+ }
+
return 0
}
@@ -123,9 +132,9 @@ func (c *RunCommand) Run(args []string) int {
}
-// convertJob is used to take a *structs.Job and convert it to an *api.Job.
+// convertStructJob is used to take a *structs.Job and convert it to an *api.Job.
// This function is just a hammer and probably needs to be revisited.
-func convertJob(in *structs.Job) (*api.Job, error) {
+func convertStructJob(in *structs.Job) (*api.Job, error) {
gob.Register([]map[string]interface{}{})
gob.Register([]interface{}{})
var apiJob *api.Job
diff --git a/command/status.go b/command/status.go
index d4b39473b..d1359cbcb 100644
--- a/command/status.go
+++ b/command/status.go
@@ -1,8 +1,14 @@
package command
import (
+ "bytes"
+ "encoding/gob"
"fmt"
"strings"
+ "time"
+
+ "github.com/hashicorp/nomad/api"
+ "github.com/hashicorp/nomad/nomad/structs"
)
type StatusCommand struct {
@@ -118,6 +124,14 @@ func (c *StatusCommand) Run(args []string) int {
}
}
+ // Check if it is periodic
+ sJob, err := convertApiJob(job)
+ if err != nil {
+ c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
+ return 1
+ }
+ periodic := sJob.IsPeriodic()
+
// Format the job info
basic := []string{
fmt.Sprintf("ID|%s", job.ID),
@@ -126,10 +140,19 @@ func (c *StatusCommand) Run(args []string) int {
fmt.Sprintf("Priority|%d", job.Priority),
fmt.Sprintf("Datacenters|%s", strings.Join(job.Datacenters, ",")),
fmt.Sprintf("Status|%s", job.Status),
+ fmt.Sprintf("Periodic|%v", periodic),
}
- var evals, allocs []string
- if !short {
+ if periodic {
+ basic = append(basic, fmt.Sprintf("Next Periodic Launch|%v",
+ sJob.Periodic.Next(time.Now())))
+ }
+
+ c.Ui.Output(formatKV(basic))
+
+ if !periodic && !short {
+ var evals, allocs []string
+
// Query the evaluations
jobEvals, _, err := client.Jobs().Evaluations(job.ID, nil)
if err != nil {
@@ -167,15 +190,28 @@ func (c *StatusCommand) Run(args []string) int {
alloc.DesiredStatus,
alloc.ClientStatus)
}
- }
- // Dump the output
- c.Ui.Output(formatKV(basic))
- if !short {
c.Ui.Output("\n==> Evaluations")
c.Ui.Output(formatList(evals))
c.Ui.Output("\n==> Allocations")
c.Ui.Output(formatList(allocs))
}
+
return 0
}
+
+// convertApiJob is used to take a *api.Job and convert it to an *struct.Job.
+// This function is just a hammer and probably needs to be revisited.
+func convertApiJob(in *api.Job) (*structs.Job, error) {
+ gob.Register(map[string]interface{}{})
+ gob.Register([]interface{}{})
+ var structJob *structs.Job
+ buf := new(bytes.Buffer)
+ if err := gob.NewEncoder(buf).Encode(in); err != nil {
+ return nil, err
+ }
+ if err := gob.NewDecoder(buf).Decode(&structJob); err != nil {
+ return nil, err
+ }
+ return structJob, nil
+}
diff --git a/jobspec/parse.go b/jobspec/parse.go
index 963c0b3e4..3c61fdd3f 100644
--- a/jobspec/parse.go
+++ b/jobspec/parse.go
@@ -698,8 +698,8 @@ func parsePeriodic(result **structs.PeriodicConfig, list *ast.ObjectList) error
m["Enabled"] = enabled
}
- // If "cron_spec" is provided, set the type to "cron" and store the spec.
- if cron, ok := m["cron_spec"]; ok {
+ // If "cron" is provided, set the type to "cron" and store the spec.
+ if cron, ok := m["cron"]; ok {
m["SpecType"] = structs.PeriodicSpecCron
m["Spec"] = cron
}
diff --git a/jobspec/test-fixtures/periodic-cron.hcl b/jobspec/test-fixtures/periodic-cron.hcl
index 2b1bd2b39..c463cc4f1 100644
--- a/jobspec/test-fixtures/periodic-cron.hcl
+++ b/jobspec/test-fixtures/periodic-cron.hcl
@@ -1,5 +1,5 @@
job "foo" {
periodic {
- cron_spec = "*/5 * * *"
+ cron = "*/5 * * *"
}
}
diff --git a/nomad/fsm.go b/nomad/fsm.go
index 71c40d68f..5fe4009a2 100644
--- a/nomad/fsm.go
+++ b/nomad/fsm.go
@@ -32,17 +32,19 @@ const (
EvalSnapshot
AllocSnapshot
TimeTableSnapshot
+ PeriodicLaunchSnapshot
)
// nomadFSM implements a finite state machine that is used
// along with Raft to provide strong consistency. We implement
// this outside the Server to avoid exposing this outside the package.
type nomadFSM struct {
- evalBroker *EvalBroker
- logOutput io.Writer
- logger *log.Logger
- state *state.StateStore
- timetable *TimeTable
+ evalBroker *EvalBroker
+ periodicDispatcher *PeriodicDispatch
+ logOutput io.Writer
+ logger *log.Logger
+ state *state.StateStore
+ timetable *TimeTable
}
// nomadSnapshot is used to provide a snapshot of the current
@@ -58,7 +60,7 @@ type snapshotHeader struct {
}
// NewFSMPath is used to construct a new FSM with a blank state
-func NewFSM(evalBroker *EvalBroker, logOutput io.Writer) (*nomadFSM, error) {
+func NewFSM(evalBroker *EvalBroker, periodic *PeriodicDispatch, logOutput io.Writer) (*nomadFSM, error) {
// Create a state store
state, err := state.NewStateStore(logOutput)
if err != nil {
@@ -66,11 +68,12 @@ func NewFSM(evalBroker *EvalBroker, logOutput io.Writer) (*nomadFSM, error) {
}
fsm := &nomadFSM{
- evalBroker: evalBroker,
- logOutput: logOutput,
- logger: log.New(logOutput, "", log.LstdFlags),
- state: state,
- timetable: NewTimeTable(timeTableGranularity, timeTableLimit),
+ evalBroker: evalBroker,
+ periodicDispatcher: periodic,
+ logOutput: logOutput,
+ logger: log.New(logOutput, "", log.LstdFlags),
+ state: state,
+ timetable: NewTimeTable(timeTableGranularity, timeTableLimit),
}
return fsm, nil
}
@@ -204,6 +207,65 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
n.logger.Printf("[ERR] nomad.fsm: UpsertJob failed: %v", err)
return err
}
+
+ // We always add the job to the periodic dispatcher because there is the
+ // possibility that the periodic spec was removed and then we should stop
+ // tracking it.
+ if err := n.periodicDispatcher.Add(req.Job); err != nil {
+ n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
+ return err
+ }
+
+ // If it is periodic, record the time it was inserted. This is necessary for
+ // recovering during leader election. It is possible that from the time it
+ // is added to when it was suppose to launch, leader election occurs and the
+ // job was not launched. In this case, we use the insertion time to
+ // determine if a launch was missed.
+ if req.Job.IsPeriodic() {
+ prevLaunch, err := n.state.PeriodicLaunchByID(req.Job.ID)
+ if err != nil {
+ n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err)
+ return err
+ }
+
+ // Record the insertion time as a launch. We overload the launch table
+ // such that the first entry is the insertion time.
+ if prevLaunch == nil {
+ launch := &structs.PeriodicLaunch{ID: req.Job.ID, Launch: time.Now()}
+ if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil {
+ n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err)
+ return err
+ }
+ }
+ }
+
+ // Check if the parent job is periodic and mark the launch time.
+ parentID := req.Job.ParentID
+ if parentID != "" {
+ parent, err := n.state.JobByID(parentID)
+ if err != nil {
+ n.logger.Printf("[ERR] nomad.fsm: JobByID(%v) lookup for parent failed: %v", parentID, err)
+ return err
+ } else if parent == nil {
+ // The parent has been deregistered.
+ return nil
+ }
+
+ if parent.IsPeriodic() {
+ t, err := n.periodicDispatcher.LaunchTime(req.Job.ID)
+ if err != nil {
+ n.logger.Printf("[ERR] nomad.fsm: LaunchTime(%v) failed: %v", req.Job.ID, err)
+ return err
+ }
+
+ launch := &structs.PeriodicLaunch{ID: parentID, Launch: t}
+ if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil {
+ n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err)
+ return err
+ }
+ }
+ }
+
return nil
}
@@ -218,6 +280,17 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err)
return err
}
+
+ if err := n.periodicDispatcher.Remove(req.JobID); err != nil {
+ n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Remove failed: %v", err)
+ return err
+ }
+
+ // We always delete from the periodic launch table because it is possible that
+ // the job was updated to be non-perioidic, thus checking if it is periodic
+ // doesn't ensure we clean it up properly.
+ n.state.DeletePeriodicLaunch(index, req.JobID)
+
return nil
}
@@ -392,6 +465,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}
+ case PeriodicLaunchSnapshot:
+ launch := new(structs.PeriodicLaunch)
+ if err := dec.Decode(launch); err != nil {
+ return err
+ }
+ if err := restore.PeriodicLaunchRestore(launch); err != nil {
+ return err
+ }
+
default:
return fmt.Errorf("Unrecognized snapshot type: %v", msgType)
}
@@ -442,6 +524,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
+ if err := s.persistPeriodicLaunches(sink, encoder); err != nil {
+ sink.Cancel()
+ return err
+ }
return nil
}
@@ -580,6 +666,33 @@ func (s *nomadSnapshot) persistAllocs(sink raft.SnapshotSink,
return nil
}
+func (s *nomadSnapshot) persistPeriodicLaunches(sink raft.SnapshotSink,
+ encoder *codec.Encoder) error {
+ // Get all the jobs
+ launches, err := s.snap.PeriodicLaunches()
+ if err != nil {
+ return err
+ }
+
+ for {
+ // Get the next item
+ raw := launches.Next()
+ if raw == nil {
+ break
+ }
+
+ // Prepare the request struct
+ launch := raw.(*structs.PeriodicLaunch)
+
+ // Write out a job registration
+ sink.Write([]byte{byte(PeriodicLaunchSnapshot)})
+ if err := encoder.Encode(launch); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.
diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go
index 4ca75158b..ef76486a8 100644
--- a/nomad/fsm_test.go
+++ b/nomad/fsm_test.go
@@ -43,7 +43,8 @@ func testStateStore(t *testing.T) *state.StateStore {
}
func testFSM(t *testing.T) *nomadFSM {
- fsm, err := NewFSM(testBroker(t, 0), os.Stderr)
+ p, _ := testPeriodicDispatcher()
+ fsm, err := NewFSM(testBroker(t, 0), p, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -222,8 +223,9 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
func TestFSM_RegisterJob(t *testing.T) {
fsm := testFSM(t)
+ job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
- Job: mock.Job(),
+ Job: job,
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
@@ -236,22 +238,39 @@ func TestFSM_RegisterJob(t *testing.T) {
}
// Verify we are registered
- job, err := fsm.State().JobByID(req.Job.ID)
+ jobOut, err := fsm.State().JobByID(req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
- if job == nil {
+ if jobOut == nil {
t.Fatalf("not found!")
}
- if job.CreateIndex != 1 {
- t.Fatalf("bad index: %d", job.CreateIndex)
+ if jobOut.CreateIndex != 1 {
+ t.Fatalf("bad index: %d", jobOut.CreateIndex)
+ }
+
+ // Verify it was added to the periodic runner.
+ if _, ok := fsm.periodicDispatcher.tracked[job.ID]; !ok {
+ t.Fatal("job not added to periodic runner")
+ }
+
+ // Verify the launch time was tracked.
+ launchOut, err := fsm.State().PeriodicLaunchByID(req.Job.ID)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if launchOut == nil {
+ t.Fatalf("not found!")
+ }
+ if launchOut.Launch.IsZero() {
+ t.Fatalf("bad launch time: %v", launchOut.Launch)
}
}
func TestFSM_DeregisterJob(t *testing.T) {
fsm := testFSM(t)
- job := mock.Job()
+ job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
}
@@ -279,13 +298,27 @@ func TestFSM_DeregisterJob(t *testing.T) {
}
// Verify we are NOT registered
- job, err = fsm.State().JobByID(req.Job.ID)
+ jobOut, err := fsm.State().JobByID(req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
- if job != nil {
+ if jobOut != nil {
t.Fatalf("job found!")
}
+
+ // Verify it was removed from the periodic runner.
+ if _, ok := fsm.periodicDispatcher.tracked[job.ID]; ok {
+ t.Fatal("job not removed from periodic runner")
+ }
+
+ // Verify it was removed from the periodic launch table.
+ launchOut, err := fsm.State().PeriodicLaunchByID(req.Job.ID)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if launchOut != nil {
+ t.Fatalf("launch found!")
+ }
}
func TestFSM_UpdateEval(t *testing.T) {
@@ -607,3 +640,27 @@ func TestFSM_SnapshotRestore_TimeTable(t *testing.T) {
t.Fatalf("bad")
}
}
+
+func TestFSM_SnapshotRestore_PeriodicLaunches(t *testing.T) {
+ // Add some state
+ fsm := testFSM(t)
+ state := fsm.State()
+ job1 := mock.Job()
+ launch1 := &structs.PeriodicLaunch{ID: job1.ID, Launch: time.Now()}
+ state.UpsertPeriodicLaunch(1000, launch1)
+ job2 := mock.Job()
+ launch2 := &structs.PeriodicLaunch{ID: job2.ID, Launch: time.Now()}
+ state.UpsertPeriodicLaunch(1001, launch2)
+
+ // Verify the contents
+ fsm2 := testSnapshotRestore(t, fsm)
+ state2 := fsm2.State()
+ out1, _ := state2.PeriodicLaunchByID(launch1.ID)
+ out2, _ := state2.PeriodicLaunchByID(launch2.ID)
+ if !reflect.DeepEqual(launch1, out1) {
+ t.Fatalf("bad: \n%#v\n%#v", out1, job1)
+ }
+ if !reflect.DeepEqual(launch2, out2) {
+ t.Fatalf("bad: \n%#v\n%#v", out2, job2)
+ }
+}
diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go
index 04d4a6dcf..eb6205e06 100644
--- a/nomad/job_endpoint.go
+++ b/nomad/job_endpoint.go
@@ -50,6 +50,14 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}
+ // Populate the reply with job information
+ reply.JobModifyIndex = index
+
+ // If the job is periodic, we don't create an eval.
+ if args.Job.IsPeriodic() {
+ return nil
+ }
+
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
@@ -74,10 +82,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}
- // Setup the reply
+ // Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
- reply.JobModifyIndex = index
reply.Index = evalIndex
return nil
}
@@ -117,6 +124,10 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis
return fmt.Errorf("job not found")
}
+ if job.IsPeriodic() {
+ return fmt.Errorf("can't evaluate periodic job")
+ }
+
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
@@ -154,6 +165,24 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
}
defer metrics.MeasureSince([]string{"nomad", "job", "deregister"}, time.Now())
+ // Validate the arguments
+ if args.JobID == "" {
+ return fmt.Errorf("missing job ID for evaluation")
+ }
+
+ // Lookup the job
+ snap, err := j.srv.fsm.State().Snapshot()
+ if err != nil {
+ return err
+ }
+ job, err := snap.JobByID(args.JobID)
+ if err != nil {
+ return err
+ }
+ if job == nil {
+ return fmt.Errorf("job not found")
+ }
+
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
@@ -161,6 +190,14 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}
+ // Populate the reply with job information
+ reply.JobModifyIndex = index
+
+ // If the job is periodic, we don't create an eval.
+ if job.IsPeriodic() {
+ return nil
+ }
+
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was. The scheduler itself also doesn't matter,
@@ -186,10 +223,9 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return err
}
- // Setup the reply
+ // Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
- reply.JobModifyIndex = index
reply.Index = evalIndex
return nil
}
diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go
index a9c69dfb9..236028c4b 100644
--- a/nomad/job_endpoint_test.go
+++ b/nomad/job_endpoint_test.go
@@ -233,6 +233,53 @@ func TestJobEndpoint_Register_GC_Set(t *testing.T) {
}
}
+func TestJobEndpoint_Register_Periodic(t *testing.T) {
+ s1 := testServer(t, func(c *Config) {
+ c.NumSchedulers = 0 // Prevent automatic dequeue
+ })
+ defer s1.Shutdown()
+ codec := rpcClient(t, s1)
+ testutil.WaitForLeader(t, s1.RPC)
+
+ // Create the register request for a periodic job.
+ job := mock.PeriodicJob()
+ req := &structs.JobRegisterRequest{
+ Job: job,
+ WriteRequest: structs.WriteRequest{Region: "global"},
+ }
+
+ // Fetch the response
+ var resp structs.JobRegisterResponse
+ if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if resp.JobModifyIndex == 0 {
+ t.Fatalf("bad index: %d", resp.Index)
+ }
+
+ // Check for the node in the FSM
+ state := s1.fsm.State()
+ out, err := state.JobByID(job.ID)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if out == nil {
+ t.Fatalf("expected job")
+ }
+ if out.CreateIndex != resp.JobModifyIndex {
+ t.Fatalf("index mis-match")
+ }
+ serviceName := out.TaskGroups[0].Tasks[0].Services[0].Name
+ expectedServiceName := "web-frontend"
+ if serviceName != expectedServiceName {
+ t.Fatalf("Expected Service Name: %s, Actual: %s", expectedServiceName, serviceName)
+ }
+
+ if resp.EvalID != "" {
+ t.Fatalf("Register created an eval for a periodic job")
+ }
+}
+
func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
@@ -304,6 +351,42 @@ func TestJobEndpoint_Evaluate(t *testing.T) {
}
}
+func TestJobEndpoint_Evaluate_Periodic(t *testing.T) {
+ s1 := testServer(t, func(c *Config) {
+ c.NumSchedulers = 0 // Prevent automatic dequeue
+ })
+ defer s1.Shutdown()
+ codec := rpcClient(t, s1)
+ testutil.WaitForLeader(t, s1.RPC)
+
+ // Create the register request
+ job := mock.PeriodicJob()
+ req := &structs.JobRegisterRequest{
+ Job: job,
+ WriteRequest: structs.WriteRequest{Region: "global"},
+ }
+
+ // Fetch the response
+ var resp structs.JobRegisterResponse
+ if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if resp.JobModifyIndex == 0 {
+ t.Fatalf("bad index: %d", resp.Index)
+ }
+
+ // Force a re-evaluation
+ reEval := &structs.JobEvaluateRequest{
+ JobID: job.ID,
+ WriteRequest: structs.WriteRequest{Region: "global"},
+ }
+
+ // Fetch the response
+ if err := msgpackrpc.CallWithCodec(codec, "Job.Evaluate", reEval, &resp); err == nil {
+ t.Fatal("expect an err")
+ }
+}
+
func TestJobEndpoint_Deregister(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
@@ -380,6 +463,55 @@ func TestJobEndpoint_Deregister(t *testing.T) {
}
}
+func TestJobEndpoint_Deregister_Periodic(t *testing.T) {
+ s1 := testServer(t, func(c *Config) {
+ c.NumSchedulers = 0 // Prevent automatic dequeue
+ })
+ defer s1.Shutdown()
+ codec := rpcClient(t, s1)
+ testutil.WaitForLeader(t, s1.RPC)
+
+ // Create the register request
+ job := mock.PeriodicJob()
+ reg := &structs.JobRegisterRequest{
+ Job: job,
+ WriteRequest: structs.WriteRequest{Region: "global"},
+ }
+
+ // Fetch the response
+ var resp structs.JobRegisterResponse
+ if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ // Deregister
+ dereg := &structs.JobDeregisterRequest{
+ JobID: job.ID,
+ WriteRequest: structs.WriteRequest{Region: "global"},
+ }
+ var resp2 structs.JobDeregisterResponse
+ if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if resp2.JobModifyIndex == 0 {
+ t.Fatalf("bad index: %d", resp2.Index)
+ }
+
+ // Check for the node in the FSM
+ state := s1.fsm.State()
+ out, err := state.JobByID(job.ID)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if out != nil {
+ t.Fatalf("unexpected job")
+ }
+
+ if resp.EvalID != "" {
+ t.Fatalf("Deregister created an eval for a periodic job")
+ }
+}
+
func TestJobEndpoint_GetJob(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
diff --git a/nomad/leader.go b/nomad/leader.go
index 0e267a7dc..5969a9239 100644
--- a/nomad/leader.go
+++ b/nomad/leader.go
@@ -1,6 +1,7 @@
package nomad
import (
+ "errors"
"fmt"
"time"
@@ -117,6 +118,15 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
return err
}
+ // Enable the periodic dispatcher, since we are now the leader.
+ s.periodicDispatcher.SetEnabled(true)
+ s.periodicDispatcher.Start()
+
+ // Restore the periodic dispatcher state
+ if err := s.restorePeriodicDispatcher(); err != nil {
+ return err
+ }
+
// Scheduler periodic jobs
go s.schedulePeriodic(stopCh)
@@ -167,6 +177,52 @@ func (s *Server) restoreEvalBroker() error {
return nil
}
+// restorePeriodicDispatcher is used to restore all periodic jobs into the
+// periodic dispatcher. It also determines if a periodic job should have been
+// created during the leadership transition and force runs them. The periodic
+// dispatcher is maintained only by the leader, so it must be restored anytime a
+// leadership transition takes place.
+func (s *Server) restorePeriodicDispatcher() error {
+ iter, err := s.fsm.State().JobsByPeriodic(true)
+ if err != nil {
+ return fmt.Errorf("failed to get periodic jobs: %v", err)
+ }
+
+ now := time.Now()
+ for i := iter.Next(); i != nil; i = iter.Next() {
+ job := i.(*structs.Job)
+ s.periodicDispatcher.Add(job)
+
+ // If the periodic job has never been launched before, launch will hold
+ // the time the periodic job was added. Otherwise it has the last launch
+ // time of the periodic job.
+ launch, err := s.fsm.State().PeriodicLaunchByID(job.ID)
+ if err != nil || launch == nil {
+ return fmt.Errorf("failed to get periodic launch time: %v", err)
+ }
+
+ // nextLaunch is the next launch that should occur.
+ nextLaunch := job.Periodic.Next(launch.Launch)
+
+ // We skip force launching the job if there should be no next launch
+ // (the zero case) or if the next launch time is in the future. If it is
+ // in the future, it will be handled by the periodic dispatcher.
+ if nextLaunch.IsZero() || !nextLaunch.Before(now) {
+ continue
+ }
+
+ if err := s.periodicDispatcher.ForceRun(job.ID); err != nil {
+ msg := fmt.Sprintf("force run of periodic job %q failed: %v", job.ID, err)
+ s.logger.Printf("[ERR] nomad.periodic: %s", msg)
+ return errors.New(msg)
+ }
+ s.logger.Printf("[DEBUG] nomad.periodic: periodic job %q force"+
+ " run during leadership establishment", job.ID)
+ }
+
+ return nil
+}
+
// schedulePeriodic is used to do periodic job dispatch while we are leader
func (s *Server) schedulePeriodic(stopCh chan struct{}) {
evalGC := time.NewTicker(s.config.EvalGCInterval)
@@ -250,6 +306,9 @@ func (s *Server) revokeLeadership() error {
// Disable the eval broker, since it is only useful as a leader
s.evalBroker.SetEnabled(false)
+ // Disable the periodic dispatcher, since it is only useful as a leader
+ s.periodicDispatcher.SetEnabled(false)
+
// Clear the heartbeat timers on either shutdown or step down,
// since we are no longer responsible for TTL expirations.
if err := s.clearAllHeartbeatTimers(); err != nil {
diff --git a/nomad/leader_test.go b/nomad/leader_test.go
index b753b41f4..f3029815b 100644
--- a/nomad/leader_test.go
+++ b/nomad/leader_test.go
@@ -286,6 +286,186 @@ func TestLeader_EvalBroker_Reset(t *testing.T) {
})
}
+func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
+ s1 := testServer(t, func(c *Config) {
+ c.NumSchedulers = 0
+ })
+ defer s1.Shutdown()
+
+ s2 := testServer(t, func(c *Config) {
+ c.NumSchedulers = 0
+ c.DevDisableBootstrap = true
+ })
+ defer s2.Shutdown()
+
+ s3 := testServer(t, func(c *Config) {
+ c.NumSchedulers = 0
+ c.DevDisableBootstrap = true
+ })
+ defer s3.Shutdown()
+ servers := []*Server{s1, s2, s3}
+ testJoin(t, s1, s2, s3)
+ testutil.WaitForLeader(t, s1.RPC)
+
+ for _, s := range servers {
+ testutil.WaitForResult(func() (bool, error) {
+ peers, _ := s.raftPeers.Peers()
+ return len(peers) == 3, nil
+ }, func(err error) {
+ t.Fatalf("should have 3 peers")
+ })
+ }
+
+ var leader *Server
+ for _, s := range servers {
+ if s.IsLeader() {
+ leader = s
+ break
+ }
+ }
+ if leader == nil {
+ t.Fatalf("Should have a leader")
+ }
+
+ // Inject a periodic job and non-periodic job
+ periodic := mock.PeriodicJob()
+ nonPeriodic := mock.Job()
+ for _, job := range []*structs.Job{nonPeriodic, periodic} {
+ req := structs.JobRegisterRequest{
+ Job: job,
+ }
+ _, _, err := leader.raftApply(structs.JobRegisterRequestType, req)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ }
+
+ // Kill the leader
+ leader.Shutdown()
+ time.Sleep(100 * time.Millisecond)
+
+ // Wait for a new leader
+ leader = nil
+ testutil.WaitForResult(func() (bool, error) {
+ for _, s := range servers {
+ if s.IsLeader() {
+ leader = s
+ return true, nil
+ }
+ }
+ return false, nil
+ }, func(err error) {
+ t.Fatalf("should have leader")
+ })
+
+ // Check that the new leader is tracking the periodic job.
+ testutil.WaitForResult(func() (bool, error) {
+ _, tracked := leader.periodicDispatcher.tracked[periodic.ID]
+ return tracked, nil
+ }, func(err error) {
+ t.Fatalf("periodic job not tracked")
+ })
+}
+
+func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
+ s1 := testServer(t, func(c *Config) {
+ c.NumSchedulers = 0
+ })
+ defer s1.Shutdown()
+ testutil.WaitForLeader(t, s1.RPC)
+
+ // Inject a periodic job that will be triggered soon.
+ launch := time.Now().Add(1 * time.Second)
+ job := testPeriodicJob(launch)
+ req := structs.JobRegisterRequest{
+ Job: job,
+ }
+ _, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ // Flush the periodic dispatcher, ensuring that no evals will be created.
+ s1.periodicDispatcher.SetEnabled(false)
+
+ // Get the current time to ensure the launch time is after this once we
+ // restore.
+ now := time.Now()
+
+ // Sleep till after the job should have been launched.
+ time.Sleep(3 * time.Second)
+
+ // Restore the periodic dispatcher.
+ s1.periodicDispatcher.SetEnabled(true)
+ s1.periodicDispatcher.Start()
+ s1.restorePeriodicDispatcher()
+
+ // Ensure the job is tracked.
+ if _, tracked := s1.periodicDispatcher.tracked[job.ID]; !tracked {
+ t.Fatalf("periodic job not restored")
+ }
+
+ // Check that an eval was made.
+ last, err := s1.fsm.State().PeriodicLaunchByID(job.ID)
+ if err != nil || last == nil {
+ t.Fatalf("failed to get periodic launch time: %v", err)
+ }
+
+ if last.Launch.Before(now) {
+ t.Fatalf("restorePeriodicDispatcher did not force launch: last %v; want after %v", last.Launch, now)
+ }
+}
+
+func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
+ s1 := testServer(t, func(c *Config) {
+ c.NumSchedulers = 0
+ })
+ defer s1.Shutdown()
+ testutil.WaitForLeader(t, s1.RPC)
+
+ // Inject a periodic job that triggered once in the past, should trigger now
+ // and once in the future.
+ now := time.Now()
+ past := now.Add(-1 * time.Second)
+ future := now.Add(10 * time.Second)
+ job := testPeriodicJob(past, now, future)
+ req := structs.JobRegisterRequest{
+ Job: job,
+ }
+ _, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ // Create an eval for the past launch.
+ s1.periodicDispatcher.createEval(job, past)
+
+ // Flush the periodic dispatcher, ensuring that no evals will be created.
+ s1.periodicDispatcher.SetEnabled(false)
+
+ // Sleep till after the job should have been launched.
+ time.Sleep(3 * time.Second)
+
+ // Restore the periodic dispatcher.
+ s1.periodicDispatcher.SetEnabled(true)
+ s1.periodicDispatcher.Start()
+ s1.restorePeriodicDispatcher()
+
+ // Ensure the job is tracked.
+ if _, tracked := s1.periodicDispatcher.tracked[job.ID]; !tracked {
+ t.Fatalf("periodic job not restored")
+ }
+
+ // Check that an eval was made.
+ last, err := s1.fsm.State().PeriodicLaunchByID(job.ID)
+ if err != nil || last == nil {
+ t.Fatalf("failed to get periodic launch time: %v", err)
+ }
+ if last.Launch == past {
+ t.Fatalf("restorePeriodicDispatcher did not force launch")
+ }
+}
+
func TestLeader_PeriodicDispatch(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go
index 3c2bb4f5e..3408e0f97 100644
--- a/nomad/mock/mock.go
+++ b/nomad/mock/mock.go
@@ -190,6 +190,17 @@ func SystemJob() *structs.Job {
return job
}
+func PeriodicJob() *structs.Job {
+ job := Job()
+ job.Type = structs.JobTypeBatch
+ job.Periodic = &structs.PeriodicConfig{
+ Enabled: true,
+ SpecType: structs.PeriodicSpecCron,
+ Spec: "*/30 * * * *",
+ }
+ return job
+}
+
func Eval() *structs.Evaluation {
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
diff --git a/nomad/periodic.go b/nomad/periodic.go
new file mode 100644
index 000000000..2377da226
--- /dev/null
+++ b/nomad/periodic.go
@@ -0,0 +1,503 @@
+package nomad
+
+import (
+ "container/heap"
+ "fmt"
+ "log"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/hashicorp/nomad/nomad/structs"
+)
+
+const (
+ // The string appended to the periodic jobs ID when launching derived
+ // instances of it.
+ JobLaunchSuffix = "/periodic-"
+)
+
+// PeriodicDispatch is used to track and launch periodic jobs. It maintains the
+// set of periodic jobs and creates derived jobs and evaluations per
+// instantiation which is determined by the periodic spec.
+type PeriodicDispatch struct {
+ dispatcher JobEvalDispatcher
+ enabled bool
+ running bool
+
+ tracked map[string]*structs.Job
+ heap *periodicHeap
+
+ updateCh chan struct{}
+ stopCh chan struct{}
+ waitCh chan struct{}
+ logger *log.Logger
+ l sync.RWMutex
+}
+
+// JobEvalDispatcher is an interface to submit jobs and have evaluations created
+// for them.
+type JobEvalDispatcher interface {
+ // DispatchJob takes a job a new, untracked job and creates an evaluation
+ // for it.
+ DispatchJob(job *structs.Job) error
+}
+
+// DispatchJob creates an evaluation for the passed job and commits both the
+// evaluation and the job to the raft log.
+func (s *Server) DispatchJob(job *structs.Job) error {
+ // Commit this update via Raft
+ req := structs.JobRegisterRequest{Job: job}
+ _, index, err := s.raftApply(structs.JobRegisterRequestType, req)
+ if err != nil {
+ return err
+ }
+
+ // Create a new evaluation
+ eval := &structs.Evaluation{
+ ID: structs.GenerateUUID(),
+ Priority: job.Priority,
+ Type: job.Type,
+ TriggeredBy: structs.EvalTriggerJobRegister,
+ JobID: job.ID,
+ JobModifyIndex: index,
+ Status: structs.EvalStatusPending,
+ }
+ update := &structs.EvalUpdateRequest{
+ Evals: []*structs.Evaluation{eval},
+ }
+
+ // Commit this evaluation via Raft
+ // XXX: There is a risk of partial failure where the JobRegister succeeds
+ // but that the EvalUpdate does not.
+ _, _, err = s.raftApply(structs.EvalUpdateRequestType, update)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// NewPeriodicDispatch returns a periodic dispatcher that is used to track and
+// launch periodic jobs.
+func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *PeriodicDispatch {
+ return &PeriodicDispatch{
+ dispatcher: dispatcher,
+ tracked: make(map[string]*structs.Job),
+ heap: NewPeriodicHeap(),
+ updateCh: make(chan struct{}, 1),
+ stopCh: make(chan struct{}),
+ waitCh: make(chan struct{}),
+ logger: logger,
+ }
+}
+
+// SetEnabled is used to control if the periodic dispatcher is enabled. It
+// should only be enabled on the active leader. Disabling an active dispatcher
+// will stop any launched go routine and flush the dispatcher.
+func (p *PeriodicDispatch) SetEnabled(enabled bool) {
+ p.l.Lock()
+ p.enabled = enabled
+ p.l.Unlock()
+ if !enabled {
+ if p.running {
+ close(p.stopCh)
+ <-p.waitCh
+ p.running = false
+ }
+ p.Flush()
+ }
+}
+
+// Start begins the goroutine that creates derived jobs and evals.
+func (p *PeriodicDispatch) Start() {
+ p.l.Lock()
+ p.running = true
+ p.l.Unlock()
+ go p.run()
+}
+
+// Tracked returns the set of tracked job IDs.
+func (p *PeriodicDispatch) Tracked() []*structs.Job {
+ p.l.RLock()
+ defer p.l.RUnlock()
+ tracked := make([]*structs.Job, len(p.tracked))
+ i := 0
+ for _, job := range p.tracked {
+ tracked[i] = job
+ i++
+ }
+ return tracked
+}
+
+// Add begins tracking of a periodic job. If it is already tracked, it acts as
+// an update to the jobs periodic spec.
+func (p *PeriodicDispatch) Add(job *structs.Job) error {
+ p.l.Lock()
+ defer p.l.Unlock()
+
+ // Do nothing if not enabled
+ if !p.enabled {
+ return nil
+ }
+
+ // If we were tracking a job and it has been disabled or made non-periodic remove it.
+ disabled := !job.IsPeriodic() || !job.Periodic.Enabled
+ _, tracked := p.tracked[job.ID]
+ if disabled {
+ if tracked {
+ p.removeLocked(job.ID)
+ }
+
+ // If the job is disabled and we aren't tracking it, do nothing.
+ return nil
+ }
+
+ // Add or update the job.
+ p.tracked[job.ID] = job
+ next := job.Periodic.Next(time.Now())
+ if tracked {
+ if err := p.heap.Update(job, next); err != nil {
+ return fmt.Errorf("failed to update job %v launch time: %v", job.ID, err)
+ }
+ p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q", job.ID)
+ } else {
+ if err := p.heap.Push(job, next); err != nil {
+ return fmt.Errorf("failed to add job %v: %v", job.ID, err)
+ }
+ p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q", job.ID)
+ }
+
+ // Signal an update.
+ if p.running {
+ select {
+ case p.updateCh <- struct{}{}:
+ default:
+ }
+ }
+
+ return nil
+}
+
+// Remove stops tracking the passed job. If the job is not tracked, it is a
+// no-op.
+func (p *PeriodicDispatch) Remove(jobID string) error {
+ p.l.Lock()
+ defer p.l.Unlock()
+ return p.removeLocked(jobID)
+}
+
+// Remove stops tracking the passed job. If the job is not tracked, it is a
+// no-op. It assumes this is called while a lock is held.
+func (p *PeriodicDispatch) removeLocked(jobID string) error {
+ // Do nothing if not enabled
+ if !p.enabled {
+ return nil
+ }
+
+ job, tracked := p.tracked[jobID]
+ if !tracked {
+ return nil
+ }
+
+ delete(p.tracked, jobID)
+ if err := p.heap.Remove(job); err != nil {
+ return fmt.Errorf("failed to remove tracked job %v: %v", jobID, err)
+ }
+
+ // Signal an update.
+ if p.running {
+ select {
+ case p.updateCh <- struct{}{}:
+ default:
+ }
+ }
+
+ p.logger.Printf("[DEBUG] nomad.periodic: deregistered periodic job %q", jobID)
+ return nil
+}
+
+// ForceRun causes the periodic job to be evaluated immediately.
+func (p *PeriodicDispatch) ForceRun(jobID string) error {
+ p.l.Lock()
+
+ // Do nothing if not enabled
+ if !p.enabled {
+ return fmt.Errorf("periodic dispatch disabled")
+ }
+
+ job, tracked := p.tracked[jobID]
+ if !tracked {
+ return fmt.Errorf("can't force run non-tracked job %v", jobID)
+ }
+
+ p.l.Unlock()
+ return p.createEval(job, time.Now())
+}
+
+// shouldRun returns whether the long lived run function should run.
+func (p *PeriodicDispatch) shouldRun() bool {
+ p.l.RLock()
+ defer p.l.RUnlock()
+ return p.enabled && p.running
+}
+
+// run is a long-lived function that waits till a job's periodic spec is met and
+// then creates an evaluation to run the job.
+func (p *PeriodicDispatch) run() {
+ defer close(p.waitCh)
+ var launchCh <-chan time.Time
+ for p.shouldRun() {
+ job, launch := p.nextLaunch()
+ if launch.IsZero() {
+ launchCh = nil
+ } else {
+ launchDur := launch.Sub(time.Now())
+ launchCh = time.After(launchDur)
+ p.logger.Printf("[DEBUG] nomad.periodic: launching job %q in %s", job.ID, launchDur)
+ }
+
+ select {
+ case <-p.stopCh:
+ return
+ case <-p.updateCh:
+ continue
+ case <-launchCh:
+ p.dispatch(job, launch)
+ }
+ }
+}
+
+// dispatch creates an evaluation for the job and updates its next launchtime
+// based on the passed launch time.
+func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
+ p.l.Lock()
+ defer p.l.Unlock()
+
+ nextLaunch := job.Periodic.Next(launchTime)
+ if err := p.heap.Update(job, nextLaunch); err != nil {
+ p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q: %v", job.ID, err)
+ }
+
+ p.logger.Printf("[DEBUG] nomad.periodic: launching job %v at %v", job.ID, launchTime)
+ p.createEval(job, launchTime)
+}
+
+// nextLaunch returns the next job to launch and when it should be launched. If
+// the next job can't be determined, an error is returned. If the dispatcher is
+// stopped, a nil job will be returned.
+func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time) {
+ // If there is nothing wait for an update.
+ p.l.RLock()
+ defer p.l.RUnlock()
+ if p.heap.Length() == 0 {
+ return nil, time.Time{}
+ }
+
+ nextJob := p.heap.Peek()
+ if nextJob == nil {
+ return nil, time.Time{}
+ }
+
+ return nextJob.job, nextJob.next
+}
+
+// createEval instantiates a job based on the passed periodic job and submits an
+// evaluation for it.
+func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) error {
+ derived, err := p.deriveJob(periodicJob, time)
+ if err != nil {
+ return err
+ }
+
+ if err := p.dispatcher.DispatchJob(derived); err != nil {
+ p.logger.Printf("[ERR] nomad.periodic: failed to dispatch job %q: %v", periodicJob.ID, err)
+ return err
+ }
+
+ return nil
+}
+
+// deriveJob instantiates a new job based on the passed periodic job and the
+// launch time.
+func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
+ derived *structs.Job, err error) {
+
+ // Have to recover in case the job copy panics.
+ defer func() {
+ if r := recover(); r != nil {
+ p.logger.Printf("[ERR] nomad.periodic: deriving job from"+
+ " periodic job %v failed; deregistering from periodic runner: %v",
+ periodicJob.ID, r)
+ p.Remove(periodicJob.ID)
+ derived = nil
+ err = fmt.Errorf("Failed to create a copy of the periodic job %v: %v", periodicJob.ID, r)
+ }
+ }()
+
+ // Create a copy of the periodic job, give it a derived ID/Name and make it
+ // non-periodic.
+ derived = periodicJob.Copy()
+ derived.ParentID = periodicJob.ID
+ derived.ID = p.derivedJobID(periodicJob, time)
+ derived.Name = derived.ID
+ derived.Periodic = nil
+ derived.GC = true
+ return
+}
+
+// deriveJobID returns a job ID based on the parent periodic job and the launch
+// time.
+func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string {
+ return fmt.Sprintf("%s%s%d", periodicJob.ID, JobLaunchSuffix, time.Unix())
+}
+
+// LaunchTime returns the launch time of the job. This is only valid for
+// jobs created by PeriodicDispatch and will otherwise return an error.
+func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
+ index := strings.LastIndex(jobID, JobLaunchSuffix)
+ if index == -1 {
+ return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
+ }
+
+ launch, err := strconv.Atoi(jobID[index+len(JobLaunchSuffix):])
+ if err != nil {
+ return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
+ }
+
+ return time.Unix(int64(launch), 0), nil
+}
+
+// Flush clears the state of the PeriodicDispatcher
+func (p *PeriodicDispatch) Flush() {
+ p.l.Lock()
+ defer p.l.Unlock()
+ p.stopCh = make(chan struct{})
+ p.updateCh = make(chan struct{}, 1)
+ p.waitCh = make(chan struct{})
+ p.tracked = make(map[string]*structs.Job)
+ p.heap = NewPeriodicHeap()
+}
+
+// periodicHeap wraps a heap and gives operations other than Push/Pop.
+type periodicHeap struct {
+ index map[string]*periodicJob
+ heap periodicHeapImp
+}
+
+type periodicJob struct {
+ job *structs.Job
+ next time.Time
+ index int
+}
+
+func NewPeriodicHeap() *periodicHeap {
+ return &periodicHeap{
+ index: make(map[string]*periodicJob),
+ heap: make(periodicHeapImp, 0),
+ }
+}
+
+func (p *periodicHeap) Push(job *structs.Job, next time.Time) error {
+ if _, ok := p.index[job.ID]; ok {
+ return fmt.Errorf("job %v already exists", job.ID)
+ }
+
+ pJob := &periodicJob{job, next, 0}
+ p.index[job.ID] = pJob
+ heap.Push(&p.heap, pJob)
+ return nil
+}
+
+func (p *periodicHeap) Pop() *periodicJob {
+ if len(p.heap) == 0 {
+ return nil
+ }
+
+ pJob := heap.Pop(&p.heap).(*periodicJob)
+ delete(p.index, pJob.job.ID)
+ return pJob
+}
+
+func (p *periodicHeap) Peek() *periodicJob {
+ if len(p.heap) == 0 {
+ return nil
+ }
+
+ return p.heap[0]
+}
+
+func (p *periodicHeap) Contains(job *structs.Job) bool {
+ _, ok := p.index[job.ID]
+ return ok
+}
+
+func (p *periodicHeap) Update(job *structs.Job, next time.Time) error {
+ if pJob, ok := p.index[job.ID]; ok {
+ // Need to update the job as well because its spec can change.
+ pJob.job = job
+ pJob.next = next
+ heap.Fix(&p.heap, pJob.index)
+ return nil
+ }
+
+ return fmt.Errorf("heap doesn't contain job %v", job.ID)
+}
+
+func (p *periodicHeap) Remove(job *structs.Job) error {
+ if pJob, ok := p.index[job.ID]; ok {
+ heap.Remove(&p.heap, pJob.index)
+ delete(p.index, job.ID)
+ return nil
+ }
+
+ return fmt.Errorf("heap doesn't contain job %v", job.ID)
+}
+
+func (p *periodicHeap) Length() int {
+ return len(p.heap)
+}
+
+type periodicHeapImp []*periodicJob
+
+func (h periodicHeapImp) Len() int { return len(h) }
+
+func (h periodicHeapImp) Less(i, j int) bool {
+ // Two zero times should return false.
+ // Otherwise, zero is "greater" than any other time.
+ // (To sort it at the end of the list.)
+ // Sort such that zero times are at the end of the list.
+ iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero()
+ if iZero && jZero {
+ return false
+ } else if iZero {
+ return false
+ } else if jZero {
+ return true
+ }
+
+ return h[i].next.Before(h[j].next)
+}
+
+func (h periodicHeapImp) Swap(i, j int) {
+ h[i], h[j] = h[j], h[i]
+ h[i].index = i
+ h[j].index = j
+}
+
+func (h *periodicHeapImp) Push(x interface{}) {
+ n := len(*h)
+ job := x.(*periodicJob)
+ job.index = n
+ *h = append(*h, job)
+}
+
+func (h *periodicHeapImp) Pop() interface{} {
+ old := *h
+ n := len(old)
+ job := old[n-1]
+ job.index = -1 // for safety
+ *h = old[0 : n-1]
+ return job
+}
diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go
new file mode 100644
index 000000000..0e7002265
--- /dev/null
+++ b/nomad/periodic_test.go
@@ -0,0 +1,471 @@
+package nomad
+
+import (
+ "fmt"
+ "log"
+ "math/rand"
+ "os"
+ "reflect"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/hashicorp/nomad/nomad/mock"
+ "github.com/hashicorp/nomad/nomad/structs"
+)
+
+type MockJobEvalDispatcher struct {
+ Jobs map[string]*structs.Job
+ lock sync.Mutex
+}
+
+func NewMockJobEvalDispatcher() *MockJobEvalDispatcher {
+ return &MockJobEvalDispatcher{Jobs: make(map[string]*structs.Job)}
+}
+
+func (m *MockJobEvalDispatcher) DispatchJob(job *structs.Job) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ m.Jobs[job.ID] = job
+ return nil
+}
+
+// LaunchTimes returns the launch times of child jobs in sorted order.
+func (m *MockJobEvalDispatcher) LaunchTimes(p *PeriodicDispatch, parentID string) ([]time.Time, error) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ var launches []time.Time
+ for _, job := range m.Jobs {
+ if job.ParentID != parentID {
+ continue
+ }
+
+ t, err := p.LaunchTime(job.ID)
+ if err != nil {
+ return nil, err
+ }
+ launches = append(launches, t)
+ }
+ sort.Sort(times(launches))
+ return launches, nil
+}
+
+type times []time.Time
+
+func (t times) Len() int { return len(t) }
+func (t times) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
+func (t times) Less(i, j int) bool { return t[i].Before(t[j]) }
+
+// testPeriodicDispatcher returns an enabled PeriodicDispatcher which uses the
+// MockJobEvalDispatcher.
+func testPeriodicDispatcher() (*PeriodicDispatch, *MockJobEvalDispatcher) {
+ logger := log.New(os.Stderr, "", log.LstdFlags)
+ m := NewMockJobEvalDispatcher()
+ d := NewPeriodicDispatch(logger, m)
+ d.SetEnabled(true)
+ d.Start()
+ return d, m
+}
+
+// testPeriodicJob is a helper that creates a periodic job that launches at the
+// passed times.
+func testPeriodicJob(times ...time.Time) *structs.Job {
+ job := mock.PeriodicJob()
+ job.Periodic.SpecType = structs.PeriodicSpecTest
+
+ l := make([]string, len(times))
+ for i, t := range times {
+ l[i] = strconv.Itoa(int(t.Round(1 * time.Second).Unix()))
+ }
+
+ job.Periodic.Spec = strings.Join(l, ",")
+ return job
+}
+
+func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
+ t.Parallel()
+ p, _ := testPeriodicDispatcher()
+ job := mock.Job()
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err)
+ }
+
+ tracked := p.Tracked()
+ if len(tracked) != 0 {
+ t.Fatalf("Add of non-periodic job should be no-op: %v", tracked)
+ }
+}
+
+func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
+ t.Parallel()
+ p, _ := testPeriodicDispatcher()
+ job := mock.PeriodicJob()
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ tracked := p.Tracked()
+ if len(tracked) != 1 {
+ t.Fatalf("Add didn't track the job: %v", tracked)
+ }
+
+ // Update the job and add it again.
+ job.Periodic.Spec = "foo"
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ tracked = p.Tracked()
+ if len(tracked) != 1 {
+ t.Fatalf("Add didn't update: %v", tracked)
+ }
+
+ if !reflect.DeepEqual(job, tracked[0]) {
+ t.Fatalf("Add didn't properly update: got %v; want %v", tracked[0], job)
+ }
+}
+
+func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {
+ t.Parallel()
+ p, _ := testPeriodicDispatcher()
+ job := mock.PeriodicJob()
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ tracked := p.Tracked()
+ if len(tracked) != 1 {
+ t.Fatalf("Add didn't track the job: %v", tracked)
+ }
+
+ // Update the job to be non-periodic and add it again.
+ job.Periodic = nil
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ tracked = p.Tracked()
+ if len(tracked) != 0 {
+ t.Fatalf("Add didn't remove: %v", tracked)
+ }
+}
+
+func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
+ t.Parallel()
+ p, m := testPeriodicDispatcher()
+
+ // Create a job that won't be evalauted for a while.
+ job := testPeriodicJob(time.Now().Add(10 * time.Second))
+
+ // Add it.
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ // Update it to be sooner and re-add.
+ expected := time.Now().Round(1 * time.Second).Add(1 * time.Second)
+ job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix())
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ // Check that nothing is created.
+ if _, ok := m.Jobs[job.ID]; ok {
+ t.Fatalf("periodic dispatcher created eval at the wrong time")
+ }
+
+ time.Sleep(2 * time.Second)
+
+ // Check that job was launched correctly.
+ times, err := m.LaunchTimes(p, job.ID)
+ if err != nil {
+ t.Fatalf("failed to get launch times for job %q", job.ID)
+ }
+ if len(times) != 1 {
+ t.Fatalf("incorrect number of launch times for job %q", job.ID)
+ }
+ if times[0] != expected {
+ t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], expected)
+ }
+}
+
+func TestPeriodicDispatch_Remove_Untracked(t *testing.T) {
+ t.Parallel()
+ p, _ := testPeriodicDispatcher()
+ if err := p.Remove("foo"); err != nil {
+ t.Fatalf("Remove failed %v; expected a no-op", err)
+ }
+}
+
+func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
+ t.Parallel()
+ p, _ := testPeriodicDispatcher()
+
+ job := mock.PeriodicJob()
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ tracked := p.Tracked()
+ if len(tracked) != 1 {
+ t.Fatalf("Add didn't track the job: %v", tracked)
+ }
+
+ if err := p.Remove(job.ID); err != nil {
+ t.Fatalf("Remove failed %v", err)
+ }
+
+ tracked = p.Tracked()
+ if len(tracked) != 0 {
+ t.Fatalf("Remove didn't untrack the job: %v", tracked)
+ }
+}
+
+func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
+ t.Parallel()
+ p, _ := testPeriodicDispatcher()
+
+ // Create a job that will be evaluated soon.
+ job := testPeriodicJob(time.Now().Add(1 * time.Second))
+
+ // Add it.
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ // Remove the job.
+ if err := p.Remove(job.ID); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ time.Sleep(2 * time.Second)
+
+ // Check that an eval wasn't created.
+ d := p.dispatcher.(*MockJobEvalDispatcher)
+ if _, ok := d.Jobs[job.ID]; ok {
+ t.Fatalf("Remove didn't cancel creation of an eval")
+ }
+}
+
+func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) {
+ t.Parallel()
+ p, _ := testPeriodicDispatcher()
+
+ if err := p.ForceRun("foo"); err == nil {
+ t.Fatal("ForceRun of untracked job should fail")
+ }
+}
+
+func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
+ t.Parallel()
+ p, m := testPeriodicDispatcher()
+
+ // Create a job that won't be evalauted for a while.
+ job := testPeriodicJob(time.Now().Add(10 * time.Second))
+
+ // Add it.
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ // ForceRun the job
+ if err := p.ForceRun(job.ID); err != nil {
+ t.Fatalf("ForceRun failed %v", err)
+ }
+
+ // Check that job was launched correctly.
+ launches, err := m.LaunchTimes(p, job.ID)
+ if err != nil {
+ t.Fatalf("failed to get launch times for job %q: %v", job.ID, err)
+ }
+ l := len(launches)
+ if l != 1 {
+ t.Fatalf("restorePeriodicDispatcher() created an unexpected"+
+ " number of evals; got %d; want 1", l)
+ }
+}
+
+func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
+ t.Parallel()
+ p, m := testPeriodicDispatcher()
+
+ // Create a job that will be launched twice.
+ launch1 := time.Now().Round(1 * time.Second).Add(1 * time.Second)
+ launch2 := time.Now().Round(1 * time.Second).Add(2 * time.Second)
+ job := testPeriodicJob(launch1, launch2)
+
+ // Add it.
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ time.Sleep(3 * time.Second)
+
+ // Check that job was launched correctly.
+ times, err := m.LaunchTimes(p, job.ID)
+ if err != nil {
+ t.Fatalf("failed to get launch times for job %q", job.ID)
+ }
+ if len(times) != 2 {
+ t.Fatalf("incorrect number of launch times for job %q", job.ID)
+ }
+ if times[0] != launch1 {
+ t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch1)
+ }
+ if times[1] != launch2 {
+ t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[1], launch2)
+ }
+}
+
+func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
+ t.Parallel()
+ p, m := testPeriodicDispatcher()
+
+ // Create two job that will be launched at the same time.
+ launch := time.Now().Round(1 * time.Second).Add(1 * time.Second)
+ job := testPeriodicJob(launch)
+ job2 := testPeriodicJob(launch)
+
+ // Add them.
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+ if err := p.Add(job2); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+
+ time.Sleep(2 * time.Second)
+
+ // Check that the jobs were launched correctly.
+ for _, job := range []*structs.Job{job, job2} {
+ times, err := m.LaunchTimes(p, job.ID)
+ if err != nil {
+ t.Fatalf("failed to get launch times for job %q", job.ID)
+ }
+ if len(times) != 1 {
+ t.Fatalf("incorrect number of launch times for job %q; got %d; want 1", job.ID, len(times))
+ }
+ if times[0] != launch {
+ t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch)
+ }
+ }
+}
+
+// This test adds and removes a bunch of jobs, some launching at the same time,
+// some after each other and some invalid times, and ensures the correct
+// behavior.
+func TestPeriodicDispatch_Complex(t *testing.T) {
+ t.Parallel()
+ p, m := testPeriodicDispatcher()
+
+ // Create some jobs launching at different times.
+ now := time.Now().Round(1 * time.Second)
+ same := now.Add(1 * time.Second)
+ launch1 := same.Add(1 * time.Second)
+ launch2 := same.Add(2 * time.Second)
+ launch3 := same.Add(3 * time.Second)
+ invalid := now.Add(-200 * time.Second)
+
+ // Create two jobs launching at the same time.
+ job1 := testPeriodicJob(same)
+ job2 := testPeriodicJob(same)
+
+ // Create a job that will never launch.
+ job3 := testPeriodicJob(invalid)
+
+ // Create a job that launches twice.
+ job4 := testPeriodicJob(launch1, launch3)
+
+ // Create a job that launches once.
+ job5 := testPeriodicJob(launch2)
+
+ // Create 3 jobs we will delete.
+ job6 := testPeriodicJob(same)
+ job7 := testPeriodicJob(launch1, launch3)
+ job8 := testPeriodicJob(launch2)
+
+ // Create a map of expected eval job ids.
+ expected := map[string][]time.Time{
+ job1.ID: []time.Time{same},
+ job2.ID: []time.Time{same},
+ job3.ID: nil,
+ job4.ID: []time.Time{launch1, launch3},
+ job5.ID: []time.Time{launch2},
+ job6.ID: nil,
+ job7.ID: nil,
+ job8.ID: nil,
+ }
+
+ // Shuffle the jobs so they can be added randomly
+ jobs := []*structs.Job{job1, job2, job3, job4, job5, job6, job7, job8}
+ toDelete := []*structs.Job{job6, job7, job8}
+ shuffle(jobs)
+ shuffle(toDelete)
+
+ for _, job := range jobs {
+ if err := p.Add(job); err != nil {
+ t.Fatalf("Add failed %v", err)
+ }
+ }
+
+ for _, job := range toDelete {
+ if err := p.Remove(job.ID); err != nil {
+ t.Fatalf("Remove failed %v", err)
+ }
+ }
+
+ time.Sleep(5 * time.Second)
+ actual := make(map[string][]time.Time, len(expected))
+ for _, job := range jobs {
+ launches, err := m.LaunchTimes(p, job.ID)
+ if err != nil {
+ t.Fatalf("LaunchTimes(%v) failed %v", job.ID, err)
+ }
+
+ actual[job.ID] = launches
+ }
+
+ if !reflect.DeepEqual(actual, expected) {
+ t.Fatalf("Unexpected launches; got %#v; want %#v", actual, expected)
+ }
+}
+
+func shuffle(jobs []*structs.Job) {
+ rand.Seed(time.Now().Unix())
+ for i := range jobs {
+ j := rand.Intn(len(jobs))
+ jobs[i], jobs[j] = jobs[j], jobs[i]
+ }
+}
+
+func TestPeriodicHeap_Order(t *testing.T) {
+ t.Parallel()
+ h := NewPeriodicHeap()
+ j1 := mock.PeriodicJob()
+ j2 := mock.PeriodicJob()
+ j3 := mock.PeriodicJob()
+
+ lookup := map[*structs.Job]string{
+ j1: "j1",
+ j2: "j2",
+ j3: "j3",
+ }
+
+ h.Push(j1, time.Time{})
+ h.Push(j2, time.Unix(10, 0))
+ h.Push(j3, time.Unix(11, 0))
+
+ exp := []string{"j2", "j3", "j1"}
+ var act []string
+ for i := 0; i < 3; i++ {
+ pJob := h.Pop()
+ act = append(act, lookup[pJob.job])
+ }
+
+ if !reflect.DeepEqual(act, exp) {
+ t.Fatalf("Wrong ordering; got %v; want %v", act, exp)
+ }
+}
diff --git a/nomad/server.go b/nomad/server.go
index a7236e004..3a17c8224 100644
--- a/nomad/server.go
+++ b/nomad/server.go
@@ -113,6 +113,9 @@ type Server struct {
// plans that are waiting to be assessed by the leader
planQueue *PlanQueue
+ // periodicDispatcher is used to track and create evaluations for periodic jobs.
+ periodicDispatcher *PeriodicDispatch
+
// heartbeatTimers track the expiration time of each heartbeat that has
// a TTL. On expiration, the node status is updated to be 'down'.
heartbeatTimers map[string]*time.Timer
@@ -181,6 +184,9 @@ func NewServer(config *Config) (*Server, error) {
shutdownCh: make(chan struct{}),
}
+ // Create the periodic dispatcher for launching periodic jobs.
+ s.periodicDispatcher = NewPeriodicDispatch(s.logger, s)
+
// Initialize the RPC layer
// TODO: TLS...
if err := s.setupRPC(nil); err != nil {
@@ -406,7 +412,7 @@ func (s *Server) setupRaft() error {
// Create the FSM
var err error
- s.fsm, err = NewFSM(s.evalBroker, s.config.LogOutput)
+ s.fsm, err = NewFSM(s.evalBroker, s.periodicDispatcher, s.config.LogOutput)
if err != nil {
return err
}
diff --git a/nomad/state/schema.go b/nomad/state/schema.go
index 961cb67a7..5f68d319f 100644
--- a/nomad/state/schema.go
+++ b/nomad/state/schema.go
@@ -19,6 +19,7 @@ func stateStoreSchema() *memdb.DBSchema {
indexTableSchema,
nodeTableSchema,
jobTableSchema,
+ periodicLaunchTableSchema,
evalTableSchema,
allocTableSchema,
}
@@ -109,6 +110,14 @@ func jobTableSchema() *memdb.TableSchema {
Conditional: jobIsGCable,
},
},
+ "periodic": &memdb.IndexSchema{
+ Name: "periodic",
+ AllowMissing: false,
+ Unique: false,
+ Indexer: &memdb.ConditionalIndex{
+ Conditional: jobIsPeriodic,
+ },
+ },
},
}
}
@@ -124,6 +133,43 @@ func jobIsGCable(obj interface{}) (bool, error) {
return j.GC, nil
}
+// jobIsPeriodic satisfies the ConditionalIndexFunc interface and creates an index
+// on whether a job is periodic.
+func jobIsPeriodic(obj interface{}) (bool, error) {
+ j, ok := obj.(*structs.Job)
+ if !ok {
+ return false, fmt.Errorf("Unexpected type: %v", obj)
+ }
+
+ if j.Periodic != nil && j.Periodic.Enabled == true {
+ return true, nil
+ }
+
+ return false, nil
+}
+
+// periodicLaunchTableSchema returns the MemDB schema tracking the most recent
+// launch time for a perioidic job.
+func periodicLaunchTableSchema() *memdb.TableSchema {
+ return &memdb.TableSchema{
+ Name: "periodic_launch",
+ Indexes: map[string]*memdb.IndexSchema{
+ // Primary index is used for job management
+ // and simple direct lookup. ID is required to be
+ // unique.
+ "id": &memdb.IndexSchema{
+ Name: "id",
+ AllowMissing: false,
+ Unique: true,
+ Indexer: &memdb.StringFieldIndex{
+ Field: "ID",
+ Lowercase: true,
+ },
+ },
+ },
+ }
+}
+
// evalTableSchema returns the MemDB schema for the eval table.
// This table is used to store all the evaluations that are pending
// or recently completed.
diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go
index d27cba9b9..651e152dd 100644
--- a/nomad/state/state_store.go
+++ b/nomad/state/state_store.go
@@ -131,10 +131,6 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
txn := s.db.Txn(true)
defer txn.Abort()
- watcher := watch.NewItems()
- watcher.Add(watch.Item{Table: "nodes"})
- watcher.Add(watch.Item{Node: nodeID})
-
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
@@ -144,6 +140,10 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
return fmt.Errorf("node not found")
}
+ watcher := watch.NewItems()
+ watcher.Add(watch.Item{Table: "nodes"})
+ watcher.Add(watch.Item{Node: nodeID})
+
// Delete the node
if err := txn.Delete("nodes", existing); err != nil {
return fmt.Errorf("node delete failed: %v", err)
@@ -318,10 +318,6 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
txn := s.db.Txn(true)
defer txn.Abort()
- watcher := watch.NewItems()
- watcher.Add(watch.Item{Table: "jobs"})
- watcher.Add(watch.Item{Job: jobID})
-
// Lookup the node
existing, err := txn.First("jobs", "id", jobID)
if err != nil {
@@ -331,6 +327,10 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
return fmt.Errorf("job not found")
}
+ watcher := watch.NewItems()
+ watcher.Add(watch.Item{Table: "jobs"})
+ watcher.Add(watch.Item{Job: jobID})
+
// Delete the node
if err := txn.Delete("jobs", existing); err != nil {
return fmt.Errorf("job delete failed: %v", err)
@@ -383,6 +383,17 @@ func (s *StateStore) Jobs() (memdb.ResultIterator, error) {
return iter, nil
}
+// JobsByPeriodic returns an iterator over all the periodic or non-periodic jobs.
+func (s *StateStore) JobsByPeriodic(periodic bool) (memdb.ResultIterator, error) {
+ txn := s.db.Txn(false)
+
+ iter, err := txn.Get("jobs", "periodic", periodic)
+ if err != nil {
+ return nil, err
+ }
+ return iter, nil
+}
+
// JobsByScheduler returns an iterator over all the jobs with the specific
// scheduler type.
func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator, error) {
@@ -408,6 +419,102 @@ func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) {
return iter, nil
}
+// UpsertPeriodicLaunch is used to register a launch or update it.
+func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error {
+ txn := s.db.Txn(true)
+ defer txn.Abort()
+
+ watcher := watch.NewItems()
+ watcher.Add(watch.Item{Table: "periodic_launch"})
+ watcher.Add(watch.Item{Job: launch.ID})
+
+ // Check if the job already exists
+ existing, err := txn.First("periodic_launch", "id", launch.ID)
+ if err != nil {
+ return fmt.Errorf("periodic launch lookup failed: %v", err)
+ }
+
+ // Setup the indexes correctly
+ if existing != nil {
+ launch.CreateIndex = existing.(*structs.PeriodicLaunch).CreateIndex
+ launch.ModifyIndex = index
+ } else {
+ launch.CreateIndex = index
+ launch.ModifyIndex = index
+ }
+
+ // Insert the job
+ if err := txn.Insert("periodic_launch", launch); err != nil {
+ return fmt.Errorf("launch insert failed: %v", err)
+ }
+ if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil {
+ return fmt.Errorf("index update failed: %v", err)
+ }
+
+ txn.Defer(func() { s.watch.notify(watcher) })
+ txn.Commit()
+ return nil
+}
+
+// DeletePeriodicLaunch is used to delete the periodic launch
+func (s *StateStore) DeletePeriodicLaunch(index uint64, jobID string) error {
+ txn := s.db.Txn(true)
+ defer txn.Abort()
+
+ // Lookup the launch
+ existing, err := txn.First("periodic_launch", "id", jobID)
+ if err != nil {
+ return fmt.Errorf("launch lookup failed: %v", err)
+ }
+ if existing == nil {
+ return fmt.Errorf("launch not found")
+ }
+
+ watcher := watch.NewItems()
+ watcher.Add(watch.Item{Table: "periodic_launch"})
+ watcher.Add(watch.Item{Job: jobID})
+
+ // Delete the launch
+ if err := txn.Delete("periodic_launch", existing); err != nil {
+ return fmt.Errorf("launch delete failed: %v", err)
+ }
+ if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil {
+ return fmt.Errorf("index update failed: %v", err)
+ }
+
+ txn.Defer(func() { s.watch.notify(watcher) })
+ txn.Commit()
+ return nil
+}
+
+// PeriodicLaunchByID is used to lookup a periodic launch by the periodic job
+// ID.
+func (s *StateStore) PeriodicLaunchByID(id string) (*structs.PeriodicLaunch, error) {
+ txn := s.db.Txn(false)
+
+ existing, err := txn.First("periodic_launch", "id", id)
+ if err != nil {
+ return nil, fmt.Errorf("periodic launch lookup failed: %v", err)
+ }
+
+ if existing != nil {
+ return existing.(*structs.PeriodicLaunch), nil
+ }
+ return nil, nil
+}
+
+// PeriodicLaunches returns an iterator over all the periodic launches
+func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) {
+ txn := s.db.Txn(false)
+
+ // Walk the entire table
+ iter, err := txn.Get("periodic_launch", "id")
+ if err != nil {
+ return nil, err
+ }
+ return iter, nil
+}
+
// UpsertEvaluation is used to upsert an evaluation
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
txn := s.db.Txn(true)
@@ -875,6 +982,16 @@ func (r *StateRestore) IndexRestore(idx *IndexEntry) error {
return nil
}
+// PeriodicLaunchRestore is used to restore a periodic launch.
+func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) error {
+ r.items.Add(watch.Item{Table: "periodic_launch"})
+ r.items.Add(watch.Item{Job: launch.ID})
+ if err := r.txn.Insert("periodic_launch", launch); err != nil {
+ return fmt.Errorf("periodic launch insert failed: %v", err)
+ }
+ return nil
+}
+
// stateWatch holds shared state for watching updates. This is
// outside of StateStore so it can be shared with snapshots.
type stateWatch struct {
diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go
index 5ff4d110d..dda6ae293 100644
--- a/nomad/state/state_store_test.go
+++ b/nomad/state/state_store_test.go
@@ -5,6 +5,7 @@ import (
"reflect"
"sort"
"testing"
+ "time"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/mock"
@@ -546,6 +547,64 @@ func TestStateStore_JobsByIDPrefix(t *testing.T) {
}
}
+func TestStateStore_JobsByPeriodic(t *testing.T) {
+ state := testStateStore(t)
+ var periodic, nonPeriodic []*structs.Job
+
+ for i := 0; i < 10; i++ {
+ job := mock.Job()
+ nonPeriodic = append(nonPeriodic, job)
+
+ err := state.UpsertJob(1000+uint64(i), job)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ }
+
+ for i := 0; i < 10; i++ {
+ job := mock.PeriodicJob()
+ periodic = append(periodic, job)
+
+ err := state.UpsertJob(2000+uint64(i), job)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ }
+
+ iter, err := state.JobsByPeriodic(true)
+ var outPeriodic []*structs.Job
+ for {
+ raw := iter.Next()
+ if raw == nil {
+ break
+ }
+ outPeriodic = append(outPeriodic, raw.(*structs.Job))
+ }
+
+ iter, err = state.JobsByPeriodic(false)
+ var outNonPeriodic []*structs.Job
+ for {
+ raw := iter.Next()
+ if raw == nil {
+ break
+ }
+ outNonPeriodic = append(outNonPeriodic, raw.(*structs.Job))
+ }
+
+ sort.Sort(JobIDSort(periodic))
+ sort.Sort(JobIDSort(nonPeriodic))
+ sort.Sort(JobIDSort(outPeriodic))
+ sort.Sort(JobIDSort(outNonPeriodic))
+
+ if !reflect.DeepEqual(periodic, outPeriodic) {
+ t.Fatalf("bad: %#v %#v", periodic, outPeriodic)
+ }
+
+ if !reflect.DeepEqual(nonPeriodic, outNonPeriodic) {
+ t.Fatalf("bad: %#v %#v", nonPeriodic, outNonPeriodic)
+ }
+}
+
func TestStateStore_JobsByScheduler(t *testing.T) {
state := testStateStore(t)
var serviceJobs []*structs.Job
@@ -702,6 +761,222 @@ func TestStateStore_RestoreJob(t *testing.T) {
notify.verify(t)
}
+func TestStateStore_UpsertPeriodicLaunch(t *testing.T) {
+ state := testStateStore(t)
+ job := mock.Job()
+ launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
+
+ notify := setupNotifyTest(
+ state,
+ watch.Item{Table: "periodic_launch"},
+ watch.Item{Job: job.ID})
+
+ err := state.UpsertPeriodicLaunch(1000, launch)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ out, err := state.PeriodicLaunchByID(job.ID)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if out.CreateIndex != 1000 {
+ t.Fatalf("bad: %#v", out)
+ }
+ if out.ModifyIndex != 1000 {
+ t.Fatalf("bad: %#v", out)
+ }
+
+ if !reflect.DeepEqual(launch, out) {
+ t.Fatalf("bad: %#v %#v", job, out)
+ }
+
+ index, err := state.Index("periodic_launch")
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if index != 1000 {
+ t.Fatalf("bad: %d", index)
+ }
+
+ notify.verify(t)
+}
+
+func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) {
+ state := testStateStore(t)
+ job := mock.Job()
+ launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
+
+ notify := setupNotifyTest(
+ state,
+ watch.Item{Table: "periodic_launch"},
+ watch.Item{Job: job.ID})
+
+ err := state.UpsertPeriodicLaunch(1000, launch)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ launch2 := &structs.PeriodicLaunch{
+ ID: job.ID,
+ Launch: launch.Launch.Add(1 * time.Second),
+ }
+ err = state.UpsertPeriodicLaunch(1001, launch2)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ out, err := state.PeriodicLaunchByID(job.ID)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if out.CreateIndex != 1000 {
+ t.Fatalf("bad: %#v", out)
+ }
+ if out.ModifyIndex != 1001 {
+ t.Fatalf("bad: %#v", out)
+ }
+
+ if !reflect.DeepEqual(launch2, out) {
+ t.Fatalf("bad: %#v %#v", launch2, out)
+ }
+
+ index, err := state.Index("periodic_launch")
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if index != 1001 {
+ t.Fatalf("bad: %d", index)
+ }
+
+ notify.verify(t)
+}
+
+func TestStateStore_DeletePeriodicLaunch(t *testing.T) {
+ state := testStateStore(t)
+ job := mock.Job()
+ launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
+
+ notify := setupNotifyTest(
+ state,
+ watch.Item{Table: "periodic_launch"},
+ watch.Item{Job: job.ID})
+
+ err := state.UpsertPeriodicLaunch(1000, launch)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ err = state.DeletePeriodicLaunch(1001, job.ID)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ out, err := state.PeriodicLaunchByID(job.ID)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ if out != nil {
+ t.Fatalf("bad: %#v %#v", job, out)
+ }
+
+ index, err := state.Index("periodic_launch")
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if index != 1001 {
+ t.Fatalf("bad: %d", index)
+ }
+
+ notify.verify(t)
+}
+
+func TestStateStore_PeriodicLaunches(t *testing.T) {
+ state := testStateStore(t)
+ var launches []*structs.PeriodicLaunch
+
+ for i := 0; i < 10; i++ {
+ job := mock.Job()
+ launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
+ launches = append(launches, launch)
+
+ err := state.UpsertPeriodicLaunch(1000+uint64(i), launch)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ }
+
+ iter, err := state.PeriodicLaunches()
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ out := make(map[string]*structs.PeriodicLaunch, 10)
+ for {
+ raw := iter.Next()
+ if raw == nil {
+ break
+ }
+ launch := raw.(*structs.PeriodicLaunch)
+ if _, ok := out[launch.ID]; ok {
+ t.Fatalf("duplicate: %v", launch.ID)
+ }
+
+ out[launch.ID] = launch
+ }
+
+ for _, launch := range launches {
+ l, ok := out[launch.ID]
+ if !ok {
+ t.Fatalf("bad %v", launch.ID)
+ }
+
+ if !reflect.DeepEqual(launch, l) {
+ t.Fatalf("bad: %#v %#v", launch, l)
+ }
+
+ delete(out, launch.ID)
+ }
+
+ if len(out) != 0 {
+ t.Fatalf("leftover: %#v", out)
+ }
+}
+
+func TestStateStore_RestorePeriodicLaunch(t *testing.T) {
+ state := testStateStore(t)
+ job := mock.Job()
+ launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
+
+ notify := setupNotifyTest(
+ state,
+ watch.Item{Table: "periodic_launch"},
+ watch.Item{Job: job.ID})
+
+ restore, err := state.Restore()
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ err = restore.PeriodicLaunchRestore(launch)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ restore.Commit()
+
+ out, err := state.PeriodicLaunchByID(job.ID)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ if !reflect.DeepEqual(out, launch) {
+ t.Fatalf("Bad: %#v %#v", out, job)
+ }
+
+ notify.verify(t)
+}
+
func TestStateStore_Indexes(t *testing.T) {
state := testStateStore(t)
node := mock.Node()
diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go
index 5b443c4cb..1205141f1 100644
--- a/nomad/structs/structs.go
+++ b/nomad/structs/structs.go
@@ -8,6 +8,7 @@ import (
"io"
"reflect"
"regexp"
+ "strconv"
"strings"
"time"
@@ -16,6 +17,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/args"
+ "github.com/mitchellh/copystructure"
)
var (
@@ -722,6 +724,9 @@ type Job struct {
// specified hierarchically like LineOfBiz/OrgName/Team/Project
ID string
+ // ParentID is the unique identifier of the job that spawned this job.
+ ParentID string
+
// Name is the logical name of the job used to refer to it. This is unique
// per region, but not unique globally.
Name string
@@ -790,6 +795,17 @@ func (j *Job) InitFields() {
}
}
+// Copy returns a deep copy of the Job. It is expected that callers use recover.
+// This job can panic if the deep copy failed as it uses reflection.
+func (j *Job) Copy() *Job {
+ i, err := copystructure.Copy(j)
+ if err != nil {
+ panic(err)
+ }
+
+ return i.(*Job)
+}
+
// Validate is used to sanity check a job input
func (j *Job) Validate() error {
var mErr multierror.Error
@@ -850,9 +866,15 @@ func (j *Job) Validate() error {
}
// Validate periodic is only used with batch jobs.
- if j.Periodic != nil && j.Periodic.Enabled && j.Type != JobTypeBatch {
- mErr.Errors = append(mErr.Errors,
- fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch))
+ if j.IsPeriodic() {
+ if j.Type != JobTypeBatch {
+ mErr.Errors = append(mErr.Errors,
+ fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch))
+ }
+
+ if err := j.Periodic.Validate(); err != nil {
+ mErr.Errors = append(mErr.Errors, err)
+ }
}
return mErr.ErrorOrNil()
@@ -917,6 +939,10 @@ func (u *UpdateStrategy) Rolling() bool {
const (
// PeriodicSpecCron is used for a cron spec.
PeriodicSpecCron = "cron"
+
+ // PeriodicSpecTest is only used by unit tests. It is a sorted, comma
+ // seperated list of unix timestamps at which to launch.
+ PeriodicSpecTest = "_internal_test"
)
// Periodic defines the interval a job should be run at.
@@ -947,8 +973,10 @@ func (p *PeriodicConfig) Validate() error {
if _, err := cronexpr.Parse(p.Spec); err != nil {
return fmt.Errorf("Invalid cron spec %q: %v", p.Spec, err)
}
+ case PeriodicSpecTest:
+ // No-op
default:
- return fmt.Errorf("Unknown specification type %q", p.SpecType)
+ return fmt.Errorf("Unknown periodic specification type %q", p.SpecType)
}
return nil
@@ -964,11 +992,44 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
if e, err := cronexpr.Parse(p.Spec); err == nil {
return e.Next(fromTime)
}
+ case PeriodicSpecTest:
+ split := strings.Split(p.Spec, ",")
+ if len(split) == 1 && split[0] == "" {
+ return time.Time{}
+ }
+
+ // Parse the times
+ times := make([]time.Time, len(split))
+ for i, s := range split {
+ unix, err := strconv.Atoi(s)
+ if err != nil {
+ return time.Time{}
+ }
+
+ times[i] = time.Unix(int64(unix), 0)
+ }
+
+ // Find the next match
+ for _, next := range times {
+ if fromTime.Before(next) {
+ return next
+ }
+ }
}
return time.Time{}
}
+// PeriodicLaunch tracks the last launch time of a periodic job.
+type PeriodicLaunch struct {
+ ID string // ID of the periodic job.
+ Launch time.Time // The last launch time.
+
+ // Raft Indexes
+ CreateIndex uint64
+ ModifyIndex uint64
+}
+
var (
defaultServiceJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second,
diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go
index 2b74c3f1a..22e5dfea9 100644
--- a/nomad/structs/structs_test.go
+++ b/nomad/structs/structs_test.go
@@ -93,6 +93,80 @@ func TestJob_Validate(t *testing.T) {
}
}
+func TestJob_Copy(t *testing.T) {
+ j := &Job{
+ Region: "global",
+ ID: GenerateUUID(),
+ Name: "my-job",
+ Type: JobTypeService,
+ Priority: 50,
+ AllAtOnce: false,
+ Datacenters: []string{"dc1"},
+ Constraints: []*Constraint{
+ &Constraint{
+ LTarget: "$attr.kernel.name",
+ RTarget: "linux",
+ Operand: "=",
+ },
+ },
+ Periodic: &PeriodicConfig{
+ Enabled: false,
+ },
+ TaskGroups: []*TaskGroup{
+ &TaskGroup{
+ Name: "web",
+ Count: 10,
+ RestartPolicy: &RestartPolicy{
+ Attempts: 3,
+ Interval: 10 * time.Minute,
+ Delay: 1 * time.Minute,
+ },
+ Tasks: []*Task{
+ &Task{
+ Name: "web",
+ Driver: "exec",
+ Config: map[string]interface{}{
+ "command": "/bin/date",
+ },
+ Env: map[string]string{
+ "FOO": "bar",
+ },
+ Services: []*Service{
+ {
+ Name: "${TASK}-frontend",
+ PortLabel: "http",
+ },
+ },
+ Resources: &Resources{
+ CPU: 500,
+ MemoryMB: 256,
+ Networks: []*NetworkResource{
+ &NetworkResource{
+ MBits: 50,
+ DynamicPorts: []Port{{Label: "http"}},
+ },
+ },
+ },
+ },
+ },
+ Meta: map[string]string{
+ "elb_check_type": "http",
+ "elb_check_interval": "30s",
+ "elb_check_min": "3",
+ },
+ },
+ },
+ Meta: map[string]string{
+ "owner": "armon",
+ },
+ }
+
+ c := j.Copy()
+ if !reflect.DeepEqual(j, c) {
+ t.Fatalf("Copy() returned an unequal Job; got %v; want %v", c, j)
+ }
+}
+
func TestJob_IsPeriodic(t *testing.T) {
j := &Job{
Type: JobTypeService,
diff --git a/nomad/timetable.go b/nomad/timetable.go
index 38344f79a..36076ce4a 100644
--- a/nomad/timetable.go
+++ b/nomad/timetable.go
@@ -64,7 +64,7 @@ func (t *TimeTable) Deserialize(dec *codec.Decoder) error {
return nil
}
-// Witness is used to witness a new inde and time.
+// Witness is used to witness a new index and time.
func (t *TimeTable) Witness(index uint64, when time.Time) {
t.l.Lock()
defer t.l.Unlock()
diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go
index 33abef8a4..7af4c4fb2 100644
--- a/scheduler/generic_sched_test.go
+++ b/scheduler/generic_sched_test.go
@@ -405,6 +405,9 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) {
// Ensure all allocations placed
if len(out) != 10 {
+ for _, alloc := range out {
+ t.Logf("%#v", alloc)
+ }
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
diff --git a/version.go b/version.go
index f7908dc06..c2fba56b0 100644
--- a/version.go
+++ b/version.go
@@ -5,9 +5,9 @@ var GitCommit string
var GitDescribe string
// The main version number that is being run at the moment.
-const Version = "0.2.3"
+const Version = "0.3.0"
// A pre-release marker for the version. If this is "" (empty string)
// then it means that it is a final release. Otherwise, this is a pre-release
// such as "dev" (in development), "beta", "rc1", etc.
-const VersionPrerelease = ""
+const VersionPrerelease = "dev"
diff --git a/website/source/docs/agent/config.html.md b/website/source/docs/agent/config.html.md
index c35ac0a63..386ba64ed 100644
--- a/website/source/docs/agent/config.html.md
+++ b/website/source/docs/agent/config.html.md
@@ -94,7 +94,7 @@ nodes, unless otherwise specified:
directory by default to store temporary allocation data as well as cluster
information. Server nodes use this directory to store cluster state, including
the replicated log and snapshot data. This option is required to start the
- Nomad agent.
+ Nomad agent and must be specified as an absolute path.
* `log_level`: Controls the verbosity of logs the Nomad agent will output. Valid
log levels include `WARN`, `INFO`, or `DEBUG` in increasing order of
@@ -253,14 +253,14 @@ configured on server nodes.
configuration options depend on this value. Defaults to `false`.
* `state_dir`: This is the state dir used to store
client state. By default, it lives inside of the [data_dir](#data_dir), in
- the "client" sub-path.
+ the "client" sub-path. It must be specified as an absolute path.
* `alloc_dir`: A directory used to store allocation data.
Depending on the workload, the size of this directory can grow arbitrarily
large as it is used to store downloaded artifacts for drivers (QEMU images,
JAR files, etc.). It is therefore important to ensure this directory is
placed some place on the filesystem with adequate storage capacity. By
default, this directory lives under the [data_dir](#data_dir) at the
- "alloc" sub-path.
+ "alloc" sub-path. It must be specified as an absolute path.
* `servers`: An array of server addresses. This list is
used to register the client with the server nodes and advertise the
available resources so that the agent can receive work.
diff --git a/website/source/docs/drivers/rkt.html.md b/website/source/docs/drivers/rkt.html.md
index d37e16b5c..5efab860b 100644
--- a/website/source/docs/drivers/rkt.html.md
+++ b/website/source/docs/drivers/rkt.html.md
@@ -11,10 +11,9 @@ description: |-
Name: `rkt`
The `rkt` driver provides an interface for using CoreOS rkt for running
-application containers. Currently, the driver supports launching
-containers but does not support resource isolation or dynamic ports. This can
-lead to resource over commitment and port conflicts and as such, this driver is
-being marked as experimental and should be used with care.
+application containers. Currently, the driver supports launching containers but
+does not support dynamic ports. This can lead to port conflicts and as such,
+this driver is being marked as experimental and should be used with care.
## Task Configuration
@@ -49,9 +48,11 @@ The `rkt` driver will set the following client attributes:
* `driver.rkt` - Set to `1` if rkt is found on the host node. Nomad determines
this by executing `rkt version` on the host and parsing the output
-* `driver.rkt.version` - Version of `rkt` eg: `0.8.1`
+* `driver.rkt.version` - Version of `rkt` eg: `0.8.1`. Note that the minimum required
+version is `0.14.0`
* `driver.rkt.appc.version` - Version of `appc` that `rkt` is using eg: `0.8.1`
## Resource Isolation
-This driver does not support any resource isolation as of now.
+This driver supports CPU and memory isolation by delegating to `rkt`. Network isolation
+is not supported as of now.
diff --git a/website/source/docs/jobspec/index.html.md b/website/source/docs/jobspec/index.html.md
index f55251c2b..f3c2a4435 100644
--- a/website/source/docs/jobspec/index.html.md
+++ b/website/source/docs/jobspec/index.html.md
@@ -156,6 +156,24 @@ The `job` object supports the following keys:
and "h" suffix can be used, such as "30s". Both values default to zero,
which disables rolling updates.
+* `periodic` - `periodic` allows the job to be scheduled at fixed times, dates
+ or intervals. The `periodic` block has the following configuration:
+
+ ```
+ periodic {
+ // Enabled is defaulted to true if the block is included. It can be set
+ // to false to pause the periodic job from running.
+ enabled = true
+
+ // A cron expression configuring the interval the job is launched at.
+ // Supports predefined expressions such as "@daily" and "@weekly"
+ cron = "*/15 * * * * *"
+ }
+ ```
+
+ `cron`: See [here](https://github.com/gorhill/cronexpr#implementation)
+ for full documentation of supported cron specs and the predefined expressions.
+
### Task Group
The `group` object supports the following keys: