Merge pull request #7572 from hashicorp/f-7422-scaling-events

finalizing scaling API work
This commit is contained in:
Chris Baker 2020-04-01 13:49:22 -05:00 committed by GitHub
commit 277d29c6e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 709 additions and 111 deletions

View File

@ -155,8 +155,9 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) {
// Scale is used to retrieve information about a particular
// job given its unique ID.
func (j *Jobs) Scale(jobID, group string, count *int,
reason, error *string, meta map[string]interface{}, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool, meta map[string]interface{},
q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
var count64 *int64
if count != nil {
count64 = int64ToPtr(int64(*count))
@ -167,9 +168,9 @@ func (j *Jobs) Scale(jobID, group string, count *int,
"Job": jobID,
"Group": group,
},
Error: error,
Reason: reason,
Meta: meta,
Error: error,
Message: message,
Meta: meta,
}
var resp JobRegisterResponse
qm, err := j.client.write(fmt.Sprintf("/v1/job/%s/scale", url.PathEscape(jobID)), req, &resp, q)

View File

@ -1119,7 +1119,7 @@ func TestJobs_ScaleInvalidAction(t *testing.T) {
{"i-dont-exist", "me-neither", 1, "404"},
}
for _, test := range tests {
_, _, err := jobs.Scale(test.jobID, test.group, &test.value, stringToPtr("reason"), nil, nil, nil)
_, _, err := jobs.Scale(test.jobID, test.group, &test.value, "reason", false, nil, nil)
require.Errorf(err, "expected jobs.Scale(%s, %s) to fail", test.jobID, test.group)
require.Containsf(err.Error(), test.want, "jobs.Scale(%s, %s) error doesn't contain %s, got: %s", test.jobID, test.group, test.want, err)
}
@ -1133,7 +1133,7 @@ func TestJobs_ScaleInvalidAction(t *testing.T) {
// Perform a scaling action with bad group name, verify error
_, _, err = jobs.Scale(*job.ID, "incorrect-group-name", intToPtr(2),
stringToPtr("because"), nil, nil, nil)
"because", false, nil, nil)
require.Error(err)
require.Contains(err.Error(), "does not exist")
}
@ -1779,7 +1779,8 @@ func TestJobs_ScaleAction(t *testing.T) {
groupCount := *job.TaskGroups[0].Count
// Trying to scale against a target before it exists returns an error
_, _, err := jobs.Scale(id, "missing", intToPtr(groupCount+1), stringToPtr("this won't work"), nil, nil, nil)
_, _, err := jobs.Scale(id, "missing", intToPtr(groupCount+1), "this won't work",
false, nil, nil)
require.Error(err)
require.Contains(err.Error(), "not found")
@ -1791,7 +1792,10 @@ func TestJobs_ScaleAction(t *testing.T) {
// Perform scaling action
newCount := groupCount + 1
scalingResp, wm, err := jobs.Scale(id, groupName,
intToPtr(newCount), stringToPtr("need more instances"), nil, nil, nil)
intToPtr(newCount), "need more instances", false,
map[string]interface{}{
"meta": "data",
}, nil)
require.NoError(err)
require.NotNil(scalingResp)
@ -1805,7 +1809,71 @@ func TestJobs_ScaleAction(t *testing.T) {
require.NoError(err)
require.Equal(*resp.TaskGroups[0].Count, newCount)
// TODO: check that scaling event was persisted
// Check for the scaling event
status, _, err := jobs.ScaleStatus(*job.ID, nil)
require.NoError(err)
require.Len(status.TaskGroups[groupName].Events, 1)
scalingEvent := status.TaskGroups[groupName].Events[0]
require.False(scalingEvent.Error)
require.Equal("need more instances", scalingEvent.Message)
require.Equal(map[string]interface{}{
"meta": "data",
}, scalingEvent.Meta)
require.Greater(scalingEvent.Time, uint64(0))
require.NotNil(scalingEvent.EvalID)
require.Equal(scalingResp.EvalID, *scalingEvent.EvalID)
}
func TestJobs_ScaleAction_Error(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()
id := "job-id/with\\troublesome:characters\n?&字\000"
job := testJobWithScalingPolicy()
job.ID = &id
groupName := *job.TaskGroups[0].Name
prevCount := *job.TaskGroups[0].Count
// Register the job
regResp, wm, err := jobs.Register(job, nil)
require.NoError(err)
assertWriteMeta(t, wm)
// Perform scaling action
scaleResp, wm, err := jobs.Scale(id, groupName, nil, "something bad happened", true,
map[string]interface{}{
"meta": "data",
}, nil)
require.NoError(err)
require.NotNil(scaleResp)
require.Empty(scaleResp.EvalID)
require.Empty(scaleResp.EvalCreateIndex)
assertWriteMeta(t, wm)
// Query the job again
resp, _, err := jobs.Info(*job.ID, nil)
require.NoError(err)
require.Equal(*resp.TaskGroups[0].Count, prevCount)
require.Equal(regResp.JobModifyIndex, scaleResp.JobModifyIndex)
require.Empty(scaleResp.EvalCreateIndex)
require.Empty(scaleResp.EvalID)
status, _, err := jobs.ScaleStatus(*job.ID, nil)
require.NoError(err)
require.Len(status.TaskGroups[groupName].Events, 1)
errEvent := status.TaskGroups[groupName].Events[0]
require.True(errEvent.Error)
require.Equal("something bad happened", errEvent.Message)
require.Equal(map[string]interface{}{
"meta": "data",
}, errEvent.Meta)
require.Greater(errEvent.Time, uint64(0))
require.Nil(errEvent.EvalID)
}
func TestJobs_ScaleAction_Noop(t *testing.T) {
@ -1828,8 +1896,10 @@ func TestJobs_ScaleAction_Noop(t *testing.T) {
assertWriteMeta(t, wm)
// Perform scaling action
scaleResp, wm, err := jobs.Scale(id, groupName,
nil, stringToPtr("no count, just informative"), nil, nil, nil)
scaleResp, wm, err := jobs.Scale(id, groupName, nil, "no count, just informative",
false, map[string]interface{}{
"meta": "data",
}, nil)
require.NoError(err)
require.NotNil(scaleResp)
@ -1845,7 +1915,17 @@ func TestJobs_ScaleAction_Noop(t *testing.T) {
require.Empty(scaleResp.EvalCreateIndex)
require.Empty(scaleResp.EvalID)
// TODO: check that scaling event was persisted
status, _, err := jobs.ScaleStatus(*job.ID, nil)
require.NoError(err)
require.Len(status.TaskGroups[groupName].Events, 1)
noopEvent := status.TaskGroups[groupName].Events[0]
require.False(noopEvent.Error)
require.Equal("no count, just informative", noopEvent.Message)
require.Equal(map[string]interface{}{
"meta": "data",
}, noopEvent.Meta)
require.Greater(noopEvent.Time, uint64(0))
require.Nil(noopEvent.EvalID)
}
// TestJobs_ScaleStatus tests the /scale status endpoint for task group count
@ -1865,7 +1945,7 @@ func TestJobs_ScaleStatus(t *testing.T) {
require.Contains(err.Error(), "not found")
// Register the job
job := testJobWithScalingPolicy()
job := testJob()
job.ID = &id
groupName := *job.TaskGroups[0].Name
groupCount := *job.TaskGroups[0].Count

View File

@ -40,11 +40,11 @@ func (p *ScalingPolicy) Canonicalize(taskGroupCount int) {
// ScalingRequest is the payload for a generic scaling action
type ScalingRequest struct {
Count *int64
Target map[string]string
Reason *string
Error *string
Meta map[string]interface{}
Count *int64
Target map[string]string
Message string
Error bool
Meta map[string]interface{}
WriteRequest
// this is effectively a job update, so we need the ability to override policy.
PolicyOverride bool
@ -92,9 +92,10 @@ type TaskGroupScaleStatus struct {
}
type ScalingEvent struct {
Reason *string
Error *string
Meta map[string]interface{}
Time uint64
EvalID *string
Error bool
Message string
Meta map[string]interface{}
EvalID *string
Time uint64
CreateIndex uint64
}

View File

@ -517,7 +517,7 @@ func (s *HTTPServer) jobScaleAction(resp http.ResponseWriter, req *http.Request,
Target: args.Target,
Count: args.Count,
PolicyOverride: args.PolicyOverride,
Reason: args.Reason,
Message: args.Message,
Error: args.Error,
Meta: args.Meta,
}

View File

@ -686,8 +686,8 @@ func TestHTTP_Job_ScaleTaskGroup(t *testing.T) {
newCount := job.TaskGroups[0].Count + 1
scaleReq := &api.ScalingRequest{
Count: helper.Int64ToPtr(int64(newCount)),
Reason: helper.StringToPtr("testing"),
Count: helper.Int64ToPtr(int64(newCount)),
Message: "testing",
Target: map[string]string{
"Job": job.ID,
"Group": job.TaskGroups[0].Name,

View File

@ -51,6 +51,7 @@ const (
ScalingPolicySnapshot
CSIPluginSnapshot
CSIVolumeSnapshot
ScalingEventsSnapshot
)
// LogApplier is the definition of a function that can apply a Raft log
@ -269,6 +270,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyCSIVolumeDeregister(buf[1:], log.Index)
case structs.CSIVolumeClaimRequestType:
return n.applyCSIVolumeClaim(buf[1:], log.Index)
case structs.ScalingEventRegisterRequestType:
return n.applyUpsertScalingEvent(buf[1:], log.Index)
}
// Check enterprise only message types.
@ -1403,6 +1406,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}
case ScalingEventsSnapshot:
jobScalingEvents := new(structs.JobScalingEvents)
if err := dec.Decode(jobScalingEvents); err != nil {
return err
}
if err := restore.ScalingEventsRestore(jobScalingEvents); err != nil {
return err
}
case ScalingPolicySnapshot:
scalingPolicy := new(structs.ScalingPolicy)
if err := dec.Decode(scalingPolicy); err != nil {
@ -1629,6 +1642,21 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
return nil
}
func (n *nomadFSM) applyUpsertScalingEvent(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_scaling_event"}, time.Now())
var req structs.ScalingEventRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertScalingEvent(index, &req); err != nil {
n.logger.Error("UpsertScalingEvent failed", "error", err)
return err
}
return nil
}
func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now())
// Register the nodes
@ -1697,6 +1725,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistScalingEvents(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistCSIPlugins(sink, encoder); err != nil {
sink.Cancel()
return err
@ -2136,6 +2168,33 @@ func (s *nomadSnapshot) persistScalingPolicies(sink raft.SnapshotSink,
return nil
}
func (s *nomadSnapshot) persistScalingEvents(sink raft.SnapshotSink, encoder *codec.Encoder) error {
// Get all the scaling events
ws := memdb.NewWatchSet()
iter, err := s.snap.ScalingEvents(ws)
if err != nil {
return err
}
for {
// Get the next item
raw := iter.Next()
if raw == nil {
break
}
// Prepare the request struct
events := raw.(*structs.JobScalingEvents)
// Write out a scaling events snapshot
sink.Write([]byte{byte(ScalingEventsSnapshot)})
if err := encoder.Encode(events); err != nil {
return err
}
}
return nil
}
func (s *nomadSnapshot) persistCSIPlugins(sink raft.SnapshotSink,
encoder *codec.Encoder) error {

View File

@ -864,11 +864,8 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
if groupName == "" {
return structs.NewErrRPCCoded(400, "missing task group name for scaling action")
}
if args.Error != nil && args.Reason != nil {
return structs.NewErrRPCCoded(400, "scaling action should not contain error and scaling reason")
}
if args.Error != nil && args.Count != nil {
return structs.NewErrRPCCoded(400, "scaling action should not contain error and count")
if args.Error && args.Count != nil {
return structs.NewErrRPCCoded(400, "scaling action should not contain count if error is true")
}
// Check for submit-job permissions
@ -896,20 +893,30 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
return structs.NewErrRPCCoded(404, fmt.Sprintf("job %q not found", args.JobID))
}
jobModifyIndex := job.ModifyIndex
var found *structs.TaskGroup
for _, tg := range job.TaskGroups {
if groupName == tg.Name {
found = tg
break
}
}
if found == nil {
return structs.NewErrRPCCoded(400,
fmt.Sprintf("task group %q specified for scaling does not exist in job", groupName))
}
now := time.Now().UTC().UnixNano()
// If the count is present, commit the job update via Raft
// for now, we'll do this even if count didn't change
if args.Count != nil {
found := false
for _, tg := range job.TaskGroups {
if groupName == tg.Name {
tg.Count = int(*args.Count) // TODO: not safe, check this above
found = true
break
}
}
if !found {
truncCount := int(*args.Count)
if int64(truncCount) != *args.Count {
return structs.NewErrRPCCoded(400,
fmt.Sprintf("task group %q specified for scaling does not exist in job", groupName))
fmt.Sprintf("new scaling count is too large for TaskGroup.Count (int): %v", args.Count))
}
found.Count = truncCount
registerReq := structs.JobRegisterRequest{
Job: job,
EnforceIndex: true,
@ -917,58 +924,73 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
PolicyOverride: args.PolicyOverride,
WriteRequest: args.WriteRequest,
}
// Commit this update via Raft
_, jobModifyIndex, err = j.srv.raftApply(structs.JobRegisterRequestType, registerReq)
_, jobModifyIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, registerReq)
if err != nil {
j.logger.Error("job register for scale failed", "error", err)
return err
}
reply.JobModifyIndex = jobModifyIndex
} else {
reply.JobModifyIndex = job.ModifyIndex
}
// Populate the reply with job information
reply.JobModifyIndex = jobModifyIndex
reply.Index = reply.JobModifyIndex
// Only create an eval for non-dispatch jobs and if the count was provided
// for now, we'll do this even if count didn't change
if !job.IsPeriodic() && !job.IsParameterized() && args.Count != nil {
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerScaling,
JobID: args.JobID,
JobModifyIndex: reply.JobModifyIndex,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// FINISH:
// register the scaling event to the scaling_event table, once that exists
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "scale")
return err
}
// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) || args.Count == nil {
return nil
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
} else {
reply.EvalID = ""
reply.EvalCreateIndex = 0
}
// Create a new evaluation
// FINISH: only do this if args.Error == nil || ""
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerScaling,
JobID: args.JobID,
JobModifyIndex: jobModifyIndex,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
event := &structs.ScalingEventRequest{
Namespace: job.Namespace,
JobID: job.ID,
TaskGroup: groupName,
ScalingEvent: &structs.ScalingEvent{
Time: now,
Count: args.Count,
Message: args.Message,
Error: args.Error,
Meta: args.Meta,
},
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
if reply.EvalID != "" {
event.ScalingEvent.EvalID = &reply.EvalID
}
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
_, eventIndex, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event)
if err != nil {
j.logger.Error("eval create failed", "error", err, "method", "deregister")
j.logger.Error("scaling event create failed", "error", err)
return err
}
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = reply.EvalCreateIndex
reply.Index = eventIndex
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
@ -1745,6 +1767,14 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest,
return err
}
events, eventsIndex, err := state.ScalingEventsByJob(ws, args.RequestNamespace(), args.JobID)
if err != nil {
return err
}
if events == nil {
events = make(map[string][]*structs.ScalingEvent)
}
// Setup the output
reply.JobScaleStatus = &structs.JobScaleStatus{
JobID: job.ID,
@ -1765,14 +1795,18 @@ func (j *Job) ScaleStatus(args *structs.JobScaleStatusRequest,
tgScale.Unhealthy = ds.UnhealthyAllocs
}
}
tgScale.Events = events[tg.Name]
reply.JobScaleStatus.TaskGroups[tg.Name] = tgScale
}
if deployment != nil && deployment.ModifyIndex > job.ModifyIndex {
reply.Index = deployment.ModifyIndex
} else {
reply.Index = job.ModifyIndex
maxIndex := job.ModifyIndex
if deployment != nil && deployment.ModifyIndex > maxIndex {
maxIndex = deployment.ModifyIndex
}
if eventsIndex > maxIndex {
maxIndex = eventsIndex
}
reply.Index = maxIndex
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)

View File

@ -5396,8 +5396,8 @@ func TestJobEndpoint_Scale(t *testing.T) {
Target: map[string]string{
structs.ScalingTargetGroup: job.TaskGroups[0].Name,
},
Count: helper.Int64ToPtr(int64(count + 1)),
Reason: helper.StringToPtr("this should fail"),
Count: helper.Int64ToPtr(int64(count + 1)),
Message: "because of the load",
Meta: map[string]interface{}{
"metrics": map[string]string{
"1": "a",
@ -5415,6 +5415,7 @@ func TestJobEndpoint_Scale(t *testing.T) {
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
require.NoError(err)
require.NotEmpty(resp.EvalID)
require.Greater(resp.EvalCreateIndex, resp.JobModifyIndex)
}
func TestJobEndpoint_Scale_ACL(t *testing.T) {
@ -5436,7 +5437,7 @@ func TestJobEndpoint_Scale_ACL(t *testing.T) {
Target: map[string]string{
structs.ScalingTargetGroup: job.TaskGroups[0].Name,
},
Reason: helper.StringToPtr("this should fail"),
Message: "because of the load",
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
@ -5521,8 +5522,8 @@ func TestJobEndpoint_Scale_Invalid(t *testing.T) {
Target: map[string]string{
structs.ScalingTargetGroup: job.TaskGroups[0].Name,
},
Count: helper.Int64ToPtr(int64(count) + 1),
Reason: helper.StringToPtr("this should fail"),
Count: helper.Int64ToPtr(int64(count) + 1),
Message: "this should fail",
Meta: map[string]interface{}{
"metrics": map[string]string{
"1": "a",
@ -5545,19 +5546,72 @@ func TestJobEndpoint_Scale_Invalid(t *testing.T) {
err = state.UpsertJob(1000, job)
require.Nil(err)
scale.Count = nil
scale.Error = helper.StringToPtr("error and reason")
scale.Reason = helper.StringToPtr("is not allowed")
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
require.Error(err)
require.Contains(err.Error(), "should not contain error and scaling reason")
scale.Count = helper.Int64ToPtr(10)
scale.Reason = nil
scale.Error = helper.StringToPtr("error and count is not allowed")
scale.Message = "error message"
scale.Error = true
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
require.Error(err)
require.Contains(err.Error(), "should not contain error and count")
require.Contains(err.Error(), "should not contain count if error is true")
}
func TestJobEndpoint_Scale_NoEval(t *testing.T) {
t.Parallel()
require := require.New(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()
job := mock.Job()
groupName := job.TaskGroups[0].Name
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}, &resp)
jobCreateIndex := resp.Index
require.NoError(err)
scale := &structs.JobScaleRequest{
JobID: job.ID,
Target: map[string]string{
structs.ScalingTargetGroup: groupName,
},
Count: nil, // no count => no eval
Message: "something informative",
Meta: map[string]interface{}{
"metrics": map[string]string{
"1": "a",
"2": "b",
},
"other": "value",
},
PolicyOverride: false,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
require.NoError(err)
require.Empty(resp.EvalID)
require.Empty(resp.EvalCreateIndex)
jobEvents, eventsIndex, err := state.ScalingEventsByJob(nil, job.Namespace, job.ID)
require.NoError(err)
require.NotNil(jobEvents)
require.Len(jobEvents, 1)
require.Contains(jobEvents, groupName)
groupEvents := jobEvents[groupName]
require.Len(groupEvents, 1)
event := groupEvents[0]
require.Nil(event.EvalID)
require.Greater(eventsIndex, jobCreateIndex)
}
func TestJobEndpoint_GetScaleStatus(t *testing.T) {

View File

@ -51,6 +51,7 @@ func init() {
csiVolumeTableSchema,
csiPluginTableSchema,
scalingPolicyTableSchema,
scalingEventTableSchema,
}...)
}
@ -789,7 +790,6 @@ func (s *ScalingPolicyTargetFieldIndex) PrefixFromArgs(args ...interface{}) ([]b
}
// scalingPolicyTableSchema returns the MemDB schema for the policy table.
// This table is used to store the policies which are referenced by tokens
func scalingPolicyTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "scaling_policy",
@ -842,3 +842,41 @@ func scalingPolicyTableSchema() *memdb.TableSchema {
},
}
}
// scalingEventTableSchema returns the memdb schema for job scaling events
func scalingEventTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "scaling_event",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
AllowMissing: false,
Unique: true,
// Use a compound index so the tuple of (Namespace, JobID) is
// uniquely identifying
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},
&memdb.StringFieldIndex{
Field: "JobID",
},
},
},
},
// TODO: need to figure out whether we want to index these or the jobs or ...
// "error": {
// Name: "error",
// AllowMissing: false,
// Unique: false,
// Indexer: &memdb.FieldSetIndex{
// Field: "Error",
// },
// },
},
}
}

View File

@ -627,6 +627,89 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro
return nil
}
// UpsertScalingEvent is used to insert a new scaling event.
// Only the most recent JobTrackedScalingEvents will be kept.
func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventRequest) error {
txn := s.db.Txn(true)
defer txn.Abort()
// Get the existing events
existing, err := txn.First("scaling_event", "id", req.Namespace, req.JobID)
if err != nil {
return fmt.Errorf("scaling event lookup failed: %v", err)
}
var jobEvents *structs.JobScalingEvents
if existing != nil {
jobEvents = existing.(*structs.JobScalingEvents)
} else {
jobEvents = &structs.JobScalingEvents{
Namespace: req.Namespace,
JobID: req.JobID,
ScalingEvents: make(map[string][]*structs.ScalingEvent),
}
}
jobEvents.ModifyIndex = index
req.ScalingEvent.CreateIndex = index
events := jobEvents.ScalingEvents[req.TaskGroup]
// Prepend this latest event
events = append(
[]*structs.ScalingEvent{req.ScalingEvent},
events...,
)
// Truncate older events
if len(events) > structs.JobTrackedScalingEvents {
events = events[0:structs.JobTrackedScalingEvents]
}
jobEvents.ScalingEvents[req.TaskGroup] = events
// Insert the new event
if err := txn.Insert("scaling_event", jobEvents); err != nil {
return fmt.Errorf("scaling event insert failed: %v", err)
}
// Update the indexes table for scaling_event
if err := txn.Insert("index", &IndexEntry{"scaling_event", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()
return nil
}
// ScalingEvents returns an iterator over all the job scaling events
func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
// Walk the entire scaling_event table
iter, err := txn.Get("scaling_event", "id")
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}
func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, uint64, error) {
txn := s.db.Txn(false)
watchCh, existing, err := txn.FirstWatch("scaling_event", "id", namespace, jobID)
if err != nil {
return nil, 0, fmt.Errorf("job scaling events lookup failed: %v", err)
}
ws.Add(watchCh)
if existing != nil {
events := existing.(*structs.JobScalingEvents)
return events.ScalingEvents, events.ModifyIndex, nil
}
return nil, 0, nil
}
// UpsertNode is used to register a node or update a node definition
// This is assumed to be triggered by the client, so we retain the value
// of drain/eligibility which is set by the scheduler.
@ -1407,6 +1490,14 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
}
}
// Delete the scaling events
if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil {
return fmt.Errorf("deleting job scaling events failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"scaling_event", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
// Cleanup plugins registered by this job
err = s.deleteJobFromPlugin(index, txn, job)
if err != nil {
@ -5278,3 +5369,10 @@ func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error {
}
return nil
}
func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) error {
if err := r.txn.Insert("scaling_event", jobEvents); err != nil {
return fmt.Errorf("scaling event insert failed: %v", err)
}
return nil
}

View File

@ -8552,6 +8552,172 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {
require.Equal(expect, found)
}
func TestStateStore_UpsertScalingEvent(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
job := mock.Job()
groupName := job.TaskGroups[0].Name
newEvent := structs.NewScalingEvent("message 1").SetMeta(map[string]interface{}{
"a": 1,
})
wsAll := memdb.NewWatchSet()
all, err := state.ScalingEvents(wsAll)
require.NoError(err)
require.Nil(all.Next())
ws := memdb.NewWatchSet()
out, _, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID)
require.NoError(err)
require.Nil(out)
err = state.UpsertScalingEvent(1000, &structs.ScalingEventRequest{
Namespace: job.Namespace,
JobID: job.ID,
TaskGroup: groupName,
ScalingEvent: newEvent,
})
require.NoError(err)
require.True(watchFired(ws))
require.True(watchFired(wsAll))
ws = memdb.NewWatchSet()
out, eventsIndex, err := state.ScalingEventsByJob(ws, job.Namespace, job.ID)
require.NoError(err)
require.Equal(map[string][]*structs.ScalingEvent{
groupName: {newEvent},
}, out)
require.EqualValues(eventsIndex, 1000)
iter, err := state.ScalingEvents(ws)
require.NoError(err)
count := 0
jobsReturned := []string{}
var jobEvents *structs.JobScalingEvents
for {
raw := iter.Next()
if raw == nil {
break
}
jobEvents = raw.(*structs.JobScalingEvents)
jobsReturned = append(jobsReturned, jobEvents.JobID)
count++
}
require.Equal(1, count)
require.EqualValues(jobEvents.ModifyIndex, 1000)
require.EqualValues(jobEvents.ScalingEvents[groupName][0].CreateIndex, 1000)
index, err := state.Index("scaling_event")
require.NoError(err)
require.ElementsMatch([]string{job.ID}, jobsReturned)
require.Equal(map[string][]*structs.ScalingEvent{
groupName: {newEvent},
}, jobEvents.ScalingEvents)
require.EqualValues(1000, index)
require.False(watchFired(ws))
}
func TestStateStore_UpsertScalingEvent_LimitAndOrder(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
namespace := uuid.Generate()
jobID := uuid.Generate()
group1 := uuid.Generate()
group2 := uuid.Generate()
index := uint64(1000)
for i := 1; i <= structs.JobTrackedScalingEvents+10; i++ {
newEvent := structs.NewScalingEvent("").SetMeta(map[string]interface{}{
"i": i,
"group": group1,
})
err := state.UpsertScalingEvent(index, &structs.ScalingEventRequest{
Namespace: namespace,
JobID: jobID,
TaskGroup: group1,
ScalingEvent: newEvent,
})
index++
require.NoError(err)
newEvent = structs.NewScalingEvent("").SetMeta(map[string]interface{}{
"i": i,
"group": group2,
})
err = state.UpsertScalingEvent(index, &structs.ScalingEventRequest{
Namespace: namespace,
JobID: jobID,
TaskGroup: group2,
ScalingEvent: newEvent,
})
index++
require.NoError(err)
}
out, _, err := state.ScalingEventsByJob(nil, namespace, jobID)
require.NoError(err)
require.Len(out, 2)
expectedEvents := []int{}
for i := structs.JobTrackedScalingEvents; i > 0; i-- {
expectedEvents = append(expectedEvents, i+10)
}
// checking order and content
require.Len(out[group1], structs.JobTrackedScalingEvents)
actualEvents := []int{}
for _, event := range out[group1] {
require.Equal(group1, event.Meta["group"])
actualEvents = append(actualEvents, event.Meta["i"].(int))
}
require.Equal(expectedEvents, actualEvents)
// checking order and content
require.Len(out[group2], structs.JobTrackedScalingEvents)
actualEvents = []int{}
for _, event := range out[group2] {
require.Equal(group2, event.Meta["group"])
actualEvents = append(actualEvents, event.Meta["i"].(int))
}
require.Equal(expectedEvents, actualEvents)
}
func TestStateStore_RestoreScalingEvents(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
jobScalingEvents := &structs.JobScalingEvents{
Namespace: uuid.Generate(),
JobID: uuid.Generate(),
ScalingEvents: map[string][]*structs.ScalingEvent{
uuid.Generate(): {
structs.NewScalingEvent(uuid.Generate()),
},
},
}
restore, err := state.Restore()
require.NoError(err)
err = restore.ScalingEventsRestore(jobScalingEvents)
require.NoError(err)
restore.Commit()
ws := memdb.NewWatchSet()
out, _, err := state.ScalingEventsByJob(ws, jobScalingEvents.Namespace,
jobScalingEvents.JobID)
require.NoError(err)
require.NotNil(out)
require.EqualValues(jobScalingEvents.ScalingEvents, out)
}
func TestStateStore_Abandon(t *testing.T) {
t.Parallel()

View File

@ -90,6 +90,7 @@ const (
CSIVolumeRegisterRequestType
CSIVolumeDeregisterRequestType
CSIVolumeClaimRequestType
ScalingEventRegisterRequestType
)
const (
@ -625,8 +626,8 @@ type JobScaleRequest struct {
JobID string
Target map[string]string
Count *int64
Reason *string
Error *string
Message string
Error bool
Meta map[string]interface{}
// PolicyOverride is set when the user is attempting to override any policies
PolicyOverride bool
@ -1255,16 +1256,7 @@ type TaskGroupScaleStatus struct {
Running int
Healthy int
Unhealthy int
Events []ScalingEvent
}
// ScalingEvent represents a specific scaling event
type ScalingEvent struct {
Reason *string
Error *string
Meta map[string]interface{}
Time uint64
EvalID *string
Events []*ScalingEvent
}
type JobDispatchResponse struct {
@ -3517,6 +3509,10 @@ const (
// JobTrackedVersions is the number of historic job versions that are
// kept.
JobTrackedVersions = 6
// JobTrackedScalingEvents is the number of scaling events that are
// kept for a single task group.
JobTrackedScalingEvents = 20
)
// Job is the scope of a scheduling request to Nomad. It is the largest
@ -4659,6 +4655,77 @@ const (
ReasonWithinPolicy = "Restart within policy"
)
// JobScalingEvents contains the scaling events for a given job
type JobScalingEvents struct {
Namespace string
JobID string
// This map is indexed by target; currently, this is just task group
// the indexed array is sorted from newest to oldest event
// the array should have less than JobTrackedScalingEvents entries
ScalingEvents map[string][]*ScalingEvent
// Raft index
ModifyIndex uint64
}
// Factory method for ScalingEvent objects
func NewScalingEvent(message string) *ScalingEvent {
return &ScalingEvent{
Time: time.Now().Unix(),
Message: message,
}
}
// ScalingEvent describes a scaling event against a Job
type ScalingEvent struct {
// Unix Nanosecond timestamp for the scaling event
Time int64
// Count is the new scaling count, if provided
Count *int64
// Message is the message describing a scaling event
Message string
// Error indicates an error state for this scaling event
Error bool
// Meta is a map of metadata returned during a scaling event
Meta map[string]interface{}
// EvalID is the ID for an evaluation if one was created as part of a scaling event
EvalID *string
// Raft index
CreateIndex uint64
}
func (e *ScalingEvent) SetError(error bool) *ScalingEvent {
e.Error = error
return e
}
func (e *ScalingEvent) SetMeta(meta map[string]interface{}) *ScalingEvent {
e.Meta = meta
return e
}
func (e *ScalingEvent) SetEvalID(evalID string) *ScalingEvent {
e.EvalID = &evalID
return e
}
// ScalingEventRequest is by for Job.Scale endpoint
// to register scaling events
type ScalingEventRequest struct {
Namespace string
JobID string
TaskGroup string
ScalingEvent *ScalingEvent
}
// ScalingPolicy specifies the scaling policy for a scaling target
type ScalingPolicy struct {
// ID is a generated UUID used for looking up the scaling policy