Merge pull request #9160 from hashicorp/f-sysbatch

core: implement system batch scheduler
This commit is contained in:
Mahmood Ali 2021-08-16 09:30:24 -04:00 committed by GitHub
commit c37339a8c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 2763 additions and 609 deletions

View File

@ -164,9 +164,10 @@ const (
// PreemptionConfig specifies whether preemption is enabled based on scheduler type
type PreemptionConfig struct {
SystemSchedulerEnabled bool
BatchSchedulerEnabled bool
ServiceSchedulerEnabled bool
SystemSchedulerEnabled bool
SysBatchSchedulerEnabled bool
BatchSchedulerEnabled bool
ServiceSchedulerEnabled bool
}
// SchedulerGetConfiguration is used to query the current Scheduler configuration.

View File

@ -14,15 +14,19 @@ const (
// jitter is the percent of jitter added to restart delays.
jitter = 0.25
ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableErrror = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableError = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
)
func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker {
// Batch jobs should not restart if they exit successfully
onSuccess := jobType != structs.JobTypeBatch
onSuccess := true
// Batch & SysBatch jobs should not restart if they exit successfully
if jobType == structs.JobTypeBatch || jobType == structs.JobTypeSysBatch {
onSuccess = false
}
// Prestart sidecars should get restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart {
@ -201,7 +205,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
if r.startErr != nil {
// If the error is not recoverable, do not restart.
if !structs.IsRecoverable(r.startErr) {
r.reason = ReasonUnrecoverableErrror
r.reason = ReasonUnrecoverableError
return structs.TaskNotRestarting, 0
}
} else if r.exitRes != nil {

View File

@ -262,9 +262,10 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled},
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled},
}
if err := args.Config.Validate(); err != nil {

View File

@ -282,6 +282,7 @@ func TestOperator_SchedulerGetConfiguration(t *testing.T) {
// Only system jobs can preempt other jobs by default.
require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.False(out.SchedulerConfig.MemoryOversubscriptionEnabled)
@ -319,6 +320,8 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply)
require.Nil(err)
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.True(reply.SchedulerConfig.MemoryOversubscriptionEnabled)
})
@ -330,6 +333,7 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true,
"SysBatchSchedulerEnabled":true,
"BatchSchedulerEnabled":true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
@ -352,7 +356,9 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Fatalf("err: %v", err)
}
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
// Create a CAS request, bad index
{
@ -393,7 +399,9 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Fatalf("err: %v", err)
}
require.False(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
})
}

View File

