allocs: Add nomad alloc stop

This adds a `nomad alloc stop` command that can be used to stop and
force migrate an allocation to a different node.

This is built on top of the AllocUpdateDesiredTransitionRequest and
explicitly limits the scope of access to that transition to expose it
under the alloc-lifecycle ACL.

The API returns the follow up eval that can be used as part of
monitoring in the CLI or parsed and used in an external tool.
This commit is contained in:
Danielle Lancashire 2019-04-01 16:21:03 +02:00
parent 8b601fc515
commit 832f607433
11 changed files with 479 additions and 3 deletions

View File

@ -89,6 +89,20 @@ func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOption
return err
}
func (a *Allocations) Stop(alloc *Allocation, q *QueryOptions) (*AllocStopResponse, error) {
var resp AllocStopResponse
_, err := a.client.putQuery("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, q)
return &resp, err
}
// AllocStopResponse is the response to an `AllocStopRequest`
type AllocStopResponse struct {
// EvalID is the id of the follow up evalution for the rescheduled alloc.
EvalID string
WriteMeta
}
// Allocation is used for serialization of allocations.
type Allocation struct {
ID string

View File

@ -42,7 +42,29 @@ func (s *HTTPServer) AllocsRequest(resp http.ResponseWriter, req *http.Request)
}
func (s *HTTPServer) AllocSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
allocID := strings.TrimPrefix(req.URL.Path, "/v1/allocation/")
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/allocation/")
// tokenize the suffix of the path to get the alloc id and find the action
// invoked on the alloc id
tokens := strings.Split(reqSuffix, "/")
if len(tokens) > 2 || len(tokens) < 1 {
return nil, CodedError(404, resourceNotFoundErr)
}
allocID := tokens[0]
if len(tokens) == 1 {
return s.allocGet(allocID, resp, req)
}
switch tokens[1] {
case "stop":
return s.allocStop(allocID, resp, req)
}
return nil, CodedError(404, resourceNotFoundErr)
}
func (s *HTTPServer) allocGet(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
}
@ -79,8 +101,22 @@ func (s *HTTPServer) AllocSpecificRequest(resp http.ResponseWriter, req *http.Re
return alloc, nil
}
func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if !(req.Method == "POST" || req.Method == "PUT") {
return nil, CodedError(405, ErrInvalidMethod)
}
sr := &structs.AllocStopRequest{
AllocID: allocID,
}
s.parseWriteRequest(req, &sr.WriteRequest)
var out structs.AllocStopResponse
err := s.agent.RPC("Alloc.Stop", &sr, &out)
return &out, err
}
func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/client/allocation/")
// tokenize the suffix of the path to get the alloc id and find the action

View File

@ -394,6 +394,49 @@ func TestHTTP_AllocRestart_ACL(t *testing.T) {
})
}
func TestHTTP_AllocStop(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
// Directly manipulate the state
state := s.Agent.server.State()
alloc := mock.Alloc()
require := require.New(t)
require.NoError(state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)))
require.NoError(state.UpsertAllocs(1000, []*structs.Allocation{alloc}))
{
// Make the HTTP request
req, err := http.NewRequest("POST", "/v1/allocation/"+alloc.ID+"/stop", nil)
require.NoError(err)
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.AllocSpecificRequest(respW, req)
require.NoError(err)
a := obj.(*structs.AllocStopResponse)
require.NotEmpty(a.EvalID, "missing eval")
require.NotEmpty(a.Index, "missing index")
}
{
// Make the HTTP request
req, err := http.NewRequest("POST", "/v1/allocation/"+alloc.ID+"/stop", nil)
require.NoError(err)
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.AllocSpecificRequest(respW, req)
require.NoError(err)
a := obj.(*structs.AllocStopResponse)
require.NotEmpty(a.EvalID, "missing eval")
require.NotEmpty(a.Index, "missing index")
}
})
}
func TestHTTP_AllocStats(t *testing.T) {
t.Parallel()
require := require.New(t)

128
command/alloc_stop.go Normal file
View File

@ -0,0 +1,128 @@
package command
import (
"fmt"
"strings"
)
type AllocStopCommand struct {
Meta
}
func (a *AllocStopCommand) Help() string {
helpText := `
Usage: nomad alloc stop [options] <allocation>
Alias: nomad stop
stop an existing allocation. This command is used to signal a specific alloc
to shut down. When the allocation has been shut down, it will then be
rescheduled. An interactive monitoring session will display log lines as the
allocation completes shutting down. It is safe to exit the monitor early with
ctrl-c.
General Options:
` + generalOptionsUsage() + `
Stop Specific Options:
-detach
Return immediately instead of entering monitor mode. After the
stop command is submitted, a new evaluation ID is printed to the
screen, which can be used to examine the rescheduling evaluation using the
eval-status command.
-verbose
Show full information.
`
return strings.TrimSpace(helpText)
}
func (c *AllocStopCommand) Name() string { return "alloc stop" }
func (c *AllocStopCommand) Run(args []string) int {
var detach, verbose bool
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
if err := flags.Parse(args); err != nil {
return 1
}
// Check that we got exactly one alloc
args = flags.Args()
if len(args) != 1 {
c.Ui.Error("This command takes one argument: <alloc-id>")
c.Ui.Error(commandErrorText(c))
return 1
}
allocID := args[0]
// Truncate the id unless full length is requested
length := shortId
if verbose {
length = fullId
}
// Query the allocation info
if len(allocID) == 1 {
c.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters."))
return 1
}
allocID = sanitizeUUIDPrefix(allocID)
// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
allocs, _, err := client.Allocations().PrefixList(allocID)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err))
return 1
}
if len(allocs) == 0 {
c.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID))
return 1
}
if len(allocs) > 1 {
// Format the allocs
out := formatAllocListStubs(allocs, verbose, length)
c.Ui.Error(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", out))
return 1
}
// Prefix lookup matched a single allocation
alloc, _, err := client.Allocations().Info(allocs[0].ID, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err))
return 1
}
resp, err := client.Allocations().Stop(alloc, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error stopping allocation: %s", err))
return 1
}
if detach {
c.Ui.Output(resp.EvalID)
return 0
}
mon := newMonitor(c.Ui, client, length)
return mon.monitor(resp.EvalID, false)
}
func (a *AllocStopCommand) Synopsis() string {
return "Stop and reschedule a running allocation"
}

112
command/alloc_stop_test.go Normal file
View File

@ -0,0 +1,112 @@
package command
import (
"fmt"
"testing"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestAllocStopCommand_Implements(t *testing.T) {
t.Parallel()
var _ cli.Command = &AllocStopCommand{}
}
func TestAllocStop_Fails(t *testing.T) {
srv, _, url := testServer(t, false, nil)
defer srv.Shutdown()
require := require.New(t)
ui := new(cli.MockUi)
cmd := &AllocStopCommand{Meta: Meta{Ui: ui}}
// Fails on misuse
require.Equal(cmd.Run([]string{"some", "garbage", "args"}), 1, "Expected failure")
require.Contains(ui.ErrorWriter.String(), commandErrorText(cmd), "Expected help output")
ui.ErrorWriter.Reset()
// Fails on connection failure
require.Equal(cmd.Run([]string{"-address=nope", "foobar"}), 1, "expected failure")
require.Contains(ui.ErrorWriter.String(), "Error querying allocation")
ui.ErrorWriter.Reset()
// Fails on missing alloc
require.Equal(cmd.Run([]string{"-address=" + url, "26470238-5CF2-438F-8772-DC67CFB0705C"}), 1)
require.Contains(ui.ErrorWriter.String(), "No allocation(s) with prefix or id")
ui.ErrorWriter.Reset()
// Fail on identifier with too few characters
require.Equal(cmd.Run([]string{"-address=" + url, "2"}), 1)
require.Contains(ui.ErrorWriter.String(), "must contain at least two characters")
ui.ErrorWriter.Reset()
// Identifiers with uneven length should produce a query result
require.Equal(cmd.Run([]string{"-address=" + url, "123"}), 1)
require.Contains(ui.ErrorWriter.String(), "No allocation(s) with prefix or id")
ui.ErrorWriter.Reset()
}
func TestAllocStop_Run(t *testing.T) {
srv, client, url := testServer(t, true, nil)
defer srv.Shutdown()
require := require.New(t)
// Wait for a node to be ready
testutil.WaitForResult(func() (bool, error) {
nodes, _, err := client.Nodes().List(nil)
if err != nil {
return false, err
}
for _, node := range nodes {
if _, ok := node.Drivers["mock_driver"]; ok &&
node.Status == structs.NodeStatusReady {
return true, nil
}
}
return false, fmt.Errorf("no ready nodes")
}, func(err error) {
t.Fatalf("err: %v", err)
})
ui := new(cli.MockUi)
cmd := &AllocStopCommand{Meta: Meta{Ui: ui}}
jobID := "job1_sfx"
job1 := testJob(jobID)
resp, _, err := client.Jobs().Register(job1, nil)
require.NoError(err)
if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 {
t.Fatalf("status code non zero saw %d", code)
}
// get an alloc id
allocId1 := ""
if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil {
if len(allocs) > 0 {
allocId1 = allocs[0].ID
}
}
require.NotEmpty(allocId1, "unable to find allocation")
// Wait for alloc to be running
testutil.WaitForResult(func() (bool, error) {
alloc, _, err := client.Allocations().Info(allocId1, nil)
if err != nil {
return false, err
}
if alloc.ClientStatus == api.AllocClientStatusRunning {
return true, nil
}
return false, fmt.Errorf("alloc is not running, is: %s", alloc.ClientStatus)
}, func(err error) {
t.Fatalf("err: %v", err)
})
require.Equal(cmd.Run([]string{"-address=" + url, allocId1}), 0, "expected successful exit code")
ui.OutputWriter.Reset()
}

View File

@ -140,6 +140,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"alloc stop": func() (cli.Command, error) {
return &AllocStopCommand{
Meta: meta,
}, nil
},
"alloc fs": func() (cli.Command, error) {
return &AllocFSCommand{
Meta: meta,

View File

@ -10,6 +10,8 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -205,6 +207,63 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
return a.srv.blockingRPC(&opts)
}
// Stop is used to stop an allocation and migrate it to another node.
func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopResponse) error {
if done, err := a.srv.forward("Alloc.Stop", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "stop"}, time.Now())
// Check that it is a management token.
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) {
return structs.ErrPermissionDenied
}
if args.AllocID == "" {
return fmt.Errorf("must provide an alloc id")
}
ws := memdb.NewWatchSet()
alloc, err := a.srv.State().AllocByID(ws, args.AllocID)
if err != nil {
return err
}
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: alloc.Job.ID,
JobModifyIndex: alloc.Job.ModifyIndex,
Status: structs.EvalStatusPending,
}
transitionReq := &structs.AllocUpdateDesiredTransitionRequest{
Evals: []*structs.Evaluation{eval},
Allocs: map[string]*structs.DesiredTransition{
args.AllocID: {
Migrate: helper.BoolToPtr(true),
},
},
}
// Commit this update via Raft
_, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, transitionReq)
if err != nil {
a.logger.Error("AllocUpdateDesiredTransitionRequest failed", "error", err)
return err
}
// Setup the response
reply.Index = index
reply.EvalID = eval.ID
return nil
}
// UpdateDesiredTransition is used to update the desired transitions of an
// allocation.
func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransitionRequest, reply *structs.GenericResponse) error {

View File

@ -567,3 +567,65 @@ func TestAllocEndpoint_UpdateDesiredTransition(t *testing.T) {
require.True(*out1.DesiredTransition.Migrate)
require.True(*out2.DesiredTransition.Migrate)
}
func TestAllocEndpoint_Stop_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
s1, _ := TestACLServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
alloc := mock.Alloc()
alloc2 := mock.Alloc()
state := s1.fsm.State()
require.Nil(state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)))
require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)))
require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2}))
req := &structs.AllocStopRequest{
AllocID: alloc.ID,
}
req.Namespace = structs.DefaultNamespace
req.Region = alloc.Job.Region
// Try without permissions
var resp structs.AllocStopResponse
err := msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp)
require.True(structs.IsErrPermissionDenied(err), "expected permissions error, got: %v", err)
// Try with management permissions
req.WriteRequest.AuthToken = s1.getLeaderAcl()
var resp2 structs.AllocStopResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp2))
require.NotZero(resp2.Index)
// Try with alloc-lifecycle permissions
validToken := mock.CreatePolicyAndToken(t, state, 1002, "valid",
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle}))
req.WriteRequest.AuthToken = validToken.SecretID
req.AllocID = alloc2.ID
var resp3 structs.AllocStopResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp3))
require.NotZero(resp3.Index)
// Look up the allocations
out1, err := state.AllocByID(nil, alloc.ID)
require.Nil(err)
out2, err := state.AllocByID(nil, alloc2.ID)
require.Nil(err)
e1, err := state.EvalByID(nil, resp2.EvalID)
require.Nil(err)
e2, err := state.EvalByID(nil, resp3.EvalID)
require.Nil(err)
require.NotNil(out1.DesiredTransition.Migrate)
require.NotNil(out2.DesiredTransition.Migrate)
require.NotNil(e1)
require.NotNil(e2)
require.True(*out1.DesiredTransition.Migrate)
require.True(*out2.DesiredTransition.Migrate)
}

