Merge pull request #10145 from hashicorp/b-periodic-init-status

periodic: always reset periodic children status
This commit is contained in:
Mahmood Ali 2021-03-26 09:19:08 -04:00 committed by GitHub
commit dbc3850358
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 318 additions and 6 deletions

View File

@ -26,6 +26,7 @@ import (
_ "github.com/hashicorp/nomad/e2e/nodedrain"
_ "github.com/hashicorp/nomad/e2e/nomad09upgrade"
_ "github.com/hashicorp/nomad/e2e/nomadexec"
_ "github.com/hashicorp/nomad/e2e/parameterized"
_ "github.com/hashicorp/nomad/e2e/periodic"
_ "github.com/hashicorp/nomad/e2e/podman"
_ "github.com/hashicorp/nomad/e2e/quotas"

View File

@ -40,8 +40,7 @@ func Register(jobID, jobFilePath string) error {
return nil
}
// PeriodicForce forces a periodic job to dispatch, returning the child job ID
// or an error
// PeriodicForce forces a periodic job to dispatch
func PeriodicForce(jobID string) error {
// nomad job periodic force
cmd := exec.Command("nomad", "job", "periodic", "force", jobID)
@ -54,6 +53,29 @@ func PeriodicForce(jobID string) error {
return nil
}
// Dispatch dispatches a parameterized job
func Dispatch(jobID string, meta map[string]string, payload string) error {
// nomad job periodic force
args := []string{"job", "dispatch"}
for k, v := range meta {
args = append(args, "-meta", fmt.Sprintf("%v=%v", k, v))
}
args = append(args, jobID)
if payload != "" {
args = append(args, "-")
}
cmd := exec.Command("nomad", args...)
cmd.Stdin = strings.NewReader(payload)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("could not dispatch job: %w\n%v", err, string(out))
}
return nil
}
// JobInspectTemplate runs nomad job inspect and formats the output
// using the specified go template
func JobInspectTemplate(jobID, template string) (string, error) {
@ -102,7 +124,10 @@ func ChildrenJobSummary(jobID string) ([]map[string]string, error) {
section, err := GetSection(out, "Children Job Summary")
if err != nil {
return nil, fmt.Errorf("could not find children job summary section: %w", err)
section, err = GetSection(out, "Parameterized Job Summary")
if err != nil {
return nil, fmt.Errorf("could not find children job summary section: %w", err)
}
}
summary, err := ParseColumns(section)
@ -131,3 +156,22 @@ func PreviouslyLaunched(jobID string) ([]map[string]string, error) {
return summary, nil
}
func DispatchedJobs(jobID string) ([]map[string]string, error) {
out, err := Command("nomad", "job", "status", jobID)
if err != nil {
return nil, fmt.Errorf("nomad job status failed: %w", err)
}
section, err := GetSection(out, "Dispatched Jobs")
if err != nil {
return nil, fmt.Errorf("could not find previously launched jobs section: %w", err)
}
summary, err := ParseColumns(section)
if err != nil {
return nil, fmt.Errorf("could not parse previously launched jobs section: %w", err)
}
return summary, nil
}

View File

@ -0,0 +1,26 @@
job "periodic" {
datacenters = ["dc1"]
type = "batch"
constraint {
attribute = "${attr.kernel.name}"
operator = "set_contains_any"
value = "darwin,linux"
}
parameterized {
meta_optional = ["i"]
}
group "group" {
task "task" {
driver = "docker"
config {
image = "busybox:1"
command = "/bin/sh"
args = ["-c", "sleep 5"]
}
}
}
}

View File

@ -0,0 +1,90 @@
package parameterized
import (
"fmt"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
type ParameterizedTest struct {
framework.TC
jobIDs []string
}
func init() {
framework.AddSuites(&framework.TestSuite{
Component: "Parameterized",
CanRunLocal: true,
Cases: []framework.TestCase{
new(ParameterizedTest),
},
})
}
func (tc *ParameterizedTest) BeforeAll(f *framework.F) {
e2eutil.WaitForLeader(f.T(), tc.Nomad())
}
func (tc *ParameterizedTest) AfterEach(f *framework.F) {
nomadClient := tc.Nomad()
j := nomadClient.Jobs()
for _, id := range tc.jobIDs {
j.Deregister(id, true, nil)
}
_, err := e2eutil.Command("nomad", "system", "gc")
f.NoError(err)
}
func (tc *ParameterizedTest) TestParameterizedDispatch_Basic(f *framework.F) {
t := f.T()
uuid := uuid.Generate()
jobID := fmt.Sprintf("dispatch-%s", uuid[0:8])
tc.jobIDs = append(tc.jobIDs, jobID)
// register job
require.NoError(t, e2eutil.Register(jobID, "parameterized/input/simple.nomad"))
// force dispatch
dispatched := 4
for i := 0; i < dispatched; i++ {
require.NoError(t, e2eutil.Dispatch(jobID, map[string]string{"i": fmt.Sprintf("%v", i)}, ""))
}
testutil.WaitForResult(func() (bool, error) {
children, err := e2eutil.DispatchedJobs(jobID)
if err != nil {
return false, err
}
dead := 0
for _, c := range children {
if c["Status"] != "dead" {
return false, fmt.Errorf("expected periodic job to be dead")
}
dead++
}
if dead != dispatched {
return false, fmt.Errorf("expected %d but found %d children", dispatched, dead)
}
return true, nil
}, func(err error) {
require.NoError(t, err)
})
// Assert there are no pending children
summary, err := e2eutil.ChildrenJobSummary(jobID)
require.NoError(t, err)
require.Len(t, summary, 1)
require.Equal(t, summary[0]["Pending"], "0")
require.Equal(t, summary[0]["Running"], "0")
require.Equal(t, summary[0]["Dead"], fmt.Sprintf("%v", dispatched))
}

View File

@ -4,10 +4,12 @@ job "periodic" {
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
operator = "set_contains_any"
value = "darwin,linux"
}
periodic {
cron = "* * * * *"
prohibit_overlap = true

View File

@ -74,5 +74,6 @@ func (tc *PeriodicTest) TestPeriodicDispatch_Basic(f *framework.F) {
require.NoError(t, err)
require.Len(t, summary, 1)
require.Equal(t, summary[0]["Pending"], "0")
require.Equal(t, summary[0]["Running"], "0")
require.Equal(t, summary[0]["Dead"], "1")
}

View File

@ -1887,13 +1887,15 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
return err
}
// Derive the child job and commit it via Raft
// Derive the child job and commit it via Raft - with initial status
dispatchJob := parameterizedJob.Copy()
dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now())
dispatchJob.ParentID = parameterizedJob.ID
dispatchJob.Name = dispatchJob.ID
dispatchJob.SetSubmitTime()
dispatchJob.Dispatched = true
dispatchJob.Status = ""
dispatchJob.StatusDescription = ""
// Merge in the meta data
for k, v := range args.Meta {

View File

@ -6401,6 +6401,117 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
}
}
// TestJobEndpoint_Dispatch_JobChildrenSummary asserts that the job summary is updated
// appropriately as its dispatched/children jobs status are updated.
func TestJobEndpoint_Dispatch_JobChildrenSummary(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
node := mock.Node()
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node))
parameterizedJob := mock.BatchJob()
parameterizedJob.ParameterizedJob = &structs.ParameterizedJobConfig{}
// Create the register request
regReq := &structs.JobRegisterRequest{
Job: parameterizedJob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: parameterizedJob.Namespace,
},
}
var regResp structs.JobRegisterResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", regReq, &regResp))
jobChildren := func() *structs.JobChildrenSummary {
summary, err := state.JobSummaryByID(nil, parameterizedJob.Namespace, parameterizedJob.ID)
require.NoError(t, err)
return summary.Children
}
require.Equal(t, &structs.JobChildrenSummary{}, jobChildren())
// dispatch a child job
dispatchReq := &structs.JobDispatchRequest{
JobID: parameterizedJob.ID,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: parameterizedJob.Namespace,
},
}
var dispatchResp structs.JobDispatchResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", dispatchReq, &dispatchResp)
require.NoError(t, err)
nextIdx := dispatchResp.Index + 1
require.Equal(t, &structs.JobChildrenSummary{Pending: 1}, jobChildren())
dispatchedJob, err := state.JobByID(nil, parameterizedJob.Namespace, dispatchResp.DispatchedJobID)
require.NoError(t, err)
require.NotNil(t, dispatchedJob)
dispatchedStatus := func() string {
job, err := state.JobByID(nil, dispatchedJob.Namespace, dispatchedJob.ID)
require.NoError(t, err)
require.NotNil(t, job)
return job.Status
}
// Let's start a alloc for the dispatch job and walk through states
// Note that job summary reports 1 running even when alloc is pending!
nextIdx++
alloc := mock.Alloc()
alloc.Job = dispatchedJob
alloc.JobID = dispatchedJob.ID
alloc.TaskGroup = dispatchedJob.TaskGroups[0].Name
alloc.Namespace = dispatchedJob.Namespace
alloc.ClientStatus = structs.AllocClientStatusPending
err = s1.State().UpsertAllocs(structs.MsgTypeTestSetup, nextIdx, []*structs.Allocation{alloc})
require.NoError(t, err)
require.Equal(t, &structs.JobChildrenSummary{Running: 1}, jobChildren())
require.Equal(t, structs.JobStatusRunning, dispatchedStatus())
// mark the creation eval as completed
nextIdx++
eval, err := state.EvalByID(nil, dispatchResp.EvalID)
require.NoError(t, err)
eval = eval.Copy()
eval.Status = structs.EvalStatusComplete
require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, nextIdx, []*structs.Evaluation{eval}))
updateAllocStatus := func(status string) {
nextIdx++
nalloc, err := state.AllocByID(nil, alloc.ID)
require.NoError(t, err)
nalloc = nalloc.Copy()
nalloc.ClientStatus = status
err = s1.State().UpdateAllocsFromClient(structs.MsgTypeTestSetup, nextIdx, []*structs.Allocation{nalloc})
require.NoError(t, err)
}
// job should remain remaining when alloc runs
updateAllocStatus(structs.AllocClientStatusRunning)
require.Equal(t, &structs.JobChildrenSummary{Running: 1}, jobChildren())
require.Equal(t, structs.JobStatusRunning, dispatchedStatus())
// job should be dead after alloc completes
updateAllocStatus(structs.AllocClientStatusComplete)
require.Equal(t, &structs.JobChildrenSummary{Dead: 1}, jobChildren())
require.Equal(t, structs.JobStatusDead, dispatchedStatus())
}
func TestJobEndpoint_Scale(t *testing.T) {
t.Parallel()
require := require.New(t)

View File

@ -444,12 +444,14 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
}()
// Create a copy of the periodic job, give it a derived ID/Name and make it
// non-periodic.
// non-periodic in initial status
derived = periodicJob.Copy()
derived.ParentID = periodicJob.ID
derived.ID = p.derivedJobID(periodicJob, time)
derived.Name = derived.ID
derived.Periodic = nil
derived.Status = ""
derived.StatusDescription = ""
return
}

