core: allow deleting of evaluations (#13492)

* core: add eval delete RPC and core functionality.

* agent: add eval delete HTTP endpoint.

* api: add eval delete API functionality.

* cli: add eval delete command.

* docs: add eval delete website documentation.
This commit is contained in:
James Rasell 2022-07-06 16:30:11 +02:00 committed by GitHub
parent 488e65d960
commit 0c0b028a59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1626 additions and 75 deletions

7
.changelog/13492.txt Normal file
View File

@ -0,0 +1,7 @@
```release-note:improvement
cli: Added `delete` command to the eval CLI
```
```release-note:improvement
agent: Added delete support to the eval HTTP API
```

View File

@ -42,7 +42,7 @@ func (a *ACLPolicies) Delete(policyName string, q *WriteOptions) (*WriteMeta, er
if policyName == "" { if policyName == "" {
return nil, fmt.Errorf("missing policy name") return nil, fmt.Errorf("missing policy name")
} }
wm, err := a.client.delete("/v1/acl/policy/"+policyName, nil, q) wm, err := a.client.delete("/v1/acl/policy/"+policyName, nil, nil, q)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -142,7 +142,7 @@ func (a *ACLTokens) Delete(accessorID string, q *WriteOptions) (*WriteMeta, erro
if accessorID == "" { if accessorID == "" {
return nil, fmt.Errorf("missing accessor ID") return nil, fmt.Errorf("missing accessor ID")
} }
wm, err := a.client.delete("/v1/acl/token/"+accessorID, nil, q) wm, err := a.client.delete("/v1/acl/token/"+accessorID, nil, nil, q)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -982,14 +982,15 @@ func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*
return wm, nil return wm, nil
} }
// delete is used to do a DELETE request against an endpoint // delete is used to do a DELETE request against an endpoint and
// and serialize/deserialized using the standard Nomad conventions. // serialize/deserialized using the standard Nomad conventions.
func (c *Client) delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) { func (c *Client) delete(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
r, err := c.newRequest("DELETE", endpoint) r, err := c.newRequest("DELETE", endpoint)
if err != nil { if err != nil {
return nil, err return nil, err
} }
r.setWriteOptions(q) r.setWriteOptions(q)
r.obj = in
rtt, resp, err := requireOK(c.doRequest(r)) rtt, resp, err := requireOK(c.doRequest(r))
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -107,7 +107,7 @@ func TestRequestTime(t *testing.T) {
t.Errorf("bad request time: %d", wm.RequestTime) t.Errorf("bad request time: %d", wm.RequestTime)
} }
wm, err = client.delete("/", &out, nil) wm, err = client.delete("/", nil, &out, nil)
if err != nil { if err != nil {
t.Fatalf("delete err: %v", err) t.Fatalf("delete err: %v", err)
} }

View File

@ -82,7 +82,7 @@ func (v *CSIVolumes) Register(vol *CSIVolume, w *WriteOptions) (*WriteMeta, erro
// Deregister deregisters a single CSIVolume from Nomad. The volume will not be deleted from the external storage provider. // Deregister deregisters a single CSIVolume from Nomad. The volume will not be deleted from the external storage provider.
func (v *CSIVolumes) Deregister(id string, force bool, w *WriteOptions) error { func (v *CSIVolumes) Deregister(id string, force bool, w *WriteOptions) error {
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v?force=%t", url.PathEscape(id), force), nil, w) _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v?force=%t", url.PathEscape(id), force), nil, nil, w)
return err return err
} }
@ -104,7 +104,7 @@ func (v *CSIVolumes) Create(vol *CSIVolume, w *WriteOptions) ([]*CSIVolume, *Wri
// passed as an argument here is for the storage provider's ID, so a volume // passed as an argument here is for the storage provider's ID, so a volume
// that's already been deregistered can be deleted. // that's already been deregistered can be deleted.
func (v *CSIVolumes) Delete(externalVolID string, w *WriteOptions) error { func (v *CSIVolumes) Delete(externalVolID string, w *WriteOptions) error {
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(externalVolID)), nil, w) _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(externalVolID)), nil, nil, w)
return err return err
} }
@ -117,7 +117,7 @@ func (v *CSIVolumes) DeleteOpts(req *CSIVolumeDeleteRequest, w *WriteOptions) er
w = &WriteOptions{} w = &WriteOptions{}
} }
w.SetHeadersFromCSISecrets(req.Secrets) w.SetHeadersFromCSISecrets(req.Secrets)
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(req.ExternalVolumeID)), nil, w) _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(req.ExternalVolumeID)), nil, nil, w)
return err return err
} }
@ -125,7 +125,7 @@ func (v *CSIVolumes) DeleteOpts(req *CSIVolumeDeleteRequest, w *WriteOptions) er
// node. This is used in the case that the node is temporarily lost and the // node. This is used in the case that the node is temporarily lost and the
// allocations are unable to drop their claims automatically. // allocations are unable to drop their claims automatically.
func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error { func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error {
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/detach?node=%v", url.PathEscape(volID), nodeID), nil, w) _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/detach?node=%v", url.PathEscape(volID), nodeID), nil, nil, w)
return err return err
} }
@ -152,7 +152,7 @@ func (v *CSIVolumes) DeleteSnapshot(snap *CSISnapshot, w *WriteOptions) error {
w = &WriteOptions{} w = &WriteOptions{}
} }
w.SetHeadersFromCSISecrets(snap.Secrets) w.SetHeadersFromCSISecrets(snap.Secrets)
_, err := v.client.delete("/v1/volumes/snapshot?"+qp.Encode(), nil, w) _, err := v.client.delete("/v1/volumes/snapshot?"+qp.Encode(), nil, nil, w)
return err return err
} }

View File

@ -40,6 +40,18 @@ func (e *Evaluations) Info(evalID string, q *QueryOptions) (*Evaluation, *QueryM
return &resp, qm, nil return &resp, qm, nil
} }
// Delete is used to batch delete evaluations using their IDs.
func (e *Evaluations) Delete(evalIDs []string, w *WriteOptions) (*WriteMeta, error) {
req := EvalDeleteRequest{
EvalIDs: evalIDs,
}
wm, err := e.client.delete("/v1/evaluations", &req, nil, w)
if err != nil {
return nil, err
}
return wm, nil
}
// Allocations is used to retrieve a set of allocations given // Allocations is used to retrieve a set of allocations given
// an evaluation ID. // an evaluation ID.
func (e *Evaluations) Allocations(evalID string, q *QueryOptions) ([]*AllocationListStub, *QueryMeta, error) { func (e *Evaluations) Allocations(evalID string, q *QueryOptions) ([]*AllocationListStub, *QueryMeta, error) {
@ -108,6 +120,11 @@ type EvaluationStub struct {
ModifyTime int64 ModifyTime int64
} }
type EvalDeleteRequest struct {
EvalIDs []string
WriteRequest
}
// EvalIndexSort is a wrapper to sort evaluations by CreateIndex. // EvalIndexSort is a wrapper to sort evaluations by CreateIndex.
// We reverse the test so that we get the highest index first. // We reverse the test so that we get the highest index first.
type EvalIndexSort []*Evaluation type EvalIndexSort []*Evaluation

View File

@ -147,6 +147,33 @@ func TestEvaluations_Info(t *testing.T) {
} }
} }
func TestEvaluations_Delete(t *testing.T) {
testutil.Parallel(t)
testClient, testServer := makeClient(t, nil, nil)
defer testServer.Stop()
// Attempting to delete an evaluation when the eval broker is not paused
// should return an error.
wm, err := testClient.Evaluations().Delete([]string{"8E231CF4-CA48-43FF-B694-5801E69E22FA"}, nil)
require.Nil(t, wm)
require.ErrorContains(t, err, "eval broker is enabled")
// Pause the eval broker, and try to delete an evaluation that does not
// exist.
schedulerConfig, _, err := testClient.Operator().SchedulerGetConfiguration(nil)
require.NoError(t, err)
require.NotNil(t, schedulerConfig)
schedulerConfig.SchedulerConfig.PauseEvalBroker = true
schedulerConfigUpdated, _, err := testClient.Operator().SchedulerCASConfiguration(schedulerConfig.SchedulerConfig, nil)
require.NoError(t, err)
require.True(t, schedulerConfigUpdated.Updated)
wm, err = testClient.Evaluations().Delete([]string{"8E231CF4-CA48-43FF-B694-5801E69E22FA"}, nil)
require.ErrorContains(t, err, "eval not found")
}
func TestEvaluations_Allocations(t *testing.T) { func TestEvaluations_Allocations(t *testing.T) {
testutil.Parallel(t) testutil.Parallel(t)
c, s := makeClient(t, nil, nil) c, s := makeClient(t, nil, nil)

View File

@ -288,7 +288,7 @@ func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *Query
// eventually GC'ed from the system. Most callers should not specify purge. // eventually GC'ed from the system. Most callers should not specify purge.
func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *WriteMeta, error) { func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *WriteMeta, error) {
var resp JobDeregisterResponse var resp JobDeregisterResponse
wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", url.PathEscape(jobID), purge), &resp, q) wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", url.PathEscape(jobID), purge), nil, &resp, q)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
@ -334,7 +334,7 @@ func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOpt
opts.Purge, opts.Global, opts.EvalPriority, opts.NoShutdownDelay) opts.Purge, opts.Global, opts.EvalPriority, opts.NoShutdownDelay)
} }
wm, err := j.client.delete(endpoint, &resp, q) wm, err := j.client.delete(endpoint, nil, &resp, q)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }

View File

@ -58,7 +58,7 @@ func (n *Namespaces) Register(namespace *Namespace, q *WriteOptions) (*WriteMeta
// Delete is used to delete a namespace // Delete is used to delete a namespace
func (n *Namespaces) Delete(namespace string, q *WriteOptions) (*WriteMeta, error) { func (n *Namespaces) Delete(namespace string, q *WriteOptions) (*WriteMeta, error) {
wm, err := n.client.delete(fmt.Sprintf("/v1/namespace/%s", namespace), nil, q) wm, err := n.client.delete(fmt.Sprintf("/v1/namespace/%s", namespace), nil, nil, q)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -90,7 +90,7 @@ func (q *Quotas) Register(spec *QuotaSpec, qo *WriteOptions) (*WriteMeta, error)
// Delete is used to delete a quota spec // Delete is used to delete a quota spec
func (q *Quotas) Delete(quota string, qo *WriteOptions) (*WriteMeta, error) { func (q *Quotas) Delete(quota string, qo *WriteOptions) (*WriteMeta, error) {
wm, err := q.client.delete(fmt.Sprintf("/v1/quota/%s", quota), nil, qo) wm, err := q.client.delete(fmt.Sprintf("/v1/quota/%s", quota), nil, nil, qo)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -34,5 +34,5 @@ func (raw *Raw) Write(endpoint string, in, out interface{}, q *WriteOptions) (*W
// Delete is used to do a DELETE request against an endpoint // Delete is used to do a DELETE request against an endpoint
// and serialize/deserialized using the standard Nomad conventions. // and serialize/deserialized using the standard Nomad conventions.
func (raw *Raw) Delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) { func (raw *Raw) Delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) {
return raw.c.delete(endpoint, out, q) return raw.c.delete(endpoint, nil, out, q)
} }

View File

@ -39,7 +39,7 @@ func (a *SentinelPolicies) Delete(policyName string, q *WriteOptions) (*WriteMet
if policyName == "" { if policyName == "" {
return nil, fmt.Errorf("missing policy name") return nil, fmt.Errorf("missing policy name")
} }
wm, err := a.client.delete("/v1/sentinel/policy/"+policyName, nil, q) wm, err := a.client.delete("/v1/sentinel/policy/"+policyName, nil, nil, q)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -122,7 +122,7 @@ func (s *Services) Get(serviceName string, q *QueryOptions) ([]*ServiceRegistrat
// by its service name and service ID. // by its service name and service ID.
func (s *Services) Delete(serviceName, serviceID string, q *WriteOptions) (*WriteMeta, error) { func (s *Services) Delete(serviceName, serviceID string, q *WriteOptions) (*WriteMeta, error) {
path := fmt.Sprintf("/v1/service/%s/%s", url.PathEscape(serviceName), url.PathEscape(serviceID)) path := fmt.Sprintf("/v1/service/%s/%s", url.PathEscape(serviceName), url.PathEscape(serviceID))
wm, err := s.client.delete(path, nil, q) wm, err := s.client.delete(path, nil, nil, q)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -507,7 +507,7 @@ func TestClient_WatchAllocs(t *testing.T) {
}) })
// Delete one allocation // Delete one allocation
if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil { if err := state.DeleteEval(103, nil, []string{alloc1.ID}, false); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }

View File

@ -1,16 +1,29 @@
package agent package agent
import ( import (
"fmt"
"net/http" "net/http"
"strings" "strings"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
// EvalsRequest is the entry point for /v1/evaluations and is responsible for
// handling both the listing of evaluations, and the bulk deletion of
// evaluations. The latter is a dangerous operation and should use the
// eval delete command to perform this.
func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" { switch req.Method {
return nil, CodedError(405, ErrInvalidMethod) case http.MethodGet:
return s.evalsListRequest(resp, req)
case http.MethodDelete:
return s.evalsDeleteRequest(resp, req)
default:
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
} }
}
func (s *HTTPServer) evalsListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.EvalListRequest{} args := structs.EvalListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) { if s.parse(resp, req, &args.Region, &args.QueryOptions) {
@ -33,6 +46,37 @@ func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (
return out.Evaluations, nil return out.Evaluations, nil
} }
func (s *HTTPServer) evalsDeleteRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.EvalDeleteRequest
if err := decodeBody(req, &args); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}
numIDs := len(args.EvalIDs)
// Ensure the number of evaluation IDs included in the request is within
// bounds.
if numIDs < 1 {
return nil, CodedError(http.StatusBadRequest, "request does not include any evaluation IDs")
} else if numIDs > structs.MaxUUIDsPerWriteRequest {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf(
"request includes %v evaluations IDs, must be %v or fewer",
numIDs, structs.MaxUUIDsPerWriteRequest))
}
// Pass the write request to populate all meta fields.
s.parseWriteRequest(req, &args.WriteRequest)
var reply structs.EvalDeleteResponse
if err := s.agent.RPC(structs.EvalDeleteRPCMethod, &args, &reply); err != nil {
return nil, err
}
setIndex(resp, reply.Index)
return nil, nil
}
func (s *HTTPServer) EvalSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPServer) EvalSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
path := strings.TrimPrefix(req.URL.Path, "/v1/evaluation/") path := strings.TrimPrefix(req.URL.Path, "/v1/evaluation/")
switch { switch {

View File

@ -6,7 +6,9 @@ import (
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -110,6 +112,135 @@ func TestHTTP_EvalPrefixList(t *testing.T) {
}) })
} }
func TestHTTP_EvalsDelete(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
testFn func()
name string
}{
{
testFn: func() {
httpTest(t, nil, func(s *TestAgent) {
// Create an empty request object which doesn't contain any
// eval IDs.
deleteReq := api.EvalDeleteRequest{}
buf := encodeReq(&deleteReq)
// Generate the HTTP request.
req, err := http.NewRequest(http.MethodDelete, "/v1/evaluations", buf)
require.NoError(t, err)
respW := httptest.NewRecorder()
// Make the request and check the response.
obj, err := s.Server.EvalsRequest(respW, req)
require.Equal(t,
CodedError(http.StatusBadRequest, "request does not include any evaluation IDs"), err)
require.Nil(t, obj)
})
},
name: "too few eval IDs",
},
{
testFn: func() {
httpTest(t, nil, func(s *TestAgent) {
deleteReq := api.EvalDeleteRequest{EvalIDs: make([]string, 8000)}
// Generate a UUID and add it 8000 times to the eval ID
// request array.
evalID := uuid.Generate()
for i := 0; i < 8000; i++ {
deleteReq.EvalIDs[i] = evalID
}
buf := encodeReq(&deleteReq)
// Generate the HTTP request.
req, err := http.NewRequest(http.MethodDelete, "/v1/evaluations", buf)
require.NoError(t, err)
respW := httptest.NewRecorder()
// Make the request and check the response.
obj, err := s.Server.EvalsRequest(respW, req)
require.Equal(t,
CodedError(http.StatusBadRequest,
"request includes 8000 evaluations IDs, must be 7281 or fewer"), err)
require.Nil(t, obj)
})
},
name: "too many eval IDs",
},
{
testFn: func() {
httpTest(t, func(c *Config) {
c.NomadConfig.DefaultSchedulerConfig.PauseEvalBroker = true
}, func(s *TestAgent) {
// Generate a request with an eval ID that doesn't exist
// within state.
deleteReq := api.EvalDeleteRequest{EvalIDs: []string{uuid.Generate()}}
buf := encodeReq(&deleteReq)
// Generate the HTTP request.
req, err := http.NewRequest(http.MethodDelete, "/v1/evaluations", buf)
require.NoError(t, err)
respW := httptest.NewRecorder()
// Make the request and check the response.
obj, err := s.Server.EvalsRequest(respW, req)
require.Contains(t, err.Error(), "eval not found")
require.Nil(t, obj)
})
},
name: "eval doesn't exist",
},
{
testFn: func() {
httpTest(t, func(c *Config) {
c.NomadConfig.DefaultSchedulerConfig.PauseEvalBroker = true
}, func(s *TestAgent) {
// Upsert an eval into state.
mockEval := mock.Eval()
err := s.Agent.server.State().UpsertEvals(
structs.MsgTypeTestSetup, 10, []*structs.Evaluation{mockEval})
require.NoError(t, err)
// Generate a request with the ID of the eval previously upserted.
deleteReq := api.EvalDeleteRequest{EvalIDs: []string{mockEval.ID}}
buf := encodeReq(&deleteReq)
// Generate the HTTP request.
req, err := http.NewRequest(http.MethodDelete, "/v1/evaluations", buf)
require.NoError(t, err)
respW := httptest.NewRecorder()
// Make the request and check the response.
obj, err := s.Server.EvalsRequest(respW, req)
require.Nil(t, err)
require.Nil(t, obj)
// Ensure the eval is not found.
readEval, err := s.Agent.server.State().EvalByID(nil, mockEval.ID)
require.NoError(t, err)
require.Nil(t, readEval)
})
},
name: "successfully delete eval",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tc.testFn()
})
}
}
func TestHTTP_EvalAllocations(t *testing.T) { func TestHTTP_EvalAllocations(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) { httpTest(t, nil, func(s *TestAgent) {

View File

@ -265,6 +265,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta, Meta: meta,
}, nil }, nil
}, },
"eval delete": func() (cli.Command, error) {
return &EvalDeleteCommand{
Meta: meta,
}, nil
},
"eval list": func() (cli.Command, error) { "eval list": func() (cli.Command, error) {
return &EvalListCommand{ return &EvalListCommand{
Meta: meta, Meta: meta,

View File

@ -1,8 +1,10 @@
package command package command
import ( import (
"fmt"
"strings" "strings"
"github.com/hashicorp/nomad/api"
"github.com/mitchellh/cli" "github.com/mitchellh/cli"
) )
@ -19,10 +21,18 @@ Usage: nomad eval <subcommand> [options] [args]
detail but can be useful for debugging placement failures when the cluster detail but can be useful for debugging placement failures when the cluster
does not have the resources to run a given job. does not have the resources to run a given job.
List evaluations:
$ nomad eval list
Examine an evaluations status: Examine an evaluations status:
$ nomad eval status <eval-id> $ nomad eval status <eval-id>
Delete evaluations:
$ nomad eval delete <eval-id>
Please see the individual subcommand help for detailed usage information. Please see the individual subcommand help for detailed usage information.
` `
@ -35,6 +45,24 @@ func (f *EvalCommand) Synopsis() string {
func (f *EvalCommand) Name() string { return "eval" } func (f *EvalCommand) Name() string { return "eval" }
func (f *EvalCommand) Run(args []string) int { func (f *EvalCommand) Run(_ []string) int { return cli.RunResultHelp }
return cli.RunResultHelp
// outputEvalList is a helper which outputs an array of evaluations as a list
// to the UI with key information such as ID and status.
func outputEvalList(ui cli.Ui, evals []*api.Evaluation, length int) {
out := make([]string, len(evals)+1)
out[0] = "ID|Priority|Triggered By|Job ID|Status|Placement Failures"
for i, eval := range evals {
failures, _ := evalFailureStatus(eval)
out[i+1] = fmt.Sprintf("%s|%d|%s|%s|%s|%s",
limit(eval.ID, length),
eval.Priority,
eval.TriggeredBy,
eval.JobID,
eval.Status,
failures,
)
}
ui.Output(formatList(out))
} }

406
command/eval_delete.go Normal file
View File

@ -0,0 +1,406 @@
package command
import (
"errors"
"fmt"
"strings"
"time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
type EvalDeleteCommand struct {
Meta
filter string
yes bool
// deleteByArg is set when the command is deleting an evaluation that has
// been passed as an argument. This avoids need for confirmation.
deleteByArg bool
// numDeleted tracks the total evaluations deleted in a single run of this
// command. It provides a way to output this information to the user at the
// command completion.
numDeleted int
// client is the lazy-loaded API client and is stored here, so we don't
// need to pass it to multiple functions.
client *api.Client
}
func (e *EvalDeleteCommand) Help() string {
helpText := `
Usage: nomad eval delete [options] <evaluation>
Delete an evaluation by ID. If the evaluation ID is omitted, this command
will use the filter flag to identify and delete a set of evaluations. If ACLs
are enabled, this command requires a management ACL token.
This command should be used cautiously and only in outage situations where
there is a large backlog of evaluations not being processed. During most
normal and outage scenarios, Nomads reconciliation and state management will
handle evaluations as needed.
The eval broker is expected to be paused prior to running this command and
un-paused after. This can be done using the following two commands:
- nomad operator scheduler set-config -pause-eval-broker=true
- nomad operator scheduler set-config -pause-eval-broker=false
General Options:
` + generalOptionsUsage(usageOptsNoNamespace) + `
Eval Delete Options:
-filter
Specifies an expression used to filter evaluations by for deletion. When
using this flag, it is advisable to ensure the syntax is correct using the
eval list command first.
-yes
Bypass the confirmation prompt if an evaluation ID was not provided.
`
return strings.TrimSpace(helpText)
}
func (e *EvalDeleteCommand) Synopsis() string {
return "Delete evaluations by ID or using a filter"
}
func (e *EvalDeleteCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(e.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-filter": complete.PredictAnything,
"-yes": complete.PredictNothing,
})
}
func (e *EvalDeleteCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := e.Meta.Client()
if err != nil {
return nil
}
resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Evals, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Evals]
})
}
func (e *EvalDeleteCommand) Name() string { return "eval delete" }
func (e *EvalDeleteCommand) Run(args []string) int {
flags := e.Meta.FlagSet(e.Name(), FlagSetClient)
flags.Usage = func() { e.Ui.Output(e.Help()) }
flags.StringVar(&e.filter, "filter", "", "")
flags.BoolVar(&e.yes, "yes", false, "")
if err := flags.Parse(args); err != nil {
return 1
}
args = flags.Args()
if err := e.verifyArgsAndFlags(args); err != nil {
e.Ui.Error(fmt.Sprintf("Error validating command args and flags: %v", err))
return 1
}
// Get the HTTP client and store this for use across multiple functions.
client, err := e.Meta.Client()
if err != nil {
e.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
e.client = client
// Ensure the eval broker is paused. This check happens multiple times on
// the leader, but this check means we can provide quick and actionable
// feedback.
schedulerConfig, _, err := e.client.Operator().SchedulerGetConfiguration(nil)
if err != nil {
e.Ui.Error(fmt.Sprintf("Error querying scheduler configuration: %s", err))
return 1
}
if !schedulerConfig.SchedulerConfig.PauseEvalBroker {
e.Ui.Error("Eval broker is not paused")
e.Ui.Output(`To delete evaluations you must first pause the eval broker by running "nomad operator scheduler set-config -pause-eval-broker=true"`)
e.Ui.Output(`After the deletion is complete, unpause the eval broker by running "nomad operator scheduler set-config -pause-eval-broker=false"`)
return 1
}
// Track the eventual exit code as there are a number of factors that
// influence this.
var exitCode int
// Call the correct function in order to handle the operator input
// correctly.
switch len(args) {
case 1:
e.deleteByArg = true
exitCode, err = e.handleEvalArgDelete(args[0])
default:
// Track the next token, so we can iterate all pages that match the
// passed filter.
var nextToken string
// It is possible the filter matches a large number of evaluations
// which means we need to run a number of batch deletes. Perform
// iteration here rather than recursion in later function, so we avoid
// any potential issues with stack size limits.
for {
exitCode, nextToken, err = e.handleFlagFilterDelete(nextToken)
// If there is another page of evaluations matching the filter,
// iterate the loop and delete the next batch of evals. We pause
// for a 500ms rather than just run as fast as the code and machine
// possibly can. This means deleting 13million evals will take
// roughly 13-15 mins, which seems reasonable. It is worth noting,
// we do not expect operators to delete this many evals in a single
// run and expect more careful filtering options to be used.
if nextToken != "" {
time.Sleep(500 * time.Millisecond)
continue
} else {
break
}
}
}
// Do not exit if we got an error as it's possible this was on the
// non-first iteration, and we have therefore deleted some evals.
if err != nil {
e.Ui.Error(fmt.Sprintf("Error deleting evaluations: %s", err))
}
// Depending on whether we deleted evaluations or not, output a message so
// this is clear.
if e.numDeleted > 0 {
e.Ui.Output(fmt.Sprintf("Successfully deleted %v %s",
e.numDeleted, correctGrammar("evaluation", e.numDeleted)))
} else if err == nil {
e.Ui.Output("No evaluations were deleted")
}
return exitCode
}
// verifyArgsAndFlags ensures the passed arguments and flags are valid for what
// this command accepts and can take action on.
func (e *EvalDeleteCommand) verifyArgsAndFlags(args []string) error {
numArgs := len(args)
// The command takes either an argument or filter, but not both.
if (e.filter == "" && numArgs < 1) || (e.filter != "" && numArgs > 0) {
return errors.New("evaluation ID or filter flag required")
}
// If an argument is supplied, we only accept a single eval ID.
if numArgs > 1 {
return fmt.Errorf("expected 1 argument, got %v", numArgs)
}
return nil
}
// handleEvalArgDelete handles deletion and evaluation which was passed via
// it's ID as a command argument. This is the simplest route to take and
// doesn't require filtering or batching.
func (e *EvalDeleteCommand) handleEvalArgDelete(evalID string) (int, error) {
evalInfo, _, err := e.client.Evaluations().Info(evalID, nil)
if err != nil {
return 1, err
}
// Supplying an eval to delete by its ID will always skip verification, so
// we don't need to understand the boolean response.
code, _, err := e.batchDelete([]*api.Evaluation{evalInfo})
return code, err
}
// handleFlagFilterDelete handles deletion of evaluations discovered using
// the filter. It is unknown how many will match the operator criteria so
// this function batches lookup and delete requests into sensible numbers.
func (e *EvalDeleteCommand) handleFlagFilterDelete(nt string) (int, string, error) {
evalsToDelete, nextToken, err := e.batchLookupEvals(nt)
if err != nil {
return 1, "", err
}
numEvalsToDelete := len(evalsToDelete)
// The filter flags are operator controlled, therefore ensure we
// actually found some evals to delete. Otherwise, inform the operator
// their flags are potentially incorrect.
if numEvalsToDelete == 0 {
if e.numDeleted > 0 {
return 0, "", nil
} else {
return 1, "", errors.New("failed to find any evals that matched filter criteria")
}
}
if code, actioned, err := e.batchDelete(evalsToDelete); err != nil {
return code, "", err
} else if !actioned {
return code, "", nil
}
e.Ui.Info(fmt.Sprintf("Successfully deleted batch of %v %s",
numEvalsToDelete, correctGrammar("evaluation", numEvalsToDelete)))
return 0, nextToken, nil
}
// batchLookupEvals handles batched lookup of evaluations using the operator
// provided filter. The lookup is performed a maximum number of 3 times to
// ensure their size is limited and the number of evals to delete doesn't exceed
// the total allowable in a single call.
//
// The JSON serialized evaluation API object is 350-380B in size.
// 2426 * 380B (3.8e-4 MB) = 0.92MB. We may want to make this configurable
// in the future, but this is counteracted by the CLI logic which will loop
// until the user tells it to exit, or all evals matching the filter are
// deleted. 2426 * 3 falls below the maximum limit for eval IDs in a single
// delete request (set by MaxEvalIDsPerDeleteRequest).
func (e *EvalDeleteCommand) batchLookupEvals(nextToken string) ([]*api.Evaluation, string, error) {
var evalsToDelete []*api.Evaluation
currentNextToken := nextToken
// Call List 3 times to accumulate the maximum number if eval IDs supported
// in a single Delete request. See math above.
for i := 0; i < 3; i++ {
// Generate the query options using the passed next token and filter. The
// per page value is less than the total number we can include in a single
// delete request. This keeps the maximum size of the return object at a
// reasonable size.
opts := api.QueryOptions{
Filter: e.filter,
PerPage: 2426,
NextToken: currentNextToken,
}
evalList, meta, err := e.client.Evaluations().List(&opts)
if err != nil {
return nil, "", err
}
if len(evalList) > 0 {
evalsToDelete = append(evalsToDelete, evalList...)
}
// Store the next token no matter if it is empty or populated.
currentNextToken = meta.NextToken
// If there is no next token, ensure we exit and avoid any new loops
// which will result in duplicate IDs.
if currentNextToken == "" {
break
}
}
return evalsToDelete, currentNextToken, nil
}
// batchDelete is responsible for deleting the passed evaluations and asking
// any confirmation questions along the way. It will ask whether the operator
// want to list the evals before deletion, and optionally ask for confirmation
// before deleting based on input criteria.
func (e *EvalDeleteCommand) batchDelete(evals []*api.Evaluation) (int, bool, error) {
// Ask whether the operator wants to see the list of evaluations before
// moving forward with deletion. This will only happen if filters are used
// and the confirmation step is not bypassed.
if !e.yes && !e.deleteByArg {
_, listEvals := e.askQuestion(fmt.Sprintf(
"Do you want to list evals (%v) before deletion? [y/N]",
len(evals)), "")
// List the evals for deletion is the user has requested this. It can
// be useful when the list is small and targeted, but is maybe best
// avoided when deleting large quantities of evals.
if listEvals {
e.Ui.Output("")
outputEvalList(e.Ui, evals, shortId)
e.Ui.Output("")
}
}
// Generate our list of eval IDs which is required for the API request.
ids := make([]string, len(evals))
for i, eval := range evals {
ids[i] = eval.ID
}
// If the user did not wish to bypass the confirmation step, ask this now
// and handle the response.
if !e.yes && !e.deleteByArg {
code, deleteEvals := e.askQuestion(fmt.Sprintf(
"Are you sure you want to delete %v evals? [y/N]",
len(evals)), "Cancelling eval deletion")
e.Ui.Output("")
if !deleteEvals {
return code, deleteEvals, nil
}
}
_, err := e.client.Evaluations().Delete(ids, nil)
if err != nil {
return 1, false, err
}
// Calculate how many total evaluations we have deleted, so we can output
// this at the end of the process.
curDeleted := e.numDeleted
e.numDeleted = curDeleted + len(ids)
return 0, true, nil
}
// askQuestion allows the command to ask the operator a question requiring a
// y/n response. The optional noResp is used when the operator responds no to
// a question.
func (e *EvalDeleteCommand) askQuestion(question, noResp string) (int, bool) {
answer, err := e.Ui.Ask(question)
if err != nil {
e.Ui.Error(fmt.Sprintf("Failed to parse answer: %v", err))
return 1, false
}
if answer == "" || strings.ToLower(answer)[0] == 'n' {
if noResp != "" {
e.Ui.Output(noResp)
}
return 0, false
} else if strings.ToLower(answer)[0] == 'y' && len(answer) > 1 {
e.Ui.Output("For confirmation, an exact y is required.")
return 0, false
} else if answer != "y" {
e.Ui.Output("No confirmation detected. For confirmation, an exact 'y' is required.")
return 1, false
}
return 0, true
}
func correctGrammar(word string, num int) string {
if num > 1 {
return word + "s"
}
return word
}

184
command/eval_delete_test.go Normal file
View File

@ -0,0 +1,184 @@
package command
import (
"errors"
"fmt"
"testing"
"github.com/hashicorp/nomad/ci"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestEvalDeleteCommand_Run(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
testFn func()
name string
}{
{
testFn: func() {
testServer, client, url := testServer(t, false, nil)
defer testServer.Shutdown()
// Create the UI and command.
ui := cli.NewMockUi()
cmd := &EvalDeleteCommand{
Meta: Meta{
Ui: ui,
flagAddress: url,
},
}
// Test basic command input validation.
require.Equal(t, 1, cmd.Run([]string{"-address=" + url}))
require.Contains(t, ui.ErrorWriter.String(), "Error validating command args and flags")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
// Try running the command when the eval broker is not paused.
require.Equal(t, 1, cmd.Run([]string{"-address=" + url, "fa3a8c37-eac3-00c7-3410-5ba3f7318fd8"}))
require.Contains(t, ui.ErrorWriter.String(), "Eval broker is not paused")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
// Paused the eval broker, then try deleting with an eval that
// does not exist.
schedulerConfig, _, err := client.Operator().SchedulerGetConfiguration(nil)
require.NoError(t, err)
require.False(t, schedulerConfig.SchedulerConfig.PauseEvalBroker)
schedulerConfig.SchedulerConfig.PauseEvalBroker = true
_, _, err = client.Operator().SchedulerSetConfiguration(schedulerConfig.SchedulerConfig, nil)
require.NoError(t, err)
require.True(t, schedulerConfig.SchedulerConfig.PauseEvalBroker)
require.Equal(t, 1, cmd.Run([]string{"-address=" + url, "fa3a8c37-eac3-00c7-3410-5ba3f7318fd8"}))
require.Contains(t, ui.ErrorWriter.String(), "eval not found")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
},
name: "failure",
},
{
testFn: func() {
testServer, client, url := testServer(t, false, nil)
defer testServer.Shutdown()
// Create the UI and command.
ui := cli.NewMockUi()
cmd := &EvalDeleteCommand{
Meta: Meta{
Ui: ui,
flagAddress: url,
},
}
// Paused the eval broker.
schedulerConfig, _, err := client.Operator().SchedulerGetConfiguration(nil)
require.NoError(t, err)
require.False(t, schedulerConfig.SchedulerConfig.PauseEvalBroker)
schedulerConfig.SchedulerConfig.PauseEvalBroker = true
_, _, err = client.Operator().SchedulerSetConfiguration(schedulerConfig.SchedulerConfig, nil)
require.NoError(t, err)
require.True(t, schedulerConfig.SchedulerConfig.PauseEvalBroker)
// With the eval broker paused, run a job register several times
// to generate evals that will not be acted on.
testJob := testJob("eval-delete")
evalIDs := make([]string, 3)
for i := 0; i < 3; i++ {
regResp, _, err := client.Jobs().Register(testJob, nil)
require.NoError(t, err)
require.NotNil(t, regResp)
require.NotEmpty(t, regResp.EvalID)
evalIDs[i] = regResp.EvalID
}
// Ensure we have three evaluations in state.
evalList, _, err := client.Evaluations().List(nil)
require.NoError(t, err)
require.Len(t, evalList, 3)
// Attempted to delete one eval using the ID.
require.Equal(t, 0, cmd.Run([]string{"-address=" + url, evalIDs[0]}))
require.Contains(t, ui.OutputWriter.String(), "Successfully deleted 1 evaluation")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
// We modify the number deleted on each command run, so we
// need to reset this in order to check the next command
// output.
cmd.numDeleted = 0
// Attempted to delete the remaining two evals using a filter
// expression.
expr := fmt.Sprintf("JobID == %q and Status == \"pending\" ", *testJob.Name)
require.Equal(t, 0, cmd.Run([]string{"-address=" + url, "-filter=" + expr}))
require.Contains(t, ui.OutputWriter.String(), "Successfully deleted 2 evaluations")
ui.ErrorWriter.Reset()
ui.OutputWriter.Reset()
// Ensure we have zero evaluations in state.
evalList, _, err = client.Evaluations().List(nil)
require.NoError(t, err)
require.Len(t, evalList, 0)
},
name: "successful",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tc.testFn()
})
}
}
func TestEvalDeleteCommand_verifyArgsAndFlags(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
inputEvalDeleteCommand *EvalDeleteCommand
inputArgs []string
expectedError error
name string
}{
{
inputEvalDeleteCommand: &EvalDeleteCommand{
filter: `Status == "Pending"`,
},
inputArgs: []string{"fa3a8c37-eac3-00c7-3410-5ba3f7318fd8"},
expectedError: errors.New("evaluation ID or filter flag required"),
name: "arg and flags",
},
{
inputEvalDeleteCommand: &EvalDeleteCommand{
filter: "",
},
inputArgs: []string{},
expectedError: errors.New("evaluation ID or filter flag required"),
name: "no arg or flags",
},
{
inputEvalDeleteCommand: &EvalDeleteCommand{
filter: "",
},
inputArgs: []string{"fa3a8c37-eac3-00c7-3410-5ba3f7318fd8", "fa3a8c37-eac3-00c7-3410-5ba3f7318fd9"},
expectedError: errors.New("expected 1 argument, got 2"),
name: "multiple args",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualError := tc.inputEvalDeleteCommand.verifyArgsAndFlags(tc.inputArgs)
require.Equal(t, tc.expectedError, actualError)
})
}
}

View File

@ -167,20 +167,7 @@ func (c *EvalListCommand) Run(args []string) int {
length = fullId length = fullId
} }
out := make([]string, len(evals)+1) outputEvalList(c.Ui, evals, length)
out[0] = "ID|Priority|Triggered By|Job ID|Status|Placement Failures"
for i, eval := range evals {
failures, _ := evalFailureStatus(eval)
out[i+1] = fmt.Sprintf("%s|%d|%s|%s|%s|%s",
limit(eval.ID, length),
eval.Priority,
eval.TriggeredBy,
eval.JobID,
eval.Status,
failures,
)
}
c.Ui.Output(formatList(out))
if qm.NextToken != "" { if qm.NextToken != "" {
c.Ui.Output(fmt.Sprintf(` c.Ui.Output(fmt.Sprintf(`

View File

@ -14,13 +14,6 @@ import (
"github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/nomad/scheduler"
) )
var (
// maxIdsPerReap is the maximum number of evals and allocations to reap in a
// single Raft transaction. This is to ensure that the Raft message does not
// become too large.
maxIdsPerReap = (1024 * 256) / 36 // 0.25 MB of ids.
)
// CoreScheduler is a special "scheduler" that is registered // CoreScheduler is a special "scheduler" that is registered
// as "_core". It is used to run various administrative work // as "_core". It is used to run various administrative work
// across the cluster. // across the cluster.
@ -193,7 +186,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string)
}, },
} }
requests = append(requests, req) requests = append(requests, req)
available := maxIdsPerReap available := structs.MaxUUIDsPerWriteRequest
if remaining := len(jobs) - submittedJobs; remaining > 0 { if remaining := len(jobs) - submittedJobs; remaining > 0 {
if remaining <= available { if remaining <= available {
@ -359,20 +352,20 @@ func (c *CoreScheduler) evalReap(evals, allocs []string) error {
return nil return nil
} }
// partitionEvalReap returns a list of EvalDeleteRequest to make, ensuring a single // partitionEvalReap returns a list of EvalReapRequest to make, ensuring a single
// request does not contain too many allocations and evaluations. This is // request does not contain too many allocations and evaluations. This is
// necessary to ensure that the Raft transaction does not become too large. // necessary to ensure that the Raft transaction does not become too large.
func (c *CoreScheduler) partitionEvalReap(evals, allocs []string) []*structs.EvalDeleteRequest { func (c *CoreScheduler) partitionEvalReap(evals, allocs []string) []*structs.EvalReapRequest {
var requests []*structs.EvalDeleteRequest var requests []*structs.EvalReapRequest
submittedEvals, submittedAllocs := 0, 0 submittedEvals, submittedAllocs := 0, 0
for submittedEvals != len(evals) || submittedAllocs != len(allocs) { for submittedEvals != len(evals) || submittedAllocs != len(allocs) {
req := &structs.EvalDeleteRequest{ req := &structs.EvalReapRequest{
WriteRequest: structs.WriteRequest{ WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region, Region: c.srv.config.Region,
}, },
} }
requests = append(requests, req) requests = append(requests, req)
available := maxIdsPerReap available := structs.MaxUUIDsPerWriteRequest
// Add the allocs first // Add the allocs first
if remaining := len(allocs) - submittedAllocs; remaining > 0 { if remaining := len(allocs) - submittedAllocs; remaining > 0 {
@ -484,7 +477,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err
} }
// Call to the leader to issue the reap // Call to the leader to issue the reap
for _, ids := range partitionAll(maxIdsPerReap, nodeIDs) { for _, ids := range partitionAll(structs.MaxUUIDsPerWriteRequest, nodeIDs) {
req := structs.NodeBatchDeregisterRequest{ req := structs.NodeBatchDeregisterRequest{
NodeIDs: ids, NodeIDs: ids,
WriteRequest: structs.WriteRequest{ WriteRequest: structs.WriteRequest{
@ -584,7 +577,7 @@ func (c *CoreScheduler) partitionDeploymentReap(deployments []string) []*structs
}, },
} }
requests = append(requests, req) requests = append(requests, req)
available := maxIdsPerReap available := structs.MaxUUIDsPerWriteRequest
if remaining := len(deployments) - submittedDeployments; remaining > 0 { if remaining := len(deployments) - submittedDeployments; remaining > 0 {
if remaining <= available { if remaining <= available {

View File

@ -1852,7 +1852,7 @@ func TestCoreScheduler_PartitionEvalReap(t *testing.T) {
core := NewCoreScheduler(s1, snap) core := NewCoreScheduler(s1, snap)
// Set the max ids per reap to something lower. // Set the max ids per reap to something lower.
maxIdsPerReap = 2 structs.MaxUUIDsPerWriteRequest = 2
evals := []string{"a", "b", "c"} evals := []string{"a", "b", "c"}
allocs := []string{"1", "2", "3"} allocs := []string{"1", "2", "3"}
@ -1895,7 +1895,7 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) {
core := NewCoreScheduler(s1, snap) core := NewCoreScheduler(s1, snap)
// Set the max ids per reap to something lower. // Set the max ids per reap to something lower.
maxIdsPerReap = 2 structs.MaxUUIDsPerWriteRequest = 2
deployments := []string{"a", "b", "c"} deployments := []string{"a", "b", "c"}
requests := core.(*CoreScheduler).partitionDeploymentReap(deployments) requests := core.(*CoreScheduler).partitionDeploymentReap(deployments)
@ -1915,7 +1915,6 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) {
} }
func TestCoreScheduler_PartitionJobReap(t *testing.T) { func TestCoreScheduler_PartitionJobReap(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil) s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1() defer cleanupS1()
@ -1929,7 +1928,11 @@ func TestCoreScheduler_PartitionJobReap(t *testing.T) {
core := NewCoreScheduler(s1, snap) core := NewCoreScheduler(s1, snap)
// Set the max ids per reap to something lower. // Set the max ids per reap to something lower.
maxIdsPerReap = 2 originalMaxUUIDsPerWriteRequest := structs.MaxUUIDsPerWriteRequest
structs.MaxUUIDsPerWriteRequest = 2
defer func() {
structs.MaxUUIDsPerWriteRequest = originalMaxUUIDsPerWriteRequest
}()
jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()} jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()}
requests := core.(*CoreScheduler).partitionJobReap(jobs, "") requests := core.(*CoreScheduler).partitionJobReap(jobs, "")
@ -2385,7 +2388,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
index, _ = store.LatestIndex() index, _ = store.LatestIndex()
index++ index++
err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID}) err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID}, false)
require.NoError(t, err) require.NoError(t, err)
// Create a core scheduler and attempt the volume claim GC // Create a core scheduler and attempt the volume claim GC

View File

@ -1,6 +1,7 @@
package nomad package nomad
import ( import (
"errors"
"fmt" "fmt"
"net/http" "net/http"
"time" "time"
@ -391,7 +392,7 @@ func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericRe
} }
// Reap is used to cleanup dead evaluations and allocations // Reap is used to cleanup dead evaluations and allocations
func (e *Eval) Reap(args *structs.EvalDeleteRequest, func (e *Eval) Reap(args *structs.EvalReapRequest,
reply *structs.GenericResponse) error { reply *structs.GenericResponse) error {
// Ensure the connection was initiated by another server if TLS is used. // Ensure the connection was initiated by another server if TLS is used.
@ -416,6 +417,150 @@ func (e *Eval) Reap(args *structs.EvalDeleteRequest,
return nil return nil
} }
// Delete is used by operators to delete evaluations during severe outages. It
// differs from Reap while duplicating some behavior to ensure we have the
// correct controls for user initiated deletions.
func (e *Eval) Delete(
args *structs.EvalDeleteRequest,
reply *structs.EvalDeleteResponse) error {
if done, err := e.srv.forward(structs.EvalDeleteRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "eval", "delete"}, time.Now())
// This RPC endpoint is very destructive and alters Nomad's core state,
// meaning only those with management tokens can call it.
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.IsManagement() {
return structs.ErrPermissionDenied
}
// The eval broker must be disabled otherwise Nomad's state will likely get
// wild in a very un-fun way.
if e.srv.evalBroker.Enabled() {
return errors.New("eval broker is enabled; eval broker must be paused to delete evals")
}
// Grab the state snapshot, so we can look up relevant eval information.
serverStateSnapshot, err := e.srv.State().Snapshot()
if err != nil {
return fmt.Errorf("failed to lookup state snapshot: %v", err)
}
ws := memdb.NewWatchSet()
// Iterate the evaluations and ensure they are safe to delete. It is
// possible passed evals are not safe to delete and would make Nomads state
// a little wonky. The nature of the RPC return error, means a single
// unsafe eval ID fails the whole call.
for _, evalID := range args.EvalIDs {
evalInfo, err := serverStateSnapshot.EvalByID(ws, evalID)
if err != nil {
return fmt.Errorf("failed to lookup eval: %v", err)
}
if evalInfo == nil {
return errors.New("eval not found")
}
jobInfo, err := serverStateSnapshot.JobByID(ws, evalInfo.Namespace, evalInfo.JobID)
if err != nil {
return fmt.Errorf("failed to lookup eval job: %v", err)
}
allocs, err := serverStateSnapshot.AllocsByEval(ws, evalInfo.ID)
if err != nil {
return fmt.Errorf("failed to lookup eval allocs: %v", err)
}
if !evalDeleteSafe(allocs, jobInfo) {
return fmt.Errorf("eval %s is not safe to delete", evalID)
}
}
// Generate the Raft request object using the reap request object. This
// avoids adding new Raft messages types and follows the existing reap
// flow.
raftReq := structs.EvalReapRequest{
Evals: args.EvalIDs,
UserInitiated: true,
WriteRequest: args.WriteRequest,
}
// Update via Raft.
_, index, err := e.srv.raftApply(structs.EvalDeleteRequestType, &raftReq)
if err != nil {
return err
}
// Update the index and return.
reply.Index = index
return nil
}
// evalDeleteSafe ensures an evaluation is safe to delete based on its related
// allocation and job information. This follows similar, but different rules to
// the eval reap checking, to ensure evaluations for running allocs or allocs
// which need the evaluation detail are not deleted.
func evalDeleteSafe(allocs []*structs.Allocation, job *structs.Job) bool {
// If the job is deleted, stopped, or dead, all allocs are terminal and
// the eval can be deleted.
if job == nil || job.Stop || job.Status == structs.JobStatusDead {
return true
}
// Iterate the allocations associated to the eval, if any, and check
// whether we can delete the eval.
for _, alloc := range allocs {
// If the allocation is still classed as running on the client, or
// might be, we can't delete.
switch alloc.ClientStatus {
case structs.AllocClientStatusRunning, structs.AllocClientStatusUnknown:
return false
}
// If the alloc hasn't failed then we don't need to consider it for
// rescheduling. Rescheduling needs to copy over information from the
// previous alloc so that it can enforce the reschedule policy.
if alloc.ClientStatus != structs.AllocClientStatusFailed {
continue
}
var reschedulePolicy *structs.ReschedulePolicy
tg := job.LookupTaskGroup(alloc.TaskGroup)
if tg != nil {
reschedulePolicy = tg.ReschedulePolicy
}
// No reschedule policy or rescheduling is disabled
if reschedulePolicy == nil || (!reschedulePolicy.Unlimited && reschedulePolicy.Attempts == 0) {
continue
}
// The restart tracking information has not been carried forward.
if alloc.NextAllocation == "" {
return false
}
// This task has unlimited rescheduling and the alloc has not been
// replaced, so we can't delete the eval yet.
if reschedulePolicy.Unlimited {
return false
}
// No restarts have been attempted yet.
if alloc.RescheduleTracker == nil || len(alloc.RescheduleTracker.Events) == 0 {
return false
}
}
return true
}
// List is used to get a list of the evaluations in the system // List is used to get a list of the evaluations in the system
func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListResponse) error { func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListResponse) error {
if done, err := e.srv.forward("Eval.List", args, args, reply); done { if done, err := e.srv.forward("Eval.List", args, args, reply); done {

View File

@ -202,7 +202,7 @@ func TestEvalEndpoint_GetEval_Blocking(t *testing.T) {
// Eval delete triggers watches // Eval delete triggers watches
time.AfterFunc(100*time.Millisecond, func() { time.AfterFunc(100*time.Millisecond, func() {
err := state.DeleteEval(300, []string{eval2.ID}, []string{}) err := state.DeleteEval(300, []string{eval2.ID}, []string{}, false)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -691,7 +691,7 @@ func TestEvalEndpoint_Reap(t *testing.T) {
s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1}) s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1})
// Reap the eval // Reap the eval
get := &structs.EvalDeleteRequest{ get := &structs.EvalReapRequest{
Evals: []string{eval1.ID}, Evals: []string{eval1.ID},
WriteRequest: structs.WriteRequest{Region: "global"}, WriteRequest: structs.WriteRequest{Region: "global"},
} }
@ -714,6 +714,351 @@ func TestEvalEndpoint_Reap(t *testing.T) {
} }
} }
func TestEvalEndpoint_Delete(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
testFn func()
name string
}{
{
testFn: func() {
testServer, testServerCleanup := TestServer(t, nil)
defer testServerCleanup()
codec := rpcClient(t, testServer)
testutil.WaitForLeader(t, testServer.RPC)
// Create and upsert an evaluation.
mockEval := mock.Eval()
require.NoError(t, testServer.fsm.State().UpsertEvals(
structs.MsgTypeTestSetup, 10, []*structs.Evaluation{mockEval}))
// Attempt to delete the eval, which should fail because the
// eval broker is not paused.
get := &structs.EvalDeleteRequest{
EvalIDs: []string{mockEval.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDeleteResponse
err := msgpackrpc.CallWithCodec(codec, structs.EvalDeleteRPCMethod, get, &resp)
require.Contains(t, err.Error(), "eval broker is enabled")
},
name: "unsuccessful delete broker enabled",
},
{
testFn: func() {
testServer, testServerCleanup := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer testServerCleanup()
codec := rpcClient(t, testServer)
testutil.WaitForLeader(t, testServer.RPC)
// Pause the eval broker and update the scheduler config.
testServer.evalBroker.SetEnabled(false)
_, schedulerConfig, err := testServer.fsm.State().SchedulerConfig()
require.NoError(t, err)
require.NotNil(t, schedulerConfig)
schedulerConfig.PauseEvalBroker = true
require.NoError(t, testServer.fsm.State().SchedulerSetConfig(10, schedulerConfig))
// Create and upsert an evaluation.
mockEval := mock.Eval()
require.NoError(t, testServer.fsm.State().UpsertEvals(
structs.MsgTypeTestSetup, 10, []*structs.Evaluation{mockEval}))
// Attempt to delete the eval, which should succeed as the eval
// broker is disabled.
get := &structs.EvalDeleteRequest{
EvalIDs: []string{mockEval.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDeleteResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, structs.EvalDeleteRPCMethod, get, &resp))
// Attempt to read the eval from state; this should not be found.
ws := memdb.NewWatchSet()
respEval, err := testServer.fsm.State().EvalByID(ws, mockEval.ID)
require.Nil(t, err)
require.Nil(t, respEval)
},
name: "successful delete without ACLs",
},
{
testFn: func() {
testServer, rootToken, testServerCleanup := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer testServerCleanup()
codec := rpcClient(t, testServer)
testutil.WaitForLeader(t, testServer.RPC)
// Pause the eval broker and update the scheduler config.
testServer.evalBroker.SetEnabled(false)
_, schedulerConfig, err := testServer.fsm.State().SchedulerConfig()
require.NoError(t, err)
require.NotNil(t, schedulerConfig)
schedulerConfig.PauseEvalBroker = true
require.NoError(t, testServer.fsm.State().SchedulerSetConfig(10, schedulerConfig))
// Create and upsert an evaluation.
mockEval := mock.Eval()
require.NoError(t, testServer.fsm.State().UpsertEvals(
structs.MsgTypeTestSetup, 20, []*structs.Evaluation{mockEval}))
// Attempt to delete the eval, which should succeed as the eval
// broker is disabled, and we are using a management token.
get := &structs.EvalDeleteRequest{
EvalIDs: []string{mockEval.ID},
WriteRequest: structs.WriteRequest{
AuthToken: rootToken.SecretID,
Region: "global",
},
}
var resp structs.EvalDeleteResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, structs.EvalDeleteRPCMethod, get, &resp))
// Attempt to read the eval from state; this should not be found.
ws := memdb.NewWatchSet()
respEval, err := testServer.fsm.State().EvalByID(ws, mockEval.ID)
require.Nil(t, err)
require.Nil(t, respEval)
},
name: "successful delete with ACLs",
},
{
testFn: func() {
testServer, _, testServerCleanup := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer testServerCleanup()
codec := rpcClient(t, testServer)
testutil.WaitForLeader(t, testServer.RPC)
// Pause the eval broker.
testServer.evalBroker.SetEnabled(false)
// Create and upsert an evaluation.
mockEval := mock.Eval()
require.NoError(t, testServer.fsm.State().UpsertEvals(
structs.MsgTypeTestSetup, 10, []*structs.Evaluation{mockEval}))
nonMgntToken := mock.CreatePolicyAndToken(t, testServer.State(), 20, "test-valid",
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob}))
// Attempt to delete the eval, which should not succeed as we
// are using a non-management token.
get := &structs.EvalDeleteRequest{
EvalIDs: []string{mockEval.ID},
WriteRequest: structs.WriteRequest{
AuthToken: nonMgntToken.SecretID,
Region: "global",
},
}
var resp structs.EvalDeleteResponse
err := msgpackrpc.CallWithCodec(codec, structs.EvalDeleteRPCMethod, get, &resp)
require.Contains(t, err.Error(), structs.ErrPermissionDenied.Error())
},
name: "unsuccessful delete with ACLs incorrect token permissions",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tc.testFn()
})
}
}
func Test_evalDeleteSafe(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
inputAllocs []*structs.Allocation
inputJob *structs.Job
expectedResult bool
name string
}{
{
inputAllocs: nil,
inputJob: nil,
expectedResult: true,
name: "job not in state",
},
{
inputAllocs: nil,
inputJob: &structs.Job{Status: structs.JobStatusDead},
expectedResult: true,
name: "job stopped",
},
{
inputAllocs: nil,
inputJob: &structs.Job{Stop: true},
expectedResult: true,
name: "job dead",
},
{
inputAllocs: []*structs.Allocation{},
inputJob: &structs.Job{Status: structs.JobStatusRunning},
expectedResult: true,
name: "no allocs for eval",
},
{
inputAllocs: []*structs.Allocation{
{ClientStatus: structs.AllocClientStatusComplete},
{ClientStatus: structs.AllocClientStatusRunning},
},
inputJob: &structs.Job{Status: structs.JobStatusRunning},
expectedResult: false,
name: "running alloc for eval",
},
{
inputAllocs: []*structs.Allocation{
{ClientStatus: structs.AllocClientStatusComplete},
{ClientStatus: structs.AllocClientStatusUnknown},
},
inputJob: &structs.Job{Status: structs.JobStatusRunning},
expectedResult: false,
name: "unknown alloc for eval",
},
{
inputAllocs: []*structs.Allocation{
{ClientStatus: structs.AllocClientStatusComplete},
{ClientStatus: structs.AllocClientStatusLost},
},
inputJob: &structs.Job{Status: structs.JobStatusRunning},
expectedResult: true,
name: "complete and lost allocs for eval",
},
{
inputAllocs: []*structs.Allocation{
{
ClientStatus: structs.AllocClientStatusFailed,
TaskGroup: "test",
},
},
inputJob: &structs.Job{
Status: structs.JobStatusPending,
TaskGroups: []*structs.TaskGroup{
{
Name: "test",
ReschedulePolicy: nil,
},
},
},
expectedResult: true,
name: "failed alloc job without reschedule",
},
{
inputAllocs: []*structs.Allocation{
{
ClientStatus: structs.AllocClientStatusFailed,
TaskGroup: "test",
},
},
inputJob: &structs.Job{
Status: structs.JobStatusPending,
TaskGroups: []*structs.TaskGroup{
{
Name: "test",
ReschedulePolicy: &structs.ReschedulePolicy{
Unlimited: false,
Attempts: 0,
},
},
},
},
expectedResult: true,
name: "failed alloc job reschedule disabled",
},
{
inputAllocs: []*structs.Allocation{
{
ClientStatus: structs.AllocClientStatusFailed,
TaskGroup: "test",
},
},
inputJob: &structs.Job{
Status: structs.JobStatusPending,
TaskGroups: []*structs.TaskGroup{
{
Name: "test",
ReschedulePolicy: &structs.ReschedulePolicy{
Unlimited: false,
Attempts: 3,
},
},
},
},
expectedResult: false,
name: "failed alloc next alloc not set",
},
{
inputAllocs: []*structs.Allocation{
{
ClientStatus: structs.AllocClientStatusFailed,
TaskGroup: "test",
NextAllocation: "4aa4930a-8749-c95b-9c67-5ef29b0fc653",
},
},
inputJob: &structs.Job{
Status: structs.JobStatusPending,
TaskGroups: []*structs.TaskGroup{
{
Name: "test",
ReschedulePolicy: &structs.ReschedulePolicy{
Unlimited: false,
Attempts: 3,
},
},
},
},
expectedResult: false,
name: "failed alloc next alloc set",
},
{
inputAllocs: []*structs.Allocation{
{
ClientStatus: structs.AllocClientStatusFailed,
TaskGroup: "test",
},
},
inputJob: &structs.Job{
Status: structs.JobStatusPending,
TaskGroups: []*structs.TaskGroup{
{
Name: "test",
ReschedulePolicy: &structs.ReschedulePolicy{
Unlimited: true,
},
},
},
},
expectedResult: false,
name: "failed alloc job reschedule unlimited",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualResult := evalDeleteSafe(tc.inputAllocs, tc.inputJob)
require.Equal(t, tc.expectedResult, actualResult)
})
}
}
func TestEvalEndpoint_List(t *testing.T) { func TestEvalEndpoint_List(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
@ -1002,7 +1347,7 @@ func TestEvalEndpoint_List_Blocking(t *testing.T) {
// Eval deletion triggers watches // Eval deletion triggers watches
time.AfterFunc(100*time.Millisecond, func() { time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteEval(3, []string{eval.ID}, nil); err != nil { if err := state.DeleteEval(3, []string{eval.ID}, nil, false); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
}) })

View File

@ -782,12 +782,12 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) {
func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} { func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "delete_eval"}, time.Now()) defer metrics.MeasureSince([]string{"nomad", "fsm", "delete_eval"}, time.Now())
var req structs.EvalDeleteRequest var req structs.EvalReapRequest
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
if err := n.state.DeleteEval(index, req.Evals, req.Allocs); err != nil { if err := n.state.DeleteEval(index, req.Evals, req.Allocs, req.UserInitiated); err != nil {
n.logger.Error("DeleteEval failed", "error", err) n.logger.Error("DeleteEval failed", "error", err)
return err return err
} }

View File

@ -1160,7 +1160,7 @@ func TestFSM_DeleteEval(t *testing.T) {
t.Fatalf("resp: %v", resp) t.Fatalf("resp: %v", resp)
} }
req2 := structs.EvalDeleteRequest{ req2 := structs.EvalReapRequest{
Evals: []string{eval.ID}, Evals: []string{eval.ID},
} }
buf, err = structs.Encode(structs.EvalDeleteRequestType, req2) buf, err = structs.Encode(structs.EvalDeleteRequestType, req2)

View File

@ -2265,7 +2265,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) {
// Delete an allocation // Delete an allocation
time.AfterFunc(100*time.Millisecond, func() { time.AfterFunc(100*time.Millisecond, func() {
assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID})) assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID}, false))
}) })
req.QueryOptions.MinQueryIndex = 150 req.QueryOptions.MinQueryIndex = 150

View File

@ -1141,7 +1141,7 @@ func TestRPC_TLS_Enforcement_RPC(t *testing.T) {
"Eval.Reblock": &structs.EvalUpdateRequest{ "Eval.Reblock": &structs.EvalUpdateRequest{
WriteRequest: structs.WriteRequest{Region: "global"}, WriteRequest: structs.WriteRequest{Region: "global"},
}, },
"Eval.Reap": &structs.EvalDeleteRequest{ "Eval.Reap": &structs.EvalReapRequest{
WriteRequest: structs.WriteRequest{Region: "global"}, WriteRequest: structs.WriteRequest{Region: "global"},
}, },
"Plan.Submit": &structs.PlanRequest{ "Plan.Submit": &structs.PlanRequest{

View File

@ -2,6 +2,7 @@ package state
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"reflect" "reflect"
"sort" "sort"
@ -3107,10 +3108,22 @@ func (s *StateStore) updateEvalModifyIndex(txn *txn, index uint64, evalID string
} }
// DeleteEval is used to delete an evaluation // DeleteEval is used to delete an evaluation
func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error { func (s *StateStore) DeleteEval(index uint64, evals, allocs []string, userInitiated bool) error {
txn := s.db.WriteTxn(index) txn := s.db.WriteTxn(index)
defer txn.Abort() defer txn.Abort()
// If this deletion has been initiated by an operator, ensure the eval
// broker is paused.
if userInitiated {
_, schedConfig, err := s.schedulerConfigTxn(txn)
if err != nil {
return err
}
if schedConfig == nil || !schedConfig.PauseEvalBroker {
return errors.New("eval broker is enabled; eval broker must be paused to delete evals")
}
}
jobs := make(map[structs.NamespacedID]string, len(evals)) jobs := make(map[structs.NamespacedID]string, len(evals))
// evalsTableUpdated and allocsTableUpdated allow us to track whether each // evalsTableUpdated and allocsTableUpdated allow us to track whether each
@ -5890,9 +5903,13 @@ func expiredOneTimeTokenFilter(now time.Time) func(interface{}) bool {
func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) { func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) {
tx := s.db.ReadTxn() tx := s.db.ReadTxn()
defer tx.Abort() defer tx.Abort()
return s.schedulerConfigTxn(tx)
}
func (s *StateStore) schedulerConfigTxn(txn *txn) (uint64, *structs.SchedulerConfiguration, error) {
// Get the scheduler config // Get the scheduler config
c, err := tx.First("scheduler_config", "id") c, err := txn.First("scheduler_config", "id")
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err) return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err)
} }

View File

@ -4233,7 +4233,7 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
err = state.DeleteEval(1002, []string{eval1.ID, eval2.ID}, []string{alloc1.ID, alloc2.ID}) err = state.DeleteEval(1002, []string{eval1.ID, eval2.ID}, []string{alloc1.ID, alloc2.ID}, false)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -4304,7 +4304,7 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
// Call the eval delete function with zero length eval and alloc ID arrays. // Call the eval delete function with zero length eval and alloc ID arrays.
// This should result in the table indexes both staying the same, rather // This should result in the table indexes both staying the same, rather
// than updating without cause. // than updating without cause.
require.NoError(t, state.DeleteEval(1010, []string{}, []string{})) require.NoError(t, state.DeleteEval(1010, []string{}, []string{}, false))
allocsIndex, err := state.Index("allocs") allocsIndex, err := state.Index("allocs")
require.NoError(t, err) require.NoError(t, err)
@ -4354,7 +4354,7 @@ func TestStateStore_DeleteEval_ChildJob(t *testing.T) {
t.Fatalf("bad: %v", err) t.Fatalf("bad: %v", err)
} }
err = state.DeleteEval(1002, []string{eval1.ID}, []string{alloc1.ID}) err = state.DeleteEval(1002, []string{eval1.ID}, []string{alloc1.ID}, false)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -4386,6 +4386,45 @@ func TestStateStore_DeleteEval_ChildJob(t *testing.T) {
} }
} }
func TestStateStore_DeleteEval_UserInitiated(t *testing.T) {
ci.Parallel(t)
testState := testStateStore(t)
// Upsert a scheduler config object, so we have something to check and
// modify.
schedulerConfig := structs.SchedulerConfiguration{PauseEvalBroker: false}
require.NoError(t, testState.SchedulerSetConfig(10, &schedulerConfig))
// Generate some mock evals and upsert these into state.
mockEval1 := mock.Eval()
mockEval2 := mock.Eval()
require.NoError(t, testState.UpsertEvals(
structs.MsgTypeTestSetup, 20, []*structs.Evaluation{mockEval1, mockEval2}))
mockEvalIDs := []string{mockEval1.ID, mockEval2.ID}
// Try and delete the evals without pausing the eval broker.
err := testState.DeleteEval(30, mockEvalIDs, []string{}, true)
require.ErrorContains(t, err, "eval broker is enabled")
// Pause the eval broker on the scheduler config, and try deleting the
// evals again.
schedulerConfig.PauseEvalBroker = true
require.NoError(t, testState.SchedulerSetConfig(30, &schedulerConfig))
require.NoError(t, testState.DeleteEval(40, mockEvalIDs, []string{}, true))
ws := memdb.NewWatchSet()
mockEval1Lookup, err := testState.EvalByID(ws, mockEval1.ID)
require.NoError(t, err)
require.Nil(t, mockEval1Lookup)
mockEval2Lookup, err := testState.EvalByID(ws, mockEval1.ID)
require.NoError(t, err)
require.Nil(t, mockEval2Lookup)
}
func TestStateStore_EvalsByJob(t *testing.T) { func TestStateStore_EvalsByJob(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)

24
nomad/structs/eval.go Normal file
View File

@ -0,0 +1,24 @@
package structs
const (
// EvalDeleteRPCMethod is the RPC method for batch deleting evaluations
// using their IDs.
//
// Args: EvalDeleteRequest
// Reply: EvalDeleteResponse
EvalDeleteRPCMethod = "Eval.Delete"
)
// EvalDeleteRequest is the request object used when operators are manually
// deleting evaluations. The number of evaluation IDs within the request must
// not be greater than MaxEvalIDsPerDeleteRequest.
type EvalDeleteRequest struct {
EvalIDs []string
WriteRequest
}
// EvalDeleteResponse is the response object when one or more evaluation are
// deleted manually by an operator.
type EvalDeleteResponse struct {
WriteMeta
}

View File

@ -822,10 +822,20 @@ type EvalUpdateRequest struct {
WriteRequest WriteRequest
} }
// EvalDeleteRequest is used for deleting an evaluation. // EvalReapRequest is used for reaping evaluations and allocation. This struct
type EvalDeleteRequest struct { // is used by the Eval.Reap RPC endpoint as a request argument, and also when
// performing eval reap or deletes via Raft. This is because Eval.Reap and
// Eval.Delete use the same Raft message when performing deletes so we do not
// need more Raft message types.
type EvalReapRequest struct {
Evals []string Evals []string
Allocs []string Allocs []string
// UserInitiated tracks whether this reap request is the result of an
// operator request. If this is true, the FSM needs to ensure the eval
// broker is paused as the request can include non-terminal allocations.
UserInitiated bool
WriteRequest WriteRequest
} }

