Merge pull request #8187 from hashicorp/f-8143-block-scaling-during-deployment
modify Job.Scale RPC to return an error if there is an active deployment
This commit is contained in:
commit
fe9d654640
|
@ -8,6 +8,7 @@ FEATURES:
|
|||
IMPROVEMENTS:
|
||||
|
||||
* core: support for persisting previous task group counts when updating a job [[GH-8168](https://github.com/hashicorp/nomad/issues/8168)]
|
||||
* core: block Job.Scale actions when the job is under active deployment [[GH-8187](https://github.com/hashicorp/nomad/issues/8187)]
|
||||
* api: Persist previous count with scaling events [[GH-8167](https://github.com/hashicorp/nomad/issues/8167)]
|
||||
* build: Updated to Go 1.14.4 [[GH-8172](https://github.com/hashicorp/nomad/issues/9172)]
|
||||
|
||||
|
|
|
@ -931,6 +931,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
|
|||
ws := memdb.NewWatchSet()
|
||||
job, err := snap.JobByID(ws, namespace, args.JobID)
|
||||
if err != nil {
|
||||
j.logger.Error("unable to lookup job", "error", err)
|
||||
return err
|
||||
}
|
||||
if job == nil {
|
||||
|
@ -955,11 +956,46 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
|
|||
// for now, we'll do this even if count didn't change
|
||||
prevCount := found.Count
|
||||
if args.Count != nil {
|
||||
|
||||
// Lookup the latest deployment, to see whether this scaling event should be blocked
|
||||
d, err := snap.LatestDeploymentByJobID(ws, namespace, args.JobID)
|
||||
if err != nil {
|
||||
j.logger.Error("unable to lookup latest deployment", "error", err)
|
||||
return err
|
||||
}
|
||||
// explicitly filter deployment by JobCreateIndex to be safe, because LatestDeploymentByJobID doesn't
|
||||
if d != nil && d.JobCreateIndex == job.CreateIndex && d.Active() {
|
||||
// attempt to register the scaling event
|
||||
JobScalingBlockedByActiveDeployment := "job scaling blocked due to active deployment"
|
||||
event := &structs.ScalingEventRequest{
|
||||
Namespace: job.Namespace,
|
||||
JobID: job.ID,
|
||||
TaskGroup: groupName,
|
||||
ScalingEvent: &structs.ScalingEvent{
|
||||
Time: now,
|
||||
PreviousCount: int64(prevCount),
|
||||
Message: JobScalingBlockedByActiveDeployment,
|
||||
Error: true,
|
||||
Meta: map[string]interface{}{
|
||||
"OriginalMessage": args.Message,
|
||||
"OriginalCount": *args.Count,
|
||||
"OriginalMeta": args.Meta,
|
||||
},
|
||||
},
|
||||
}
|
||||
if _, _, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event); err != nil {
|
||||
// just log the error, this was a best-effort attempt
|
||||
j.logger.Error("scaling event create failed during block scaling action", "error", err)
|
||||
}
|
||||
return structs.NewErrRPCCoded(400, JobScalingBlockedByActiveDeployment)
|
||||
}
|
||||
|
||||
truncCount := int(*args.Count)
|
||||
if int64(truncCount) != *args.Count {
|
||||
return structs.NewErrRPCCoded(400,
|
||||
fmt.Sprintf("new scaling count is too large for TaskGroup.Count (int): %v", args.Count))
|
||||
}
|
||||
// update the task group count
|
||||
found.Count = truncCount
|
||||
|
||||
registerReq := structs.JobRegisterRequest{
|
||||
|
|
|
@ -5555,6 +5555,178 @@ func TestJobEndpoint_Scale(t *testing.T) {
|
|||
require.Equal(int64(originalCount), events[groupName][0].PreviousCount)
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Scale_DeploymentBlocking(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
s1, cleanupS1 := TestServer(t, nil)
|
||||
defer cleanupS1()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
state := s1.fsm.State()
|
||||
|
||||
type testCase struct {
|
||||
latestDeploymentStatus string
|
||||
}
|
||||
cases := []string{
|
||||
structs.DeploymentStatusSuccessful,
|
||||
structs.DeploymentStatusPaused,
|
||||
structs.DeploymentStatusRunning,
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
// create a job with a deployment history
|
||||
job := mock.Job()
|
||||
require.Nil(state.UpsertJob(1000, job), "UpsertJob")
|
||||
d1 := mock.Deployment()
|
||||
d1.Status = structs.DeploymentStatusCancelled
|
||||
d1.StatusDescription = structs.DeploymentStatusDescriptionNewerJob
|
||||
d1.JobID = job.ID
|
||||
d1.JobCreateIndex = job.CreateIndex
|
||||
require.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment")
|
||||
d2 := mock.Deployment()
|
||||
d2.Status = structs.DeploymentStatusSuccessful
|
||||
d2.StatusDescription = structs.DeploymentStatusDescriptionSuccessful
|
||||
d2.JobID = job.ID
|
||||
d2.JobCreateIndex = job.CreateIndex
|
||||
require.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment")
|
||||
|
||||
// add the latest deployment for the test case
|
||||
dLatest := mock.Deployment()
|
||||
dLatest.Status = tc
|
||||
dLatest.StatusDescription = "description does not matter for this test"
|
||||
dLatest.JobID = job.ID
|
||||
dLatest.JobCreateIndex = job.CreateIndex
|
||||
require.Nil(state.UpsertDeployment(1003, dLatest), "UpsertDeployment")
|
||||
|
||||
// attempt to scale
|
||||
originalCount := job.TaskGroups[0].Count
|
||||
newCount := int64(originalCount+1)
|
||||
groupName := job.TaskGroups[0].Name
|
||||
scalingMetadata := map[string]interface{}{
|
||||
"meta": "data",
|
||||
}
|
||||
scalingMessage := "original reason for scaling"
|
||||
scale := &structs.JobScaleRequest{
|
||||
JobID: job.ID,
|
||||
Target: map[string]string{
|
||||
structs.ScalingTargetGroup: groupName,
|
||||
},
|
||||
Meta: scalingMetadata,
|
||||
Message: scalingMessage,
|
||||
Count: helper.Int64ToPtr(newCount),
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
var resp structs.JobRegisterResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
|
||||
if dLatest.Active() {
|
||||
// should fail
|
||||
require.Error(err, "test case %q", tc)
|
||||
require.Contains(err.Error(), "active deployment")
|
||||
} else {
|
||||
require.NoError(err, "test case %q", tc)
|
||||
require.NotEmpty(resp.EvalID)
|
||||
require.Greater(resp.EvalCreateIndex, resp.JobModifyIndex)
|
||||
}
|
||||
|
||||
events, _, _ := state.ScalingEventsByJob(nil, job.Namespace, job.ID)
|
||||
require.Equal(1, len(events[groupName]))
|
||||
latestEvent := events[groupName][0]
|
||||
if dLatest.Active() {
|
||||
require.True(latestEvent.Error)
|
||||
require.Nil(latestEvent.Count)
|
||||
require.Contains(latestEvent.Message, "blocked due to active deployment")
|
||||
require.Equal(latestEvent.Meta["OriginalCount"], newCount)
|
||||
require.Equal(latestEvent.Meta["OriginalMessage"], scalingMessage)
|
||||
require.Equal(latestEvent.Meta["OriginalMeta"], scalingMetadata)
|
||||
} else {
|
||||
require.False(latestEvent.Error)
|
||||
require.NotNil(latestEvent.Count)
|
||||
require.Equal(newCount, *latestEvent.Count)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Scale_InformationalEventsShouldNotBeBlocked(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
s1, cleanupS1 := TestServer(t, nil)
|
||||
defer cleanupS1()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
state := s1.fsm.State()
|
||||
|
||||
type testCase struct {
|
||||
latestDeploymentStatus string
|
||||
}
|
||||
cases := []string{
|
||||
structs.DeploymentStatusSuccessful,
|
||||
structs.DeploymentStatusPaused,
|
||||
structs.DeploymentStatusRunning,
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
// create a job with a deployment history
|
||||
job := mock.Job()
|
||||
require.Nil(state.UpsertJob(1000, job), "UpsertJob")
|
||||
d1 := mock.Deployment()
|
||||
d1.Status = structs.DeploymentStatusCancelled
|
||||
d1.StatusDescription = structs.DeploymentStatusDescriptionNewerJob
|
||||
d1.JobID = job.ID
|
||||
d1.JobCreateIndex = job.CreateIndex
|
||||
require.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment")
|
||||
d2 := mock.Deployment()
|
||||
d2.Status = structs.DeploymentStatusSuccessful
|
||||
d2.StatusDescription = structs.DeploymentStatusDescriptionSuccessful
|
||||
d2.JobID = job.ID
|
||||
d2.JobCreateIndex = job.CreateIndex
|
||||
require.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment")
|
||||
|
||||
// add the latest deployment for the test case
|
||||
dLatest := mock.Deployment()
|
||||
dLatest.Status = tc
|
||||
dLatest.StatusDescription = "description does not matter for this test"
|
||||
dLatest.JobID = job.ID
|
||||
dLatest.JobCreateIndex = job.CreateIndex
|
||||
require.Nil(state.UpsertDeployment(1003, dLatest), "UpsertDeployment")
|
||||
|
||||
// register informational scaling event
|
||||
groupName := job.TaskGroups[0].Name
|
||||
scalingMetadata := map[string]interface{}{
|
||||
"meta": "data",
|
||||
}
|
||||
scalingMessage := "original reason for scaling"
|
||||
scale := &structs.JobScaleRequest{
|
||||
JobID: job.ID,
|
||||
Target: map[string]string{
|
||||
structs.ScalingTargetGroup: groupName,
|
||||
},
|
||||
Meta: scalingMetadata,
|
||||
Message: scalingMessage,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: job.Namespace,
|
||||
},
|
||||
}
|
||||
var resp structs.JobRegisterResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
|
||||
require.NoError(err, "test case %q", tc)
|
||||
require.Empty(resp.EvalID)
|
||||
|
||||
events, _, _ := state.ScalingEventsByJob(nil, job.Namespace, job.ID)
|
||||
require.Equal(1, len(events[groupName]))
|
||||
latestEvent := events[groupName][0]
|
||||
require.False(latestEvent.Error)
|
||||
require.Nil(latestEvent.Count)
|
||||
require.Equal(scalingMessage, latestEvent.Message)
|
||||
require.Equal(scalingMetadata, latestEvent.Meta)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Scale_ACL(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
|
|
@ -1783,6 +1783,7 @@ $ curl \
|
|||
|
||||
This endpoint performs a scaling action against a job.
|
||||
Currently, this endpoint supports scaling the count for a task group.
|
||||
This will return a 400 error if the job has an active deployment.
|
||||
|
||||
| Method | Path | Produces |
|
||||
| ------ | ----------------------- | ------------------ |
|
||||
|
|
Loading…
Reference in a new issue