@ -35,8 +35,9 @@ import (
_ "github.com/hashicorp/nomad/e2e/rescheduling"
_ "github.com/hashicorp/nomad/e2e/scaling"
_ "github.com/hashicorp/nomad/e2e/scalingpolicies"
_ "github.com/hashicorp/nomad/e2e/scheduler_sysbatch"
_ "github.com/hashicorp/nomad/e2e/scheduler_system"
_ "github.com/hashicorp/nomad/e2e/spread"
_ "github.com/hashicorp/nomad/e2e/systemsched"
_ "github.com/hashicorp/nomad/e2e/taskevents"
_ "github.com/hashicorp/nomad/e2e/vaultsecrets"
_ "github.com/hashicorp/nomad/e2e/volumes"

View File

@ -236,6 +236,30 @@ func WaitForAllocStopped(t *testing.T, nomadClient *api.Client, allocID string)
})
}
func WaitForAllocStatus(t *testing.T, nomadClient *api.Client, allocID string, status string) {
testutil.WaitForResultRetries(retries, func() (bool, error) {
time.Sleep(time.Millisecond * 100)
alloc, _, err := nomadClient.Allocations().Info(allocID, nil)
if err != nil {
return false, err
}
switch alloc.ClientStatus {
case status:
return true, nil
default:
return false, fmt.Errorf("expected %s alloc, but was: %s", status, alloc.ClientStatus)
}
}, func(err error) {
t.Fatalf("failed to wait on alloc: %v", err)
})
}
func WaitForAllocsStatus(t *testing.T, nomadClient *api.Client, allocIDs []string, status string) {
for _, allocID := range allocIDs {
WaitForAllocStatus(t, nomadClient, allocID, status)
}
}
func AllocIDsFromAllocationListStubs(allocs []*api.AllocationListStub) []string {
allocIDs := make([]string, 0, len(allocs))
for _, alloc := range allocs {

View File

@ -0,0 +1,30 @@
job "sysbatchjob" {
datacenters = ["dc1"]
type = "sysbatch"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
parameterized {
payload = "forbidden"
meta_required = ["KEY"]
}
group "sysbatch_job_group" {
count = 1
task "sysbatch_task" {
driver = "docker"
config {
image = "busybox:1"
command = "/bin/sh"
args = ["-c", "echo hi; sleep 1"]
}
}
}
}

View File

@ -0,0 +1,25 @@
job "sysbatchjob" {
datacenters = ["dc1"]
type = "sysbatch"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "sysbatch_job_group" {
count = 1
task "sysbatch_task" {
driver = "docker"
config {
image = "busybox:1"
command = "/bin/sh"
args = ["-c", "echo hi; sleep 1"]
}
}
}
}

View File

@ -0,0 +1,25 @@
job "sysbatchjob" {
datacenters = ["dc1"]
type = "sysbatch"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "sysbatch_job_group" {
count = 1
task "sysbatch_task" {
driver = "docker"
config {
image = "busybox:1"
command = "/bin/sh"
args = ["-c", "echo hi; sleep 1000000"]
}
}
}
}

View File

@ -0,0 +1,30 @@
job "sysbatchjob" {
datacenters = ["dc1"]
type = "sysbatch"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
periodic {
cron = "*/15 * * * * *"
prohibit_overlap = true
}
group "sysbatch_job_group" {
count = 1
task "sysbatch_task" {
driver = "docker"
config {
image = "busybox:1"
command = "/bin/sh"
args = ["-c", "echo hi; sleep 1"]
}
}
}
}

View File

@ -0,0 +1,265 @@
package scheduler_sysbatch
import (
"strings"
"time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type SysBatchSchedulerTest struct {
framework.TC
jobIDs []string
}
func init() {
framework.AddSuites(&framework.TestSuite{
Component: "SysBatchScheduler",
CanRunLocal: true,
Cases: []framework.TestCase{
new(SysBatchSchedulerTest),
},
})
}
func (tc *SysBatchSchedulerTest) BeforeAll(f *framework.F) {
// Ensure cluster has leader before running tests
e2eutil.WaitForLeader(f.T(), tc.Nomad())
e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), 4)
}
func (tc *SysBatchSchedulerTest) TestJobRunBasic(f *framework.F) {
t := f.T()
nomadClient := tc.Nomad()
// submit a fast sysbatch job
jobID := "sysbatch_run_basic"
tc.jobIDs = append(tc.jobIDs, jobID)
e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scheduler_sysbatch/input/sysbatch_job_fast.nomad", jobID, "")
// get our allocations for this sysbatch job
jobs := nomadClient.Jobs()
allocs, _, err := jobs.Allocations(jobID, true, nil)
require.NoError(t, err)
// make sure this is job is being run on "all" the linux clients
require.True(t, len(allocs) >= 3)
// wait for every alloc to reach completion
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs)
e2eutil.WaitForAllocsStatus(t, nomadClient, allocIDs, structs.AllocClientStatusComplete)
}
func (tc *SysBatchSchedulerTest) TestJobStopEarly(f *framework.F) {
t := f.T()
nomadClient := tc.Nomad()
// submit a slow sysbatch job
jobID := "sysbatch_stop_early"
tc.jobIDs = append(tc.jobIDs, jobID)
e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scheduler_sysbatch/input/sysbatch_job_slow.nomad", jobID, "")
// get our allocations for this sysbatch job
jobs := nomadClient.Jobs()
allocs, _, err := jobs.Allocations(jobID, true, nil)
require.NoError(t, err)
// make sure this is job is being run on "all" the linux clients
require.True(t, len(allocs) >= 3)
// wait for every alloc to reach running status
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs)
e2eutil.WaitForAllocsStatus(t, nomadClient, allocIDs, structs.AllocClientStatusRunning)
// stop the job before allocs reach completion
_, _, err = jobs.Deregister(jobID, false, nil)
require.NoError(t, err)
}
func (tc *SysBatchSchedulerTest) TestJobReplaceRunning(f *framework.F) {
t := f.T()
nomadClient := tc.Nomad()
// submit a slow sysbatch job
jobID := "sysbatch_replace_running"
tc.jobIDs = append(tc.jobIDs, jobID)
e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scheduler_sysbatch/input/sysbatch_job_slow.nomad", jobID, "")
// get out allocations for this sysbatch job
jobs := nomadClient.Jobs()
allocs, _, err := jobs.Allocations(jobID, true, nil)
require.NoError(t, err)
// make sure this is job is being run on "all" the linux clients
require.True(t, len(allocs) >= 3)
// wait for every alloc to reach running status
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs)
e2eutil.WaitForAllocsStatus(t, nomadClient, allocIDs, structs.AllocClientStatusRunning)
// replace the slow job with the fast job
intermediate := e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scheduler_sysbatch/input/sysbatch_job_fast.nomad", jobID, "")
// get the allocs for the new updated job
var updated []*api.AllocationListStub
for _, alloc := range intermediate {
if alloc.JobVersion == 1 {
updated = append(updated, alloc)
}
}
// should be equal number of old and new allocs
newAllocIDs := e2eutil.AllocIDsFromAllocationListStubs(updated)
// make sure this new job is being run on "all" the linux clients
require.True(t, len(updated) >= 3)
// wait for the allocs of the fast job to complete
e2eutil.WaitForAllocsStatus(t, nomadClient, newAllocIDs, structs.AllocClientStatusComplete)
}
func (tc *SysBatchSchedulerTest) TestJobReplaceDead(f *framework.F) {
t := f.T()
nomadClient := tc.Nomad()
// submit a fast sysbatch job
jobID := "sysbatch_replace_dead"
tc.jobIDs = append(tc.jobIDs, jobID)
e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scheduler_sysbatch/input/sysbatch_job_fast.nomad", jobID, "")
// get the allocations for this sysbatch job
jobs := nomadClient.Jobs()
allocs, _, err := jobs.Allocations(jobID, true, nil)
require.NoError(t, err)
// make sure this is job is being run on "all" the linux clients
require.True(t, len(allocs) >= 3)
// wait for every alloc to reach complete status
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs)
e2eutil.WaitForAllocsStatus(t, nomadClient, allocIDs, structs.AllocClientStatusComplete)
// replace the fast job with the slow job
intermediate := e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scheduler_sysbatch/input/sysbatch_job_slow.nomad", jobID, "")
// get the allocs for the new updated job
var updated []*api.AllocationListStub
for _, alloc := range intermediate {
if alloc.JobVersion == 1 {
updated = append(updated, alloc)
}
}
// should be equal number of old and new allocs
upAllocIDs := e2eutil.AllocIDsFromAllocationListStubs(updated)
// make sure this new job is being run on "all" the linux clients
require.True(t, len(updated) >= 3)
// wait for the allocs of the slow job to be running
e2eutil.WaitForAllocsStatus(t, nomadClient, upAllocIDs, structs.AllocClientStatusRunning)
}
func (tc *SysBatchSchedulerTest) TestJobRunPeriodic(f *framework.F) {
t := f.T()
nomadClient := tc.Nomad()
// submit a fast sysbatch job
jobID := "sysbatch_job_periodic"
tc.jobIDs = append(tc.jobIDs, jobID)
err := e2eutil.Register(jobID, "scheduler_sysbatch/input/sysbatch_periodic.nomad")
require.NoError(t, err)
// force the cron job to run
jobs := nomadClient.Jobs()
_, _, err = jobs.PeriodicForce(jobID, nil)
require.NoError(t, err)
// find the cron job that got launched
jobsList, _, err := jobs.List(nil)
require.NoError(t, err)
cronJobID := ""
for _, job := range jobsList {
if strings.HasPrefix(job.Name, "sysbatch_job_periodic/periodic-") {
cronJobID = job.Name
break
}
}
require.NotEmpty(t, cronJobID)
tc.jobIDs = append(tc.jobIDs, cronJobID)
// wait for allocs of the cron job
var allocs []*api.AllocationListStub
require.True(t, assert.Eventually(t, func() bool {
var err error
allocs, _, err = jobs.Allocations(cronJobID, false, nil)
require.NoError(t, err)
return len(allocs) >= 3
}, 30*time.Second, time.Second))
// wait for every cron job alloc to reach completion
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs)
e2eutil.WaitForAllocsStatus(t, nomadClient, allocIDs, structs.AllocClientStatusComplete)
}
func (tc *SysBatchSchedulerTest) TestJobRunDispatch(f *framework.F) {
t := f.T()
nomadClient := tc.Nomad()
// submit a fast sysbatch dispatch job
jobID := "sysbatch_job_dispatch"
tc.jobIDs = append(tc.jobIDs, jobID)
err := e2eutil.Register(jobID, "scheduler_sysbatch/input/sysbatch_dispatch.nomad")
require.NoError(t, err)
// dispatch the sysbatch job
jobs := nomadClient.Jobs()
result, _, err := jobs.Dispatch(jobID, map[string]string{
"KEY": "value",
}, nil, nil)
require.NoError(t, err)
// grab the new dispatched jobID
dispatchID := result.DispatchedJobID
tc.jobIDs = append(tc.jobIDs, dispatchID)
// wait for allocs of the dispatched job
var allocs []*api.AllocationListStub
require.True(t, assert.Eventually(t, func() bool {
var err error
allocs, _, err = jobs.Allocations(dispatchID, false, nil)
require.NoError(t, err)
return len(allocs) >= 3
}, 30*time.Second, time.Second))
// wait for every dispatch alloc to reach completion
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs)
e2eutil.WaitForAllocsStatus(t, nomadClient, allocIDs, structs.AllocClientStatusComplete)
}
func (tc *SysBatchSchedulerTest) AfterEach(f *framework.F) {
nomadClient := tc.Nomad()
// Mark all nodes eligible
nodesAPI := tc.Nomad().Nodes()
nodes, _, _ := nodesAPI.List(nil)
for _, node := range nodes {
_, _ = nodesAPI.ToggleEligibility(node.ID, true, nil)
}
jobs := nomadClient.Jobs()
// Stop all jobs in test
for _, id := range tc.jobIDs {
_, _, _ = jobs.Deregister(id, true, nil)
}
tc.jobIDs = []string{}
// Garbage collect
_ = nomadClient.System().GarbageCollect()
}

View File

@ -23,9 +23,9 @@ job "system_job" {
driver = "docker"
config {
image = "bash:latest"
image = "busybox:1"
command = "bash"
command = "/bin/sh"
args = ["-c", "sleep 15000"]
}

View File

@ -23,9 +23,9 @@ job "system_job" {
driver = "docker"
config {
image = "bash:latest"
image = "busybox:1"
command = "bash"
command = "/bin/sh"
args = ["-c", "sleep 15000"]
}

View File

@ -1,4 +1,4 @@
package systemsched
package scheduler_system
import (
"github.com/hashicorp/nomad/api"
@ -35,16 +35,14 @@ func (tc *SystemSchedTest) TestJobUpdateOnIneligbleNode(f *framework.F) {
jobID := "system_deployment"
tc.jobIDs = append(tc.jobIDs, jobID)
e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "systemsched/input/system_job0.nomad", jobID, "")
e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scheduler_system/input/system_job0.nomad", jobID, "")
jobs := nomadClient.Jobs()
allocs, _, err := jobs.Allocations(jobID, true, nil)
require.NoError(t, err)
require.True(t, len(allocs) >= 3)
var allocIDs []string
for _, alloc := range allocs {
allocIDs = append(allocIDs, alloc.ID)
}
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs)
// Wait for allocations to get past initial pending state
e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs)
@ -58,13 +56,9 @@ func (tc *SystemSchedTest) TestJobUpdateOnIneligbleNode(f *framework.F) {
// Assert all jobs still running
jobs = nomadClient.Jobs()
allocs, _, err = jobs.Allocations(jobID, true, nil)
allocIDs = nil
for _, alloc := range allocs {
allocIDs = append(allocIDs, alloc.ID)
}
require.NoError(t, err)
allocIDs = e2eutil.AllocIDsFromAllocationListStubs(allocs)
allocForDisabledNode := make(map[string]*api.AllocationListStub)
// Wait for allocs to run and collect allocs on ineligible node
@ -89,19 +83,15 @@ func (tc *SystemSchedTest) TestJobUpdateOnIneligbleNode(f *framework.F) {
require.Len(t, allocForDisabledNode, 1)
// Update job
e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "systemsched/input/system_job1.nomad", jobID, "")
e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scheduler_system/input/system_job1.nomad", jobID, "")
// Get updated allocations
jobs = nomadClient.Jobs()
allocs, _, err = jobs.Allocations(jobID, false, nil)
require.NoError(t, err)
allocIDs = nil
for _, alloc := range allocs {
allocIDs = append(allocIDs, alloc.ID)
}
// Wait for allocs to start
allocIDs = e2eutil.AllocIDsFromAllocationListStubs(allocs)
e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs)
// Get latest alloc status now that they are no longer pending

View File

@ -328,8 +328,8 @@ type Config struct {
AutopilotInterval time.Duration
// DefaultSchedulerConfig configures the initial scheduler config to be persisted in Raft.
// Once the cluster is bootstrapped, and Raft persists the config (from here or through API),
// This value is ignored.
// Once the cluster is bootstrapped, and Raft persists the config (from here or through API)
// and this value is ignored.
DefaultSchedulerConfig structs.SchedulerConfiguration `hcl:"default_scheduler_config"`
// PluginLoader is used to load plugins.
@ -448,9 +448,10 @@ func DefaultConfig() *Config {
DefaultSchedulerConfig: structs.SchedulerConfiguration{
SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: true,
BatchSchedulerEnabled: false,
ServiceSchedulerEnabled: false,
SystemSchedulerEnabled: true,
SysBatchSchedulerEnabled: false,
BatchSchedulerEnabled: false,
ServiceSchedulerEnabled: false,
},
},
DeploymentQueryRateLimit: deploymentwatcher.LimitStateQueriesPerSecond,

View File

@ -140,9 +140,7 @@ OUTER:
gc, allocs, err := c.gcEval(eval, oldThreshold, true)
if err != nil {
continue OUTER
}
if gc {
} else if gc {
jobEval = append(jobEval, eval.ID)
jobAlloc = append(jobAlloc, allocs...)
} else {
@ -164,6 +162,7 @@ OUTER:
if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 {
return nil
}
c.logger.Debug("job GC found eligible objects",
"jobs", len(gcJob), "evals", len(gcEval), "allocs", len(gcAlloc))

View File

@ -189,6 +189,46 @@ func HCL() string {
`
}
func SystemBatchJob() *structs.Job {
job := &structs.Job{
Region: "global",
ID: fmt.Sprintf("mock-sysbatch-%s", uuid.Short()),
Name: "my-sysbatch",
Namespace: structs.DefaultNamespace,
Type: structs.JobTypeSysBatch,
Priority: 10,
Datacenters: []string{"dc1"},
Constraints: []*structs.Constraint{
{
LTarget: "${attr.kernel.name}",
RTarget: "linux",
Operand: "=",
},
},
TaskGroups: []*structs.TaskGroup{{
Count: 1,
Name: "pinger",
Tasks: []*structs.Task{{
Name: "ping-example",
Driver: "exec",
Config: map[string]interface{}{
"command": "/usr/bin/ping",
"args": []string{"-c", "5", "example.com"},
},
LogConfig: structs.DefaultLogConfig(),
}},
}},
Status: structs.JobStatusPending,
Version: 0,
CreateIndex: 42,
ModifyIndex: 99,
JobModifyIndex: 99,
}
job.Canonicalize()
return job
}
func Job() *structs.Job {
job := &structs.Job{
Region: "global",
@ -1209,7 +1249,7 @@ func BlockedEval() *structs.Evaluation {
}
func JobSummary(jobID string) *structs.JobSummary {
js := &structs.JobSummary{
return &structs.JobSummary{
JobID: jobID,
Namespace: structs.DefaultNamespace,
Summary: map[string]structs.TaskGroupSummary{
@ -1219,7 +1259,19 @@ func JobSummary(jobID string) *structs.JobSummary {
},
},
}
return js
}
func JobSysBatchSummary(jobID string) *structs.JobSummary {
return &structs.JobSummary{
JobID: jobID,
Namespace: structs.DefaultNamespace,
Summary: map[string]structs.TaskGroupSummary{
"pinger": {
Queued: 0,
Starting: 0,
},
},
}
}
func Alloc() *structs.Allocation {
@ -1504,6 +1556,34 @@ func BatchAlloc() *structs.Allocation {
return alloc
}
func SysBatchAlloc() *structs.Allocation {
job := SystemBatchJob()
return &structs.Allocation{
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: "12345678-abcd-efab-cdef-123456789abc",
Namespace: structs.DefaultNamespace,
TaskGroup: "pinger",
AllocatedResources: &structs.AllocatedResources{
Tasks: map[string]*structs.AllocatedTaskResources{
"ping-example": {
Cpu: structs.AllocatedCpuResources{CpuShares: 500},
Memory: structs.AllocatedMemoryResources{MemoryMB: 256},
Networks: []*structs.NetworkResource{{
Device: "eth0",
IP: "192.168.0.100",
}},
},
},
Shared: structs.AllocatedSharedResources{DiskMB: 150},
},
Job: job,
JobID: job.ID,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
}
}
func SystemAlloc() *structs.Allocation {
alloc := &structs.Allocation{
ID: uuid.Generate(),

View File

@ -271,13 +271,16 @@ func jobIsGCable(obj interface{}) (bool, error) {
return true, nil
}
// Otherwise, only batch jobs are eligible because they complete on their
// own without a user stopping them.
if j.Type != structs.JobTypeBatch {
switch j.Type {
// Otherwise, batch and sysbatch jobs are eligible because they complete on
// their own without a user stopping them.
case structs.JobTypeBatch, structs.JobTypeSysBatch:
return true, nil
default:
// other job types may not be GC until stopped
return false, nil
}
return true, nil
}
// jobIsPeriodic satisfies the ConditionalIndexFunc interface and creates an index

View File

@ -2036,7 +2036,7 @@ func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (m
return iter, nil
}
// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage
// JobsByGC returns an iterator over all jobs eligible or ineligible for garbage
// collection.
func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
@ -4605,12 +4605,13 @@ func (s *StateStore) setJobSummary(txn *txn, updated *structs.Job, index uint64,
func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) (string, error) {
// System, Periodic and Parameterized jobs are running until explicitly
// stopped
if job.Type == structs.JobTypeSystem || job.IsParameterized() || job.IsPeriodic() {
// stopped.
if job.Type == structs.JobTypeSystem ||
job.IsParameterized() ||
job.IsPeriodic() {
if job.Stop {
return structs.JobStatusDead, nil
}
return structs.JobStatusRunning, nil
}

View File

@ -65,10 +65,11 @@ func RemoveAllocs(alloc []*Allocation, remove []*Allocation) []*Allocation {
}
// FilterTerminalAllocs filters out all allocations in a terminal state and
// returns the latest terminal allocations
// returns the latest terminal allocations.
func FilterTerminalAllocs(allocs []*Allocation) ([]*Allocation, map[string]*Allocation) {
terminalAllocsByName := make(map[string]*Allocation)
n := len(allocs)
for i := 0; i < n; i++ {
if allocs[i].TerminalStatus() {
@ -86,9 +87,59 @@ func FilterTerminalAllocs(allocs []*Allocation) ([]*Allocation, map[string]*Allo
n--
}
}
return allocs[:n], terminalAllocsByName
}
// SplitTerminalAllocs splits allocs into non-terminal and terminal allocs, with
// the terminal allocs indexed by node->alloc.name.
func SplitTerminalAllocs(allocs []*Allocation) ([]*Allocation, TerminalByNodeByName) {
var alive []*Allocation
var terminal = make(TerminalByNodeByName)
for _, alloc := range allocs {
if alloc.TerminalStatus() {
terminal.Set(alloc)
} else {
alive = append(alive, alloc)
}
}
return alive, terminal
}
// TerminalByNodeByName is a map of NodeID->Allocation.Name->Allocation used by
// the sysbatch scheduler for locating the most up-to-date terminal allocations.
type TerminalByNodeByName map[string]map[string]*Allocation
func (a TerminalByNodeByName) Set(allocation *Allocation) {
node := allocation.NodeID
name := allocation.Name
if _, exists := a[node]; !exists {
a[node] = make(map[string]*Allocation)
}
if previous, exists := a[node][name]; !exists {
a[node][name] = allocation
} else if previous.CreateIndex < allocation.CreateIndex {
// keep the newest version of the terminal alloc for the coordinate
a[node][name] = allocation
}
}
func (a TerminalByNodeByName) Get(nodeID, name string) (*Allocation, bool) {
if _, exists := a[nodeID]; !exists {
return nil, false
}
if _, exists := a[nodeID][name]; !exists {
return nil, false
}
return a[nodeID][name], true
}
// AllocsFit checks if a given set of allocations will fit on a node.
// The netIdx can optionally be provided if its already been computed.
// If the netIdx is provided, it is assumed that the client has already

View File

@ -207,6 +207,9 @@ type PreemptionConfig struct {
// SystemSchedulerEnabled specifies if preemption is enabled for system jobs
SystemSchedulerEnabled bool `hcl:"system_scheduler_enabled"`
// SysBatchSchedulerEnabled specifies if preemption is enabled for sysbatch jobs
SysBatchSchedulerEnabled bool `hcl:"sysbatch_scheduler_enabled"`
// BatchSchedulerEnabled specifies if preemption is enabled for batch jobs
BatchSchedulerEnabled bool `hcl:"batch_scheduler_enabled"`

View File

@ -3903,10 +3903,11 @@ func (c *ComparableResources) NetIndex(n *NetworkResource) int {
const (
// JobTypeNomad is reserved for internal system tasks and is
// always handled by the CoreScheduler.
JobTypeCore = "_core"
JobTypeService = "service"
JobTypeBatch = "batch"
JobTypeSystem = "system"
JobTypeCore = "_core"
JobTypeService = "service"
JobTypeBatch = "batch"
JobTypeSystem = "system"
JobTypeSysBatch = "sysbatch"
)
const (
@ -4172,7 +4173,7 @@ func (j *Job) Validate() error {
mErr.Errors = append(mErr.Errors, errors.New("Job must be in a namespace"))
}
switch j.Type {
case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem:
case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem, JobTypeSysBatch:
case "":
mErr.Errors = append(mErr.Errors, errors.New("Missing job type"))
default:
@ -4264,11 +4265,12 @@ func (j *Job) Validate() error {
}
}
// Validate periodic is only used with batch jobs.
// Validate periodic is only used with batch or sysbatch jobs.
if j.IsPeriodic() && j.Periodic.Enabled {
if j.Type != JobTypeBatch {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch))
if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch {
mErr.Errors = append(mErr.Errors, fmt.Errorf(
"Periodic can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch,
))
}
if err := j.Periodic.Validate(); err != nil {
@ -4277,9 +4279,10 @@ func (j *Job) Validate() error {
}
if j.IsParameterized() {
if j.Type != JobTypeBatch {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Parameterized job can only be used with %q scheduler", JobTypeBatch))
if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch {
mErr.Errors = append(mErr.Errors, fmt.Errorf(
"Parameterized job can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch,
))
}
if err := j.ParameterizedJob.Validate(); err != nil {

View File

@ -36,7 +36,7 @@ const (
// allocInPlace is the status used when speculating on an in-place update
allocInPlace = "alloc updating in-place"
// allocNodeTainted is the status used when stopping an alloc because it's
// allocNodeTainted is the status used when stopping an alloc because its
// node is tainted.
allocNodeTainted = "alloc not needed as node is tainted"

View File

@ -26,7 +26,7 @@ type RankedNode struct {
TaskLifecycles map[string]*structs.TaskLifecycleConfig
AllocResources *structs.AllocatedSharedResources
// Allocs is used to cache the proposed allocations on the
// Proposed is used to cache the proposed allocations on the
// node. This can be shared between iterators that require it.
Proposed []*structs.Allocation
@ -62,7 +62,7 @@ func (r *RankedNode) SetTaskResources(task *structs.Task,
r.TaskLifecycles[task.Name] = task.Lifecycle
}
// RankFeasibleIterator is used to iteratively yield nodes along
// RankIterator is used to iteratively yield nodes along
// with ranking metadata. The iterators may manage some state for
// performance optimizations.
type RankIterator interface {

View File

@ -21,9 +21,10 @@ const (
// BuiltinSchedulers contains the built in registered schedulers
// which are available
var BuiltinSchedulers = map[string]Factory{
"service": NewServiceScheduler,
"batch": NewBatchScheduler,
"system": NewSystemScheduler,
"service": NewServiceScheduler,
"batch": NewBatchScheduler,
"system": NewSystemScheduler,
"sysbatch": NewSysBatchScheduler,
}
// NewScheduler is used to instantiate and return a new scheduler

File diff suppressed because it is too large Load Diff

View File

@ -14,15 +14,21 @@ const (
// we will attempt to schedule if we continue to hit conflicts for system
// jobs.
maxSystemScheduleAttempts = 5
// maxSysBatchScheduleAttempts is used to limit the number of times we will
// attempt to schedule if we continue to hit conflicts for sysbatch jobs.
maxSysBatchScheduleAttempts = 2
)
// SystemScheduler is used for 'system' jobs. This scheduler is
// designed for services that should be run on every client.
// One for each job, containing an allocation for each node
// SystemScheduler is used for 'system' and 'sysbatch' jobs. This scheduler is
// designed for jobs that should be run on every client. The 'system' mode
// will ensure those jobs continuously run regardless of successful task exits,
// whereas 'sysbatch' considers the task complete on success.
type SystemScheduler struct {
logger log.Logger
state State
planner Planner
logger log.Logger
state State
planner Planner
sysbatch bool
eval *structs.Evaluation
job *structs.Job
@ -30,8 +36,9 @@ type SystemScheduler struct {
planResult *structs.PlanResult
ctx *EvalContext
stack *SystemStack
nodes []*structs.Node
nodesByDC map[string]int
nodes []*structs.Node
nodesByDC map[string]int
limitReached bool
nextEval *structs.Evaluation
@ -44,14 +51,25 @@ type SystemScheduler struct {
// scheduler.
func NewSystemScheduler(logger log.Logger, state State, planner Planner) Scheduler {
return &SystemScheduler{
logger: logger.Named("system_sched"),
state: state,
planner: planner,
logger: logger.Named("system_sched"),
state: state,
planner: planner,
sysbatch: false,
}
}
func NewSysBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler {
return &SystemScheduler{
logger: logger.Named("sysbatch_sched"),
state: state,
planner: planner,
sysbatch: true,
}
}
// Process is used to handle a single evaluation.
func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
// Store the evaluation
s.eval = eval
@ -59,21 +77,20 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
s.logger = s.logger.With("eval_id", eval.ID, "job_id", eval.JobID, "namespace", eval.Namespace)
// Verify the evaluation trigger reason is understood
switch eval.TriggeredBy {
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerFailedFollowUp,
structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain, structs.EvalTriggerAllocStop,
structs.EvalTriggerQueuedAllocs, structs.EvalTriggerScaling:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
if !s.canHandle(eval.TriggeredBy) {
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy)
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, structs.EvalStatusFailed, desc,
s.queuedAllocs, "")
}
limit := maxSystemScheduleAttempts
if s.sysbatch {
limit = maxSysBatchScheduleAttempts
}
// Retry up to the maxSystemScheduleAttempts and reset if progress is made.
progress := func() bool { return progressMade(s.planResult) }
if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil {
if err := retryMax(limit, s.process, progress); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, statusErr.EvalStatus, err.Error(),
s.queuedAllocs, "")
@ -94,9 +111,9 @@ func (s *SystemScheduler) process() (bool, error) {
ws := memdb.NewWatchSet()
s.job, err = s.state.JobByID(ws, s.eval.Namespace, s.eval.JobID)
if err != nil {
return false, fmt.Errorf("failed to get job '%s': %v",
s.eval.JobID, err)
return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err)
}
numTaskGroups := 0
if !s.job.Stopped() {
numTaskGroups = len(s.job.TaskGroups)
@ -121,7 +138,7 @@ func (s *SystemScheduler) process() (bool, error) {
s.ctx = NewEvalContext(s.state, s.plan, s.logger)
// Construct the placement stack
s.stack = NewSystemStack(s.ctx)
s.stack = NewSystemStack(s.sysbatch, s.ctx)
if !s.job.Stopped() {
s.stack.SetJob(s.job)
}
@ -185,26 +202,24 @@ func (s *SystemScheduler) computeJobAllocs() error {
ws := memdb.NewWatchSet()
allocs, err := s.state.AllocsByJob(ws, s.eval.Namespace, s.eval.JobID, true)
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v",
s.eval.JobID, err)
return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err)
}
// Determine the tainted nodes containing job allocs
tainted, err := taintedNodes(s.state, allocs)
if err != nil {
return fmt.Errorf("failed to get tainted nodes for job '%s': %v",
s.eval.JobID, err)
return fmt.Errorf("failed to get tainted nodes for job '%s': %v", s.eval.JobID, err)
}
// Update the allocations which are in pending/running state on tainted
// nodes to lost
// nodes to lost.
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)
// Filter out the allocations in a terminal state
allocs, terminalAllocs := structs.FilterTerminalAllocs(allocs)
// Split out terminal allocations
live, term := structs.SplitTerminalAllocs(allocs)
// Diff the required and existing allocations
diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs, terminalAllocs)
diff := diffSystemAllocs(s.job, s.nodes, tainted, live, term)
s.logger.Debug("reconciled current state with desired state",
"place", len(diff.place), "update", len(diff.update),
"migrate", len(diff.migrate), "stop", len(diff.stop),
@ -427,3 +442,27 @@ func (s *SystemScheduler) addBlocked(node *structs.Node) error {
return s.planner.CreateEval(blocked)
}
func (s *SystemScheduler) canHandle(trigger string) bool {
switch trigger {
case structs.EvalTriggerJobRegister:
case structs.EvalTriggerNodeUpdate:
case structs.EvalTriggerFailedFollowUp:
case structs.EvalTriggerJobDeregister:
case structs.EvalTriggerRollingUpdate:
case structs.EvalTriggerPreemption:
case structs.EvalTriggerDeploymentWatcher:
case structs.EvalTriggerNodeDrain:
case structs.EvalTriggerAllocStop:
case structs.EvalTriggerQueuedAllocs:
case structs.EvalTriggerScaling:
default:
switch s.sysbatch {
case true:
return trigger == structs.EvalTriggerPeriodicJob
case false:
return false
}
}
return true
}

View File

@ -199,8 +199,12 @@ type SystemStack struct {
scoreNorm *ScoreNormalizationIterator
}
// NewSystemStack constructs a stack used for selecting system job placements.
func NewSystemStack(ctx Context) *SystemStack {
// NewSystemStack constructs a stack used for selecting system and sysbatch
// job placements.
//
// sysbatch is used to determine which scheduler config option is used to
// control the use of preemption.
func NewSystemStack(sysbatch bool, ctx Context) *SystemStack {
// Create a new stack
s := &SystemStack{ctx: ctx}
@ -234,10 +238,13 @@ func NewSystemStack(ctx Context) *SystemStack {
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint,
tgs := []FeasibilityChecker{
s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices,
s.taskGroupNetwork}
s.taskGroupNetwork,
}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs, avail)
@ -260,9 +267,14 @@ func NewSystemStack(ctx Context) *SystemStack {
_, schedConfig, _ := s.ctx.State().SchedulerConfig()
enablePreemption := true
if schedConfig != nil {
enablePreemption = schedConfig.PreemptionConfig.SystemSchedulerEnabled
if sysbatch {
enablePreemption = schedConfig.PreemptionConfig.SysBatchSchedulerEnabled
} else {
enablePreemption = schedConfig.PreemptionConfig.SystemSchedulerEnabled
}
}
// Create binpack iterator
s.binPack = NewBinPackIterator(ctx, rankSource, enablePreemption, 0, schedConfig)
// Apply score normalization
@ -359,11 +371,13 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers,
tgs := []FeasibilityChecker{
s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices,
s.taskGroupNetwork}
s.taskGroupNetwork,
}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs, avail)

View File

@ -392,7 +392,7 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) {
func TestSystemStack_SetNodes(t *testing.T) {
_, ctx := testContext(t)
stack := NewSystemStack(ctx)
stack := NewSystemStack(false, ctx)
nodes := []*structs.Node{
mock.Node(),
@ -414,7 +414,7 @@ func TestSystemStack_SetNodes(t *testing.T) {
func TestSystemStack_SetJob(t *testing.T) {
_, ctx := testContext(t)
stack := NewSystemStack(ctx)
stack := NewSystemStack(false, ctx)
job := mock.Job()
stack.SetJob(job)
@ -430,7 +430,7 @@ func TestSystemStack_SetJob(t *testing.T) {
func TestSystemStack_Select_Size(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{mock.Node()}
stack := NewSystemStack(ctx)
stack := NewSystemStack(false, ctx)
stack.SetNodes(nodes)
job := mock.Job()
@ -458,7 +458,7 @@ func TestSystemStack_Select_MetricsReset(t *testing.T) {
mock.Node(),
mock.Node(),
}
stack := NewSystemStack(ctx)
stack := NewSystemStack(false, ctx)
stack.SetNodes(nodes)
job := mock.Job()
@ -494,7 +494,7 @@ func TestSystemStack_Select_DriverFilter(t *testing.T) {
zero := nodes[0]
zero.Attributes["driver.foo"] = "1"
stack := NewSystemStack(ctx)
stack := NewSystemStack(false, ctx)
stack.SetNodes(nodes)
job := mock.Job()
@ -516,7 +516,7 @@ func TestSystemStack_Select_DriverFilter(t *testing.T) {
t.Fatalf("ComputedClass() failed: %v", err)
}
stack = NewSystemStack(ctx)
stack = NewSystemStack(false, ctx)
stack.SetNodes(nodes)
stack.SetJob(job)
node = stack.Select(job.TaskGroups[0], selectOptions)
@ -537,7 +537,7 @@ func TestSystemStack_Select_ConstraintFilter(t *testing.T) {
t.Fatalf("ComputedClass() failed: %v", err)
}
stack := NewSystemStack(ctx)
stack := NewSystemStack(false, ctx)
stack.SetNodes(nodes)
job := mock.Job()
@ -580,7 +580,7 @@ func TestSystemStack_Select_BinPack_Overflow(t *testing.T) {
}
one := nodes[1]
stack := NewSystemStack(ctx)
stack := NewSystemStack(false, ctx)
stack.SetNodes(nodes)
job := mock.Job()

View File

@ -61,21 +61,19 @@ func (d *diffResult) Append(other *diffResult) {
// need to be migrated (node is draining), the allocs that need to be evicted
// (no longer required), those that should be ignored and those that are lost
// that need to be replaced (running on a lost node).
//
// job is the job whose allocs is going to be diff-ed.
// taintedNodes is an index of the nodes which are either down or in drain mode
// by name.
// required is a set of allocations that must exist.
// allocs is a list of non terminal allocations.
// terminalAllocs is an index of the latest terminal allocations by name.
func diffSystemAllocsForNode(job *structs.Job, nodeID string,
eligibleNodes, taintedNodes map[string]*structs.Node,
required map[string]*structs.TaskGroup, allocs []*structs.Allocation,
terminalAllocs map[string]*structs.Allocation) *diffResult {
result := &diffResult{}
func diffSystemAllocsForNode(
job *structs.Job, // job whose allocs are going to be diff-ed
nodeID string,
eligibleNodes map[string]*structs.Node,
taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name)
required map[string]*structs.TaskGroup, // set of allocations that must exist
allocs []*structs.Allocation, // non-terminal allocations that exist
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, name)
) *diffResult {
result := new(diffResult)
// Scan the existing updates
existing := make(map[string]struct{})
existing := make(map[string]struct{}) // set of alloc names
for _, exist := range allocs {
// Index the existing node
name := exist.Name
@ -103,6 +101,17 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string,
})
continue
}
// If we are a sysbatch job and terminal, ignore (or stop?) the alloc
if job.Type == structs.JobTypeSysBatch && exist.TerminalStatus() {
result.ignore = append(result.ignore, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
continue
}
// If we are on a tainted node, we must migrate if we are a service or
// if the batch allocation did not finish
if node, ok := taintedNodes[exist.NodeID]; ok {
@ -155,14 +164,38 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string,
// Scan the required groups
for name, tg := range required {
// Check for an existing allocation
_, ok := existing[name]
// Require a placement if no existing allocation. If there
// is an existing allocation, we would have checked for a potential
// update or ignore above. Ignore placements for tainted or
// ineligible nodes
if !ok {
// Check for an existing allocation
if _, ok := existing[name]; !ok {
// Check for a terminal sysbatch allocation, which should be not placed
// again unless the job has been updated.
if job.Type == structs.JobTypeSysBatch {
if alloc, termExists := terminal.Get(nodeID, name); termExists {
// the alloc is terminal, but now the job has been updated
if job.JobModifyIndex != alloc.Job.JobModifyIndex {
result.update = append(result.update, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: alloc,
})
} else {
// alloc is terminal and job unchanged, leave it alone
result.ignore = append(result.ignore, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: alloc,
})
}
continue
}
}
// Require a placement if no existing allocation. If there
// is an existing allocation, we would have checked for a potential
// update or ignore above. Ignore placements for tainted or
// ineligible nodes
// Tainted and ineligible nodes for a non existing alloc
// should be filtered out and not count towards ignore or place
if _, tainted := taintedNodes[nodeID]; tainted {
@ -172,10 +205,11 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string,
continue
}
termOnNode, _ := terminal.Get(nodeID, name)
allocTuple := allocTuple{
Name: name,
TaskGroup: tg,
Alloc: terminalAllocs[name],
Alloc: termOnNode,
}
// If the new allocation isn't annotated with a previous allocation
@ -184,6 +218,7 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string,
if allocTuple.Alloc == nil || allocTuple.Alloc.NodeID != nodeID {
allocTuple.Alloc = &structs.Allocation{NodeID: nodeID}
}
result.place = append(result.place, allocTuple)
}
}
@ -192,15 +227,13 @@ func diffSystemAllocsForNode(job *structs.Job, nodeID string,
// diffSystemAllocs is like diffSystemAllocsForNode however, the allocations in the
// diffResult contain the specific nodeID they should be allocated on.
//
// job is the job whose allocs is going to be diff-ed.
// nodes is a list of nodes in ready state.
// taintedNodes is an index of the nodes which are either down or in drain mode
// by name.
// allocs is a list of non terminal allocations.
// terminalAllocs is an index of the latest terminal allocations by name.
func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]*structs.Node,
allocs []*structs.Allocation, terminalAllocs map[string]*structs.Allocation) *diffResult {
func diffSystemAllocs(
job *structs.Job, // jobs whose allocations are going to be diff-ed
nodes []*structs.Node, // list of nodes in the ready state
taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name)
allocs []*structs.Allocation, // non-terminal allocations
terminal structs.TerminalByNodeByName, // latest terminal allocations (by name)
) *diffResult {
// Build a mapping of nodes to all their allocs.
nodeAllocs := make(map[string][]*structs.Allocation, len(allocs))
@ -220,9 +253,9 @@ func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[
// Create the required task groups.
required := materializeTaskGroups(job)
result := &diffResult{}
result := new(diffResult)
for nodeID, allocs := range nodeAllocs {
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminalAllocs)
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminal)
result.Append(diff)
}

View File

@ -27,6 +27,76 @@ func TestMaterializeTaskGroups(t *testing.T) {
}
}
func newNode(name string) *structs.Node {
n := mock.Node()
n.Name = name
return n
}
func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) {
// For a sysbatch job, the scheduler should not re-place an allocation
// that has become terminal, unless the job has been updated.
job := mock.SystemBatchJob()
required := materializeTaskGroups(job)
eligible := map[string]*structs.Node{
"node1": newNode("node1"),
}
var live []*structs.Allocation // empty
tainted := map[string]*structs.Node(nil)
t.Run("current job", func(t *testing.T) {
terminal := structs.TerminalByNodeByName{
"node1": map[string]*structs.Allocation{
"my-sysbatch.pinger[0]": &structs.Allocation{
ID: uuid.Generate(),
NodeID: "node1",
Name: "my-sysbatch.pinger[0]",
Job: job,
ClientStatus: structs.AllocClientStatusComplete,
},
},
}
diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal)
require.Empty(t, diff.place)
require.Empty(t, diff.update)
require.Empty(t, diff.stop)
require.Empty(t, diff.migrate)
require.Empty(t, diff.lost)
require.True(t, len(diff.ignore) == 1 && diff.ignore[0].Alloc == terminal["node1"]["my-sysbatch.pinger[0]"])
})
t.Run("outdated job", func(t *testing.T) {
previousJob := job.Copy()
previousJob.JobModifyIndex -= 1
terminal := structs.TerminalByNodeByName{
"node1": map[string]*structs.Allocation{
"my-sysbatch.pinger[0]": &structs.Allocation{
ID: uuid.Generate(),
NodeID: "node1",
Name: "my-sysbatch.pinger[0]",
Job: previousJob,
},
},
}
expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"]
expAlloc.NodeID = "node1"
diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal)
require.Empty(t, diff.place)
require.Equal(t, 1, len(diff.update))
require.Empty(t, diff.stop)
require.Empty(t, diff.migrate)
require.Empty(t, diff.lost)
require.Empty(t, diff.ignore)
})
}
func TestDiffSystemAllocsForNode(t *testing.T) {
job := mock.Job()
required := materializeTaskGroups(job)
@ -98,28 +168,30 @@ func TestDiffSystemAllocsForNode(t *testing.T) {
}
// Have three terminal allocs
terminalAllocs := map[string]*structs.Allocation{
"my-job.web[4]": {
ID: uuid.Generate(),
NodeID: "zip",
Name: "my-job.web[4]",
Job: job,
},
"my-job.web[5]": {
ID: uuid.Generate(),
NodeID: "zip",
Name: "my-job.web[5]",
Job: job,
},
"my-job.web[6]": {
ID: uuid.Generate(),
NodeID: "zip",
Name: "my-job.web[6]",
Job: job,
terminal := structs.TerminalByNodeByName{
"zip": map[string]*structs.Allocation{
"my-job.web[4]": {
ID: uuid.Generate(),
NodeID: "zip",
Name: "my-job.web[4]",
Job: job,
},
"my-job.web[5]": {
ID: uuid.Generate(),
NodeID: "zip",
Name: "my-job.web[5]",
Job: job,
},
"my-job.web[6]": {
ID: uuid.Generate(),
NodeID: "zip",
Name: "my-job.web[6]",
Job: job,
},
},
}
diff := diffSystemAllocsForNode(job, "zip", eligible, tainted, required, allocs, terminalAllocs)
diff := diffSystemAllocsForNode(job, "zip", eligible, tainted, required, allocs, terminal)
place := diff.place
update := diff.update
migrate := diff.migrate
@ -146,12 +218,14 @@ func TestDiffSystemAllocsForNode(t *testing.T) {
require.Equal(t, 6, len(place))
// Ensure that the allocations which are replacements of terminal allocs are
// annotated
for name, alloc := range terminalAllocs {
for _, allocTuple := range diff.place {
if name == allocTuple.Name {
require.True(t, reflect.DeepEqual(alloc, allocTuple.Alloc),
"expected: %#v, actual: %#v", alloc, allocTuple.Alloc)
// annotated.
for _, m := range terminal {
for _, alloc := range m {
for _, tuple := range diff.place {
if alloc.Name == tuple.Name {
require.True(t, reflect.DeepEqual(alloc, tuple.Alloc),
"expected: %#v, actual: %#v", alloc, tuple.Alloc)
}
}
}
}
@ -198,9 +272,9 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) {
}
// No terminal allocs
terminalAllocs := map[string]*structs.Allocation{}
terminal := make(structs.TerminalByNodeByName)
diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, tainted, required, allocs, terminalAllocs)
diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, tainted, required, allocs, terminal)
place := diff.place
update := diff.update
migrate := diff.migrate
@ -274,17 +348,19 @@ func TestDiffSystemAllocs(t *testing.T) {
},
}
// Have three terminal allocs
terminalAllocs := map[string]*structs.Allocation{
"my-job.web[0]": {
ID: uuid.Generate(),
NodeID: "pipe",
Name: "my-job.web[0]",
Job: job,
// Have three (?) terminal allocs
terminal := structs.TerminalByNodeByName{
"pipe": map[string]*structs.Allocation{
"my-job.web[0]": {
ID: uuid.Generate(),
NodeID: "pipe",
Name: "my-job.web[0]",
Job: job,
},
},
}
diff := diffSystemAllocs(job, nodes, tainted, allocs, terminalAllocs)
diff := diffSystemAllocs(job, nodes, tainted, allocs, terminal)
place := diff.place
update := diff.update
migrate := diff.migrate
@ -311,12 +387,14 @@ func TestDiffSystemAllocs(t *testing.T) {
require.Equal(t, 2, len(place))
// Ensure that the allocations which are replacements of terminal allocs are
// annotated
for _, alloc := range terminalAllocs {
for _, allocTuple := range diff.place {
if alloc.NodeID == allocTuple.Alloc.NodeID {
require.True(t, reflect.DeepEqual(alloc, allocTuple.Alloc),
"expected: %#v, actual: %#v", alloc, allocTuple.Alloc)
// annotated.
for _, m := range terminal {
for _, alloc := range m {
for _, tuple := range diff.place {
if alloc.NodeID == tuple.Alloc.NodeID {
require.True(t, reflect.DeepEqual(alloc, tuple.Alloc),
"expected: %#v, actual: %#v", alloc, tuple.Alloc)
}
}
}
}

View File

@ -303,9 +303,10 @@ server {
memory_oversubscription_enabled = true
preemption_config {
batch_scheduler_enabled = true
system_scheduler_enabled = true
service_scheduler_enabled = true
batch_scheduler_enabled = true
system_scheduler_enabled = true
service_scheduler_enabled = true
sysbatch_scheduler_enabled = true
}
}
}

View File

@ -113,7 +113,7 @@ job "docs" {
node if any of its allocation statuses become "failed".
- `type` `(string: "service")` - Specifies the [Nomad scheduler][scheduler] to
use. Nomad provides the `service`, `system` and `batch` schedulers.
use. Nomad provides the `service`, `system`, `batch`, and `sysbatch` schedulers.
- `update` <code>([Update][update]: nil)</code> - Specifies the task's update
strategy. When omitted, a default update strategy is applied.

View File

@ -46,8 +46,8 @@ job "docs" {
}
```
~> The reschedule stanza does not apply to `system` jobs because they run on
every node.
~> The reschedule stanza does not apply to `system` or `sysbatch` jobs because
they run on every node.
## `reschedule` Parameters

View File

@ -6,9 +6,9 @@ description: Learn about Nomad's various schedulers.
# Schedulers
Nomad has three scheduler types that can be used when creating your job:
`service`, `batch` and `system`. Here we will describe the differences between
each of these schedulers.
Nomad has four scheduler types that can be used when creating your job:
`service`, `batch`, `system` and `sysbatch`. Here we will describe the differences
between each of these schedulers.
## Service
@ -60,8 +60,30 @@ Systems jobs are intended to run until explicitly stopped either by an operator
or [preemption]. If a system task exits it is considered a failure and handled
according to the job's [restart] stanza; system jobs do not have rescheduling.
## System Batch
The `sysbatch` scheduler is used to register jobs that should be run to completion
on all clients that meet the job's constraints. The `sysbatch` scheduler will
schedule jobs similarly to the `system` scheduler, but like a `batch` job once a
task exists successfully it is not restarted on that client.
This scheduler type is useful for issuing "one off" commands to be run on every
node in the cluster. Sysbatch jobs can also be created as [periodic] and [parameterized]
jobs. Since these tasks are managed by Nomad, they can take advantage of job
updating, service discovery, monitoring, and more.
The `sysbatch` scheduler will preempt lower priority tasks running on a node if there
is not enough capacity to place the job. See preemption details on how tasks that
get preempted are chosen.
Sysbatch jobs are intended to run until successful completion, explicitly stopped
by an operator, or evicted through [preemption]. Sysbatch tasks that exit with an
error are handled according to the job's [restart] stanza.
[borg]: https://research.google.com/pubs/pub43438.html
[sparrow]: https://cs.stanford.edu/~matei/papers/2013/sosp_sparrow.pdf
[parameterized]: /docs/job-specification/parameterized
[periodic]: /docs/job-specification/periodic
[preemption]: /docs/internals/scheduling/preemption
[restart]: /docs/job-specification/restart
[reschedule]: /docs/job-specification/reschedule
[sparrow]: https://cs.stanford.edu/~matei/papers/2013/sosp_sparrow.pdf