From 40d6b3bbd1498802523b69afa8d7db6f493eb8d2 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Mon, 23 Mar 2020 13:38:18 +0000 Subject: [PATCH 1/7] adding raft and state_store support to track job scaling events updated ScalingEvent API to record "message string,error bool" instead of confusing "reason,error *string" --- api/jobs.go | 9 +- api/jobs_test.go | 14 +-- api/scaling.go | 20 ++-- command/agent/job_endpoint.go | 2 +- command/agent/job_endpoint_test.go | 4 +- nomad/fsm.go | 59 +++++++++++ nomad/job_endpoint.go | 136 ++++++++++++++---------- nomad/job_endpoint_test.go | 70 ++++++++++--- nomad/state/schema.go | 39 ++++++- nomad/state/state_store.go | 94 +++++++++++++++++ nomad/state/state_store_test.go | 163 +++++++++++++++++++++++++++++ nomad/structs/structs.go | 85 ++++++++++++--- 12 files changed, 591 insertions(+), 104 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index f706d887a..8668cc2bd 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -156,7 +156,8 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) { // Scale is used to retrieve information about a particular // job given its unique ID. func (j *Jobs) Scale(jobID, group string, count *int, - reason, error *string, meta map[string]interface{}, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { + message string, error bool, meta map[string]interface{}, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, + error) { var count64 *int64 if count != nil { count64 = int64ToPtr(int64(*count)) @@ -167,9 +168,9 @@ func (j *Jobs) Scale(jobID, group string, count *int, "Job": jobID, "Group": group, }, - Error: error, - Reason: reason, - Meta: meta, + Error: error, + Message: message, + Meta: meta, } var resp JobRegisterResponse qm, err := j.client.write(fmt.Sprintf("/v1/job/%s/scale", url.PathEscape(jobID)), req, &resp, q) diff --git a/api/jobs_test.go b/api/jobs_test.go index 2a016dfad..afcab5812 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -1119,7 +1119,7 @@ func TestJobs_ScaleInvalidAction(t *testing.T) { {"i-dont-exist", "me-neither", 1, "404"}, } for _, test := range tests { - _, _, err := jobs.Scale(test.jobID, test.group, &test.value, stringToPtr("reason"), nil, nil, nil) + _, _, err := jobs.Scale(test.jobID, test.group, &test.value, "reason", false, nil, nil) require.Errorf(err, "expected jobs.Scale(%s, %s) to fail", test.jobID, test.group) require.Containsf(err.Error(), test.want, "jobs.Scale(%s, %s) error doesn't contain %s, got: %s", test.jobID, test.group, test.want, err) } @@ -1133,7 +1133,7 @@ func TestJobs_ScaleInvalidAction(t *testing.T) { // Perform a scaling action with bad group name, verify error _, _, err = jobs.Scale(*job.ID, "incorrect-group-name", intToPtr(2), - stringToPtr("because"), nil, nil, nil) + "because", false, nil, nil) require.Error(err) require.Contains(err.Error(), "does not exist") } @@ -1779,7 +1779,7 @@ func TestJobs_ScaleAction(t *testing.T) { groupCount := *job.TaskGroups[0].Count // Trying to scale against a target before it exists returns an error - _, _, err := jobs.Scale(id, "missing", intToPtr(groupCount+1), stringToPtr("this won't work"), nil, nil, nil) + _, _, err := jobs.Scale(id, "missing", intToPtr(groupCount+1), "this won't work", false, nil, nil) require.Error(err) require.Contains(err.Error(), "not found") @@ -1791,7 +1791,7 @@ func TestJobs_ScaleAction(t *testing.T) { // Perform scaling action newCount := groupCount + 1 scalingResp, wm, err := jobs.Scale(id, groupName, - intToPtr(newCount), stringToPtr("need more instances"), nil, nil, nil) + intToPtr(newCount), "need more instances", false, nil, nil) require.NoError(err) require.NotNil(scalingResp) @@ -1829,7 +1829,7 @@ func TestJobs_ScaleAction_Noop(t *testing.T) { // Perform scaling action scaleResp, wm, err := jobs.Scale(id, groupName, - nil, stringToPtr("no count, just informative"), nil, nil, nil) + nil, "no count, just informative", false, nil, nil) require.NoError(err) require.NotNil(scaleResp) @@ -1845,7 +1845,9 @@ func TestJobs_ScaleAction_Noop(t *testing.T) { require.Empty(scaleResp.EvalCreateIndex) require.Empty(scaleResp.EvalID) - // TODO: check that scaling event was persisted + status, _, err := jobs.ScaleStatus(*job.ID, nil) + require.NoError(err) + require.NotEmpty(status.TaskGroups[groupName].Events) } // TestJobs_ScaleStatus tests the /scale status endpoint for task group count diff --git a/api/scaling.go b/api/scaling.go index 9c92fc861..9b22d5705 100644 --- a/api/scaling.go +++ b/api/scaling.go @@ -40,11 +40,11 @@ func (p *ScalingPolicy) Canonicalize(taskGroupCount int) { // ScalingRequest is the payload for a generic scaling action type ScalingRequest struct { - Count *int64 - Target map[string]string - Reason *string - Error *string - Meta map[string]interface{} + Count *int64 + Target map[string]string + Message string + Error bool + Meta map[string]interface{} WriteRequest // this is effectively a job update, so we need the ability to override policy. PolicyOverride bool @@ -92,9 +92,9 @@ type TaskGroupScaleStatus struct { } type ScalingEvent struct { - Reason *string - Error *string - Meta map[string]interface{} - Time uint64 - EvalID *string + Error bool + Message string + Meta map[string]interface{} + EvalID *string + Time uint64 } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 2b7f8ee05..09bac74cc 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -517,7 +517,7 @@ func (s *HTTPServer) jobScaleAction(resp http.ResponseWriter, req *http.Request, Target: args.Target, Count: args.Count, PolicyOverride: args.PolicyOverride, - Reason: args.Reason, + Message: args.Message, Error: args.Error, Meta: args.Meta, } diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 7c7c38e56..f740aa062 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -686,8 +686,8 @@ func TestHTTP_Job_ScaleTaskGroup(t *testing.T) { newCount := job.TaskGroups[0].Count + 1 scaleReq := &api.ScalingRequest{ - Count: helper.Int64ToPtr(int64(newCount)), - Reason: helper.StringToPtr("testing"), + Count: helper.Int64ToPtr(int64(newCount)), + Message: "testing", Target: map[string]string{ "Job": job.ID, "Group": job.TaskGroups[0].Name, diff --git a/nomad/fsm.go b/nomad/fsm.go index d6900a1c7..f2a9ee7e9 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -51,6 +51,7 @@ const ( ScalingPolicySnapshot CSIPluginSnapshot CSIVolumeSnapshot + ScalingEventsSnapshot ) // LogApplier is the definition of a function that can apply a Raft log @@ -269,6 +270,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyCSIVolumeDeregister(buf[1:], log.Index) case structs.CSIVolumeClaimRequestType: return n.applyCSIVolumeClaim(buf[1:], log.Index) + case structs.ScalingEventRegisterRequestType: + return n.applyUpsertScalingEvent(buf[1:], log.Index) } // Check enterprise only message types. @@ -1403,6 +1406,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } + case ScalingEventsSnapshot: + jobScalingEvents := new(structs.JobScalingEvents) + if err := dec.Decode(jobScalingEvents); err != nil { + return err + } + + if err := restore.ScalingEventsRestore(jobScalingEvents); err != nil { + return err + } + case ScalingPolicySnapshot: scalingPolicy := new(structs.ScalingPolicy) if err := dec.Decode(scalingPolicy); err != nil { @@ -1629,6 +1642,21 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { return nil } +func (n *nomadFSM) applyUpsertScalingEvent(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_scaling_event"}, time.Now()) + var req structs.ScalingEventRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpsertScalingEvent(index, &req); err != nil { + n.logger.Error("UpsertScalingEvent failed", "error", err) + return err + } + + return nil +} + func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now()) // Register the nodes @@ -1697,6 +1725,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistScalingEvents(sink, encoder); err != nil { + sink.Cancel() + return err + } if err := s.persistCSIPlugins(sink, encoder); err != nil { sink.Cancel() return err @@ -2136,6 +2168,33 @@ func (s *nomadSnapshot) persistScalingPolicies(sink raft.SnapshotSink, return nil } +func (s *nomadSnapshot) persistScalingEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error { + // Get all the scaling events + ws := memdb.NewWatchSet() + jobScalingEvents, err := s.snap.ScalingEvents(ws) + if err != nil { + return err + } + + for { + // Get the next item + raw := jobScalingEvents.Next() + if raw == nil { + break + } + + // Prepare the request struct + events := raw.(*structs.JobScalingEvents) + + // Write out a scaling events snapshot + sink.Write([]byte{byte(ScalingEventsSnapshot)}) + if err := encoder.Encode(events); err != nil { + return err + } + } + return nil +} + func (s *nomadSnapshot) persistCSIPlugins(sink raft.SnapshotSink, encoder *codec.Encoder) error { diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 9bab65d70..169a98fa8 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -864,11 +864,8 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes if groupName == "" { return structs.NewErrRPCCoded(400, "missing task group name for scaling action") } - if args.Error != nil && args.Reason != nil { - return structs.NewErrRPCCoded(400, "scaling action should not contain error and scaling reason") - } - if args.Error != nil && args.Count != nil { - return structs.NewErrRPCCoded(400, "scaling action should not contain error and count") + if args.Error && args.Count != nil { + return structs.NewErrRPCCoded(400, "scaling action should not contain count if error is true") } // Check for submit-job permissions @@ -896,20 +893,29 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes return structs.NewErrRPCCoded(404, fmt.Sprintf("job %q not found", args.JobID)) } - jobModifyIndex := job.ModifyIndex + var found *structs.TaskGroup + for _, tg := range job.TaskGroups { + if groupName == tg.Name { + found = tg + break + } + } + if found == nil { + return structs.NewErrRPCCoded(400, + fmt.Sprintf("task group %q specified for scaling does not exist in job", groupName)) + } + + now := time.Now().UTC().UnixNano() + + // If the count is present, commit the job update via Raft if args.Count != nil { - found := false - for _, tg := range job.TaskGroups { - if groupName == tg.Name { - tg.Count = int(*args.Count) // TODO: not safe, check this above - found = true - break - } - } - if !found { + truncCount := int(*args.Count) + if int64(truncCount) != *args.Count { return structs.NewErrRPCCoded(400, - fmt.Sprintf("task group %q specified for scaling does not exist in job", groupName)) + fmt.Sprintf("new scaling count is too large for TaskGroup.Count (int): %v", args.Count)) } + found.Count = truncCount + registerReq := structs.JobRegisterRequest{ Job: job, EnforceIndex: true, @@ -917,58 +923,73 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes PolicyOverride: args.PolicyOverride, WriteRequest: args.WriteRequest, } - - // Commit this update via Raft - _, jobModifyIndex, err = j.srv.raftApply(structs.JobRegisterRequestType, registerReq) + _, jobModifyIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, registerReq) if err != nil { j.logger.Error("job register for scale failed", "error", err) return err } + reply.JobModifyIndex = jobModifyIndex + } else { + reply.JobModifyIndex = job.ModifyIndex } - // Populate the reply with job information - reply.JobModifyIndex = jobModifyIndex - reply.Index = reply.JobModifyIndex - - // FINISH: - // register the scaling event to the scaling_event table, once that exists - // If the job is periodic or parameterized, we don't create an eval. - if job != nil && (job.IsPeriodic() || job.IsParameterized()) || args.Count == nil { - return nil + var eval *structs.Evaluation + if !job.IsPeriodic() && !job.IsParameterized() && args.Count != nil { + eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: structs.JobDefaultPriority, + Type: structs.JobTypeService, + TriggeredBy: structs.EvalTriggerScaling, + JobID: args.JobID, + JobModifyIndex: reply.JobModifyIndex, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } + + // Commit this evaluation via Raft + _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + j.logger.Error("eval create failed", "error", err, "method", "scale") + return err + } + + reply.EvalID = eval.ID + reply.EvalCreateIndex = evalIndex + } else { + reply.EvalID = "" + reply.EvalCreateIndex = 0 } - // Create a new evaluation - // FINISH: only do this if args.Error == nil || "" - now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: structs.JobDefaultPriority, - Type: structs.JobTypeService, - TriggeredBy: structs.EvalTriggerScaling, - JobID: args.JobID, - JobModifyIndex: jobModifyIndex, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, + event := &structs.ScalingEventRequest{ + Namespace: job.Namespace, + JobID: job.ID, + TaskGroup: groupName, + ScalingEvent: &structs.ScalingEvent{ + Time: now, + Count: args.Count, + Message: args.Message, + Error: args.Error, + Meta: args.Meta, + }, } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, + if eval != nil { + event.ScalingEvent.EvalID = &eval.ID } - - // Commit this evaluation via Raft - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + _, eventIndex, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event) if err != nil { - j.logger.Error("eval create failed", "error", err, "method", "deregister") + j.logger.Error("scaling event create failed", "error", err) return err } - // Populate the reply with eval information - reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - reply.Index = reply.EvalCreateIndex + reply.Index = eventIndex + j.srv.setQueryMeta(&reply.QueryMeta) return nil } @@ -1745,6 +1766,14 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest, return err } + events, err := state.ScalingEventsByJob(ws, args.RequestNamespace(), args.JobID) + if err != nil { + return err + } + if events == nil { + events = make(map[string][]*structs.ScalingEvent) + } + // Setup the output reply.JobScaleStatus = &structs.JobScaleStatus{ JobID: job.ID, @@ -1765,6 +1794,7 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest, tgScale.Unhealthy = ds.UnhealthyAllocs } } + tgScale.Events = events[tg.Name] reply.JobScaleStatus.TaskGroups[tg.Name] = tgScale } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 368165a27..4dfb6204a 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -5396,8 +5396,8 @@ func TestJobEndpoint_Scale(t *testing.T) { Target: map[string]string{ structs.ScalingTargetGroup: job.TaskGroups[0].Name, }, - Count: helper.Int64ToPtr(int64(count + 1)), - Reason: helper.StringToPtr("this should fail"), + Count: helper.Int64ToPtr(int64(count + 1)), + Message: "because of the load", Meta: map[string]interface{}{ "metrics": map[string]string{ "1": "a", @@ -5436,7 +5436,7 @@ func TestJobEndpoint_Scale_ACL(t *testing.T) { Target: map[string]string{ structs.ScalingTargetGroup: job.TaskGroups[0].Name, }, - Reason: helper.StringToPtr("this should fail"), + Message: "because of the load", WriteRequest: structs.WriteRequest{ Region: "global", Namespace: job.Namespace, @@ -5521,8 +5521,8 @@ func TestJobEndpoint_Scale_Invalid(t *testing.T) { Target: map[string]string{ structs.ScalingTargetGroup: job.TaskGroups[0].Name, }, - Count: helper.Int64ToPtr(int64(count) + 1), - Reason: helper.StringToPtr("this should fail"), + Count: helper.Int64ToPtr(int64(count) + 1), + Message: "this should fail", Meta: map[string]interface{}{ "metrics": map[string]string{ "1": "a", @@ -5545,19 +5545,59 @@ func TestJobEndpoint_Scale_Invalid(t *testing.T) { err = state.UpsertJob(1000, job) require.Nil(err) - scale.Count = nil - scale.Error = helper.StringToPtr("error and reason") - scale.Reason = helper.StringToPtr("is not allowed") - err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) - require.Error(err) - require.Contains(err.Error(), "should not contain error and scaling reason") - scale.Count = helper.Int64ToPtr(10) - scale.Reason = nil - scale.Error = helper.StringToPtr("error and count is not allowed") + scale.Message = "error message" + scale.Error = true err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) require.Error(err) - require.Contains(err.Error(), "should not contain error and count") + require.Contains(err.Error(), "should not contain count if error is true") +} + +func TestJobEndpoint_Scale_NoEval(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + job := mock.Job() + err := state.UpsertJob(1000, job) + require.Nil(err) + + scale := &structs.JobScaleRequest{ + JobID: job.ID, + Target: map[string]string{ + structs.ScalingTargetGroup: job.TaskGroups[0].Name, + }, + Count: nil, // no count => no eval + Message: "something informative", + Meta: map[string]interface{}{ + "metrics": map[string]string{ + "1": "a", + "2": "b", + }, + "other": "value", + }, + PolicyOverride: false, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) + require.NoError(err) + require.Empty(resp.EvalID) + require.Empty(resp.EvalCreateIndex) + + events, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID) + require.NoError(err) + require.NotNil(events) + require.Contains(events, job.TaskGroups[0].Name) + require.NotEmpty(events[job.TaskGroups[0].Name]) } func TestJobEndpoint_GetScaleStatus(t *testing.T) { diff --git a/nomad/state/schema.go b/nomad/state/schema.go index a87738c03..ec9893f48 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -51,6 +51,7 @@ func init() { csiVolumeTableSchema, csiPluginTableSchema, scalingPolicyTableSchema, + scalingEventTableSchema, }...) } @@ -789,7 +790,6 @@ func (s *ScalingPolicyTargetFieldIndex) PrefixFromArgs(args ...interface{}) ([]b } // scalingPolicyTableSchema returns the MemDB schema for the policy table. -// This table is used to store the policies which are referenced by tokens func scalingPolicyTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: "scaling_policy", @@ -842,3 +842,40 @@ func scalingPolicyTableSchema() *memdb.TableSchema { }, } } + +// scalingEventTableSchema returns the memdb schema for job scaling events +func scalingEventTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "scaling_event", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + + // Use a compound index so the tuple of (Namespace, JobID) is + // uniquely identifying + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Namespace", + }, + + &memdb.StringFieldIndex{ + Field: "JobID", + }, + }, + }, + }, + + // "error": { + // Name: "error", + // AllowMissing: false, + // Unique: false, + // Indexer: &memdb.FieldSetIndex{ + // Field: "Error", + // }, + // }, + }, + } +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index f47749b8e..6972c28c8 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -627,6 +627,85 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro return nil } +// UpsertScalingEvent is used to insert a new scaling event. +// Only the most recent JobTrackedScalingEvents will be kept. +func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventRequest) error { + txn := s.db.Txn(true) + defer txn.Abort() + + // Get the existing events + existing, err := txn.First("scaling_event", "id", req.Namespace, req.JobID) + if err != nil { + return fmt.Errorf("scaling event lookup failed: %v", err) + } + + var jobEvents *structs.JobScalingEvents + if existing != nil { + jobEvents = existing.(*structs.JobScalingEvents) + } else { + jobEvents = &structs.JobScalingEvents{ + Namespace: req.Namespace, + JobID: req.JobID, + ScalingEvents: make(map[string][]*structs.ScalingEvent), + } + } + + events := jobEvents.ScalingEvents[req.TaskGroup] + // prepend this latest event + events = append( + []*structs.ScalingEvent{req.ScalingEvent}, + events..., + ) + // truncate older events + if len(events) > structs.JobTrackedScalingEvents { + events = events[0:structs.JobTrackedScalingEvents] + } + jobEvents.ScalingEvents[req.TaskGroup] = events + + // Insert the new event + if err := txn.Insert("scaling_event", jobEvents); err != nil { + return fmt.Errorf("scaling event insert failed: %v", err) + } + + // Update the indexes table for scaling_event + if err := txn.Insert("index", &IndexEntry{"scaling_event", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Commit() + return nil +} + +// ScalingEvents returns an iterator over all the job scaling events +func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, error) { + txn := s.db.Txn(false) + + // Walk the entire scaling_event table + iter, err := txn.Get("scaling_event", "id") + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + return iter, nil +} + +func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, error) { + txn := s.db.Txn(false) + + watchCh, existing, err := txn.FirstWatch("scaling_event", "id", namespace, jobID) + if err != nil { + return nil, fmt.Errorf("job scaling events lookup failed: %v", err) + } + ws.Add(watchCh) + + if existing != nil { + return existing.(*structs.JobScalingEvents).ScalingEvents, nil + } + return nil, nil +} + // UpsertNode is used to register a node or update a node definition // This is assumed to be triggered by the client, so we retain the value // of drain/eligibility which is set by the scheduler. @@ -1407,6 +1486,14 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn } } + // Delete the scaling events + if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil { + return fmt.Errorf("deleting job scaling events failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"scaling_event", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + // Cleanup plugins registered by this job err = s.deleteJobFromPlugin(index, txn, job) if err != nil { @@ -5278,3 +5365,10 @@ func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error { } return nil } + +func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) error { + if err := r.txn.Insert("scaling_event", jobEvents); err != nil { + return fmt.Errorf("scaling event insert failed: %v", err) + } + return nil +} diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 31611ee53..acbb1cb65 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -8552,6 +8552,169 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) { require.Equal(expect, found) } +func TestStateStore_UpsertScalingEvent(t *testing.T) { + t.Parallel() + require := require.New(t) + + state := testStateStore(t) + job := mock.Job() + groupName := job.TaskGroups[0].Name + + newEvent := structs.NewScalingEvent("message 1").SetMeta(map[string]interface{}{ + "a": 1, + }) + + wsAll := memdb.NewWatchSet() + all, err := state.ScalingEvents(wsAll) + require.NoError(err) + require.Nil(all.Next()) + + ws := memdb.NewWatchSet() + out, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID) + require.NoError(err) + require.Nil(out) + + err = state.UpsertScalingEvent(1000, &structs.ScalingEventRequest{ + Namespace: job.Namespace, + JobID: job.ID, + TaskGroup: groupName, + ScalingEvent: newEvent, + }) + require.NoError(err) + require.True(watchFired(ws)) + require.True(watchFired(wsAll)) + + ws = memdb.NewWatchSet() + out, err = state.ScalingEventsByJob(ws, job.Namespace, job.ID) + require.NoError(err) + require.Equal(map[string][]*structs.ScalingEvent{ + groupName: {newEvent}, + }, out) + + iter, err := state.ScalingEvents(ws) + require.NoError(err) + + count := 0 + jobsReturned := []string{} + var jobEvents *structs.JobScalingEvents + for { + raw := iter.Next() + if raw == nil { + break + } + jobEvents = raw.(*structs.JobScalingEvents) + jobsReturned = append(jobsReturned, jobEvents.JobID) + count++ + } + require.Equal(1, count) + + index, err := state.Index("scaling_event") + require.NoError(err) + require.ElementsMatch([]string{job.ID}, jobsReturned) + require.Equal(map[string][]*structs.ScalingEvent{ + groupName: {newEvent}, + }, jobEvents.ScalingEvents) + require.EqualValues(1000, index) + require.False(watchFired(ws)) +} + +func TestStateStore_UpsertScalingEvent_LimitAndOrder(t *testing.T) { + t.Parallel() + require := require.New(t) + + state := testStateStore(t) + namespace := uuid.Generate() + jobID := uuid.Generate() + group1 := uuid.Generate() + group2 := uuid.Generate() + + index := uint64(1000) + for i := 1; i <= structs.JobTrackedScalingEvents+10; i++ { + newEvent := structs.NewScalingEvent("").SetMeta(map[string]interface{}{ + "i": i, + "group": group1, + }) + err := state.UpsertScalingEvent(index, &structs.ScalingEventRequest{ + Namespace: namespace, + JobID: jobID, + TaskGroup: group1, + ScalingEvent: newEvent, + }) + index++ + require.NoError(err) + + newEvent = structs.NewScalingEvent("").SetMeta(map[string]interface{}{ + "i": i, + "group": group2, + }) + err = state.UpsertScalingEvent(index, &structs.ScalingEventRequest{ + Namespace: namespace, + JobID: jobID, + TaskGroup: group2, + ScalingEvent: newEvent, + }) + index++ + require.NoError(err) + } + + out, err := state.ScalingEventsByJob(nil, namespace, jobID) + require.NoError(err) + require.Len(out, 2) + + expectedEvents := []int{} + for i := structs.JobTrackedScalingEvents; i > 0; i-- { + expectedEvents = append(expectedEvents, i+10) + } + + // checking order and content + require.Len(out[group1], structs.JobTrackedScalingEvents) + actualEvents := []int{} + for _, event := range out[group1] { + require.Equal(group1, event.Meta["group"]) + actualEvents = append(actualEvents, event.Meta["i"].(int)) + } + require.Equal(expectedEvents, actualEvents) + + // checking order and content + require.Len(out[group2], structs.JobTrackedScalingEvents) + actualEvents = []int{} + for _, event := range out[group2] { + require.Equal(group2, event.Meta["group"]) + actualEvents = append(actualEvents, event.Meta["i"].(int)) + } + require.Equal(expectedEvents, actualEvents) +} + +func TestStateStore_RestoreScalingEvents(t *testing.T) { + t.Parallel() + require := require.New(t) + + state := testStateStore(t) + jobScalingEvents := &structs.JobScalingEvents{ + Namespace: uuid.Generate(), + JobID: uuid.Generate(), + ScalingEvents: map[string][]*structs.ScalingEvent{ + uuid.Generate(): { + structs.NewScalingEvent(uuid.Generate()), + }, + }, + } + + restore, err := state.Restore() + require.NoError(err) + + err = restore.ScalingEventsRestore(jobScalingEvents) + require.NoError(err) + restore.Commit() + + ws := memdb.NewWatchSet() + out, err := state.ScalingEventsByJob(ws, jobScalingEvents.Namespace, + jobScalingEvents.JobID) + require.NoError(err) + require.NotNil(out) + require.EqualValues(jobScalingEvents.ScalingEvents, out) +} + func TestStateStore_Abandon(t *testing.T) { t.Parallel() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 3866afa73..d4f9b02be 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -90,6 +90,7 @@ const ( CSIVolumeRegisterRequestType CSIVolumeDeregisterRequestType CSIVolumeClaimRequestType + ScalingEventRegisterRequestType ) const ( @@ -625,8 +626,8 @@ type JobScaleRequest struct { JobID string Target map[string]string Count *int64 - Reason *string - Error *string + Message string + Error bool Meta map[string]interface{} // PolicyOverride is set when the user is attempting to override any policies PolicyOverride bool @@ -1255,16 +1256,7 @@ type TaskGroupScaleStatus struct { Running int Healthy int Unhealthy int - Events []ScalingEvent -} - -// ScalingEvent represents a specific scaling event -type ScalingEvent struct { - Reason *string - Error *string - Meta map[string]interface{} - Time uint64 - EvalID *string + Events []*ScalingEvent } type JobDispatchResponse struct { @@ -3517,6 +3509,10 @@ const ( // JobTrackedVersions is the number of historic job versions that are // kept. JobTrackedVersions = 6 + + // JobTrackedScalingEvents is the number of scaling events that are + // kept for a single task group. + JobTrackedScalingEvents = 20 ) // Job is the scope of a scheduling request to Nomad. It is the largest @@ -4659,6 +4655,71 @@ const ( ReasonWithinPolicy = "Restart within policy" ) +// JobScalingEvents contains the scaling events for a given job +type JobScalingEvents struct { + Namespace string + JobID string + + // This map is indexed by target; currently, this is just task group + // the indexed array is sorted from newest to oldest event + // the array should have less than JobTrackedScalingEvents entries + ScalingEvents map[string][]*ScalingEvent +} + +// Factory method for ScalingEvent objects +func NewScalingEvent(message string) *ScalingEvent { + return &ScalingEvent{ + Time: time.Now().Unix(), + Message: message, + } +} + +// ScalingEvent describes a scaling event against a Job +type ScalingEvent struct { + // Unix Nanosecond timestamp for the scaling event + Time int64 + + // Count is the new scaling count, if provided + Count *int64 + + // Message is the message describing a scaling event + Message string + + // Error indicates an error state for this scaling event + Error bool + + // Meta is a map of metadata returned during a scaling event + Meta map[string]interface{} + + // EvalID is the ID for an evaluation if one was created as part of a scaling event + EvalID *string +} + +func (e *ScalingEvent) SetError(error bool) *ScalingEvent { + e.Error = error + return e +} + +func (e *ScalingEvent) SetMeta(meta map[string]interface{}) *ScalingEvent { + e.Meta = meta + return e +} + +func (e *ScalingEvent) SetEvalID(evalID string) *ScalingEvent { + e.EvalID = &evalID + return e +} + +// ScalingEventRequest is by for Job.Scale endpoint +// to register scaling events +type ScalingEventRequest struct { + Namespace string + JobID string + TaskGroup string + + ScalingEvent *ScalingEvent +} + // ScalingPolicy specifies the scaling policy for a scaling target type ScalingPolicy struct { // ID is a generated UUID used for looking up the scaling policy From b2ab42afbb50ba09a36d024eb1d706a00771f8d2 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Wed, 1 Apr 2020 16:14:54 +0000 Subject: [PATCH 2/7] scaling api: more testing around the scaling events api --- api/jobs.go | 6 +-- api/jobs_test.go | 92 +++++++++++++++++++++++++++++++++++--- nomad/fsm.go | 4 +- nomad/job_endpoint.go | 11 ++--- nomad/job_endpoint_test.go | 16 ++++--- nomad/state/schema.go | 1 + 6 files changed, 108 insertions(+), 22 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 8668cc2bd..7514c608e 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -155,9 +155,9 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) { // Scale is used to retrieve information about a particular // job given its unique ID. -func (j *Jobs) Scale(jobID, group string, count *int, - message string, error bool, meta map[string]interface{}, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, - error) { +func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool, meta map[string]interface{}, + q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { + var count64 *int64 if count != nil { count64 = int64ToPtr(int64(*count)) diff --git a/api/jobs_test.go b/api/jobs_test.go index afcab5812..9d4cee430 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -1779,7 +1779,8 @@ func TestJobs_ScaleAction(t *testing.T) { groupCount := *job.TaskGroups[0].Count // Trying to scale against a target before it exists returns an error - _, _, err := jobs.Scale(id, "missing", intToPtr(groupCount+1), "this won't work", false, nil, nil) + _, _, err := jobs.Scale(id, "missing", intToPtr(groupCount+1), "this won't work", + false, nil, nil) require.Error(err) require.Contains(err.Error(), "not found") @@ -1791,7 +1792,10 @@ func TestJobs_ScaleAction(t *testing.T) { // Perform scaling action newCount := groupCount + 1 scalingResp, wm, err := jobs.Scale(id, groupName, - intToPtr(newCount), "need more instances", false, nil, nil) + intToPtr(newCount), "need more instances", false, + map[string]interface{}{ + "meta": "data", + }, nil) require.NoError(err) require.NotNil(scalingResp) @@ -1805,7 +1809,71 @@ func TestJobs_ScaleAction(t *testing.T) { require.NoError(err) require.Equal(*resp.TaskGroups[0].Count, newCount) - // TODO: check that scaling event was persisted + // Check for the scaling event + status, _, err := jobs.ScaleStatus(*job.ID, nil) + require.NoError(err) + require.Len(status.TaskGroups[groupName].Events, 1) + scalingEvent := status.TaskGroups[groupName].Events[0] + require.False(scalingEvent.Error) + require.Equal("need more instances", scalingEvent.Message) + require.Equal(map[string]interface{}{ + "meta": "data", + }, scalingEvent.Meta) + require.Greater(scalingEvent.Time, uint64(0)) + require.NotNil(scalingEvent.EvalID) + require.Equal(scalingResp.EvalID, *scalingEvent.EvalID) +} + +func TestJobs_ScaleAction_Error(t *testing.T) { + t.Parallel() + require := require.New(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + id := "job-id/with\\troublesome:characters\n?&字\000" + job := testJobWithScalingPolicy() + job.ID = &id + groupName := *job.TaskGroups[0].Name + prevCount := *job.TaskGroups[0].Count + + // Register the job + regResp, wm, err := jobs.Register(job, nil) + require.NoError(err) + assertWriteMeta(t, wm) + + // Perform scaling action + scaleResp, wm, err := jobs.Scale(id, groupName, nil, "something bad happened", true, + map[string]interface{}{ + "meta": "data", + }, nil) + + require.NoError(err) + require.NotNil(scaleResp) + require.Empty(scaleResp.EvalID) + require.Empty(scaleResp.EvalCreateIndex) + assertWriteMeta(t, wm) + + // Query the job again + resp, _, err := jobs.Info(*job.ID, nil) + require.NoError(err) + require.Equal(*resp.TaskGroups[0].Count, prevCount) + require.Equal(regResp.JobModifyIndex, scaleResp.JobModifyIndex) + require.Empty(scaleResp.EvalCreateIndex) + require.Empty(scaleResp.EvalID) + + status, _, err := jobs.ScaleStatus(*job.ID, nil) + require.NoError(err) + require.Len(status.TaskGroups[groupName].Events, 1) + errEvent := status.TaskGroups[groupName].Events[0] + require.True(errEvent.Error) + require.Equal("something bad happened", errEvent.Message) + require.Equal(map[string]interface{}{ + "meta": "data", + }, errEvent.Meta) + require.Greater(errEvent.Time, uint64(0)) + require.Nil(errEvent.EvalID) } func TestJobs_ScaleAction_Noop(t *testing.T) { @@ -1828,8 +1896,10 @@ func TestJobs_ScaleAction_Noop(t *testing.T) { assertWriteMeta(t, wm) // Perform scaling action - scaleResp, wm, err := jobs.Scale(id, groupName, - nil, "no count, just informative", false, nil, nil) + scaleResp, wm, err := jobs.Scale(id, groupName, nil, "no count, just informative", + false, map[string]interface{}{ + "meta": "data", + }, nil) require.NoError(err) require.NotNil(scaleResp) @@ -1847,7 +1917,15 @@ func TestJobs_ScaleAction_Noop(t *testing.T) { status, _, err := jobs.ScaleStatus(*job.ID, nil) require.NoError(err) - require.NotEmpty(status.TaskGroups[groupName].Events) + require.Len(status.TaskGroups[groupName].Events, 1) + noopEvent := status.TaskGroups[groupName].Events[0] + require.False(noopEvent.Error) + require.Equal("no count, just informative", noopEvent.Message) + require.Equal(map[string]interface{}{ + "meta": "data", + }, noopEvent.Meta) + require.Greater(noopEvent.Time, uint64(0)) + require.Nil(noopEvent.EvalID) } // TestJobs_ScaleStatus tests the /scale status endpoint for task group count @@ -1867,7 +1945,7 @@ func TestJobs_ScaleStatus(t *testing.T) { require.Contains(err.Error(), "not found") // Register the job - job := testJobWithScalingPolicy() + job := testJob() job.ID = &id groupName := *job.TaskGroups[0].Name groupCount := *job.TaskGroups[0].Count diff --git a/nomad/fsm.go b/nomad/fsm.go index f2a9ee7e9..97384a4a7 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -2171,14 +2171,14 @@ func (s *nomadSnapshot) persistScalingPolicies(sink raft.SnapshotSink, func (s *nomadSnapshot) persistScalingEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the scaling events ws := memdb.NewWatchSet() - jobScalingEvents, err := s.snap.ScalingEvents(ws) + iter, err := s.snap.ScalingEvents(ws) if err != nil { return err } for { // Get the next item - raw := jobScalingEvents.Next() + raw := iter.Next() if raw == nil { break } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 169a98fa8..2761549f4 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -908,6 +908,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes now := time.Now().UTC().UnixNano() // If the count is present, commit the job update via Raft + // for now, we'll do this even if count didn't change if args.Count != nil { truncCount := int(*args.Count) if int64(truncCount) != *args.Count { @@ -933,10 +934,10 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes reply.JobModifyIndex = job.ModifyIndex } - // If the job is periodic or parameterized, we don't create an eval. - var eval *structs.Evaluation + // only create an eval for non-dispatch jobs and if the count was provided + // for now, we'll do this even if count didn't change if !job.IsPeriodic() && !job.IsParameterized() && args.Count != nil { - eval = &structs.Evaluation{ + eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), Priority: structs.JobDefaultPriority, @@ -979,8 +980,8 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes Meta: args.Meta, }, } - if eval != nil { - event.ScalingEvent.EvalID = &eval.ID + if reply.EvalID != "" { + event.ScalingEvent.EvalID = &reply.EvalID } _, eventIndex, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event) if err != nil { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 4dfb6204a..4e82bbeb9 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -5415,6 +5415,7 @@ func TestJobEndpoint_Scale(t *testing.T) { err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) require.NoError(err) require.NotEmpty(resp.EvalID) + require.Greater(resp.EvalCreateIndex, resp.JobModifyIndex) } func TestJobEndpoint_Scale_ACL(t *testing.T) { @@ -5564,13 +5565,14 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) { state := s1.fsm.State() job := mock.Job() + groupName := job.TaskGroups[0].Name err := state.UpsertJob(1000, job) require.Nil(err) scale := &structs.JobScaleRequest{ JobID: job.ID, Target: map[string]string{ - structs.ScalingTargetGroup: job.TaskGroups[0].Name, + structs.ScalingTargetGroup: groupName, }, Count: nil, // no count => no eval Message: "something informative", @@ -5593,11 +5595,15 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) { require.Empty(resp.EvalID) require.Empty(resp.EvalCreateIndex) - events, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID) + jobEvents, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID) require.NoError(err) - require.NotNil(events) - require.Contains(events, job.TaskGroups[0].Name) - require.NotEmpty(events[job.TaskGroups[0].Name]) + require.NotNil(jobEvents) + require.Len(jobEvents, 1) + require.Contains(jobEvents, groupName) + groupEvents := jobEvents[groupName] + require.Len(groupEvents, 1) + event := groupEvents[0] + require.Nil(event.EvalID) } func TestJobEndpoint_GetScaleStatus(t *testing.T) { diff --git a/nomad/state/schema.go b/nomad/state/schema.go index ec9893f48..3f926188e 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -868,6 +868,7 @@ func scalingEventTableSchema() *memdb.TableSchema { }, }, + // TODO: need to figure out whether we want to index these or the jobs or ... // "error": { // Name: "error", // AllowMissing: false, From 6dbfb36e14d603387de7f3c499391fe6a98191b6 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Wed, 1 Apr 2020 11:55:55 -0500 Subject: [PATCH 3/7] Update nomad/job_endpoint.go Co-Authored-By: Drew Bailey <2614075+drewbailey@users.noreply.github.com> --- nomad/job_endpoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 2761549f4..150b4e191 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -934,7 +934,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes reply.JobModifyIndex = job.ModifyIndex } - // only create an eval for non-dispatch jobs and if the count was provided + // Only create an eval for non-dispatch jobs and if the count was provided // for now, we'll do this even if count didn't change if !job.IsPeriodic() && !job.IsParameterized() && args.Count != nil { eval := &structs.Evaluation{ From eb19fe16d28c88d89d0c47a4eb8b9bc74bc9650f Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Wed, 1 Apr 2020 11:56:01 -0500 Subject: [PATCH 4/7] Update nomad/state/state_store.go Co-Authored-By: Drew Bailey <2614075+drewbailey@users.noreply.github.com> --- nomad/state/state_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6972c28c8..5d5c84ee1 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -651,7 +651,7 @@ func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventR } events := jobEvents.ScalingEvents[req.TaskGroup] - // prepend this latest event + // Prepend this latest event events = append( []*structs.ScalingEvent{req.ScalingEvent}, events..., From 4ac36b7c89d7c2386adbaf07b8b81b26f8c85b93 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Wed, 1 Apr 2020 11:56:12 -0500 Subject: [PATCH 5/7] Update nomad/state/state_store.go Co-Authored-By: Drew Bailey <2614075+drewbailey@users.noreply.github.com> --- nomad/state/state_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 5d5c84ee1..27208c40c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -656,7 +656,7 @@ func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventR []*structs.ScalingEvent{req.ScalingEvent}, events..., ) - // truncate older events + // Truncate older events if len(events) > structs.JobTrackedScalingEvents { events = events[0:structs.JobTrackedScalingEvents] } From 8ec252e6272751698c0afcbef27a7cb6577b880e Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Wed, 1 Apr 2020 17:28:19 +0000 Subject: [PATCH 6/7] added indices to the job scaling events, so we could properly do blocking queries on the job scaling status --- api/scaling.go | 11 ++++++----- nomad/job_endpoint.go | 13 ++++++++----- nomad/job_endpoint_test.go | 3 ++- nomad/state/state_store.go | 12 ++++++++---- nomad/state/state_store_test.go | 11 +++++++---- nomad/structs/structs.go | 6 ++++++ 6 files changed, 37 insertions(+), 19 deletions(-) diff --git a/api/scaling.go b/api/scaling.go index 9b22d5705..a854b43a5 100644 --- a/api/scaling.go +++ b/api/scaling.go @@ -92,9 +92,10 @@ type TaskGroupScaleStatus struct { } type ScalingEvent struct { - Error bool - Message string - Meta map[string]interface{} - EvalID *string - Time uint64 + Error bool + Message string + Meta map[string]interface{} + EvalID *string + Time uint64 + CreateIndex uint64 } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 2761549f4..cb58740c0 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1767,7 +1767,7 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest, return err } - events, err := state.ScalingEventsByJob(ws, args.RequestNamespace(), args.JobID) + events, eventsIndex, err := state.ScalingEventsByJob(ws, args.RequestNamespace(), args.JobID) if err != nil { return err } @@ -1799,11 +1799,14 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest, reply.JobScaleStatus.TaskGroups[tg.Name] = tgScale } - if deployment != nil && deployment.ModifyIndex > job.ModifyIndex { - reply.Index = deployment.ModifyIndex - } else { - reply.Index = job.ModifyIndex + maxIndex := job.ModifyIndex + if deployment != nil && deployment.ModifyIndex > maxIndex { + maxIndex = deployment.ModifyIndex } + if eventsIndex > maxIndex { + maxIndex = eventsIndex + } + reply.Index = maxIndex // Set the query response j.srv.setQueryMeta(&reply.QueryMeta) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 4e82bbeb9..3aa49d8d6 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -5595,7 +5595,7 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) { require.Empty(resp.EvalID) require.Empty(resp.EvalCreateIndex) - jobEvents, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID) + jobEvents, eventsIndex, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID) require.NoError(err) require.NotNil(jobEvents) require.Len(jobEvents, 1) @@ -5604,6 +5604,7 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) { require.Len(groupEvents, 1) event := groupEvents[0] require.Nil(event.EvalID) + require.Greater(eventsIndex, job.CreateIndex) } func TestJobEndpoint_GetScaleStatus(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6972c28c8..1f6838965 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -650,6 +650,9 @@ func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventR } } + jobEvents.ModifyIndex = index + req.ScalingEvent.CreateIndex = index + events := jobEvents.ScalingEvents[req.TaskGroup] // prepend this latest event events = append( @@ -691,19 +694,20 @@ func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, err return iter, nil } -func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, error) { +func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, uint64, error) { txn := s.db.Txn(false) watchCh, existing, err := txn.FirstWatch("scaling_event", "id", namespace, jobID) if err != nil { - return nil, fmt.Errorf("job scaling events lookup failed: %v", err) + return nil, 0, fmt.Errorf("job scaling events lookup failed: %v", err) } ws.Add(watchCh) if existing != nil { - return existing.(*structs.JobScalingEvents).ScalingEvents, nil + events := existing.(*structs.JobScalingEvents) + return events.ScalingEvents, events.ModifyIndex, nil } - return nil, nil + return nil, 0, nil } // UpsertNode is used to register a node or update a node definition diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index acbb1cb65..af2406dbf 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -8570,7 +8570,7 @@ func TestStateStore_UpsertScalingEvent(t *testing.T) { require.Nil(all.Next()) ws := memdb.NewWatchSet() - out, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID) + out, _, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID) require.NoError(err) require.Nil(out) @@ -8585,11 +8585,12 @@ func TestStateStore_UpsertScalingEvent(t *testing.T) { require.True(watchFired(wsAll)) ws = memdb.NewWatchSet() - out, err = state.ScalingEventsByJob(ws, job.Namespace, job.ID) + out, eventsIndex, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID) require.NoError(err) require.Equal(map[string][]*structs.ScalingEvent{ groupName: {newEvent}, }, out) + require.EqualValues(eventsIndex, 1000) iter, err := state.ScalingEvents(ws) require.NoError(err) @@ -8607,6 +8608,8 @@ func TestStateStore_UpsertScalingEvent(t *testing.T) { count++ } require.Equal(1, count) + require.EqualValues(jobEvents.ModifyIndex, 1000) + require.EqualValues(jobEvents.ScalingEvents[groupName][0].CreateIndex, 1000) index, err := state.Index("scaling_event") require.NoError(err) @@ -8657,7 +8660,7 @@ func TestStateStore_UpsertScalingEvent_LimitAndOrder(t *testing.T) { require.NoError(err) } - out, err := state.ScalingEventsByJob(nil, namespace, jobID) + out, _, err := state.ScalingEventsByJob(nil, namespace, jobID) require.NoError(err) require.Len(out, 2) @@ -8708,7 +8711,7 @@ func TestStateStore_RestoreScalingEvents(t *testing.T) { restore.Commit() ws := memdb.NewWatchSet() - out, err := state.ScalingEventsByJob(ws, jobScalingEvents.Namespace, + out, _, err := state.ScalingEventsByJob(ws, jobScalingEvents.Namespace, jobScalingEvents.JobID) require.NoError(err) require.NotNil(out) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d4f9b02be..453431d61 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4664,6 +4664,9 @@ type JobScalingEvents struct { // the indexed array is sorted from newest to oldest event // the array should have less than JobTrackedScalingEvents entries ScalingEvents map[string][]*ScalingEvent + + // Raft index + ModifyIndex uint64 } // Factory method for ScalingEvent objects @@ -4693,6 +4696,9 @@ type ScalingEvent struct { // EvalID is the ID for an evaluation if one was created as part of a scaling event EvalID *string + + // Raft index + CreateIndex uint64 } func (e *ScalingEvent) SetError(error bool) *ScalingEvent { From c3ab837d9eb1e29070d173dfa3006cc3493dd2d9 Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Wed, 1 Apr 2020 18:11:58 +0000 Subject: [PATCH 7/7] job_endpoint: fixed bad test --- nomad/job_endpoint_test.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 3aa49d8d6..57cd96928 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -5566,8 +5566,16 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) { job := mock.Job() groupName := job.TaskGroups[0].Name - err := state.UpsertJob(1000, job) - require.Nil(err) + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + }, &resp) + jobCreateIndex := resp.Index + require.NoError(err) scale := &structs.JobScaleRequest{ JobID: job.ID, @@ -5589,7 +5597,6 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) { Namespace: job.Namespace, }, } - var resp structs.JobRegisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) require.NoError(err) require.Empty(resp.EvalID) @@ -5604,7 +5611,7 @@ func TestJobEndpoint_Scale_NoEval(t *testing.T) { require.Len(groupEvents, 1) event := groupEvents[0] require.Nil(event.EvalID) - require.Greater(eventsIndex, job.CreateIndex) + require.Greater(eventsIndex, jobCreateIndex) } func TestJobEndpoint_GetScaleStatus(t *testing.T) {