View File

@ -700,6 +700,21 @@ type AllocUpdateDesiredTransitionRequest struct {
WriteRequest
}
// AllocStopRequest is used to stop and reschedule a running Allocation.
type AllocStopRequest struct {
AllocID string
WriteRequest
}
// AllocStopResponse is the response to an `AllocStopRequest`
type AllocStopResponse struct {
// EvalID is the id of the follow up evalution for the rescheduled alloc.
EvalID string
WriteMeta
}
// AllocListRequest is used to request a list of allocations
type AllocListRequest struct {
QueryOptions
@ -7974,6 +7989,7 @@ const (
EvalTriggerPeriodicJob = "periodic-job"
EvalTriggerNodeDrain = "node-drain"
EvalTriggerNodeUpdate = "node-update"
EvalTriggerAllocStop = "alloc-stop"
EvalTriggerScheduled = "scheduled"
EvalTriggerRollingUpdate = "rolling-update"
EvalTriggerDeploymentWatcher = "deployment-watcher"

View File

@ -127,6 +127,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
switch eval.TriggeredBy {
case structs.EvalTriggerJobRegister, structs.EvalTriggerJobDeregister,
structs.EvalTriggerNodeDrain, structs.EvalTriggerNodeUpdate,
structs.EvalTriggerAllocStop,
structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs,
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc,

View File

@ -61,7 +61,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
switch eval.TriggeredBy {
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerFailedFollowUp,
structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain:
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain, structs.EvalTriggerAllocStop:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)