added new policy capabilities for recommendations API

state store: call-out to generic update of job recommendations from job update method
recommendations API work, and http endpoint errors for OSS
support for scaling polices in task block of job spec
add query filters for ScalingPolicy list endpoint
command: nomad scaling policy list: added -job and -type
This commit is contained in:
Chris Baker 2020-09-09 22:30:40 +00:00
parent be3f54d296
commit 719077a26d
38 changed files with 1282 additions and 301 deletions

View file

@ -44,6 +44,7 @@ const (
NamespaceCapabilityReadScalingPolicy = "read-scaling-policy" NamespaceCapabilityReadScalingPolicy = "read-scaling-policy"
NamespaceCapabilityReadJobScaling = "read-job-scaling" NamespaceCapabilityReadJobScaling = "read-job-scaling"
NamespaceCapabilityScaleJob = "scale-job" NamespaceCapabilityScaleJob = "scale-job"
NamespaceCapabilitySubmitRecommendation = "submit-recommendation"
) )
var ( var (
@ -153,7 +154,7 @@ func isNamespaceCapabilityValid(cap string) bool {
NamespaceCapabilityListScalingPolicies, NamespaceCapabilityReadScalingPolicy, NamespaceCapabilityReadJobScaling, NamespaceCapabilityScaleJob: NamespaceCapabilityListScalingPolicies, NamespaceCapabilityReadScalingPolicy, NamespaceCapabilityReadJobScaling, NamespaceCapabilityScaleJob:
return true return true
// Separate the enterprise-only capabilities // Separate the enterprise-only capabilities
case NamespaceCapabilitySentinelOverride: case NamespaceCapabilitySentinelOverride, NamespaceCapabilitySubmitRecommendation:
return true return true
default: default:
return false return false
@ -183,6 +184,7 @@ func expandNamespacePolicy(policy string) []string {
NamespaceCapabilityAllocLifecycle, NamespaceCapabilityAllocLifecycle,
NamespaceCapabilityCSIMountVolume, NamespaceCapabilityCSIMountVolume,
NamespaceCapabilityCSIWriteVolume, NamespaceCapabilityCSIWriteVolume,
NamespaceCapabilitySubmitRecommendation,
}...) }...)
switch policy { switch policy {

View file

@ -106,6 +106,7 @@ func TestParse(t *testing.T) {
NamespaceCapabilityAllocLifecycle, NamespaceCapabilityAllocLifecycle,
NamespaceCapabilityCSIMountVolume, NamespaceCapabilityCSIMountVolume,
NamespaceCapabilityCSIWriteVolume, NamespaceCapabilityCSIWriteVolume,
NamespaceCapabilitySubmitRecommendation,
}, },
}, },
{ {

123
api/recommendations.go Normal file
View file

@ -0,0 +1,123 @@
package api
// Recommendations is used to query the recommendations endpoints.
type Recommendations struct {
client *Client
}
// Recommendations returns a new handle on the recommendations endpoints.
func (c *Client) Recommendations() *Recommendations {
return &Recommendations{client: c}
}
// List is used to dump all of the recommendations in the cluster
func (r *Recommendations) List(q *QueryOptions) ([]*Recommendation, *QueryMeta, error) {
var resp []*Recommendation
qm, err := r.client.query("/v1/recommendations", &resp, q)
if err != nil {
return nil, qm, err
}
return resp, qm, nil
}
// Info is used to return information on a single recommendation
func (r *Recommendations) Info(id string, q *QueryOptions) (*Recommendation, *QueryMeta, error) {
var resp Recommendation
qm, err := r.client.query("/v1/recommendation/"+id, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}
// Upsert is used to create or update a recommendation
func (r *Recommendations) Upsert(rec *Recommendation, q *WriteOptions) (*Recommendation, *WriteMeta, error) {
var resp Recommendation
wm, err := r.client.write("/v1/recommendation", rec, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}
// Delete is used to delete a list of recommendations
func (r *Recommendations) Delete(ids []string, q *WriteOptions) (*WriteMeta, error) {
req := &RecommendationApplyRequest{
Apply: []string{},
Dismiss: ids,
}
wm, err := r.client.write("/v1/recommendations/apply", req, nil, q)
if err != nil {
return nil, err
}
return wm, nil
}
// Apply is used to apply a set of recommendations
func (r *Recommendations) Apply(ids []string, policyOverride bool) (
*RecommendationApplyResponse, *WriteMeta, error) {
req := &RecommendationApplyRequest{
Apply: ids,
PolicyOverride: policyOverride,
}
var resp RecommendationApplyResponse
wm, err := r.client.write("/v1/recommendations/apply", req, &resp, nil)
if err != nil {
return nil, nil, err
}
resp.WriteMeta = *wm
return &resp, wm, nil
}
// Recommendation is used to serialize a recommendation.
type Recommendation struct {
ID string
Region string
Namespace string
JobID string
JobVersion uint64
Group string
Task string
Resource string
Value int
Current int
Meta map[string]interface{}
Stats map[string]float64
EnforceVersion bool
SubmitTime int64
CreateIndex uint64
ModifyIndex uint64
}
// RecommendationApplyRequest is used to apply and/or dismiss a set of recommendations
type RecommendationApplyRequest struct {
Apply []string
Dismiss []string
PolicyOverride bool
}
// RecommendationApplyResponse is used to apply a set of recommendations
type RecommendationApplyResponse struct {
UpdatedJobs []*SingleRecommendationApplyResult
Errors []*SingleRecommendationApplyError
WriteMeta
}
type SingleRecommendationApplyResult struct {
Namespace string
JobID string
JobModifyIndex uint64
EvalID string
EvalCreateIndex uint64
Warnings string
Recommendations []string
}
type SingleRecommendationApplyError struct {
Namespace string
JobID string
Recommendations []string
Error string
}

View file

@ -66,12 +66,12 @@ type ScalingPolicy struct {
Max *int64 `hcl:"max,optional"` Max *int64 `hcl:"max,optional"`
Policy map[string]interface{} `hcl:"policy,block"` Policy map[string]interface{} `hcl:"policy,block"`
Enabled *bool `hcl:"enabled,optional"` Enabled *bool `hcl:"enabled,optional"`
Type string `hcl:"type,optional"`
/* fields set by server */ /* fields set by server */
ID string ID string
Namespace string Namespace string
Type string
Target map[string]string Target map[string]string
CreateIndex uint64 CreateIndex uint64
ModifyIndex uint64 ModifyIndex uint64

View file

@ -671,6 +671,7 @@ type Task struct {
ShutdownDelay time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"` ShutdownDelay time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"`
KillSignal string `mapstructure:"kill_signal" hcl:"kill_signal,optional"` KillSignal string `mapstructure:"kill_signal" hcl:"kill_signal,optional"`
Kind string `hcl:"kind,optional"` Kind string `hcl:"kind,optional"`
ScalingPolicies []*ScalingPolicy `hcl:"scaling,block"`
} }
func (t *Task) Canonicalize(tg *TaskGroup, job *Job) { func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {

View file

@ -68,6 +68,32 @@ func testPeriodicJob() *Job {
return job return job
} }
func testRecommendation(job *Job) *Recommendation {
rec := &Recommendation{
ID: "",
Region: *job.Region,
Namespace: *job.Namespace,
JobID: *job.ID,
Group: *job.TaskGroups[0].Name,
Task: job.TaskGroups[0].Tasks[0].Name,
Resource: "CPU",
Value: *job.TaskGroups[0].Tasks[0].Resources.CPU * 2,
Meta: map[string]interface{}{
"testing": true,
"mocked": "also true",
},
Stats: map[string]float64{
"median": 50.0,
"mean": 51.0,
"max": 75.5,
"99": 73.0,
"min": 0.0,
},
EnforceVersion: false,
}
return rec
}
func testNamespace() *Namespace { func testNamespace() *Namespace {
return &Namespace{ return &Namespace{
Name: "test-namespace", Name: "test-namespace",

View file

@ -17,6 +17,11 @@ func (s *HTTPServer) registerEnterpriseHandlers() {
s.mux.HandleFunc("/v1/quota", s.wrap(s.entOnly)) s.mux.HandleFunc("/v1/quota", s.wrap(s.entOnly))
s.mux.HandleFunc("/v1/operator/license", s.wrap(s.entOnly)) s.mux.HandleFunc("/v1/operator/license", s.wrap(s.entOnly))
s.mux.HandleFunc("/v1/recommendation", s.wrap(s.entOnly))
s.mux.HandleFunc("/v1/recommendations", s.wrap(s.entOnly))
s.mux.HandleFunc("/v1/recommendations/apply", s.wrap(s.entOnly))
s.mux.HandleFunc("/v1/recommendation/", s.wrap(s.entOnly))
} }
func (s *HTTPServer) entOnly(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPServer) entOnly(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

View file

@ -982,7 +982,7 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
tg.Tasks = make([]*structs.Task, l) tg.Tasks = make([]*structs.Task, l)
for l, task := range taskGroup.Tasks { for l, task := range taskGroup.Tasks {
t := &structs.Task{} t := &structs.Task{}
ApiTaskToStructsTask(task, t) ApiTaskToStructsTask(job, tg, task, t)
// Set the tasks vault namespace from Job if it was not // Set the tasks vault namespace from Job if it was not
// specified by the task or group // specified by the task or group
@ -996,7 +996,9 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
// ApiTaskToStructsTask is a copy and type conversion between the API // ApiTaskToStructsTask is a copy and type conversion between the API
// representation of a task from a struct representation of a task. // representation of a task from a struct representation of a task.
func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup,
apiTask *api.Task, structsTask *structs.Task) {
structsTask.Name = apiTask.Name structsTask.Name = apiTask.Name
structsTask.Driver = apiTask.Driver structsTask.Driver = apiTask.Driver
structsTask.User = apiTask.User structsTask.User = apiTask.User
@ -1033,6 +1035,13 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
} }
} }
if l := len(apiTask.ScalingPolicies); l != 0 {
structsTask.ScalingPolicies = make([]*structs.ScalingPolicy, l)
for i, policy := range apiTask.ScalingPolicies {
structsTask.ScalingPolicies[i] = ApiScalingPolicyToStructs(0, policy).TargetTask(job, group, structsTask)
}
}
if l := len(apiTask.Services); l != 0 { if l := len(apiTask.Services); l != 0 {
structsTask.Services = make([]*structs.Service, l) structsTask.Services = make([]*structs.Service, l)
for i, service := range apiTask.Services { for i, service := range apiTask.Services {

View file

@ -22,6 +22,12 @@ func (s *HTTPServer) scalingPoliciesListRequest(resp http.ResponseWriter, req *h
if s.parse(resp, req, &args.Region, &args.QueryOptions) { if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil return nil, nil
} }
if job := req.URL.Query().Get("job"); job != "" {
args.Job = job
}
if tpe := req.URL.Query().Get("type"); tpe != "" {
args.Type = tpe
}
var out structs.ScalingPolicyListResponse var out structs.ScalingPolicyListResponse
if err := s.agent.RPC("Scaling.ListPolicies", &args, &out); err != nil { if err := s.agent.RPC("Scaling.ListPolicies", &args, &out); err != nil {
@ -77,10 +83,15 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ
func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.ScalingPolicy { func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.ScalingPolicy {
p := structs.ScalingPolicy{ p := structs.ScalingPolicy{
Enabled: *ap.Enabled, Type: ap.Type,
Policy: ap.Policy, Policy: ap.Policy,
Target: map[string]string{}, Target: map[string]string{},
} }
if ap.Enabled != nil {
p.Enabled = *ap.Enabled
} else {
p.Enabled = true
}
if ap.Max != nil { if ap.Max != nil {
p.Max = *ap.Max p.Max = *ap.Max
} else { } else {

View file

@ -12,6 +12,7 @@ import (
) )
func TestHTTP_ScalingPoliciesList(t *testing.T) { func TestHTTP_ScalingPoliciesList(t *testing.T) {
require := require.New(t)
t.Parallel() t.Parallel()
httpTest(t, nil, func(s *TestAgent) { httpTest(t, nil, func(s *TestAgent) {
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
@ -26,40 +27,75 @@ func TestHTTP_ScalingPoliciesList(t *testing.T) {
}, },
} }
var resp structs.JobRegisterResponse var resp structs.JobRegisterResponse
if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil { require.NoError(s.Agent.RPC("Job.Register", &args, &resp))
t.Fatalf("err: %v", err)
}
} }
// Make the HTTP request // Make the HTTP request
req, err := http.NewRequest("GET", "/v1/scaling/policies", nil) req, err := http.NewRequest("GET", "/v1/scaling/policies", nil)
if err != nil { require.NoError(err)
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder() respW := httptest.NewRecorder()
// Make the request // Make the request
obj, err := s.Server.ScalingPoliciesRequest(respW, req) obj, err := s.Server.ScalingPoliciesRequest(respW, req)
if err != nil { require.NoError(err)
t.Fatalf("err: %v", err)
}
// Check for the index // Check for the index
if respW.Header().Get("X-Nomad-Index") == "" { require.NotEmpty(respW.Header().Get("X-Nomad-Index"), "missing index")
t.Fatalf("missing index") require.NotEmpty(respW.Header().Get("X-Nomad-KnownLeader"), "missing known leader")
} require.NotEmpty(respW.Header().Get("X-Nomad-LastContact"), "missing last contact")
if respW.Header().Get("X-Nomad-KnownLeader") != "true" {
t.Fatalf("missing known leader")
}
if respW.Header().Get("X-Nomad-LastContact") == "" {
t.Fatalf("missing last contact")
}
// Check the list // Check the list
l := obj.([]*structs.ScalingPolicyListStub) l := obj.([]*structs.ScalingPolicyListStub)
if len(l) != 3 { require.Len(l, 3)
t.Fatalf("bad: %#v", l) })
} }
func TestHTTP_ScalingPoliciesList_Filter(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
var job *structs.Job
for i := 0; i < 3; i++ {
// Create the job
job, _ = mock.JobWithScalingPolicy()
args := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var resp structs.JobRegisterResponse
require.NoError(s.Agent.RPC("Job.Register", &args, &resp))
}
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/scaling/policies?job="+job.ID, nil)
require.NoError(err)
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.ScalingPoliciesRequest(respW, req)
require.NoError(err)
// Check the list
l := obj.([]*structs.ScalingPolicyListStub)
require.Len(l, 1)
// Request again, with policy type filter
req, err = http.NewRequest("GET", "/v1/scaling/policies?type=cluster", nil)
require.NoError(err)
respW = httptest.NewRecorder()
// Make the request
obj, err = s.Server.ScalingPoliciesRequest(respW, req)
require.NoError(err)
// Check the list
l = obj.([]*structs.ScalingPolicyListStub)
require.Len(l, 0)
}) })
} }
@ -90,15 +126,9 @@ func TestHTTP_ScalingPolicyGet(t *testing.T) {
require.NoError(err) require.NoError(err)
// Check for the index // Check for the index
if respW.Header().Get("X-Nomad-Index") == "" { require.NotEmpty(respW.Header().Get("X-Nomad-Index"), "missing index")
t.Fatalf("missing index") require.NotEmpty(respW.Header().Get("X-Nomad-KnownLeader"), "missing known leader")
} require.NotEmpty(respW.Header().Get("X-Nomad-LastContact"), "missing last contact")
if respW.Header().Get("X-Nomad-KnownLeader") != "true" {
t.Fatalf("missing known leader")
}
if respW.Header().Get("X-Nomad-LastContact") == "" {
t.Fatalf("missing last contact")
}
// Check the policy // Check the policy
require.Equal(p.ID, obj.(*structs.ScalingPolicy).ID) require.Equal(p.ID, obj.(*structs.ScalingPolicy).ID)

View file

@ -31,6 +31,9 @@ General Options:
Policy Info Options: Policy Info Options:
-type
Filter scaling policies by type.
-json -json
Output the scaling policy in its JSON format. Output the scaling policy in its JSON format.
@ -48,6 +51,8 @@ func (s *ScalingPolicyListCommand) Synopsis() string {
func (s *ScalingPolicyListCommand) AutocompleteFlags() complete.Flags { func (s *ScalingPolicyListCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(s.Meta.AutocompleteFlags(FlagSetClient), return mergeAutocompleteFlags(s.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{ complete.Flags{
"-job": complete.PredictNothing,
"-type": complete.PredictNothing,
"-json": complete.PredictNothing, "-json": complete.PredictNothing,
"-t": complete.PredictAnything, "-t": complete.PredictAnything,
}) })
@ -59,12 +64,14 @@ func (s *ScalingPolicyListCommand) Name() string { return "scaling policy list"
// Run satisfies the cli.Command Run function. // Run satisfies the cli.Command Run function.
func (s *ScalingPolicyListCommand) Run(args []string) int { func (s *ScalingPolicyListCommand) Run(args []string) int {
var json bool var json bool
var tmpl string var tmpl, policyType, job string
flags := s.Meta.FlagSet(s.Name(), FlagSetClient) flags := s.Meta.FlagSet(s.Name(), FlagSetClient)
flags.Usage = func() { s.Ui.Output(s.Help()) } flags.Usage = func() { s.Ui.Output(s.Help()) }
flags.BoolVar(&json, "json", false, "") flags.BoolVar(&json, "json", false, "")
flags.StringVar(&tmpl, "t", "", "") flags.StringVar(&tmpl, "t", "", "")
flags.StringVar(&policyType, "type", "", "")
flags.StringVar(&job, "job", "", "")
if err := flags.Parse(args); err != nil { if err := flags.Parse(args); err != nil {
return 1 return 1
} }
@ -81,7 +88,16 @@ func (s *ScalingPolicyListCommand) Run(args []string) int {
return 1 return 1
} }
policies, _, err := client.Scaling().ListPolicies(nil) q := &api.QueryOptions{
Params: map[string]string{},
}
if policyType != "" {
q.Params["type"] = policyType
}
if job != "" {
q.Params["job"] = job
}
policies, _, err := client.Scaling().ListPolicies(q)
if err != nil { if err != nil {
s.Ui.Error(fmt.Sprintf("Error listing scaling policies: %s", err)) s.Ui.Error(fmt.Sprintf("Error listing scaling policies: %s", err))
return 1 return 1
@ -103,7 +119,7 @@ func (s *ScalingPolicyListCommand) Run(args []string) int {
} }
// Create the output table header. // Create the output table header.
output := []string{"ID|Enabled|Target"} output := []string{"ID|Enabled|Type|Target"}
// Sort the list of policies based on their target. // Sort the list of policies based on their target.
sortedPolicies := scalingPolicyStubList{policies: policies} sortedPolicies := scalingPolicyStubList{policies: policies}
@ -112,8 +128,8 @@ func (s *ScalingPolicyListCommand) Run(args []string) int {
// Iterate the policies and add to the output. // Iterate the policies and add to the output.
for _, policy := range sortedPolicies.policies { for _, policy := range sortedPolicies.policies {
output = append(output, fmt.Sprintf( output = append(output, fmt.Sprintf(
"%s|%v|%s", "%s|%v|%s|%s",
policy.ID, policy.Enabled, formatScalingPolicyTarget(policy.Target))) policy.ID, policy.Enabled, policy.Type, formatScalingPolicyTarget(policy.Target)))
} }
// Output. // Output.

View file

@ -1,52 +1,36 @@
package command package command
import ( import (
"fmt"
"strings"
"testing" "testing"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/cli"
) )
func TestScalingPolicyListCommand_Run(t *testing.T) { func TestScalingPolicyListCommand_Run(t *testing.T) {
require := require.New(t)
t.Parallel() t.Parallel()
srv, client, url := testServer(t, true, nil) srv, client, url := testServer(t, false, nil)
defer srv.Shutdown() defer srv.Shutdown()
testutil.WaitForResult(func() (bool, error) {
nodes, _, err := client.Nodes().List(nil)
if err != nil {
return false, err
}
if len(nodes) == 0 {
return false, fmt.Errorf("missing node")
}
if _, ok := nodes[0].Drivers["mock_driver"]; !ok {
return false, fmt.Errorf("mock_driver not ready")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
ui := cli.NewMockUi() ui := cli.NewMockUi()
cmd := &ScalingPolicyListCommand{Meta: Meta{Ui: ui}} cmd := &ScalingPolicyListCommand{Meta: Meta{Ui: ui}}
// Perform an initial list, which should return zero results. // Perform an initial list, which should return zero results.
if code := cmd.Run([]string{"-address=" + url}); code != 0 { code := cmd.Run([]string{"-address=" + url})
t.Fatalf("expected cmd run exit code 0, got: %d", code) require.Equal(0, code)
} out := ui.OutputWriter.String()
if out := ui.OutputWriter.String(); !strings.Contains(out, "No policies found") { require.Contains(out, "No policies found")
t.Fatalf("expected no policies found within output: %v", out)
}
// Generate two test jobs. // Generate two test jobs.
jobs := []*api.Job{testJob("scaling_policy_list_1"), testJob("scaling_policy_list_2")} jobs := []*api.Job{testJob("scaling_policy_list_1"), testJob("scaling_policy_list_2")}
// Generate an example scaling policy. // Generate an example scaling policy.
scalingPolicy := api.ScalingPolicy{ scalingPolicy := api.ScalingPolicy{
Type: api.ScalingPolicyTypeHorizontal,
Enabled: helper.BoolToPtr(true), Enabled: helper.BoolToPtr(true),
Min: helper.Int64ToPtr(1), Min: helper.Int64ToPtr(1),
Max: helper.Int64ToPtr(1), Max: helper.Int64ToPtr(1),
@ -55,29 +39,18 @@ func TestScalingPolicyListCommand_Run(t *testing.T) {
// Iterate the jobs, add the scaling policy and register. // Iterate the jobs, add the scaling policy and register.
for _, job := range jobs { for _, job := range jobs {
job.TaskGroups[0].Scaling = &scalingPolicy job.TaskGroups[0].Scaling = &scalingPolicy
resp, _, err := client.Jobs().Register(job, nil) _, _, err := client.Jobs().Register(job, nil)
if err != nil { require.NoError(err)
t.Fatalf("err: %s", err)
}
if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 {
t.Fatalf("expected waitForSuccess exit code 0, got: %d", code)
}
} }
// Perform a new list which should yield results.. // Perform a new list which should yield results..
if code := cmd.Run([]string{"-address=" + url}); code != 0 { code = cmd.Run([]string{"-address=" + url})
t.Fatalf("expected cmd run exit code 0, got: %d", code) require.Equal(0, code)
} out = ui.OutputWriter.String()
out := ui.OutputWriter.String() require.Contains(out, "ID")
if !strings.Contains(out, "ID") || require.Contains(out, "Enabled")
!strings.Contains(out, "Enabled") || require.Contains(out, "Type")
!strings.Contains(out, "Target") { require.Contains(out, "Target")
t.Fatalf("expected table headers within output: %v", out) require.Contains(out, "scaling_policy_list_1")
} require.Contains(out, "scaling_policy_list_2")
if !strings.Contains(out, "scaling_policy_list_1") {
t.Fatalf("expected job scaling_policy_list_1 within output: %v", out)
}
if !strings.Contains(out, "scaling_policy_list_2") {
t.Fatalf("expected job scaling_policy_list_2 within output: %v", out)
}
} }

View file

@ -185,7 +185,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
// Parse scaling policy // Parse scaling policy
if o := listVal.Filter("scaling"); len(o.Items) > 0 { if o := listVal.Filter("scaling"); len(o.Items) > 0 {
if err := parseScalingPolicy(&g.Scaling, o); err != nil { if err := parseGroupScalingPolicy(&g.Scaling, o); err != nil {
return multierror.Prefix(err, "scaling ->") return multierror.Prefix(err, "scaling ->")
} }
} }
@ -319,21 +319,39 @@ func parseVolumes(out *map[string]*api.VolumeRequest, list *ast.ObjectList) erro
return nil return nil
} }
func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error { func parseGroupScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 { if len(list.Items) > 1 {
return fmt.Errorf("only one 'scaling' block allowed") return fmt.Errorf("only one 'scaling' block allowed")
} }
item := list.Items[0]
if len(item.Keys) != 0 {
return fmt.Errorf("task group scaling policy should not have a name")
}
p, err := parseScalingPolicy(item)
if err != nil {
return err
}
// Get our resource object // group-specific validation
o := list.Items[0] if p.Max == nil {
return fmt.Errorf("missing 'max'")
}
if p.Type == "" {
p.Type = "horizontal"
} else if p.Type != "horizontal" {
return fmt.Errorf("task group scaling policy had invalid type: %q", p.Type)
}
*out = p
return nil
}
func parseScalingPolicy(item *ast.ObjectItem) (*api.ScalingPolicy, error) {
// We need this later // We need this later
var listVal *ast.ObjectList var listVal *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok { if ot, ok := item.Val.(*ast.ObjectType); ok {
listVal = ot.List listVal = ot.List
} else { } else {
return fmt.Errorf("should be an object") return nil, fmt.Errorf("should be an object")
} }
valid := []string{ valid := []string{
@ -341,14 +359,15 @@ func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error {
"max", "max",
"policy", "policy",
"enabled", "enabled",
"type",
} }
if err := checkHCLKeys(o.Val, valid); err != nil { if err := checkHCLKeys(item.Val, valid); err != nil {
return err return nil, err
} }
var m map[string]interface{} var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil { if err := hcl.DecodeObject(&m, item.Val); err != nil {
return err return nil, err
} }
delete(m, "policy") delete(m, "policy")
@ -358,30 +377,26 @@ func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error {
Result: &result, Result: &result,
}) })
if err != nil { if err != nil {
return err return nil, err
} }
if err := dec.Decode(m); err != nil { if err := dec.Decode(m); err != nil {
return err return nil, err
}
if result.Max == nil {
return fmt.Errorf("missing 'max'")
} }
// If we have policy, then parse that // If we have policy, then parse that
if o := listVal.Filter("policy"); len(o.Items) > 0 { if o := listVal.Filter("policy"); len(o.Items) > 0 {
if len(o.Elem().Items) > 1 { if len(o.Elem().Items) > 1 {
return fmt.Errorf("only one 'policy' block allowed per 'scaling' block") return nil, fmt.Errorf("only one 'policy' block allowed per 'scaling' block")
} }
p := o.Elem().Items[0] p := o.Elem().Items[0]
var m map[string]interface{} var m map[string]interface{}
if err := hcl.DecodeObject(&m, p.Val); err != nil { if err := hcl.DecodeObject(&m, p.Val); err != nil {
return err return nil, err
} }
if err := mapstructure.WeakDecode(m, &result.Policy); err != nil { if err := mapstructure.WeakDecode(m, &result.Policy); err != nil {
return err return nil, err
} }
} }
*out = &result return &result, nil
return nil
} }

View file

@ -2,6 +2,7 @@ package jobspec
import ( import (
"fmt" "fmt"
"strings"
"time" "time"
multierror "github.com/hashicorp/go-multierror" multierror "github.com/hashicorp/go-multierror"
@ -23,6 +24,7 @@ var (
"kill_timeout", "kill_timeout",
"shutdown_delay", "shutdown_delay",
"kill_signal", "kill_signal",
"scaling",
} }
normalTaskKeys = append(commonTaskKeys, normalTaskKeys = append(commonTaskKeys,
@ -110,6 +112,7 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) {
delete(m, "vault") delete(m, "vault")
delete(m, "volume_mount") delete(m, "volume_mount")
delete(m, "csi_plugin") delete(m, "csi_plugin")
delete(m, "scaling")
// Build the task // Build the task
var t api.Task var t api.Task
@ -276,6 +279,13 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) {
} }
} }
// Parse scaling policies
if o := listVal.Filter("scaling"); len(o.Items) > 0 {
if err := parseTaskScalingPolicies(&t.ScalingPolicies, o); err != nil {
return nil, err
}
}
// If we have a vault block, then parse that // If we have a vault block, then parse that
if o := listVal.Filter("vault"); len(o.Items) > 0 { if o := listVal.Filter("vault"); len(o.Items) > 0 {
v := &api.Vault{ v := &api.Vault{
@ -462,6 +472,56 @@ func parseTemplates(result *[]*api.Template, list *ast.ObjectList) error {
return nil return nil
} }
func parseTaskScalingPolicies(result *[]*api.ScalingPolicy, list *ast.ObjectList) error {
if len(list.Items) == 0 {
return nil
}
errPrefix := "scaling ->"
// Go through each object and turn it into an actual result.
seen := make(map[string]bool)
for _, item := range list.Items {
if l := len(item.Keys); l == 0 {
return multierror.Prefix(fmt.Errorf("task scaling policy missing name"), errPrefix)
} else if l > 1 {
return multierror.Prefix(fmt.Errorf("task scaling policy should only have one name"), errPrefix)
}
n := item.Keys[0].Token.Value().(string)
errPrefix = fmt.Sprintf("scaling[%v] ->", n)
var policyType string
switch strings.ToLower(n) {
case "cpu":
policyType = "vertical_cpu"
case "mem":
policyType = "vertical_mem"
default:
return multierror.Prefix(fmt.Errorf(`scaling policy name must be "cpu" or "mem"`), errPrefix)
}
// Make sure we haven't already found this
if seen[n] {
return multierror.Prefix(fmt.Errorf("scaling policy cannot be defined more than once"), errPrefix)
}
seen[n] = true
p, err := parseScalingPolicy(item)
if err != nil {
return multierror.Prefix(err, errPrefix)
}
if p.Type == "" {
p.Type = policyType
} else if p.Type != policyType {
return multierror.Prefix(fmt.Errorf("policy had invalid 'type': %q", p.Type), errPrefix)
}
*result = append(*result, p)
}
return nil
}
func parseResources(result *api.Resources, list *ast.ObjectList) error { func parseResources(result *api.Resources, list *ast.ObjectList) error {
list = list.Elem() list = list.Elem()
if len(list.Items) == 0 { if len(list.Items) == 0 {

View file

@ -1153,7 +1153,6 @@ func TestParse(t *testing.T) {
}, },
false, false,
}, },
{ {
"tg-service-check.hcl", "tg-service-check.hcl",
&api.Job{ &api.Job{
@ -1383,7 +1382,6 @@ func TestParse(t *testing.T) {
}, },
false, false,
}, },
{ {
"tg-scaling-policy.hcl", "tg-scaling-policy.hcl",
&api.Job{ &api.Job{
@ -1393,6 +1391,7 @@ func TestParse(t *testing.T) {
{ {
Name: stringToPtr("group"), Name: stringToPtr("group"),
Scaling: &api.ScalingPolicy{ Scaling: &api.ScalingPolicy{
Type: "horizontal",
Min: int64ToPtr(5), Min: int64ToPtr(5),
Max: int64ToPtr(100), Max: int64ToPtr(100),
Policy: map[string]interface{}{ Policy: map[string]interface{}{
@ -1414,6 +1413,47 @@ func TestParse(t *testing.T) {
}, },
false, false,
}, },
{
"task-scaling-policy.hcl",
&api.Job{
ID: stringToPtr("foo"),
Name: stringToPtr("foo"),
TaskGroups: []*api.TaskGroup{
{
Name: stringToPtr("bar"),
Tasks: []*api.Task{
{
Name: "bar",
Driver: "docker",
ScalingPolicies: []*api.ScalingPolicy{
{
Type: "vertical_cpu",
Target: nil,
Min: int64ToPtr(50),
Max: int64ToPtr(1000),
Policy: map[string]interface{}{
"test": "cpu",
},
Enabled: boolToPtr(true),
},
{
Type: "vertical_mem",
Target: nil,
Min: int64ToPtr(128),
Max: int64ToPtr(1024),
Policy: map[string]interface{}{
"test": "mem",
},
Enabled: boolToPtr(false),
},
},
},
},
},
},
},
false,
},
{ {
"tg-service-connect-gateway-ingress.hcl", "tg-service-connect-gateway-ingress.hcl",
&api.Job{ &api.Job{
@ -1480,6 +1520,7 @@ func TestParse(t *testing.T) {
{ {
Name: stringToPtr("group"), Name: stringToPtr("group"),
Scaling: &api.ScalingPolicy{ Scaling: &api.ScalingPolicy{
Type: "horizontal",
Min: nil, Min: nil,
Max: int64ToPtr(10), Max: int64ToPtr(10),
Policy: nil, Policy: nil,
@ -1490,19 +1531,51 @@ func TestParse(t *testing.T) {
}, },
false, false,
}, },
{ {
"tg-scaling-policy-missing-max.hcl", "tg-scaling-policy-missing-max.hcl",
nil, nil,
true, true,
}, },
{ {
"tg-scaling-policy-multi-policy.hcl", "tg-scaling-policy-multi-policy.hcl",
nil, nil,
true, true,
}, },
{
"tg-scaling-policy-with-label.hcl",
nil,
true,
},
{
"tg-scaling-policy-invalid-type.hcl",
nil,
true,
},
{
"task-scaling-policy-missing-name.hcl",
nil,
true,
},
{
"task-scaling-policy-multi-name.hcl",
nil,
true,
},
{
"task-scaling-policy-multi-cpu.hcl",
nil,
true,
},
{
"task-scaling-policy-invalid-type.hcl",
nil,
true,
},
{
"task-scaling-policy-invalid-resource.hcl",
nil,
true,
},
{ {
"multiregion.hcl", "multiregion.hcl",
&api.Job{ &api.Job{

View file

@ -0,0 +1,17 @@
job "foo" {
task "bar" {
driver = "docker"
scaling "wrong" {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
}
}

View file

@ -0,0 +1,18 @@
job "foo" {
task "bar" {
driver = "docker"
scaling "cpu" {
type = "vertical_mem"
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
}
}

View file

@ -0,0 +1,17 @@
job "foo" {
task "bar" {
driver = "docker"
scaling {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
}
}

View file

@ -0,0 +1,27 @@
job "foo" {
task "bar" {
driver = "docker"
scaling "cpu" {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
scaling "cpu" {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
}
}

View file

@ -0,0 +1,17 @@
job "foo" {
task "bar" {
driver = "docker"
scaling "cpu" "mem" {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
}
}

View file

@ -0,0 +1,27 @@
job "foo" {
task "bar" {
driver = "docker"
scaling "cpu" {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
scaling "mem" {
enabled = false
min = 128
max = 1024
policy {
test = "mem"
}
}
}
}

View file

@ -0,0 +1,17 @@
job "elastic" {
group "group" {
scaling {
type = "vertical_cpu"
enabled = false
min = 5
max = 100
policy {
foo = "bar"
b = true
val = 5
f = 0.1
}
}
}
}

View file

@ -0,0 +1,16 @@
job "elastic" {
group "group" {
scaling "no-label-allowed" {
enabled = false
min = 5
max = 100
policy {
foo = "bar"
b = true
val = 5
f = 0.1
}
}
}
}

View file

@ -3,6 +3,7 @@ package jobspec2
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"strings"
"time" "time"
"github.com/hashicorp/hcl/v2" "github.com/hashicorp/hcl/v2"
@ -18,6 +19,7 @@ var hclDecoder *gohcl.Decoder
func init() { func init() {
hclDecoder = newHCLDecoder() hclDecoder = newHCLDecoder()
hclDecoder.RegisterBlockDecoder(reflect.TypeOf(api.TaskGroup{}), decodeTaskGroup) hclDecoder.RegisterBlockDecoder(reflect.TypeOf(api.TaskGroup{}), decodeTaskGroup)
hclDecoder.RegisterBlockDecoder(reflect.TypeOf(api.Task{}), decodeTask)
} }
func newHCLDecoder() *gohcl.Decoder { func newHCLDecoder() *gohcl.Decoder {
@ -266,6 +268,7 @@ func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.D
} }
d := newHCLDecoder() d := newHCLDecoder()
d.RegisterBlockDecoder(reflect.TypeOf(api.Task{}), decodeTask)
diags = d.DecodeBody(tgBody, ctx, tg) diags = d.DecodeBody(tgBody, ctx, tg)
if tgExtra.Vault != nil { if tgExtra.Vault != nil {
@ -276,6 +279,128 @@ func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.D
} }
} }
if tg.Scaling != nil {
if tg.Scaling.Type == "" {
tg.Scaling.Type = "horizontal"
}
diags = append(diags, validateGroupScalingPolicy(tg.Scaling, tgBody)...)
}
return diags return diags
} }
func decodeTask(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics {
// special case scaling policy
t := val.(*api.Task)
b, remain, diags := body.PartialContent(&hcl.BodySchema{
Blocks: []hcl.BlockHeaderSchema{
{Type: "scaling", LabelNames: []string{"name"}},
},
})
diags = append(diags, decodeTaskScalingPolicies(b.Blocks, ctx, t)...)
decoder := newHCLDecoder()
diags = append(diags, decoder.DecodeBody(remain, ctx, val)...)
return diags
}
func decodeTaskScalingPolicies(blocks hcl.Blocks, ctx *hcl.EvalContext, task *api.Task) hcl.Diagnostics {
if len(blocks) == 0 {
return nil
}
var diags hcl.Diagnostics
seen := map[string]*hcl.Block{}
for _, b := range blocks {
label := strings.ToLower(b.Labels[0])
var policyType string
switch label {
case "cpu":
policyType = "vertical_cpu"
case "mem":
policyType = "vertical_mem"
default:
diags = append(diags, &hcl.Diagnostic{
Severity: hcl.DiagError,
Summary: "Invalid scaling policy name",
Detail: `scaling policy name must be "cpu" or "mem"`,
Subject: &b.LabelRanges[0],
})
continue
}
if prev, ok := seen[label]; ok {
diags = append(diags, &hcl.Diagnostic{
Severity: hcl.DiagError,
Summary: fmt.Sprintf("Duplicate scaling %q block", label),
Detail: fmt.Sprintf(
"Only one scaling %s block is allowed. Another was defined at %s.",
label, prev.DefRange.String(),
),
Subject: &b.DefRange,
})
continue
}
seen[label] = b
var p api.ScalingPolicy
diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, &p)...)
if p.Type == "" {
p.Type = policyType
} else if p.Type != policyType {
diags = append(diags, &hcl.Diagnostic{
Severity: hcl.DiagError,
Summary: "Invalid scaling policy type",
Detail: fmt.Sprintf(
"Invalid policy type, expected %q but found %q",
p.Type, policyType),
Subject: &b.DefRange,
})
continue
}
task.ScalingPolicies = append(task.ScalingPolicies, &p)
}
return diags
}
func validateGroupScalingPolicy(p *api.ScalingPolicy, body hcl.Body) hcl.Diagnostics {
// fast path: do nothing
if p.Max != nil && p.Type == "horizontal" {
return nil
}
content, _, diags := body.PartialContent(&hcl.BodySchema{
Blocks: []hcl.BlockHeaderSchema{{Type: "scaling"}},
})
if len(content.Blocks) == 0 {
// unexpected, given that we have a scaling policy
return diags
}
pc, _, diags := content.Blocks[0].Body.PartialContent(&hcl.BodySchema{
Attributes: []hcl.AttributeSchema{
{Name: "max", Required: true},
{Name: "type", Required: false},
},
})
if p.Type != "horizontal" {
if attr, ok := pc.Attributes["type"]; ok {
diags = append(diags, &hcl.Diagnostic{
Severity: hcl.DiagError,
Summary: "Invalid group scaling type",
Detail: fmt.Sprintf(
"task group scaling policy had invalid type: %q",
p.Type),
Subject: attr.Expr.Range().Ptr(),
})
}
}
return diags
}

View file

@ -145,3 +145,163 @@ dynamic "group" {
require.Equal(t, "groupB", *out.TaskGroups[1].Name) require.Equal(t, "groupB", *out.TaskGroups[1].Name)
require.Equal(t, "groupC", *out.TaskGroups[2].Name) require.Equal(t, "groupC", *out.TaskGroups[2].Name)
} }
func TestParse_InvalidScalingSyntax(t *testing.T) {
cases := []struct {
name string
expectedErr string
hcl string
}{
{
"valid",
"",
`
job "example" {
group "g1" {
scaling {
max = 40
type = "horizontal"
}
task "t1" {
scaling "cpu" {
max = 20
}
scaling "mem" {
max = 15
}
}
}
}
`,
},
{
"group missing max",
`argument "max" is required`,
`
job "example" {
group "g1" {
scaling {
#max = 40
type = "horizontal"
}
task "t1" {
scaling "cpu" {
max = 20
}
scaling "mem" {
max = 15
}
}
}
}
`,
},
{
"group invalid type",
`task group scaling policy had invalid type`,
`
job "example" {
group "g1" {
scaling {
max = 40
type = "invalid_type"
}
task "t1" {
scaling "cpu" {
max = 20
}
scaling "mem" {
max = 15
}
}
}
}
`,
},
{
"task invalid label",
`scaling policy name must be "cpu" or "mem"`,
`
job "example" {
group "g1" {
scaling {
max = 40
type = "horizontal"
}
task "t1" {
scaling "not_cpu" {
max = 20
}
scaling "mem" {
max = 15
}
}
}
}
`,
},
{
"task duplicate blocks",
`Duplicate scaling "cpu" block`,
`
job "example" {
group "g1" {
scaling {
max = 40
type = "horizontal"
}
task "t1" {
scaling "cpu" {
max = 20
}
scaling "cpu" {
max = 15
}
}
}
}
`,
},
{
"task invalid type",
`Invalid scaling policy type`,
`
job "example" {
group "g1" {
scaling {
max = 40
type = "horizontal"
}
task "t1" {
scaling "cpu" {
max = 20
type = "invalid"
}
scaling "mem" {
max = 15
}
}
}
}
`,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
_, err := ParseWithArgs(c.name+".hcl", strings.NewReader(c.hcl), nil, true)
if c.expectedErr == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr)
}
})
}
}

View file

@ -37,6 +37,10 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
return structs.ErrPermissionDenied return structs.ErrPermissionDenied
} }
allow := func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs)
}
// Setup the blocking query // Setup the blocking query
opts := blockingOptions{ opts := blockingOptions{
queryOpts: &args.QueryOptions, queryOpts: &args.QueryOptions,
@ -48,7 +52,7 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
prefix := args.QueryOptions.Prefix prefix := args.QueryOptions.Prefix
if args.RequestNamespace() == structs.AllNamespacesSentinel { if args.RequestNamespace() == structs.AllNamespacesSentinel {
allowedNSes, err := allowedNSes(aclObj, state) allowedNSes, err := allowedNSes(aclObj, state, allow)
if err != nil { if err != nil {
return err return err
} }

View file

@ -451,16 +451,16 @@ func propagateScalingPolicyIDs(old, new *structs.Job) error {
oldIDs := make(map[string]string) oldIDs := make(map[string]string)
if old != nil { if old != nil {
// jobs currently only have scaling policies on task groups, so we can // use the job-scoped key (includes type, group, and task) to uniquely
// find correspondences using task group names // identify policies in a job
for _, p := range old.GetScalingPolicies() { for _, p := range old.GetScalingPolicies() {
oldIDs[p.Target[structs.ScalingTargetGroup]] = p.ID oldIDs[p.JobKey()] = p.ID
} }
} }
// ignore any existing ID in the policy, they should be empty // ignore any existing ID in the policy, they should be empty
for _, p := range new.GetScalingPolicies() { for _, p := range new.GetScalingPolicies() {
if id, ok := oldIDs[p.Target[structs.ScalingTargetGroup]]; ok { if id, ok := oldIDs[p.JobKey()]; ok {
p.ID = id p.ID = id
} else { } else {
p.ID = uuid.Generate() p.ID = uuid.Generate()
@ -1265,7 +1265,7 @@ func (j *Job) GetJobVersions(args *structs.JobVersionsRequest,
// allowedNSes returns a set (as map of ns->true) of the namespaces a token has access to. // allowedNSes returns a set (as map of ns->true) of the namespaces a token has access to.
// Returns `nil` set if the token has access to all namespaces // Returns `nil` set if the token has access to all namespaces
// and ErrPermissionDenied if the token has no capabilities on any namespace. // and ErrPermissionDenied if the token has no capabilities on any namespace.
func allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, error) { func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string) bool) (map[string]bool, error) {
if aclObj == nil || aclObj.IsManagement() { if aclObj == nil || aclObj.IsManagement() {
return nil, nil return nil, nil
} }
@ -1279,7 +1279,7 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, err
r := make(map[string]bool, len(nses)) r := make(map[string]bool, len(nses))
for _, ns := range nses { for _, ns := range nses {
if aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) { if allow(ns) {
r[ns] = true r[ns] = true
} }
} }
@ -1367,6 +1367,9 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job
return err return err
} }
prefix := args.QueryOptions.Prefix prefix := args.QueryOptions.Prefix
allow := func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs)
}
// Setup the blocking query // Setup the blocking query
opts := blockingOptions{ opts := blockingOptions{
@ -1374,7 +1377,7 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job
queryMeta: &reply.QueryMeta, queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error { run: func(ws memdb.WatchSet, state *state.StateStore) error {
// check if user has permission to all namespaces // check if user has permission to all namespaces
allowedNSes, err := allowedNSes(aclObj, state) allowedNSes, err := allowedNSes(aclObj, state, allow)
if err == structs.ErrPermissionDenied { if err == structs.ErrPermissionDenied {
// return empty jobs if token isn't authorized for any // return empty jobs if token isn't authorized for any
// namespace, matching other endpoints // namespace, matching other endpoints

View file

@ -1381,8 +1381,6 @@ func ScalingPolicy() *structs.ScalingPolicy {
"a": "b", "a": "b",
}, },
Enabled: true, Enabled: true,
CreateIndex: 10,
ModifyIndex: 20,
} }
} }

View file

@ -1,6 +1,7 @@
package nomad package nomad
import ( import (
"strings"
"time" "time"
metrics "github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
@ -8,6 +9,7 @@ import (
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
@ -19,16 +21,18 @@ type Scaling struct {
} }
// ListPolicies is used to list the policies // ListPolicies is used to list the policies
func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, func (p *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error {
reply *structs.ScalingPolicyListResponse) error {
if done, err := a.srv.forward("Scaling.ListPolicies", args, args, reply); done { if done, err := p.srv.forward("Scaling.ListPolicies", args, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"nomad", "scaling", "list_policies"}, time.Now()) defer metrics.MeasureSince([]string{"nomad", "scaling", "list_policies"}, time.Now())
// Check for list-job permissions if args.RequestNamespace() == structs.AllNamespacesSentinel {
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { return p.listAllNamespaces(args, reply)
}
if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil {
return err return err
} else if aclObj != nil { } else if aclObj != nil {
hasListScalingPolicies := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListScalingPolicies) hasListScalingPolicies := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListScalingPolicies)
@ -45,22 +49,24 @@ func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest,
queryMeta: &reply.QueryMeta, queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error { run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Iterate over all the policies // Iterate over all the policies
iter, err := state.ScalingPoliciesByNamespace(ws, args.Namespace) var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.ScalingPoliciesByIDPrefix(ws, args.RequestNamespace(), prefix)
} else if job := args.Job; job != "" {
iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job)
} else {
iter, err = state.ScalingPoliciesByNamespace(ws, args.Namespace, args.Type)
}
if err != nil { if err != nil {
return err return err
} }
// Convert all the policies to a list stub // Convert all the policies to a list stub
reply.Policies = nil reply.Policies = nil
for { for raw := iter.Next(); raw != nil; raw = iter.Next() {
raw := iter.Next()
if raw == nil {
break
}
policy := raw.(*structs.ScalingPolicy) policy := raw.(*structs.ScalingPolicy)
// if _, ok := policies[policy.Target]; ok || mgt {
// reply.Policies = append(reply.Policies, policy.Stub())
// }
reply.Policies = append(reply.Policies, policy.Stub()) reply.Policies = append(reply.Policies, policy.Stub())
} }
@ -70,28 +76,27 @@ func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest,
return err return err
} }
// Ensure we never set the index to zero, otherwise a blocking query cannot be used. // Don't return index zero, otherwise a blocking query cannot be used.
// We floor the index at one, since realistically the first write must have a higher index.
if index == 0 { if index == 0 {
index = 1 index = 1
} }
reply.Index = index reply.Index = index
return nil return nil
}} }}
return a.srv.blockingRPC(&opts) return p.srv.blockingRPC(&opts)
} }
// GetPolicy is used to get a specific policy // GetPolicy is used to get a specific policy
func (a *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest, func (p *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest,
reply *structs.SingleScalingPolicyResponse) error { reply *structs.SingleScalingPolicyResponse) error {
if done, err := a.srv.forward("Scaling.GetPolicy", args, args, reply); done { if done, err := p.srv.forward("Scaling.GetPolicy", args, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"nomad", "scaling", "get_policy"}, time.Now()) defer metrics.MeasureSince([]string{"nomad", "scaling", "get_policy"}, time.Now())
// Check for list-job permissions // Check for list-job permissions
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil {
return err return err
} else if aclObj != nil { } else if aclObj != nil {
hasReadScalingPolicy := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadScalingPolicy) hasReadScalingPolicy := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadScalingPolicy)
@ -129,5 +134,71 @@ func (a *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest,
reply.Index = index reply.Index = index
return nil return nil
}} }}
return a.srv.blockingRPC(&opts) return p.srv.blockingRPC(&opts)
}
func (j *Scaling) listAllNamespaces(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error {
// Check for list-job permissions
aclObj, err := j.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
prefix := args.QueryOptions.Prefix
allow := func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListScalingPolicies) ||
(aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) && aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob))
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// check if user has permission to all namespaces
allowedNSes, err := allowedNSes(aclObj, state, allow)
if err == structs.ErrPermissionDenied {
// return empty if token isn't authorized for any namespace
reply.Policies = []*structs.ScalingPolicyListStub{}
return nil
} else if err != nil {
return err
}
// Capture all the policies
var iter memdb.ResultIterator
if args.Type != "" {
iter, err = state.ScalingPoliciesByTypePrefix(ws, args.Type)
} else {
iter, err = state.ScalingPolicies(ws)
}
if err != nil {
return err
}
var policies []*structs.ScalingPolicyListStub
for raw := iter.Next(); raw != nil; raw = iter.Next() {
policy := raw.(*structs.ScalingPolicy)
if allowedNSes != nil && !allowedNSes[policy.Target[structs.ScalingTargetNamespace]] {
// not permitted to this name namespace
continue
}
if prefix != "" && !strings.HasPrefix(policy.ID, prefix) {
continue
}
policies = append(policies, policy.Stub())
}
reply.Policies = policies
// Use the last index that affected the policies table or summary
index, err := state.Index("scaling_policy")
if err != nil {
return err
}
reply.Index = helper.Uint64Max(1, index)
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
} }

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"sort" "sort"
"strings"
"time" "time"
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
@ -1527,6 +1528,10 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b
return fmt.Errorf("unable to update job scaling policies: %v", err) return fmt.Errorf("unable to update job scaling policies: %v", err)
} }
if err := s.updateJobRecommendations(index, txn, existingJob, job); err != nil {
return fmt.Errorf("unable to update job recommendations: %v", err)
}
if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil { if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil {
return fmt.Errorf("unable to update job scaling policies: %v", err) return fmt.Errorf("unable to update job scaling policies: %v", err)
} }
@ -1644,6 +1649,11 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
return fmt.Errorf("deleting job scaling policies failed: %v", err) return fmt.Errorf("deleting job scaling policies failed: %v", err)
} }
// Delete any job recommendations
if err := s.deleteRecommendationsByJob(index, txn, job); err != nil {
return fmt.Errorf("deleting job recommendatons failed: %v", err)
}
// Delete the scaling events // Delete the scaling events
if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil { if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil {
return fmt.Errorf("deleting job scaling events failed: %v", err) return fmt.Errorf("deleting job scaling events failed: %v", err)
@ -4567,17 +4577,10 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
if job.Stop {
if err := s.deleteJobScalingPolicies(index, job, txn); err != nil {
return fmt.Errorf("deleting job scaling policies failed: %v", err)
}
return nil
}
scalingPolicies := job.GetScalingPolicies() scalingPolicies := job.GetScalingPolicies()
newTargets := map[string]struct{}{} newTargets := map[string]bool{}
for _, p := range scalingPolicies { for _, p := range scalingPolicies {
newTargets[p.Target[structs.ScalingTargetGroup]] = struct{}{} newTargets[p.JobKey()] = true
} }
// find existing policies that need to be deleted // find existing policies that need to be deleted
deletedPolicies := []string{} deletedPolicies := []string{}
@ -4585,13 +4588,9 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx
if err != nil { if err != nil {
return fmt.Errorf("ScalingPoliciesByJob lookup failed: %v", err) return fmt.Errorf("ScalingPoliciesByJob lookup failed: %v", err)
} }
for { for raw := iter.Next(); raw != nil; raw = iter.Next() {
raw := iter.Next()
if raw == nil {
break
}
oldPolicy := raw.(*structs.ScalingPolicy) oldPolicy := raw.(*structs.ScalingPolicy)
if _, ok := newTargets[oldPolicy.Target[structs.ScalingTargetGroup]]; !ok { if !newTargets[oldPolicy.JobKey()] {
deletedPolicies = append(deletedPolicies, oldPolicy.ID) deletedPolicies = append(deletedPolicies, oldPolicy.ID)
} }
} }
@ -5460,12 +5459,11 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s
} }
policy.ID = existing.ID policy.ID = existing.ID
policy.CreateIndex = existing.CreateIndex policy.CreateIndex = existing.CreateIndex
policy.ModifyIndex = index
} else { } else {
// policy.ID must have been set already in Job.Register before log apply // policy.ID must have been set already in Job.Register before log apply
policy.CreateIndex = index policy.CreateIndex = index
policy.ModifyIndex = index
} }
policy.ModifyIndex = index
// Insert the scaling policy // Insert the scaling policy
hadUpdates = true hadUpdates = true
@ -5474,7 +5472,7 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s
} }
} }
// Update the indexes table for scaling policy // Update the indexes table for scaling policy if we updated any policies
if hadUpdates { if hadUpdates {
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
return fmt.Errorf("index update failed: %v", err) return fmt.Errorf("index update failed: %v", err)
@ -5740,19 +5738,6 @@ func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, e
return iter, nil return iter, nil
} }
// ScalingPoliciesByType returns an iterator over scaling policies of a certain type.
func (s *StateStore) ScalingPoliciesByType(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
iter, err := txn.Get("scaling_policy", "type", t)
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}
// ScalingPoliciesByTypePrefix returns an iterator over scaling policies with a certain type prefix. // ScalingPoliciesByTypePrefix returns an iterator over scaling policies with a certain type prefix.
func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) { func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn() txn := s.db.ReadTxn()
@ -5766,7 +5751,7 @@ func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (m
return iter, nil return iter, nil
} }
func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace, typ string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn() txn := s.db.ReadTxn()
iter, err := txn.Get("scaling_policy", "target_prefix", namespace) iter, err := txn.Get("scaling_policy", "target_prefix", namespace)
@ -5775,18 +5760,22 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str
} }
ws.Add(iter.WatchCh()) ws.Add(iter.WatchCh())
filter := func(raw interface{}) bool {
d, ok := raw.(*structs.ScalingPolicy) // Wrap the iterator in a filter to exact match the namespace
iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace))
// If policy type is specified as well, wrap again
if typ != "" {
iter = memdb.NewFilterIterator(iter, func(raw interface{}) bool {
p, ok := raw.(*structs.ScalingPolicy)
if !ok { if !ok {
return true return true
} }
return !strings.HasPrefix(p.Type, typ)
return d.Target[structs.ScalingTargetNamespace] != namespace })
} }
// Wrap the iterator in a filter return iter, nil
wrap := memdb.NewFilterIterator(iter, filter)
return wrap, nil
} }
func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) { func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) {
@ -5834,6 +5823,8 @@ func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.S
return nil, nil return nil, nil
} }
// ScalingPolicyByTargetAndType returns a fully-qualified policy against a target and policy type,
// or nil if it does not exist. This method does not honor the watchset on the policy type, just the target.
func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[string]string, typ string) (*structs.ScalingPolicy, func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[string]string, typ string) (*structs.ScalingPolicy,
error) { error) {
txn := s.db.ReadTxn() txn := s.db.ReadTxn()
@ -5847,6 +5838,7 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[
if err != nil { if err != nil {
return nil, fmt.Errorf("scaling_policy lookup failed: %v", err) return nil, fmt.Errorf("scaling_policy lookup failed: %v", err)
} }
ws.Add(it.WatchCh()) ws.Add(it.WatchCh())
// Check for type // Check for type
@ -5866,6 +5858,34 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[
return nil, nil return nil, nil
} }
func (s *StateStore) ScalingPoliciesByIDPrefix(ws memdb.WatchSet, namespace string, prefix string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
iter, err := txn.Get("scaling_policy", "id_prefix", prefix)
if err != nil {
return nil, fmt.Errorf("scaling policy lookup failed: %v", err)
}
ws.Add(iter.WatchCh())
iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace))
return iter, nil
}
// scalingPolicyNamespaceFilter returns a filter function that filters all
// scaling policies not targeting the given namespace.
func scalingPolicyNamespaceFilter(namespace string) func(interface{}) bool {
return func(raw interface{}) bool {
p, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}
return p.Target[structs.ScalingTargetNamespace] != namespace
}
}
func (s *StateStore) EventSinks(ws memdb.WatchSet) (memdb.ResultIterator, error) { func (s *StateStore) EventSinks(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn() txn := s.db.ReadTxn()
@ -6216,6 +6236,7 @@ func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error {
return nil return nil
} }
// ScalingEventsRestore is used to restore scaling events for a job
func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) error { func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) error {
if err := r.txn.Insert("scaling_event", jobEvents); err != nil { if err := r.txn.Insert("scaling_event", jobEvents); err != nil {
return fmt.Errorf("scaling event insert failed: %v", err) return fmt.Errorf("scaling event insert failed: %v", err)

View file

@ -20,3 +20,13 @@ func (s *StateStore) quotaReconcile(index uint64, txn *txn, newQuota, oldQuota s
func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error { func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error {
return nil return nil
} }
// deleteRecommendationsByJob deletes all recommendations for the specified job
func (s *StateStore) deleteRecommendationsByJob(index uint64, txn Txn, job *structs.Job) error {
return nil
}
// updateJobRecommendations updates/deletes job recommendations as necessary for a job update
func (s *StateStore) updateJobRecommendations(index uint64, txn Txn, prevJob, newJob *structs.Job) error {
return nil
}

View file

@ -8632,12 +8632,12 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) {
policy2.Target[structs.ScalingTargetNamespace] = otherNamespace policy2.Target[structs.ScalingTargetNamespace] = otherNamespace
ws1 := memdb.NewWatchSet() ws1 := memdb.NewWatchSet()
iter, err := state.ScalingPoliciesByNamespace(ws1, structs.DefaultNamespace) iter, err := state.ScalingPoliciesByNamespace(ws1, structs.DefaultNamespace, "")
require.NoError(err) require.NoError(err)
require.Nil(iter.Next()) require.Nil(iter.Next())
ws2 := memdb.NewWatchSet() ws2 := memdb.NewWatchSet()
iter, err = state.ScalingPoliciesByNamespace(ws2, otherNamespace) iter, err = state.ScalingPoliciesByNamespace(ws2, otherNamespace, "")
require.NoError(err) require.NoError(err)
require.Nil(iter.Next()) require.Nil(iter.Next())
@ -8646,7 +8646,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) {
require.True(watchFired(ws1)) require.True(watchFired(ws1))
require.True(watchFired(ws2)) require.True(watchFired(ws2))
iter, err = state.ScalingPoliciesByNamespace(nil, structs.DefaultNamespace) iter, err = state.ScalingPoliciesByNamespace(nil, structs.DefaultNamespace, "")
require.NoError(err) require.NoError(err)
policiesInDefaultNamespace := []string{} policiesInDefaultNamespace := []string{}
for { for {
@ -8658,7 +8658,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) {
} }
require.ElementsMatch([]string{policy.ID}, policiesInDefaultNamespace) require.ElementsMatch([]string{policy.ID}, policiesInDefaultNamespace)
iter, err = state.ScalingPoliciesByNamespace(nil, otherNamespace) iter, err = state.ScalingPoliciesByNamespace(nil, otherNamespace, "")
require.NoError(err) require.NoError(err)
policiesInOtherNamespace := []string{} policiesInOtherNamespace := []string{}
for { for {
@ -8684,12 +8684,12 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
policy2.Target[structs.ScalingTargetNamespace] = ns2 policy2.Target[structs.ScalingTargetNamespace] = ns2
ws1 := memdb.NewWatchSet() ws1 := memdb.NewWatchSet()
iter, err := state.ScalingPoliciesByNamespace(ws1, ns1) iter, err := state.ScalingPoliciesByNamespace(ws1, ns1, "")
require.NoError(err) require.NoError(err)
require.Nil(iter.Next()) require.Nil(iter.Next())
ws2 := memdb.NewWatchSet() ws2 := memdb.NewWatchSet()
iter, err = state.ScalingPoliciesByNamespace(ws2, ns2) iter, err = state.ScalingPoliciesByNamespace(ws2, ns2, "")
require.NoError(err) require.NoError(err)
require.Nil(iter.Next()) require.Nil(iter.Next())
@ -8698,7 +8698,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
require.True(watchFired(ws1)) require.True(watchFired(ws1))
require.True(watchFired(ws2)) require.True(watchFired(ws2))
iter, err = state.ScalingPoliciesByNamespace(nil, ns1) iter, err = state.ScalingPoliciesByNamespace(nil, ns1, "")
require.NoError(err) require.NoError(err)
policiesInNS1 := []string{} policiesInNS1 := []string{}
for { for {
@ -8710,7 +8710,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
} }
require.ElementsMatch([]string{policy1.ID}, policiesInNS1) require.ElementsMatch([]string{policy1.ID}, policiesInNS1)
iter, err = state.ScalingPoliciesByNamespace(nil, ns2) iter, err = state.ScalingPoliciesByNamespace(nil, ns2, "")
require.NoError(err) require.NoError(err)
policiesInNS2 := []string{} policiesInNS2 := []string{}
for { for {
@ -8723,37 +8723,6 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
require.ElementsMatch([]string{policy2.ID}, policiesInNS2) require.ElementsMatch([]string{policy2.ID}, policiesInNS2)
} }
func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
job, policy := mock.JobWithScalingPolicy()
// Create a watchset so we can test that upsert fires the watch
ws := memdb.NewWatchSet()
out, err := state.ScalingPolicyByTargetAndType(ws, policy.Target, policy.Type)
require.NoError(err)
require.Nil(out)
var newIndex uint64 = 1000
err = state.UpsertJob(structs.MsgTypeTestSetup, newIndex, job)
require.NoError(err)
require.True(watchFired(ws), "watch did not fire")
ws = memdb.NewWatchSet()
out, err = state.ScalingPolicyByTargetAndType(ws, policy.Target, policy.Type)
require.NoError(err)
require.NotNil(out)
require.Equal(newIndex, out.CreateIndex)
require.Equal(newIndex, out.ModifyIndex)
index, err := state.Index("scaling_policy")
require.NoError(err)
require.Equal(newIndex, index)
}
// Scaling Policy IDs are generated randomly during Job.Register // Scaling Policy IDs are generated randomly during Job.Register
// Subsequent updates of the job should preserve the ID for the scaling policy // Subsequent updates of the job should preserve the ID for the scaling policy
// associated with a given target. // associated with a given target.
@ -8883,7 +8852,7 @@ func TestStateStore_DeleteScalingPolicies(t *testing.T) {
require.Nil(out) require.Nil(out)
// Ensure we see both policies // Ensure we see both policies
iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace]) iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace], "")
require.NoError(err) require.NoError(err)
count := 0 count := 0
for { for {
@ -8967,13 +8936,12 @@ func TestStateStore_UnstopJob_UpsertScalingPolicies(t *testing.T) {
// upsert a stopped job, verify that we don't fire the watcher or add any scaling policies // upsert a stopped job, verify that we don't fire the watcher or add any scaling policies
err = state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) err = state.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
require.NoError(err) require.NoError(err)
require.False(watchFired(ws)) require.True(watchFired(ws))
// stopped job should have no scaling policies, watcher doesn't fire
list, err = state.ScalingPolicies(ws) list, err = state.ScalingPolicies(ws)
require.NoError(err) require.NoError(err)
require.Nil(list.Next()) require.NotNil(list.Next())
// Establish a new watcher // Establish a new watchset
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
_, err = state.ScalingPolicies(ws) _, err = state.ScalingPolicies(ws)
require.NoError(err) require.NoError(err)
@ -8982,14 +8950,14 @@ func TestStateStore_UnstopJob_UpsertScalingPolicies(t *testing.T) {
err = state.UpsertJob(structs.MsgTypeTestSetup, 1100, job) err = state.UpsertJob(structs.MsgTypeTestSetup, 1100, job)
require.NoError(err) require.NoError(err)
// Ensure the scaling policy was added, watch was fired, index was advanced // Ensure the scaling policy still exists, watch was not fired, index was not advanced
require.True(watchFired(ws))
out, err := state.ScalingPolicyByTargetAndType(nil, policy.Target, policy.Type) out, err := state.ScalingPolicyByTargetAndType(nil, policy.Target, policy.Type)
require.NoError(err) require.NoError(err)
require.NotNil(out) require.NotNil(out)
index, err := state.Index("scaling_policy") index, err := state.Index("scaling_policy")
require.NoError(err) require.NoError(err)
require.GreaterOrEqual(index, uint64(1100)) require.EqualValues(index, 1000)
require.False(watchFired(ws))
} }
func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) { func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) {
@ -9105,15 +9073,12 @@ func TestStateStore_ScalingPoliciesByType(t *testing.T) {
pOther2.Type = "other-type-2" pOther2.Type = "other-type-2"
// Create search routine // Create search routine
search := func(t string) (count int, found []string, err error) { search := func(t string) (found []string) {
found = []string{} found = []string{}
iter, err := state.ScalingPoliciesByType(nil, t) iter, err := state.ScalingPoliciesByTypePrefix(nil, t)
if err != nil { require.NoError(err)
return
}
for raw := iter.Next(); raw != nil; raw = iter.Next() { for raw := iter.Next(); raw != nil; raw = iter.Next() {
count++
found = append(found, raw.(*structs.ScalingPolicy).Type) found = append(found, raw.(*structs.ScalingPolicy).Type)
} }
return return
@ -9126,47 +9091,23 @@ func TestStateStore_ScalingPoliciesByType(t *testing.T) {
// Check if we can read horizontal policies // Check if we can read horizontal policies
expect := []string{pHorzA.Type, pHorzB.Type} expect := []string{pHorzA.Type, pHorzB.Type}
count, found, err := search(structs.ScalingPolicyTypeHorizontal) actual := search(structs.ScalingPolicyTypeHorizontal)
require.ElementsMatch(expect, actual)
sort.Strings(found)
sort.Strings(expect)
require.NoError(err)
require.Equal(expect, found)
require.Equal(2, count)
// Check if we can read policies of other types // Check if we can read policies of other types
expect = []string{pOther1.Type} expect = []string{pOther1.Type}
count, found, err = search("other-type-1") actual = search("other-type-1")
require.ElementsMatch(expect, actual)
sort.Strings(found) // Check that we can read policies by prefix
sort.Strings(expect) expect = []string{"other-type-1", "other-type-2"}
actual = search("other-type")
require.NoError(err) require.Equal(expect, actual)
require.Equal(expect, found)
require.Equal(1, count)
// Check if we can't read policies by prefix
expect = []string{}
count, found, err = search("other-type")
sort.Strings(found)
sort.Strings(expect)
require.NoError(err)
require.Equal(expect, found)
require.Equal(0, count)
// Check for empty result // Check for empty result
expect = []string{} expect = []string{}
count, found, err = search("non-existing") actual = search("non-existing")
require.ElementsMatch(expect, actual)
sort.Strings(found)
sort.Strings(expect)
require.NoError(err)
require.Equal(expect, found)
require.Equal(0, count)
} }
func TestStateStore_ScalingPoliciesByTypePrefix(t *testing.T) { func TestStateStore_ScalingPoliciesByTypePrefix(t *testing.T) {

View file

@ -1181,6 +1181,8 @@ type SingleScalingPolicyResponse struct {
// ScalingPolicyListRequest is used to parameterize a scaling policy list request // ScalingPolicyListRequest is used to parameterize a scaling policy list request
type ScalingPolicyListRequest struct { type ScalingPolicyListRequest struct {
Job string
Type string
QueryOptions QueryOptions
} }
@ -5273,6 +5275,14 @@ type ScalingPolicy struct {
ModifyIndex uint64 ModifyIndex uint64
} }
// JobKey returns a key that is unique to a job-scoped target, useful as a map
// key. This uses the policy type, plus target (group and task).
func (p *ScalingPolicy) JobKey() string {
return p.Type + "\000" +
p.Target[ScalingTargetGroup] + "\000" +
p.Target[ScalingTargetTask]
}
const ( const (
ScalingTargetNamespace = "Namespace" ScalingTargetNamespace = "Namespace"
ScalingTargetJob = "Job" ScalingTargetJob = "Job"
@ -5376,6 +5386,7 @@ func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool {
return !reflect.DeepEqual(*p, copy) return !reflect.DeepEqual(*p, copy)
} }
// TarketTaskGroup updates a ScalingPolicy target to specify a given task group
func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy { func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy {
p.Target = map[string]string{ p.Target = map[string]string{
ScalingTargetNamespace: job.Namespace, ScalingTargetNamespace: job.Namespace,
@ -5385,6 +5396,13 @@ func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy
return p return p
} }
// TargetTask updates a ScalingPolicy target to specify a given task
func (p *ScalingPolicy) TargetTask(job *Job, tg *TaskGroup, task *Task) *ScalingPolicy {
p.TargetTaskGroup(job, tg)
p.Target[ScalingTargetTask] = task.Name
return p
}
func (p *ScalingPolicy) Stub() *ScalingPolicyListStub { func (p *ScalingPolicy) Stub() *ScalingPolicyListStub {
stub := &ScalingPolicyListStub{ stub := &ScalingPolicyListStub{
ID: p.ID, ID: p.ID,
@ -5410,6 +5428,8 @@ func (j *Job) GetScalingPolicies() []*ScalingPolicy {
} }
} }
ret = append(ret, j.GetEntScalingPolicies()...)
return ret return ret
} }
@ -6541,7 +6561,8 @@ type Task struct {
// attached to this task. // attached to this task.
VolumeMounts []*VolumeMount VolumeMounts []*VolumeMount
// The kill signal to use for the task. This is an optional specification, // ScalingPolicies is a list of scaling policies scoped to this task
ScalingPolicies []*ScalingPolicy
// KillSignal is the kill signal to use for the task. This is an optional // KillSignal is the kill signal to use for the task. This is an optional
// specification and defaults to SIGINT // specification and defaults to SIGINT

View file

@ -31,3 +31,7 @@ func (p *ScalingPolicy) validateType() multierror.Error {
return mErr return mErr
} }
func (j *Job) GetEntScalingPolicies() []*ScalingPolicy {
return nil
}

View file

@ -5,8 +5,6 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us= github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us=
github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM= github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8 h1:CkFIJJAKEbZbM2tKmCqt/v9ivgpikjPu5lnDsk8huLE=
github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg=
github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e h1:8YO27VG7yrHhATTepO2FYLbGUB1wfYbkqoKiVwaAQ+Q= github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e h1:8YO27VG7yrHhATTepO2FYLbGUB1wfYbkqoKiVwaAQ+Q=
github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg= github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg=
github.com/agext/levenshtein v1.2.1 h1:QmvMAjj2aEICytGiWzmxoE0x2KZvE0fvmqMOfy2tjT8= github.com/agext/levenshtein v1.2.1 h1:QmvMAjj2aEICytGiWzmxoE0x2KZvE0fvmqMOfy2tjT8=

View file

@ -0,0 +1,123 @@
package api
// Recommendations is used to query the recommendations endpoints.
type Recommendations struct {
client *Client
}
// Recommendations returns a new handle on the recommendations endpoints.
func (c *Client) Recommendations() *Recommendations {
return &Recommendations{client: c}
}
// List is used to dump all of the recommendations in the cluster
func (r *Recommendations) List(q *QueryOptions) ([]*Recommendation, *QueryMeta, error) {
var resp []*Recommendation
qm, err := r.client.query("/v1/recommendations", &resp, q)
if err != nil {
return nil, qm, err
}
return resp, qm, nil
}
// Info is used to return information on a single recommendation
func (r *Recommendations) Info(id string, q *QueryOptions) (*Recommendation, *QueryMeta, error) {
var resp Recommendation
qm, err := r.client.query("/v1/recommendation/"+id, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}
// Upsert is used to create or update a recommendation
func (r *Recommendations) Upsert(rec *Recommendation, q *WriteOptions) (*Recommendation, *WriteMeta, error) {
var resp Recommendation
wm, err := r.client.write("/v1/recommendation", rec, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}
// Delete is used to delete a list of recommendations
func (r *Recommendations) Delete(ids []string, q *WriteOptions) (*WriteMeta, error) {
req := &RecommendationApplyRequest{
Apply: []string{},
Dismiss: ids,
}
wm, err := r.client.write("/v1/recommendations/apply", req, nil, q)
if err != nil {
return nil, err
}
return wm, nil
}
// Apply is used to apply a set of recommendations
func (r *Recommendations) Apply(ids []string, policyOverride bool) (
*RecommendationApplyResponse, *WriteMeta, error) {
req := &RecommendationApplyRequest{
Apply: ids,
PolicyOverride: policyOverride,
}
var resp RecommendationApplyResponse
wm, err := r.client.write("/v1/recommendations/apply", req, &resp, nil)
if err != nil {
return nil, nil, err
}
resp.WriteMeta = *wm
return &resp, wm, nil
}
// Recommendation is used to serialize a recommendation.
type Recommendation struct {
ID string
Region string
Namespace string
JobID string
JobVersion uint64
Group string
Task string
Resource string
Value int
Current int
Meta map[string]interface{}
Stats map[string]float64
EnforceVersion bool
SubmitTime int64
CreateIndex uint64
ModifyIndex uint64
}
// RecommendationApplyRequest is used to apply and/or dismiss a set of recommendations
type RecommendationApplyRequest struct {
Apply []string
Dismiss []string
PolicyOverride bool
}
// RecommendationApplyResponse is used to apply a set of recommendations
type RecommendationApplyResponse struct {
UpdatedJobs []*SingleRecommendationApplyResult
Errors []*SingleRecommendationApplyError
WriteMeta
}
type SingleRecommendationApplyResult struct {
Namespace string
JobID string
JobModifyIndex uint64
EvalID string
EvalCreateIndex uint64
Warnings string
Recommendations []string
}
type SingleRecommendationApplyError struct {
Namespace string
JobID string
Recommendations []string
Error string
}

View file

@ -66,12 +66,12 @@ type ScalingPolicy struct {
Max *int64 `hcl:"max,optional"` Max *int64 `hcl:"max,optional"`
Policy map[string]interface{} `hcl:"policy,block"` Policy map[string]interface{} `hcl:"policy,block"`
Enabled *bool `hcl:"enabled,optional"` Enabled *bool `hcl:"enabled,optional"`
Type string `hcl:"type,optional"`
/* fields set by server */ /* fields set by server */
ID string ID string
Namespace string Namespace string
Type string
Target map[string]string Target map[string]string
CreateIndex uint64 CreateIndex uint64
ModifyIndex uint64 ModifyIndex uint64

View file

@ -671,6 +671,7 @@ type Task struct {
ShutdownDelay time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"` ShutdownDelay time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"`
KillSignal string `mapstructure:"kill_signal" hcl:"kill_signal,optional"` KillSignal string `mapstructure:"kill_signal" hcl:"kill_signal,optional"`
Kind string `hcl:"kind,optional"` Kind string `hcl:"kind,optional"`
ScalingPolicies []*ScalingPolicy `hcl:"scaling,block"`
} }
func (t *Task) Canonicalize(tg *TaskGroup, job *Job) { func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {