job gc uses batch endpoint
This commit is contained in:
parent
586ae36d13
commit
7545c0053e
|
@ -151,20 +151,17 @@ OUTER:
|
|||
return err
|
||||
}
|
||||
|
||||
// Call to the leader to deregister the jobs.
|
||||
for _, job := range gcJob {
|
||||
req := structs.JobDeregisterRequest{
|
||||
JobID: job.ID,
|
||||
Purge: true,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.srv.config.Region,
|
||||
Namespace: job.Namespace,
|
||||
AuthToken: eval.LeaderACL,
|
||||
},
|
||||
}
|
||||
var resp structs.JobDeregisterResponse
|
||||
if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil {
|
||||
c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err)
|
||||
// Reap the jobs
|
||||
return c.jobReap(gcJob, eval.LeaderACL)
|
||||
}
|
||||
|
||||
// jobReap contacts the leader and issues a reap on the passed jobs
|
||||
func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error {
|
||||
// Call to the leader to issue the reap
|
||||
for _, req := range c.partitionJobReap(jobs, leaderACL) {
|
||||
var resp structs.JobBatchDeregisterResponse
|
||||
if err := c.srv.RPC("Job.BatchDeregister", req, &resp); err != nil {
|
||||
c.srv.logger.Printf("[ERR] sched.core: batch job reap failed: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -172,6 +169,44 @@ OUTER:
|
|||
return nil
|
||||
}
|
||||
|
||||
// partitionJobReap returns a list of JobBatchDeregisterRequests to make,
|
||||
// ensuring a single request does not contain too many jobs. This is necessary
|
||||
// to ensure that the Raft transaction does not become too large.
|
||||
func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string) []*structs.JobBatchDeregisterRequest {
|
||||
option := &structs.JobDeregisterOptions{Purge: true}
|
||||
var requests []*structs.JobBatchDeregisterRequest
|
||||
submittedJobs := 0
|
||||
for submittedJobs != len(jobs) {
|
||||
req := &structs.JobBatchDeregisterRequest{
|
||||
Jobs: make(map[structs.NamespacedID]*structs.JobDeregisterOptions),
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: c.srv.config.Region,
|
||||
AuthToken: leaderACL,
|
||||
},
|
||||
}
|
||||
requests = append(requests, req)
|
||||
available := maxIdsPerReap
|
||||
|
||||
if remaining := len(jobs) - submittedJobs; remaining > 0 {
|
||||
if remaining <= available {
|
||||
for _, job := range jobs[submittedJobs:] {
|
||||
jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace}
|
||||
req.Jobs[jns] = option
|
||||
}
|
||||
submittedJobs += remaining
|
||||
} else {
|
||||
for _, job := range jobs[submittedJobs : submittedJobs+available] {
|
||||
jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace}
|
||||
req.Jobs[jns] = option
|
||||
}
|
||||
submittedJobs += available
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return requests
|
||||
}
|
||||
|
||||
// evalGC is used to garbage collect old evaluations
|
||||
func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
|
||||
// Iterate over the evaluations
|
||||
|
|
|
@ -1767,6 +1767,33 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCoreScheduler_PartitionJobReap(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create a core scheduler
|
||||
snap, err := s1.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
core := NewCoreScheduler(s1, snap)
|
||||
|
||||
// Set the max ids per reap to something lower.
|
||||
maxIdsPerReap = 2
|
||||
|
||||
jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()}
|
||||
requests := core.(*CoreScheduler).partitionJobReap(jobs, "")
|
||||
require.Len(requests, 2)
|
||||
|
||||
first := requests[0]
|
||||
second := requests[1]
|
||||
require.Len(first.Jobs, 2)
|
||||
require.Len(second.Jobs, 1)
|
||||
}
|
||||
|
||||
// Tests various scenarios when allocations are eligible to be GCed
|
||||
func TestAllocation_GCEligible(t *testing.T) {
|
||||
type testCase struct {
|
||||
|
|
Loading…
Reference in New Issue