Merge pull request #659 from hashicorp/f-periodic-status

Improve nomad status on periodic jobs
This commit is contained in:
Alex Dadgar 2016-01-07 16:12:00 -08:00
commit 1ab849049c
10 changed files with 127 additions and 59 deletions

View file

@ -144,6 +144,7 @@ type Job struct {
// jobs during list operations.
type JobListStub struct {
ID string
ParentID string
Name string
Type string
Priority int

View file

@ -61,5 +61,5 @@ func (c *EvalMonitorCommand) Run(args []string) int {
// Start monitoring
mon := newMonitor(c.Ui, client)
return mon.monitor(evalID)
return mon.monitor(evalID, true)
}

View file

@ -161,14 +161,15 @@ func (m *monitor) update(update *evalState) {
// monitor is used to start monitoring the given evaluation ID. It
// writes output directly to the monitor's ui, and returns the
// exit code for the command.
// exit code for the command. If allowPrefix is false, monitor will only accept
// exact matching evalIDs.
//
// The return code will be 0 on successful evaluation. If there are
// problems scheduling the job (impossible constraints, resources
// exhausted, etc), then the return code will be 2. For any other
// failures (API connectivity, internal errors, etc), the return code
// will be 1.
func (m *monitor) monitor(evalID string) int {
func (m *monitor) monitor(evalID string, allowPrefix bool) int {
// Track if we encounter a scheduling failure. This can only be
// detected while querying allocations, so we use this bool to
// carry that status into the return code.
@ -182,6 +183,11 @@ func (m *monitor) monitor(evalID string) int {
// Query the evaluation
eval, _, err := m.client.Evaluations().Info(evalID, nil)
if err != nil {
if !allowPrefix {
m.ui.Error(fmt.Sprintf("No evaluation with id %q found", evalID))
return 1
}
evals, _, err := m.client.Evaluations().PrefixList(evalID)
if err != nil {
m.ui.Error(fmt.Sprintf("Error reading evaluation: %s", err))
@ -279,7 +285,7 @@ func (m *monitor) monitor(evalID string) int {
// Reset the state and monitor the new eval
m.state = newEvalState()
return m.monitor(eval.NextEval)
return m.monitor(eval.NextEval, allowPrefix)
}
break
}

View file

@ -250,7 +250,7 @@ func TestMonitor_Monitor(t *testing.T) {
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
code = mon.monitor(evalID)
code = mon.monitor(evalID, false)
}()
// Wait for completion
@ -296,7 +296,7 @@ func TestMonitor_MonitorWithPrefix(t *testing.T) {
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
code = mon.monitor(evalID[:4])
code = mon.monitor(evalID[:4], true)
}()
// Wait for completion

View file

@ -128,7 +128,7 @@ func (c *RunCommand) Run(args []string) int {
// Detach was not specified, so start monitoring
mon := newMonitor(c.Ui, client)
return mon.monitor(evalID)
return mon.monitor(evalID, false)
}

View file

@ -151,56 +151,110 @@ func (c *StatusCommand) Run(args []string) int {
c.Ui.Output(formatKV(basic))
if !periodic && !short {
var evals, allocs []string
// Exit early
if short {
return 0
}
// Query the evaluations
jobEvals, _, err := client.Jobs().Evaluations(job.ID, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying job evaluations: %s", err))
// Print periodic job information
if periodic {
if err := c.outputPeriodicInfo(client, job); err != nil {
c.Ui.Error(err.Error())
return 1
}
// Query the allocations
jobAllocs, _, err := client.Jobs().Allocations(job.ID, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying job allocations: %s", err))
return 1
}
return 0
}
// Format the evals
evals = make([]string, len(jobEvals)+1)
evals[0] = "ID|Priority|TriggeredBy|Status"
for i, eval := range jobEvals {
evals[i+1] = fmt.Sprintf("%s|%d|%s|%s",
eval.ID,
eval.Priority,
eval.TriggeredBy,
eval.Status)
}
// Format the allocs
allocs = make([]string, len(jobAllocs)+1)
allocs[0] = "ID|EvalID|NodeID|TaskGroup|Desired|Status"
for i, alloc := range jobAllocs {
allocs[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s",
alloc.ID,
alloc.EvalID,
alloc.NodeID,
alloc.TaskGroup,
alloc.DesiredStatus,
alloc.ClientStatus)
}
c.Ui.Output("\n==> Evaluations")
c.Ui.Output(formatList(evals))
c.Ui.Output("\n==> Allocations")
c.Ui.Output(formatList(allocs))
if err := c.outputJobInfo(client, job); err != nil {
c.Ui.Error(err.Error())
return 1
}
return 0
}
// outputPeriodicInfo prints information about the passed periodic job. If a
// request fails, an error is returned.
func (c *StatusCommand) outputPeriodicInfo(client *api.Client, job *api.Job) error {
// Generate the prefix that matches launched jobs from the periodic job.
prefix := fmt.Sprintf("%s%s", job.ID, structs.PeriodicLaunchSuffix)
children, _, err := client.Jobs().PrefixList(prefix)
if err != nil {
return fmt.Errorf("Error querying job: %s", err)
}
if len(children) == 0 {
c.Ui.Output("\nNo previously launched jobs")
return nil
}
out := make([]string, 1)
out[0] = "ID|Status"
for _, child := range children {
// Ensure that we are only showing jobs whose parent is the requested
// job.
if child.ParentID != job.ID {
continue
}
out = append(out, fmt.Sprintf("%s|%s",
child.ID,
child.Status))
}
c.Ui.Output(fmt.Sprintf("\nPreviously launched jobs:\n%s", formatList(out)))
return nil
}
// outputJobInfo prints information about the passed non-periodic job. If a
// request fails, an error is returned.
func (c *StatusCommand) outputJobInfo(client *api.Client, job *api.Job) error {
var evals, allocs []string
// Query the evaluations
jobEvals, _, err := client.Jobs().Evaluations(job.ID, nil)
if err != nil {
return fmt.Errorf("Error querying job evaluations: %s", err)
}
// Query the allocations
jobAllocs, _, err := client.Jobs().Allocations(job.ID, nil)
if err != nil {
return fmt.Errorf("Error querying job allocations: %s", err)
}
// Format the evals
evals = make([]string, len(jobEvals)+1)
evals[0] = "ID|Priority|TriggeredBy|Status"
for i, eval := range jobEvals {
evals[i+1] = fmt.Sprintf("%s|%d|%s|%s",
eval.ID,
eval.Priority,
eval.TriggeredBy,
eval.Status)
}
// Format the allocs
allocs = make([]string, len(jobAllocs)+1)
allocs[0] = "ID|EvalID|NodeID|TaskGroup|Desired|Status"
for i, alloc := range jobAllocs {
allocs[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s",
alloc.ID,
alloc.EvalID,
alloc.NodeID,
alloc.TaskGroup,
alloc.DesiredStatus,
alloc.ClientStatus)
}
c.Ui.Output("\n==> Evaluations")
c.Ui.Output(formatList(evals))
c.Ui.Output("\n==> Allocations")
c.Ui.Output(formatList(allocs))
return nil
}
// convertApiJob is used to take a *api.Job and convert it to an *struct.Job.
// This function is just a hammer and probably needs to be revisited.
func convertApiJob(in *api.Job) (*structs.Job, error) {

View file

@ -104,6 +104,11 @@ func (c *StopCommand) Run(args []string) int {
return 1
}
// If we are stopping a periodic job there won't be an evalID.
if evalID == "" {
return 0
}
if detach {
c.Ui.Output(evalID)
return 0
@ -111,5 +116,5 @@ func (c *StopCommand) Run(args []string) int {
// Start monitoring the stop eval
mon := newMonitor(c.Ui, client)
return mon.monitor(evalID)
return mon.monitor(evalID, false)
}

View file

@ -12,12 +12,6 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// The string appended to the periodic jobs ID when launching derived
// instances of it.
JobLaunchSuffix = "/periodic-"
)
// PeriodicDispatch is used to track and launch periodic jobs. It maintains the
// set of periodic jobs and creates derived jobs and evaluations per
// instantiation which is determined by the periodic spec.
@ -85,7 +79,7 @@ func (s *Server) DispatchJob(job *structs.Job) error {
// RunningChildren checks whether the passed job has any running children.
func (s *Server) RunningChildren(job *structs.Job) (bool, error) {
state := s.fsm.State()
prefix := fmt.Sprintf("%s%s", job.ID, JobLaunchSuffix)
prefix := fmt.Sprintf("%s%s", job.ID, structs.PeriodicLaunchSuffix)
iter, err := state.JobsByIDPrefix(prefix)
if err != nil {
return false, err
@ -418,18 +412,18 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
// deriveJobID returns a job ID based on the parent periodic job and the launch
// time.
func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string {
return fmt.Sprintf("%s%s%d", periodicJob.ID, JobLaunchSuffix, time.Unix())
return fmt.Sprintf("%s%s%d", periodicJob.ID, structs.PeriodicLaunchSuffix, time.Unix())
}
// LaunchTime returns the launch time of the job. This is only valid for
// jobs created by PeriodicDispatch and will otherwise return an error.
func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
index := strings.LastIndex(jobID, JobLaunchSuffix)
index := strings.LastIndex(jobID, structs.PeriodicLaunchSuffix)
if index == -1 {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
launch, err := strconv.Atoi(jobID[index+len(JobLaunchSuffix):])
launch, err := strconv.Atoi(jobID[index+len(structs.PeriodicLaunchSuffix):])
if err != nil {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}

View file

@ -517,7 +517,7 @@ func TestPeriodicHeap_Order(t *testing.T) {
func deriveChildJob(parent *structs.Job) *structs.Job {
childjob := mock.Job()
childjob.ParentID = parent.ID
childjob.ID = fmt.Sprintf("%s%s%v", parent.ID, JobLaunchSuffix, time.Now().Unix())
childjob.ID = fmt.Sprintf("%s%s%v", parent.ID, structs.PeriodicLaunchSuffix, time.Now().Unix())
return childjob
}

View file

@ -894,6 +894,7 @@ func (j *Job) LookupTaskGroup(name string) *TaskGroup {
func (j *Job) Stub() *JobListStub {
return &JobListStub{
ID: j.ID,
ParentID: j.ParentID,
Name: j.Name,
Type: j.Type,
Priority: j.Priority,
@ -913,6 +914,7 @@ func (j *Job) IsPeriodic() bool {
// for the job list
type JobListStub struct {
ID string
ParentID string
Name string
Type string
Priority int
@ -1023,6 +1025,12 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
return time.Time{}
}
const (
// PeriodicLaunchSuffix is the string appended to the periodic jobs ID
// when launching derived instances of it.
PeriodicLaunchSuffix = "/periodic-"
)
// PeriodicLaunch tracks the last launch time of a periodic job.
type PeriodicLaunch struct {
ID string // ID of the periodic job.