7
nomad/structs/uuid.go Normal file
View File

@ -0,0 +1,7 @@
package structs
// MaxUUIDsPerWriteRequest is the maximum number of UUIDs that can be included
// within a single write request. This is to ensure that the Raft message does
// not become too large. The resulting value corresponds to 0.25MB of IDs or
// 7282 UUID strings.
var MaxUUIDsPerWriteRequest = (1024 * 256) / 36

View File

@ -108,7 +108,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
// allocation is now invalid // allocation is now invalid
index++ index++
err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID}) err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID}, false)
require.NoError(t, err) require.NoError(t, err)
// emit a GC so that we have a volume change that's dropped // emit a GC so that we have a volume change that's dropped

View File

@ -204,6 +204,55 @@ $ curl \
} }
``` ```
## Delete Evaluations
This endpoint deletes evaluations. In order to utilise this endpoint the
eval broker should be paused via the
[update_scheduler_configuration][operator scheduler update configuration] API
endpoint.
This API endpoint should be used cautiously and only in outage situations where
there is a large backlog of evaluations not being processed. During most normal
and outage scenarios, Nomad's reconciliation and state management will handle
evaluations as needed.
| Method | Path | Produces |
| --------- | ----------------- | ------------------ |
| `DELETE` | `/v1/evaluations` | `application/json` |
The table below shows this endpoint's support for
[blocking queries](/api-docs#blocking-queries) and
[required ACLs](/api-docs#acls).
| Blocking Queries | ACL Required |
| ---------------- | ------------ |
| `NO` | `management` |
### Parameters
- `EvalIDs` `(array<string>: <required>)`- An array of evaluation UUIDs to
delete. This must be a full length UUID and not a prefix.
### Sample Payload
```javascript
{
"EvalIDs": [
"167ec27d-2e36-979a-280a-a6b920d382db",
"6c193955-ac66-42e2-f4c7-f1fc707f1f5e"
]
}
```
### Sample Request
```shell-session
$ curl \
--request DELETE \
--data @payload.json \
https://localhost:4646/v1/evaluations
```
## List Allocations for Evaluation ## List Allocations for Evaluation
This endpoint lists the allocations created or modified for the given This endpoint lists the allocations created or modified for the given
@ -332,3 +381,5 @@ $ curl \
} }
] ]
``` ```
[update_scheduler_configuration]: api-docs/operator/scheduler#update-scheduler-configuration

View File

@ -0,0 +1,75 @@
---
layout: docs
page_title: 'Commands: eval delete'
description: |
The eval delete command is used to delete evaluations.
---
# Command: eval delete
The `eval delete` command is used to delete evaluations. It should be used
cautiously and only in outage situations where there is a large backlog of
evaluations not being processed. During most normal and outage scenarios,
Nomad's reconciliation and state management will handle evaluations as needed.
The eval broker is expected to be paused prior to running this command and
un-paused after. These actions can be performed by the
[`operator scheduler get-config`][scheduler_get_config]
and [`operator scheduler set-config`][scheduler_set_config] commands respectively.
## Usage
```plaintext
nomad eval delete [options] [args]
```
It takes an optional argument which is the ID of the evaluation to delete. If
the evaluation ID is omitted, this command will use the filter flag to identify
and delete a set of evaluations.
When ACLs are enabled, this command requires a `management` token.
## General Options
@include 'general_options.mdx'
## Delete Options
- `-filter`: Specifies an expression used to filter evaluations by for
deletion.
- `-yes`: Bypass the confirmation prompt if an evaluation ID was not provided.
## Examples
Delete an evaluation using its ID:
```shell-session
$ nomad eval delete 9ecffbba-73be-d909-5d7e-ac2694c10e0c
Successfuly deleted 1 evaluation
```
Delete all evaluations with status `pending` for the `example` job:
```shell-session
$ nomad eval delete -filter='Stauts == "pending" and JobID == "example"'
Do you want to list evals (3) before deletion? [y/N] y
ID Priority Triggered By Job ID Status Placement Failures
cef92121 50 job-register example pending false
1c905ca0 50 job-register example pending false
b9e77692 50 job-register example pending false
Are you sure you want to delete 3 evals? [y/N] y
Successfuly deleted 3 evaluations
```
Delete all evaluations for the `system` and `service` whilst skipping all
prompts:
```shell-session
$ nomad eval delete -filter='Scheduler == "system" or Scheduler == "service"' -yes
Successfuly deleted 23 evaluations
```
[scheduler_get_config]: /docs/commands/operator/scheduler-get-config
[scheduler_set_config]: /docs/commands/operator/scheduler-set-config

View File

@ -15,9 +15,10 @@ Usage: `nomad eval <subcommand> [options]`
Run `nomad eval <subcommand> -h` for help on that subcommand. The following Run `nomad eval <subcommand> -h` for help on that subcommand. The following
subcommands are available: subcommands are available:
- [`eval delete`][delete] - Delete evals
- [`eval list`][list] - List all evals - [`eval list`][list] - List all evals
- [`eval status`][status] - Display the status of a eval - [`eval status`][status] - Display the status of a eval
[delete]: /docs/commands/eval/delete 'Delete evals'
[list]: /docs/commands/eval/list 'List all evals' [list]: /docs/commands/eval/list 'List all evals'
[status]: /docs/commands/eval/status 'Display the status of a eval' [status]: /docs/commands/eval/status 'Display the status of a eval'

View File

@ -372,6 +372,10 @@
"title": "Overview", "title": "Overview",
"path": "commands/eval" "path": "commands/eval"
}, },
{
"title": "delete",
"path": "commands/eval/delete"
},
{ {
"title": "list", "title": "list",
"path": "commands/eval/list" "path": "commands/eval/list"