View File

@ -70,6 +70,20 @@ func (m *MockJobEvalDispatcher) LaunchTimes(p *PeriodicDispatch, namespace, pare
return launches, nil
}
func (m *MockJobEvalDispatcher) dispatchedJobs(parent *structs.Job) []*structs.Job {
m.lock.Lock()
defer m.lock.Unlock()
jobs := []*structs.Job{}
for _, job := range m.Jobs {
if job.ParentID == parent.ID && job.Namespace == parent.Namespace {
jobs = append(jobs, job)
}
}
return jobs
}
type times []time.Time
func (t times) Len() int { return len(t) }
@ -754,3 +768,22 @@ func TestPeriodicDispatch_RunningChildren_ActiveAllocs(t *testing.T) {
t.Fatalf("RunningChildren should return true")
}
}
// TestPeriodicDispatch_JobEmptyStatus asserts that dispatched
// job will always has an empty status
func TestPeriodicDispatch_JobEmptyStatus(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher(t)
job := testPeriodicJob(time.Now().Add(1 * time.Second))
job.Status = structs.JobStatusRunning
err := p.Add(job)
require.NoError(t, err)
time.Sleep(2 * time.Second)
dispatched := m.dispatchedJobs(job)
require.NotEmpty(t, dispatched)
require.Empty(t, dispatched[0].Status)
}