Merge pull request #9203 from hashicorp/f-das-oss-merge

merge of open-source components of DAS
This commit is contained in:
Chris Baker 2020-10-28 11:19:14 -05:00 committed by GitHub
commit 5ddb939681
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
38 changed files with 1282 additions and 301 deletions

View file

@ -24,26 +24,27 @@ const (
// combined we take the union of all capabilities. If the deny capability is present, it
// takes precedence and overwrites all other capabilities.
NamespaceCapabilityDeny = "deny"
NamespaceCapabilityListJobs = "list-jobs"
NamespaceCapabilityReadJob = "read-job"
NamespaceCapabilitySubmitJob = "submit-job"
NamespaceCapabilityDispatchJob = "dispatch-job"
NamespaceCapabilityReadLogs = "read-logs"
NamespaceCapabilityReadFS = "read-fs"
NamespaceCapabilityAllocExec = "alloc-exec"
NamespaceCapabilityAllocNodeExec = "alloc-node-exec"
NamespaceCapabilityAllocLifecycle = "alloc-lifecycle"
NamespaceCapabilitySentinelOverride = "sentinel-override"
NamespaceCapabilityCSIRegisterPlugin = "csi-register-plugin"
NamespaceCapabilityCSIWriteVolume = "csi-write-volume"
NamespaceCapabilityCSIReadVolume = "csi-read-volume"
NamespaceCapabilityCSIListVolume = "csi-list-volume"
NamespaceCapabilityCSIMountVolume = "csi-mount-volume"
NamespaceCapabilityListScalingPolicies = "list-scaling-policies"
NamespaceCapabilityReadScalingPolicy = "read-scaling-policy"
NamespaceCapabilityReadJobScaling = "read-job-scaling"
NamespaceCapabilityScaleJob = "scale-job"
NamespaceCapabilityDeny = "deny"
NamespaceCapabilityListJobs = "list-jobs"
NamespaceCapabilityReadJob = "read-job"
NamespaceCapabilitySubmitJob = "submit-job"
NamespaceCapabilityDispatchJob = "dispatch-job"
NamespaceCapabilityReadLogs = "read-logs"
NamespaceCapabilityReadFS = "read-fs"
NamespaceCapabilityAllocExec = "alloc-exec"
NamespaceCapabilityAllocNodeExec = "alloc-node-exec"
NamespaceCapabilityAllocLifecycle = "alloc-lifecycle"
NamespaceCapabilitySentinelOverride = "sentinel-override"
NamespaceCapabilityCSIRegisterPlugin = "csi-register-plugin"
NamespaceCapabilityCSIWriteVolume = "csi-write-volume"
NamespaceCapabilityCSIReadVolume = "csi-read-volume"
NamespaceCapabilityCSIListVolume = "csi-list-volume"
NamespaceCapabilityCSIMountVolume = "csi-mount-volume"
NamespaceCapabilityListScalingPolicies = "list-scaling-policies"
NamespaceCapabilityReadScalingPolicy = "read-scaling-policy"
NamespaceCapabilityReadJobScaling = "read-job-scaling"
NamespaceCapabilityScaleJob = "scale-job"
NamespaceCapabilitySubmitRecommendation = "submit-recommendation"
)
var (
@ -153,7 +154,7 @@ func isNamespaceCapabilityValid(cap string) bool {
NamespaceCapabilityListScalingPolicies, NamespaceCapabilityReadScalingPolicy, NamespaceCapabilityReadJobScaling, NamespaceCapabilityScaleJob:
return true
// Separate the enterprise-only capabilities
case NamespaceCapabilitySentinelOverride:
case NamespaceCapabilitySentinelOverride, NamespaceCapabilitySubmitRecommendation:
return true
default:
return false
@ -183,6 +184,7 @@ func expandNamespacePolicy(policy string) []string {
NamespaceCapabilityAllocLifecycle,
NamespaceCapabilityCSIMountVolume,
NamespaceCapabilityCSIWriteVolume,
NamespaceCapabilitySubmitRecommendation,
}...)
switch policy {

View file

@ -106,6 +106,7 @@ func TestParse(t *testing.T) {
NamespaceCapabilityAllocLifecycle,
NamespaceCapabilityCSIMountVolume,
NamespaceCapabilityCSIWriteVolume,
NamespaceCapabilitySubmitRecommendation,
},
},
{

123
api/recommendations.go Normal file
View file

@ -0,0 +1,123 @@
package api
// Recommendations is used to query the recommendations endpoints.
type Recommendations struct {
client *Client
}
// Recommendations returns a new handle on the recommendations endpoints.
func (c *Client) Recommendations() *Recommendations {
return &Recommendations{client: c}
}
// List is used to dump all of the recommendations in the cluster
func (r *Recommendations) List(q *QueryOptions) ([]*Recommendation, *QueryMeta, error) {
var resp []*Recommendation
qm, err := r.client.query("/v1/recommendations", &resp, q)
if err != nil {
return nil, qm, err
}
return resp, qm, nil
}
// Info is used to return information on a single recommendation
func (r *Recommendations) Info(id string, q *QueryOptions) (*Recommendation, *QueryMeta, error) {
var resp Recommendation
qm, err := r.client.query("/v1/recommendation/"+id, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}
// Upsert is used to create or update a recommendation
func (r *Recommendations) Upsert(rec *Recommendation, q *WriteOptions) (*Recommendation, *WriteMeta, error) {
var resp Recommendation
wm, err := r.client.write("/v1/recommendation", rec, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}
// Delete is used to delete a list of recommendations
func (r *Recommendations) Delete(ids []string, q *WriteOptions) (*WriteMeta, error) {
req := &RecommendationApplyRequest{
Apply: []string{},
Dismiss: ids,
}
wm, err := r.client.write("/v1/recommendations/apply", req, nil, q)
if err != nil {
return nil, err
}
return wm, nil
}
// Apply is used to apply a set of recommendations
func (r *Recommendations) Apply(ids []string, policyOverride bool) (
*RecommendationApplyResponse, *WriteMeta, error) {
req := &RecommendationApplyRequest{
Apply: ids,
PolicyOverride: policyOverride,
}
var resp RecommendationApplyResponse
wm, err := r.client.write("/v1/recommendations/apply", req, &resp, nil)
if err != nil {
return nil, nil, err
}
resp.WriteMeta = *wm
return &resp, wm, nil
}
// Recommendation is used to serialize a recommendation.
type Recommendation struct {
ID string
Region string
Namespace string
JobID string
JobVersion uint64
Group string
Task string
Resource string
Value int
Current int
Meta map[string]interface{}
Stats map[string]float64
EnforceVersion bool
SubmitTime int64
CreateIndex uint64
ModifyIndex uint64
}
// RecommendationApplyRequest is used to apply and/or dismiss a set of recommendations
type RecommendationApplyRequest struct {
Apply []string
Dismiss []string
PolicyOverride bool
}
// RecommendationApplyResponse is used to apply a set of recommendations
type RecommendationApplyResponse struct {
UpdatedJobs []*SingleRecommendationApplyResult
Errors []*SingleRecommendationApplyError
WriteMeta
}
type SingleRecommendationApplyResult struct {
Namespace string
JobID string
JobModifyIndex uint64
EvalID string
EvalCreateIndex uint64
Warnings string
Recommendations []string
}
type SingleRecommendationApplyError struct {
Namespace string
JobID string
Recommendations []string
Error string
}

View file

@ -66,12 +66,12 @@ type ScalingPolicy struct {
Max *int64 `hcl:"max,optional"`
Policy map[string]interface{} `hcl:"policy,block"`
Enabled *bool `hcl:"enabled,optional"`
Type string `hcl:"type,optional"`
/* fields set by server */
ID string
Namespace string
Type string
Target map[string]string
CreateIndex uint64
ModifyIndex uint64

View file

@ -671,6 +671,7 @@ type Task struct {
ShutdownDelay time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"`
KillSignal string `mapstructure:"kill_signal" hcl:"kill_signal,optional"`
Kind string `hcl:"kind,optional"`
ScalingPolicies []*ScalingPolicy `hcl:"scaling,block"`
}
func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {

View file

@ -68,6 +68,32 @@ func testPeriodicJob() *Job {
return job
}
func testRecommendation(job *Job) *Recommendation {
rec := &Recommendation{
ID: "",
Region: *job.Region,
Namespace: *job.Namespace,
JobID: *job.ID,
Group: *job.TaskGroups[0].Name,
Task: job.TaskGroups[0].Tasks[0].Name,
Resource: "CPU",
Value: *job.TaskGroups[0].Tasks[0].Resources.CPU * 2,
Meta: map[string]interface{}{
"testing": true,
"mocked": "also true",
},
Stats: map[string]float64{
"median": 50.0,
"mean": 51.0,
"max": 75.5,
"99": 73.0,
"min": 0.0,
},
EnforceVersion: false,
}
return rec
}
func testNamespace() *Namespace {
return &Namespace{
Name: "test-namespace",

View file

@ -17,6 +17,11 @@ func (s *HTTPServer) registerEnterpriseHandlers() {
s.mux.HandleFunc("/v1/quota", s.wrap(s.entOnly))
s.mux.HandleFunc("/v1/operator/license", s.wrap(s.entOnly))
s.mux.HandleFunc("/v1/recommendation", s.wrap(s.entOnly))
s.mux.HandleFunc("/v1/recommendations", s.wrap(s.entOnly))
s.mux.HandleFunc("/v1/recommendations/apply", s.wrap(s.entOnly))
s.mux.HandleFunc("/v1/recommendation/", s.wrap(s.entOnly))
}
func (s *HTTPServer) entOnly(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

View file

@ -982,7 +982,7 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
tg.Tasks = make([]*structs.Task, l)
for l, task := range taskGroup.Tasks {
t := &structs.Task{}
ApiTaskToStructsTask(task, t)
ApiTaskToStructsTask(job, tg, task, t)
// Set the tasks vault namespace from Job if it was not
// specified by the task or group
@ -996,7 +996,9 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
// ApiTaskToStructsTask is a copy and type conversion between the API
// representation of a task from a struct representation of a task.
func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup,
apiTask *api.Task, structsTask *structs.Task) {
structsTask.Name = apiTask.Name
structsTask.Driver = apiTask.Driver
structsTask.User = apiTask.User
@ -1033,6 +1035,13 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
}
}
if l := len(apiTask.ScalingPolicies); l != 0 {
structsTask.ScalingPolicies = make([]*structs.ScalingPolicy, l)
for i, policy := range apiTask.ScalingPolicies {
structsTask.ScalingPolicies[i] = ApiScalingPolicyToStructs(0, policy).TargetTask(job, group, structsTask)
}
}
if l := len(apiTask.Services); l != 0 {
structsTask.Services = make([]*structs.Service, l)
for i, service := range apiTask.Services {

View file

@ -22,6 +22,12 @@ func (s *HTTPServer) scalingPoliciesListRequest(resp http.ResponseWriter, req *h
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
if job := req.URL.Query().Get("job"); job != "" {
args.Job = job
}
if tpe := req.URL.Query().Get("type"); tpe != "" {
args.Type = tpe
}
var out structs.ScalingPolicyListResponse
if err := s.agent.RPC("Scaling.ListPolicies", &args, &out); err != nil {
@ -77,9 +83,14 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ
func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.ScalingPolicy {
p := structs.ScalingPolicy{
Enabled: *ap.Enabled,
Policy: ap.Policy,
Target: map[string]string{},
Type: ap.Type,
Policy: ap.Policy,
Target: map[string]string{},
}
if ap.Enabled != nil {
p.Enabled = *ap.Enabled
} else {
p.Enabled = true
}
if ap.Max != nil {
p.Max = *ap.Max

View file

@ -12,6 +12,7 @@ import (
)
func TestHTTP_ScalingPoliciesList(t *testing.T) {
require := require.New(t)
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
for i := 0; i < 3; i++ {
@ -26,40 +27,75 @@ func TestHTTP_ScalingPoliciesList(t *testing.T) {
},
}
var resp structs.JobRegisterResponse
if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(s.Agent.RPC("Job.Register", &args, &resp))
}
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/scaling/policies", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.ScalingPoliciesRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}
require.NoError(err)
// Check for the index
if respW.Header().Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
if respW.Header().Get("X-Nomad-KnownLeader") != "true" {
t.Fatalf("missing known leader")
}
if respW.Header().Get("X-Nomad-LastContact") == "" {
t.Fatalf("missing last contact")
}
require.NotEmpty(respW.Header().Get("X-Nomad-Index"), "missing index")
require.NotEmpty(respW.Header().Get("X-Nomad-KnownLeader"), "missing known leader")
require.NotEmpty(respW.Header().Get("X-Nomad-LastContact"), "missing last contact")
// Check the list
l := obj.([]*structs.ScalingPolicyListStub)
if len(l) != 3 {
t.Fatalf("bad: %#v", l)
require.Len(l, 3)
})
}
func TestHTTP_ScalingPoliciesList_Filter(t *testing.T) {
t.Parallel()
require := require.New(t)
httpTest(t, nil, func(s *TestAgent) {
var job *structs.Job
for i := 0; i < 3; i++ {
// Create the job
job, _ = mock.JobWithScalingPolicy()
args := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var resp structs.JobRegisterResponse
require.NoError(s.Agent.RPC("Job.Register", &args, &resp))
}
// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/scaling/policies?job="+job.ID, nil)
require.NoError(err)
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.ScalingPoliciesRequest(respW, req)
require.NoError(err)
// Check the list
l := obj.([]*structs.ScalingPolicyListStub)
require.Len(l, 1)
// Request again, with policy type filter
req, err = http.NewRequest("GET", "/v1/scaling/policies?type=cluster", nil)
require.NoError(err)
respW = httptest.NewRecorder()
// Make the request
obj, err = s.Server.ScalingPoliciesRequest(respW, req)
require.NoError(err)
// Check the list
l = obj.([]*structs.ScalingPolicyListStub)
require.Len(l, 0)
})
}
@ -90,15 +126,9 @@ func TestHTTP_ScalingPolicyGet(t *testing.T) {
require.NoError(err)
// Check for the index
if respW.Header().Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
if respW.Header().Get("X-Nomad-KnownLeader") != "true" {
t.Fatalf("missing known leader")
}
if respW.Header().Get("X-Nomad-LastContact") == "" {
t.Fatalf("missing last contact")
}
require.NotEmpty(respW.Header().Get("X-Nomad-Index"), "missing index")
require.NotEmpty(respW.Header().Get("X-Nomad-KnownLeader"), "missing known leader")
require.NotEmpty(respW.Header().Get("X-Nomad-LastContact"), "missing last contact")
// Check the policy
require.Equal(p.ID, obj.(*structs.ScalingPolicy).ID)

View file

@ -31,6 +31,9 @@ General Options:
Policy Info Options:
-type
Filter scaling policies by type.
-json
Output the scaling policy in its JSON format.
@ -48,6 +51,8 @@ func (s *ScalingPolicyListCommand) Synopsis() string {
func (s *ScalingPolicyListCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(s.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-job": complete.PredictNothing,
"-type": complete.PredictNothing,
"-json": complete.PredictNothing,
"-t": complete.PredictAnything,
})
@ -59,12 +64,14 @@ func (s *ScalingPolicyListCommand) Name() string { return "scaling policy list"
// Run satisfies the cli.Command Run function.
func (s *ScalingPolicyListCommand) Run(args []string) int {
var json bool
var tmpl string
var tmpl, policyType, job string
flags := s.Meta.FlagSet(s.Name(), FlagSetClient)
flags.Usage = func() { s.Ui.Output(s.Help()) }
flags.BoolVar(&json, "json", false, "")
flags.StringVar(&tmpl, "t", "", "")
flags.StringVar(&policyType, "type", "", "")
flags.StringVar(&job, "job", "", "")
if err := flags.Parse(args); err != nil {
return 1
}
@ -81,7 +88,16 @@ func (s *ScalingPolicyListCommand) Run(args []string) int {
return 1
}
policies, _, err := client.Scaling().ListPolicies(nil)
q := &api.QueryOptions{
Params: map[string]string{},
}
if policyType != "" {
q.Params["type"] = policyType
}
if job != "" {
q.Params["job"] = job
}
policies, _, err := client.Scaling().ListPolicies(q)
if err != nil {
s.Ui.Error(fmt.Sprintf("Error listing scaling policies: %s", err))
return 1
@ -103,7 +119,7 @@ func (s *ScalingPolicyListCommand) Run(args []string) int {
}
// Create the output table header.
output := []string{"ID|Enabled|Target"}
output := []string{"ID|Enabled|Type|Target"}
// Sort the list of policies based on their target.
sortedPolicies := scalingPolicyStubList{policies: policies}
@ -112,8 +128,8 @@ func (s *ScalingPolicyListCommand) Run(args []string) int {
// Iterate the policies and add to the output.
for _, policy := range sortedPolicies.policies {
output = append(output, fmt.Sprintf(
"%s|%v|%s",
policy.ID, policy.Enabled, formatScalingPolicyTarget(policy.Target)))
"%s|%v|%s|%s",
policy.ID, policy.Enabled, policy.Type, formatScalingPolicyTarget(policy.Target)))
}
// Output.

View file

@ -1,52 +1,36 @@
package command
import (
"fmt"
"strings"
"testing"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/cli"
)
func TestScalingPolicyListCommand_Run(t *testing.T) {
require := require.New(t)
t.Parallel()
srv, client, url := testServer(t, true, nil)
srv, client, url := testServer(t, false, nil)
defer srv.Shutdown()
testutil.WaitForResult(func() (bool, error) {
nodes, _, err := client.Nodes().List(nil)
if err != nil {
return false, err
}
if len(nodes) == 0 {
return false, fmt.Errorf("missing node")
}
if _, ok := nodes[0].Drivers["mock_driver"]; !ok {
return false, fmt.Errorf("mock_driver not ready")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
ui := cli.NewMockUi()
cmd := &ScalingPolicyListCommand{Meta: Meta{Ui: ui}}
// Perform an initial list, which should return zero results.
if code := cmd.Run([]string{"-address=" + url}); code != 0 {
t.Fatalf("expected cmd run exit code 0, got: %d", code)
}
if out := ui.OutputWriter.String(); !strings.Contains(out, "No policies found") {
t.Fatalf("expected no policies found within output: %v", out)
}
code := cmd.Run([]string{"-address=" + url})
require.Equal(0, code)
out := ui.OutputWriter.String()
require.Contains(out, "No policies found")
// Generate two test jobs.
jobs := []*api.Job{testJob("scaling_policy_list_1"), testJob("scaling_policy_list_2")}
// Generate an example scaling policy.
scalingPolicy := api.ScalingPolicy{
Type: api.ScalingPolicyTypeHorizontal,
Enabled: helper.BoolToPtr(true),
Min: helper.Int64ToPtr(1),
Max: helper.Int64ToPtr(1),
@ -55,29 +39,18 @@ func TestScalingPolicyListCommand_Run(t *testing.T) {
// Iterate the jobs, add the scaling policy and register.
for _, job := range jobs {
job.TaskGroups[0].Scaling = &scalingPolicy
resp, _, err := client.Jobs().Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 {
t.Fatalf("expected waitForSuccess exit code 0, got: %d", code)
}
_, _, err := client.Jobs().Register(job, nil)
require.NoError(err)
}
// Perform a new list which should yield results..
if code := cmd.Run([]string{"-address=" + url}); code != 0 {
t.Fatalf("expected cmd run exit code 0, got: %d", code)
}
out := ui.OutputWriter.String()
if !strings.Contains(out, "ID") ||
!strings.Contains(out, "Enabled") ||
!strings.Contains(out, "Target") {
t.Fatalf("expected table headers within output: %v", out)
}
if !strings.Contains(out, "scaling_policy_list_1") {
t.Fatalf("expected job scaling_policy_list_1 within output: %v", out)
}
if !strings.Contains(out, "scaling_policy_list_2") {
t.Fatalf("expected job scaling_policy_list_2 within output: %v", out)
}
code = cmd.Run([]string{"-address=" + url})
require.Equal(0, code)
out = ui.OutputWriter.String()
require.Contains(out, "ID")
require.Contains(out, "Enabled")
require.Contains(out, "Type")
require.Contains(out, "Target")
require.Contains(out, "scaling_policy_list_1")
require.Contains(out, "scaling_policy_list_2")
}

View file

@ -185,7 +185,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
// Parse scaling policy
if o := listVal.Filter("scaling"); len(o.Items) > 0 {
if err := parseScalingPolicy(&g.Scaling, o); err != nil {
if err := parseGroupScalingPolicy(&g.Scaling, o); err != nil {
return multierror.Prefix(err, "scaling ->")
}
}
@ -319,21 +319,39 @@ func parseVolumes(out *map[string]*api.VolumeRequest, list *ast.ObjectList) erro
return nil
}
func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error {
list = list.Elem()
func parseGroupScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error {
if len(list.Items) > 1 {
return fmt.Errorf("only one 'scaling' block allowed")
}
item := list.Items[0]
if len(item.Keys) != 0 {
return fmt.Errorf("task group scaling policy should not have a name")
}
p, err := parseScalingPolicy(item)
if err != nil {
return err
}
// Get our resource object
o := list.Items[0]
// group-specific validation
if p.Max == nil {
return fmt.Errorf("missing 'max'")
}
if p.Type == "" {
p.Type = "horizontal"
} else if p.Type != "horizontal" {
return fmt.Errorf("task group scaling policy had invalid type: %q", p.Type)
}
*out = p
return nil
}
func parseScalingPolicy(item *ast.ObjectItem) (*api.ScalingPolicy, error) {
// We need this later
var listVal *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
if ot, ok := item.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return fmt.Errorf("should be an object")
return nil, fmt.Errorf("should be an object")
}
valid := []string{
@ -341,14 +359,15 @@ func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error {
"max",
"policy",
"enabled",
"type",
}
if err := checkHCLKeys(o.Val, valid); err != nil {
return err
if err := checkHCLKeys(item.Val, valid); err != nil {
return nil, err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
if err := hcl.DecodeObject(&m, item.Val); err != nil {
return nil, err
}
delete(m, "policy")
@ -358,30 +377,26 @@ func parseScalingPolicy(out **api.ScalingPolicy, list *ast.ObjectList) error {
Result: &result,
})
if err != nil {
return err
return nil, err
}
if err := dec.Decode(m); err != nil {
return err
}
if result.Max == nil {
return fmt.Errorf("missing 'max'")
return nil, err
}
// If we have policy, then parse that
if o := listVal.Filter("policy"); len(o.Items) > 0 {
if len(o.Elem().Items) > 1 {
return fmt.Errorf("only one 'policy' block allowed per 'scaling' block")
return nil, fmt.Errorf("only one 'policy' block allowed per 'scaling' block")
}
p := o.Elem().Items[0]
var m map[string]interface{}
if err := hcl.DecodeObject(&m, p.Val); err != nil {
return err
return nil, err
}
if err := mapstructure.WeakDecode(m, &result.Policy); err != nil {
return err
return nil, err
}
}
*out = &result
return nil
return &result, nil
}

View file

@ -2,6 +2,7 @@ package jobspec
import (
"fmt"
"strings"
"time"
multierror "github.com/hashicorp/go-multierror"
@ -23,6 +24,7 @@ var (
"kill_timeout",
"shutdown_delay",
"kill_signal",
"scaling",
}
normalTaskKeys = append(commonTaskKeys,
@ -110,6 +112,7 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) {
delete(m, "vault")
delete(m, "volume_mount")
delete(m, "csi_plugin")
delete(m, "scaling")
// Build the task
var t api.Task
@ -276,6 +279,13 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) {
}
}
// Parse scaling policies
if o := listVal.Filter("scaling"); len(o.Items) > 0 {
if err := parseTaskScalingPolicies(&t.ScalingPolicies, o); err != nil {
return nil, err
}
}
// If we have a vault block, then parse that
if o := listVal.Filter("vault"); len(o.Items) > 0 {
v := &api.Vault{
@ -462,6 +472,56 @@ func parseTemplates(result *[]*api.Template, list *ast.ObjectList) error {
return nil
}
func parseTaskScalingPolicies(result *[]*api.ScalingPolicy, list *ast.ObjectList) error {
if len(list.Items) == 0 {
return nil
}
errPrefix := "scaling ->"
// Go through each object and turn it into an actual result.
seen := make(map[string]bool)
for _, item := range list.Items {
if l := len(item.Keys); l == 0 {
return multierror.Prefix(fmt.Errorf("task scaling policy missing name"), errPrefix)
} else if l > 1 {
return multierror.Prefix(fmt.Errorf("task scaling policy should only have one name"), errPrefix)
}
n := item.Keys[0].Token.Value().(string)
errPrefix = fmt.Sprintf("scaling[%v] ->", n)
var policyType string
switch strings.ToLower(n) {
case "cpu":
policyType = "vertical_cpu"
case "mem":
policyType = "vertical_mem"
default:
return multierror.Prefix(fmt.Errorf(`scaling policy name must be "cpu" or "mem"`), errPrefix)
}
// Make sure we haven't already found this
if seen[n] {
return multierror.Prefix(fmt.Errorf("scaling policy cannot be defined more than once"), errPrefix)
}
seen[n] = true
p, err := parseScalingPolicy(item)
if err != nil {
return multierror.Prefix(err, errPrefix)
}
if p.Type == "" {
p.Type = policyType
} else if p.Type != policyType {
return multierror.Prefix(fmt.Errorf("policy had invalid 'type': %q", p.Type), errPrefix)
}
*result = append(*result, p)
}
return nil
}
func parseResources(result *api.Resources, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) == 0 {

View file

@ -1153,7 +1153,6 @@ func TestParse(t *testing.T) {
},
false,
},
{
"tg-service-check.hcl",
&api.Job{
@ -1383,7 +1382,6 @@ func TestParse(t *testing.T) {
},
false,
},
{
"tg-scaling-policy.hcl",
&api.Job{
@ -1393,8 +1391,9 @@ func TestParse(t *testing.T) {
{
Name: stringToPtr("group"),
Scaling: &api.ScalingPolicy{
Min: int64ToPtr(5),
Max: int64ToPtr(100),
Type: "horizontal",
Min: int64ToPtr(5),
Max: int64ToPtr(100),
Policy: map[string]interface{}{
"foo": "bar",
"b": true,
@ -1414,6 +1413,47 @@ func TestParse(t *testing.T) {
},
false,
},
{
"task-scaling-policy.hcl",
&api.Job{
ID: stringToPtr("foo"),
Name: stringToPtr("foo"),
TaskGroups: []*api.TaskGroup{
{
Name: stringToPtr("bar"),
Tasks: []*api.Task{
{
Name: "bar",
Driver: "docker",
ScalingPolicies: []*api.ScalingPolicy{
{
Type: "vertical_cpu",
Target: nil,
Min: int64ToPtr(50),
Max: int64ToPtr(1000),
Policy: map[string]interface{}{
"test": "cpu",
},
Enabled: boolToPtr(true),
},
{
Type: "vertical_mem",
Target: nil,
Min: int64ToPtr(128),
Max: int64ToPtr(1024),
Policy: map[string]interface{}{
"test": "mem",
},
Enabled: boolToPtr(false),
},
},
},
},
},
},
},
false,
},
{
"tg-service-connect-gateway-ingress.hcl",
&api.Job{
@ -1480,6 +1520,7 @@ func TestParse(t *testing.T) {
{
Name: stringToPtr("group"),
Scaling: &api.ScalingPolicy{
Type: "horizontal",
Min: nil,
Max: int64ToPtr(10),
Policy: nil,
@ -1490,19 +1531,51 @@ func TestParse(t *testing.T) {
},
false,
},
{
"tg-scaling-policy-missing-max.hcl",
nil,
true,
},
{
"tg-scaling-policy-multi-policy.hcl",
nil,
true,
},
{
"tg-scaling-policy-with-label.hcl",
nil,
true,
},
{
"tg-scaling-policy-invalid-type.hcl",
nil,
true,
},
{
"task-scaling-policy-missing-name.hcl",
nil,
true,
},
{
"task-scaling-policy-multi-name.hcl",
nil,
true,
},
{
"task-scaling-policy-multi-cpu.hcl",
nil,
true,
},
{
"task-scaling-policy-invalid-type.hcl",
nil,
true,
},
{
"task-scaling-policy-invalid-resource.hcl",
nil,
true,
},
{
"multiregion.hcl",
&api.Job{

View file

@ -0,0 +1,17 @@
job "foo" {
task "bar" {
driver = "docker"
scaling "wrong" {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
}
}

View file

@ -0,0 +1,18 @@
job "foo" {
task "bar" {
driver = "docker"
scaling "cpu" {
type = "vertical_mem"
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
}
}

View file

@ -0,0 +1,17 @@
job "foo" {
task "bar" {
driver = "docker"
scaling {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
}
}

View file

@ -0,0 +1,27 @@
job "foo" {
task "bar" {
driver = "docker"
scaling "cpu" {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
scaling "cpu" {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
}
}

View file

@ -0,0 +1,17 @@
job "foo" {
task "bar" {
driver = "docker"
scaling "cpu" "mem" {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
}
}

View file

@ -0,0 +1,27 @@
job "foo" {
task "bar" {
driver = "docker"
scaling "cpu" {
enabled = true
min = 50
max = 1000
policy {
test = "cpu"
}
}
scaling "mem" {
enabled = false
min = 128
max = 1024
policy {
test = "mem"
}
}
}
}

View file

@ -0,0 +1,17 @@
job "elastic" {
group "group" {
scaling {
type = "vertical_cpu"
enabled = false
min = 5
max = 100
policy {
foo = "bar"
b = true
val = 5
f = 0.1
}
}
}
}

View file

@ -0,0 +1,16 @@
job "elastic" {
group "group" {
scaling "no-label-allowed" {
enabled = false
min = 5
max = 100
policy {
foo = "bar"
b = true
val = 5
f = 0.1
}
}
}
}

View file

@ -3,6 +3,7 @@ package jobspec2
import (
"fmt"
"reflect"
"strings"
"time"
"github.com/hashicorp/hcl/v2"
@ -18,6 +19,7 @@ var hclDecoder *gohcl.Decoder
func init() {
hclDecoder = newHCLDecoder()
hclDecoder.RegisterBlockDecoder(reflect.TypeOf(api.TaskGroup{}), decodeTaskGroup)
hclDecoder.RegisterBlockDecoder(reflect.TypeOf(api.Task{}), decodeTask)
}
func newHCLDecoder() *gohcl.Decoder {
@ -266,6 +268,7 @@ func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.D
}
d := newHCLDecoder()
d.RegisterBlockDecoder(reflect.TypeOf(api.Task{}), decodeTask)
diags = d.DecodeBody(tgBody, ctx, tg)
if tgExtra.Vault != nil {
@ -276,6 +279,128 @@ func decodeTaskGroup(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.D
}
}
if tg.Scaling != nil {
if tg.Scaling.Type == "" {
tg.Scaling.Type = "horizontal"
}
diags = append(diags, validateGroupScalingPolicy(tg.Scaling, tgBody)...)
}
return diags
}
func decodeTask(body hcl.Body, ctx *hcl.EvalContext, val interface{}) hcl.Diagnostics {
// special case scaling policy
t := val.(*api.Task)
b, remain, diags := body.PartialContent(&hcl.BodySchema{
Blocks: []hcl.BlockHeaderSchema{
{Type: "scaling", LabelNames: []string{"name"}},
},
})
diags = append(diags, decodeTaskScalingPolicies(b.Blocks, ctx, t)...)
decoder := newHCLDecoder()
diags = append(diags, decoder.DecodeBody(remain, ctx, val)...)
return diags
}
func decodeTaskScalingPolicies(blocks hcl.Blocks, ctx *hcl.EvalContext, task *api.Task) hcl.Diagnostics {
if len(blocks) == 0 {
return nil
}
var diags hcl.Diagnostics
seen := map[string]*hcl.Block{}
for _, b := range blocks {
label := strings.ToLower(b.Labels[0])
var policyType string
switch label {
case "cpu":
policyType = "vertical_cpu"
case "mem":
policyType = "vertical_mem"
default:
diags = append(diags, &hcl.Diagnostic{
Severity: hcl.DiagError,
Summary: "Invalid scaling policy name",
Detail: `scaling policy name must be "cpu" or "mem"`,
Subject: &b.LabelRanges[0],
})
continue
}
if prev, ok := seen[label]; ok {
diags = append(diags, &hcl.Diagnostic{
Severity: hcl.DiagError,
Summary: fmt.Sprintf("Duplicate scaling %q block", label),
Detail: fmt.Sprintf(
"Only one scaling %s block is allowed. Another was defined at %s.",
label, prev.DefRange.String(),
),
Subject: &b.DefRange,
})
continue
}
seen[label] = b
var p api.ScalingPolicy
diags = append(diags, hclDecoder.DecodeBody(b.Body, ctx, &p)...)
if p.Type == "" {
p.Type = policyType
} else if p.Type != policyType {
diags = append(diags, &hcl.Diagnostic{
Severity: hcl.DiagError,
Summary: "Invalid scaling policy type",
Detail: fmt.Sprintf(
"Invalid policy type, expected %q but found %q",
p.Type, policyType),
Subject: &b.DefRange,
})
continue
}
task.ScalingPolicies = append(task.ScalingPolicies, &p)
}
return diags
}
func validateGroupScalingPolicy(p *api.ScalingPolicy, body hcl.Body) hcl.Diagnostics {
// fast path: do nothing
if p.Max != nil && p.Type == "horizontal" {
return nil
}
content, _, diags := body.PartialContent(&hcl.BodySchema{
Blocks: []hcl.BlockHeaderSchema{{Type: "scaling"}},
})
if len(content.Blocks) == 0 {
// unexpected, given that we have a scaling policy
return diags
}
pc, _, diags := content.Blocks[0].Body.PartialContent(&hcl.BodySchema{
Attributes: []hcl.AttributeSchema{
{Name: "max", Required: true},
{Name: "type", Required: false},
},
})
if p.Type != "horizontal" {
if attr, ok := pc.Attributes["type"]; ok {
diags = append(diags, &hcl.Diagnostic{
Severity: hcl.DiagError,
Summary: "Invalid group scaling type",
Detail: fmt.Sprintf(
"task group scaling policy had invalid type: %q",
p.Type),
Subject: attr.Expr.Range().Ptr(),
})
}
}
return diags
}

View file

@ -145,3 +145,163 @@ dynamic "group" {
require.Equal(t, "groupB", *out.TaskGroups[1].Name)
require.Equal(t, "groupC", *out.TaskGroups[2].Name)
}
func TestParse_InvalidScalingSyntax(t *testing.T) {
cases := []struct {
name string
expectedErr string
hcl string
}{
{
"valid",
"",
`
job "example" {
group "g1" {
scaling {
max = 40
type = "horizontal"
}
task "t1" {
scaling "cpu" {
max = 20
}
scaling "mem" {
max = 15
}
}
}
}
`,
},
{
"group missing max",
`argument "max" is required`,
`
job "example" {
group "g1" {
scaling {
#max = 40
type = "horizontal"
}
task "t1" {
scaling "cpu" {
max = 20
}
scaling "mem" {
max = 15
}
}
}
}
`,
},
{
"group invalid type",
`task group scaling policy had invalid type`,
`
job "example" {
group "g1" {
scaling {
max = 40
type = "invalid_type"
}
task "t1" {
scaling "cpu" {
max = 20
}
scaling "mem" {
max = 15
}
}
}
}
`,
},
{
"task invalid label",
`scaling policy name must be "cpu" or "mem"`,
`
job "example" {
group "g1" {
scaling {
max = 40
type = "horizontal"
}
task "t1" {
scaling "not_cpu" {
max = 20
}
scaling "mem" {
max = 15
}
}
}
}
`,
},
{
"task duplicate blocks",
`Duplicate scaling "cpu" block`,
`
job "example" {
group "g1" {
scaling {
max = 40
type = "horizontal"
}
task "t1" {
scaling "cpu" {
max = 20
}
scaling "cpu" {
max = 15
}
}
}
}
`,
},
{
"task invalid type",
`Invalid scaling policy type`,
`
job "example" {
group "g1" {
scaling {
max = 40
type = "horizontal"
}
task "t1" {
scaling "cpu" {
max = 20
type = "invalid"
}
scaling "mem" {
max = 15
}
}
}
}
`,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
_, err := ParseWithArgs(c.name+".hcl", strings.NewReader(c.hcl), nil, true)
if c.expectedErr == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), c.expectedErr)
}
})
}
}

View file

@ -37,6 +37,10 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
return structs.ErrPermissionDenied
}
allow := func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs)
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
@ -48,7 +52,7 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
prefix := args.QueryOptions.Prefix
if args.RequestNamespace() == structs.AllNamespacesSentinel {
allowedNSes, err := allowedNSes(aclObj, state)
allowedNSes, err := allowedNSes(aclObj, state, allow)
if err != nil {
return err
}

View file

@ -451,16 +451,16 @@ func propagateScalingPolicyIDs(old, new *structs.Job) error {
oldIDs := make(map[string]string)
if old != nil {
// jobs currently only have scaling policies on task groups, so we can
// find correspondences using task group names
// use the job-scoped key (includes type, group, and task) to uniquely
// identify policies in a job
for _, p := range old.GetScalingPolicies() {
oldIDs[p.Target[structs.ScalingTargetGroup]] = p.ID
oldIDs[p.JobKey()] = p.ID
}
}
// ignore any existing ID in the policy, they should be empty
for _, p := range new.GetScalingPolicies() {
if id, ok := oldIDs[p.Target[structs.ScalingTargetGroup]]; ok {
if id, ok := oldIDs[p.JobKey()]; ok {
p.ID = id
} else {
p.ID = uuid.Generate()
@ -1265,7 +1265,7 @@ func (j *Job) GetJobVersions(args *structs.JobVersionsRequest,
// allowedNSes returns a set (as map of ns->true) of the namespaces a token has access to.
// Returns `nil` set if the token has access to all namespaces
// and ErrPermissionDenied if the token has no capabilities on any namespace.
func allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, error) {
func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string) bool) (map[string]bool, error) {
if aclObj == nil || aclObj.IsManagement() {
return nil, nil
}
@ -1279,7 +1279,7 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore) (map[string]bool, err
r := make(map[string]bool, len(nses))
for _, ns := range nses {
if aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) {
if allow(ns) {
r[ns] = true
}
}
@ -1367,6 +1367,9 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job
return err
}
prefix := args.QueryOptions.Prefix
allow := func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs)
}
// Setup the blocking query
opts := blockingOptions{
@ -1374,7 +1377,7 @@ func (j *Job) listAllNamespaces(args *structs.JobListRequest, reply *structs.Job
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// check if user has permission to all namespaces
allowedNSes, err := allowedNSes(aclObj, state)
allowedNSes, err := allowedNSes(aclObj, state, allow)
if err == structs.ErrPermissionDenied {
// return empty jobs if token isn't authorized for any
// namespace, matching other endpoints

View file

@ -1380,9 +1380,7 @@ func ScalingPolicy() *structs.ScalingPolicy {
Policy: map[string]interface{}{
"a": "b",
},
Enabled: true,
CreateIndex: 10,
ModifyIndex: 20,
Enabled: true,
}
}

View file

@ -1,6 +1,7 @@
package nomad
import (
"strings"
"time"
metrics "github.com/armon/go-metrics"
@ -8,6 +9,7 @@ import (
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -19,16 +21,18 @@ type Scaling struct {
}
// ListPolicies is used to list the policies
func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest,
reply *structs.ScalingPolicyListResponse) error {
func (p *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error {
if done, err := a.srv.forward("Scaling.ListPolicies", args, args, reply); done {
if done, err := p.srv.forward("Scaling.ListPolicies", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "scaling", "list_policies"}, time.Now())
// Check for list-job permissions
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil {
if args.RequestNamespace() == structs.AllNamespacesSentinel {
return p.listAllNamespaces(args, reply)
}
if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil {
hasListScalingPolicies := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListScalingPolicies)
@ -45,22 +49,24 @@ func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Iterate over all the policies
iter, err := state.ScalingPoliciesByNamespace(ws, args.Namespace)
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.ScalingPoliciesByIDPrefix(ws, args.RequestNamespace(), prefix)
} else if job := args.Job; job != "" {
iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job)
} else {
iter, err = state.ScalingPoliciesByNamespace(ws, args.Namespace, args.Type)
}
if err != nil {
return err
}
// Convert all the policies to a list stub
reply.Policies = nil
for {
raw := iter.Next()
if raw == nil {
break
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
policy := raw.(*structs.ScalingPolicy)
// if _, ok := policies[policy.Target]; ok || mgt {
// reply.Policies = append(reply.Policies, policy.Stub())
// }
reply.Policies = append(reply.Policies, policy.Stub())
}
@ -70,28 +76,27 @@ func (a *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest,
return err
}
// Ensure we never set the index to zero, otherwise a blocking query cannot be used.
// We floor the index at one, since realistically the first write must have a higher index.
// Don't return index zero, otherwise a blocking query cannot be used.
if index == 0 {
index = 1
}
reply.Index = index
return nil
}}
return a.srv.blockingRPC(&opts)
return p.srv.blockingRPC(&opts)
}
// GetPolicy is used to get a specific policy
func (a *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest,
func (p *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest,
reply *structs.SingleScalingPolicyResponse) error {
if done, err := a.srv.forward("Scaling.GetPolicy", args, args, reply); done {
if done, err := p.srv.forward("Scaling.GetPolicy", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "scaling", "get_policy"}, time.Now())
// Check for list-job permissions
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil {
if aclObj, err := p.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil {
hasReadScalingPolicy := aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadScalingPolicy)
@ -129,5 +134,71 @@ func (a *Scaling) GetPolicy(args *structs.ScalingPolicySpecificRequest,
reply.Index = index
return nil
}}
return a.srv.blockingRPC(&opts)
return p.srv.blockingRPC(&opts)
}
func (j *Scaling) listAllNamespaces(args *structs.ScalingPolicyListRequest, reply *structs.ScalingPolicyListResponse) error {
// Check for list-job permissions
aclObj, err := j.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
prefix := args.QueryOptions.Prefix
allow := func(ns string) bool {
return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListScalingPolicies) ||
(aclObj.AllowNsOp(ns, acl.NamespaceCapabilityListJobs) && aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob))
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// check if user has permission to all namespaces
allowedNSes, err := allowedNSes(aclObj, state, allow)
if err == structs.ErrPermissionDenied {
// return empty if token isn't authorized for any namespace
reply.Policies = []*structs.ScalingPolicyListStub{}
return nil
} else if err != nil {
return err
}
// Capture all the policies
var iter memdb.ResultIterator
if args.Type != "" {
iter, err = state.ScalingPoliciesByTypePrefix(ws, args.Type)
} else {
iter, err = state.ScalingPolicies(ws)
}
if err != nil {
return err
}
var policies []*structs.ScalingPolicyListStub
for raw := iter.Next(); raw != nil; raw = iter.Next() {
policy := raw.(*structs.ScalingPolicy)
if allowedNSes != nil && !allowedNSes[policy.Target[structs.ScalingTargetNamespace]] {
// not permitted to this name namespace
continue
}
if prefix != "" && !strings.HasPrefix(policy.ID, prefix) {
continue
}
policies = append(policies, policy.Stub())
}
reply.Policies = policies
// Use the last index that affected the policies table or summary
index, err := state.Index("scaling_policy")
if err != nil {
return err
}
reply.Index = helper.Uint64Max(1, index)
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"sort"
"strings"
"time"
log "github.com/hashicorp/go-hclog"
@ -1527,6 +1528,10 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b
return fmt.Errorf("unable to update job scaling policies: %v", err)
}
if err := s.updateJobRecommendations(index, txn, existingJob, job); err != nil {
return fmt.Errorf("unable to update job recommendations: %v", err)
}
if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil {
return fmt.Errorf("unable to update job scaling policies: %v", err)
}
@ -1644,6 +1649,11 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
return fmt.Errorf("deleting job scaling policies failed: %v", err)
}
// Delete any job recommendations
if err := s.deleteRecommendationsByJob(index, txn, job); err != nil {
return fmt.Errorf("deleting job recommendatons failed: %v", err)
}
// Delete the scaling events
if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil {
return fmt.Errorf("deleting job scaling events failed: %v", err)
@ -4567,17 +4577,10 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx
ws := memdb.NewWatchSet()
if job.Stop {
if err := s.deleteJobScalingPolicies(index, job, txn); err != nil {
return fmt.Errorf("deleting job scaling policies failed: %v", err)
}
return nil
}
scalingPolicies := job.GetScalingPolicies()
newTargets := map[string]struct{}{}
newTargets := map[string]bool{}
for _, p := range scalingPolicies {
newTargets[p.Target[structs.ScalingTargetGroup]] = struct{}{}
newTargets[p.JobKey()] = true
}
// find existing policies that need to be deleted
deletedPolicies := []string{}
@ -4585,13 +4588,9 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx
if err != nil {
return fmt.Errorf("ScalingPoliciesByJob lookup failed: %v", err)
}
for {
raw := iter.Next()
if raw == nil {
break
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
oldPolicy := raw.(*structs.ScalingPolicy)
if _, ok := newTargets[oldPolicy.Target[structs.ScalingTargetGroup]]; !ok {
if !newTargets[oldPolicy.JobKey()] {
deletedPolicies = append(deletedPolicies, oldPolicy.ID)
}
}
@ -5460,12 +5459,11 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s
}
policy.ID = existing.ID
policy.CreateIndex = existing.CreateIndex
policy.ModifyIndex = index
} else {
// policy.ID must have been set already in Job.Register before log apply
policy.CreateIndex = index
policy.ModifyIndex = index
}
policy.ModifyIndex = index
// Insert the scaling policy
hadUpdates = true
@ -5474,7 +5472,7 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s
}
}
// Update the indexes table for scaling policy
// Update the indexes table for scaling policy if we updated any policies
if hadUpdates {
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
@ -5740,19 +5738,6 @@ func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, e
return iter, nil
}
// ScalingPoliciesByType returns an iterator over scaling policies of a certain type.
func (s *StateStore) ScalingPoliciesByType(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
iter, err := txn.Get("scaling_policy", "type", t)
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}
// ScalingPoliciesByTypePrefix returns an iterator over scaling policies with a certain type prefix.
func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
@ -5766,7 +5751,7 @@ func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (m
return iter, nil
}
func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace, typ string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
iter, err := txn.Get("scaling_policy", "target_prefix", namespace)
@ -5775,18 +5760,22 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str
}
ws.Add(iter.WatchCh())
filter := func(raw interface{}) bool {
d, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}
return d.Target[structs.ScalingTargetNamespace] != namespace
// Wrap the iterator in a filter to exact match the namespace
iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace))
// If policy type is specified as well, wrap again
if typ != "" {
iter = memdb.NewFilterIterator(iter, func(raw interface{}) bool {
p, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}
return !strings.HasPrefix(p.Type, typ)
})
}
// Wrap the iterator in a filter
wrap := memdb.NewFilterIterator(iter, filter)
return wrap, nil
return iter, nil
}
func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) {
@ -5834,6 +5823,8 @@ func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.S
return nil, nil
}
// ScalingPolicyByTargetAndType returns a fully-qualified policy against a target and policy type,
// or nil if it does not exist. This method does not honor the watchset on the policy type, just the target.
func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[string]string, typ string) (*structs.ScalingPolicy,
error) {
txn := s.db.ReadTxn()
@ -5847,6 +5838,7 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[
if err != nil {
return nil, fmt.Errorf("scaling_policy lookup failed: %v", err)
}
ws.Add(it.WatchCh())
// Check for type
@ -5866,6 +5858,34 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[
return nil, nil
}
func (s *StateStore) ScalingPoliciesByIDPrefix(ws memdb.WatchSet, namespace string, prefix string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
iter, err := txn.Get("scaling_policy", "id_prefix", prefix)
if err != nil {
return nil, fmt.Errorf("scaling policy lookup failed: %v", err)
}
ws.Add(iter.WatchCh())
iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace))
return iter, nil
}
// scalingPolicyNamespaceFilter returns a filter function that filters all
// scaling policies not targeting the given namespace.
func scalingPolicyNamespaceFilter(namespace string) func(interface{}) bool {
return func(raw interface{}) bool {
p, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}
return p.Target[structs.ScalingTargetNamespace] != namespace
}
}
func (s *StateStore) EventSinks(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
@ -6216,6 +6236,7 @@ func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error {
return nil
}
// ScalingEventsRestore is used to restore scaling events for a job
func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) error {
if err := r.txn.Insert("scaling_event", jobEvents); err != nil {
return fmt.Errorf("scaling event insert failed: %v", err)

View file

@ -20,3 +20,13 @@ func (s *StateStore) quotaReconcile(index uint64, txn *txn, newQuota, oldQuota s
func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error {
return nil
}
// deleteRecommendationsByJob deletes all recommendations for the specified job
func (s *StateStore) deleteRecommendationsByJob(index uint64, txn Txn, job *structs.Job) error {
return nil
}
// updateJobRecommendations updates/deletes job recommendations as necessary for a job update
func (s *StateStore) updateJobRecommendations(index uint64, txn Txn, prevJob, newJob *structs.Job) error {
return nil
}

View file

@ -8632,12 +8632,12 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) {
policy2.Target[structs.ScalingTargetNamespace] = otherNamespace
ws1 := memdb.NewWatchSet()
iter, err := state.ScalingPoliciesByNamespace(ws1, structs.DefaultNamespace)
iter, err := state.ScalingPoliciesByNamespace(ws1, structs.DefaultNamespace, "")
require.NoError(err)
require.Nil(iter.Next())
ws2 := memdb.NewWatchSet()
iter, err = state.ScalingPoliciesByNamespace(ws2, otherNamespace)
iter, err = state.ScalingPoliciesByNamespace(ws2, otherNamespace, "")
require.NoError(err)
require.Nil(iter.Next())
@ -8646,7 +8646,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) {
require.True(watchFired(ws1))
require.True(watchFired(ws2))
iter, err = state.ScalingPoliciesByNamespace(nil, structs.DefaultNamespace)
iter, err = state.ScalingPoliciesByNamespace(nil, structs.DefaultNamespace, "")
require.NoError(err)
policiesInDefaultNamespace := []string{}
for {
@ -8658,7 +8658,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) {
}
require.ElementsMatch([]string{policy.ID}, policiesInDefaultNamespace)
iter, err = state.ScalingPoliciesByNamespace(nil, otherNamespace)
iter, err = state.ScalingPoliciesByNamespace(nil, otherNamespace, "")
require.NoError(err)
policiesInOtherNamespace := []string{}
for {
@ -8684,12 +8684,12 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
policy2.Target[structs.ScalingTargetNamespace] = ns2
ws1 := memdb.NewWatchSet()
iter, err := state.ScalingPoliciesByNamespace(ws1, ns1)
iter, err := state.ScalingPoliciesByNamespace(ws1, ns1, "")
require.NoError(err)
require.Nil(iter.Next())
ws2 := memdb.NewWatchSet()
iter, err = state.ScalingPoliciesByNamespace(ws2, ns2)
iter, err = state.ScalingPoliciesByNamespace(ws2, ns2, "")
require.NoError(err)
require.Nil(iter.Next())
@ -8698,7 +8698,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
require.True(watchFired(ws1))
require.True(watchFired(ws2))
iter, err = state.ScalingPoliciesByNamespace(nil, ns1)
iter, err = state.ScalingPoliciesByNamespace(nil, ns1, "")
require.NoError(err)
policiesInNS1 := []string{}
for {
@ -8710,7 +8710,7 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
}
require.ElementsMatch([]string{policy1.ID}, policiesInNS1)
iter, err = state.ScalingPoliciesByNamespace(nil, ns2)
iter, err = state.ScalingPoliciesByNamespace(nil, ns2, "")
require.NoError(err)
policiesInNS2 := []string{}
for {
@ -8723,37 +8723,6 @@ func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
require.ElementsMatch([]string{policy2.ID}, policiesInNS2)
}
func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
job, policy := mock.JobWithScalingPolicy()
// Create a watchset so we can test that upsert fires the watch
ws := memdb.NewWatchSet()
out, err := state.ScalingPolicyByTargetAndType(ws, policy.Target, policy.Type)
require.NoError(err)
require.Nil(out)
var newIndex uint64 = 1000
err = state.UpsertJob(structs.MsgTypeTestSetup, newIndex, job)
require.NoError(err)
require.True(watchFired(ws), "watch did not fire")
ws = memdb.NewWatchSet()
out, err = state.ScalingPolicyByTargetAndType(ws, policy.Target, policy.Type)
require.NoError(err)
require.NotNil(out)
require.Equal(newIndex, out.CreateIndex)
require.Equal(newIndex, out.ModifyIndex)
index, err := state.Index("scaling_policy")
require.NoError(err)
require.Equal(newIndex, index)
}
// Scaling Policy IDs are generated randomly during Job.Register
// Subsequent updates of the job should preserve the ID for the scaling policy
// associated with a given target.
@ -8883,7 +8852,7 @@ func TestStateStore_DeleteScalingPolicies(t *testing.T) {
require.Nil(out)
// Ensure we see both policies
iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace])
iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace], "")
require.NoError(err)
count := 0
for {
@ -8967,13 +8936,12 @@ func TestStateStore_UnstopJob_UpsertScalingPolicies(t *testing.T) {
// upsert a stopped job, verify that we don't fire the watcher or add any scaling policies
err = state.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
require.NoError(err)
require.False(watchFired(ws))
// stopped job should have no scaling policies, watcher doesn't fire
require.True(watchFired(ws))
list, err = state.ScalingPolicies(ws)
require.NoError(err)
require.Nil(list.Next())
require.NotNil(list.Next())
// Establish a new watcher
// Establish a new watchset
ws = memdb.NewWatchSet()
_, err = state.ScalingPolicies(ws)
require.NoError(err)
@ -8982,14 +8950,14 @@ func TestStateStore_UnstopJob_UpsertScalingPolicies(t *testing.T) {
err = state.UpsertJob(structs.MsgTypeTestSetup, 1100, job)
require.NoError(err)
// Ensure the scaling policy was added, watch was fired, index was advanced
require.True(watchFired(ws))
// Ensure the scaling policy still exists, watch was not fired, index was not advanced
out, err := state.ScalingPolicyByTargetAndType(nil, policy.Target, policy.Type)
require.NoError(err)
require.NotNil(out)
index, err := state.Index("scaling_policy")
require.NoError(err)
require.GreaterOrEqual(index, uint64(1100))
require.EqualValues(index, 1000)
require.False(watchFired(ws))
}
func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) {
@ -9105,15 +9073,12 @@ func TestStateStore_ScalingPoliciesByType(t *testing.T) {
pOther2.Type = "other-type-2"
// Create search routine
search := func(t string) (count int, found []string, err error) {
search := func(t string) (found []string) {
found = []string{}
iter, err := state.ScalingPoliciesByType(nil, t)
if err != nil {
return
}
iter, err := state.ScalingPoliciesByTypePrefix(nil, t)
require.NoError(err)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
count++
found = append(found, raw.(*structs.ScalingPolicy).Type)
}
return
@ -9126,47 +9091,23 @@ func TestStateStore_ScalingPoliciesByType(t *testing.T) {
// Check if we can read horizontal policies
expect := []string{pHorzA.Type, pHorzB.Type}
count, found, err := search(structs.ScalingPolicyTypeHorizontal)
sort.Strings(found)
sort.Strings(expect)
require.NoError(err)
require.Equal(expect, found)
require.Equal(2, count)
actual := search(structs.ScalingPolicyTypeHorizontal)
require.ElementsMatch(expect, actual)
// Check if we can read policies of other types
expect = []string{pOther1.Type}
count, found, err = search("other-type-1")
actual = search("other-type-1")
require.ElementsMatch(expect, actual)
sort.Strings(found)
sort.Strings(expect)
require.NoError(err)
require.Equal(expect, found)
require.Equal(1, count)
// Check if we can't read policies by prefix
expect = []string{}
count, found, err = search("other-type")
sort.Strings(found)
sort.Strings(expect)
require.NoError(err)
require.Equal(expect, found)
require.Equal(0, count)
// Check that we can read policies by prefix
expect = []string{"other-type-1", "other-type-2"}
actual = search("other-type")
require.Equal(expect, actual)
// Check for empty result
expect = []string{}
count, found, err = search("non-existing")
sort.Strings(found)
sort.Strings(expect)
require.NoError(err)
require.Equal(expect, found)
require.Equal(0, count)
actual = search("non-existing")
require.ElementsMatch(expect, actual)
}
func TestStateStore_ScalingPoliciesByTypePrefix(t *testing.T) {

View file

@ -1181,6 +1181,8 @@ type SingleScalingPolicyResponse struct {
// ScalingPolicyListRequest is used to parameterize a scaling policy list request
type ScalingPolicyListRequest struct {
Job string
Type string
QueryOptions
}
@ -5273,6 +5275,14 @@ type ScalingPolicy struct {
ModifyIndex uint64
}
// JobKey returns a key that is unique to a job-scoped target, useful as a map
// key. This uses the policy type, plus target (group and task).
func (p *ScalingPolicy) JobKey() string {
return p.Type + "\000" +
p.Target[ScalingTargetGroup] + "\000" +
p.Target[ScalingTargetTask]
}
const (
ScalingTargetNamespace = "Namespace"
ScalingTargetJob = "Job"
@ -5376,6 +5386,7 @@ func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool {
return !reflect.DeepEqual(*p, copy)
}
// TarketTaskGroup updates a ScalingPolicy target to specify a given task group
func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy {
p.Target = map[string]string{
ScalingTargetNamespace: job.Namespace,
@ -5385,6 +5396,13 @@ func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy
return p
}
// TargetTask updates a ScalingPolicy target to specify a given task
func (p *ScalingPolicy) TargetTask(job *Job, tg *TaskGroup, task *Task) *ScalingPolicy {
p.TargetTaskGroup(job, tg)
p.Target[ScalingTargetTask] = task.Name
return p
}
func (p *ScalingPolicy) Stub() *ScalingPolicyListStub {
stub := &ScalingPolicyListStub{
ID: p.ID,
@ -5410,6 +5428,8 @@ func (j *Job) GetScalingPolicies() []*ScalingPolicy {
}
}
ret = append(ret, j.GetEntScalingPolicies()...)
return ret
}
@ -6541,7 +6561,8 @@ type Task struct {
// attached to this task.
VolumeMounts []*VolumeMount
// The kill signal to use for the task. This is an optional specification,
// ScalingPolicies is a list of scaling policies scoped to this task
ScalingPolicies []*ScalingPolicy
// KillSignal is the kill signal to use for the task. This is an optional
// specification and defaults to SIGINT

View file

@ -31,3 +31,7 @@ func (p *ScalingPolicy) validateType() multierror.Error {
return mErr
}
func (j *Job) GetEntScalingPolicies() []*ScalingPolicy {
return nil
}

View file

@ -5,8 +5,6 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us=
github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8 h1:CkFIJJAKEbZbM2tKmCqt/v9ivgpikjPu5lnDsk8huLE=
github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg=
github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e h1:8YO27VG7yrHhATTepO2FYLbGUB1wfYbkqoKiVwaAQ+Q=
github.com/a8m/tree v0.0.0-20201026183218-fce18e2a750e/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg=
github.com/agext/levenshtein v1.2.1 h1:QmvMAjj2aEICytGiWzmxoE0x2KZvE0fvmqMOfy2tjT8=

View file

@ -0,0 +1,123 @@
package api
// Recommendations is used to query the recommendations endpoints.
type Recommendations struct {
client *Client
}
// Recommendations returns a new handle on the recommendations endpoints.
func (c *Client) Recommendations() *Recommendations {
return &Recommendations{client: c}
}
// List is used to dump all of the recommendations in the cluster
func (r *Recommendations) List(q *QueryOptions) ([]*Recommendation, *QueryMeta, error) {
var resp []*Recommendation
qm, err := r.client.query("/v1/recommendations", &resp, q)
if err != nil {
return nil, qm, err
}
return resp, qm, nil
}
// Info is used to return information on a single recommendation
func (r *Recommendations) Info(id string, q *QueryOptions) (*Recommendation, *QueryMeta, error) {
var resp Recommendation
qm, err := r.client.query("/v1/recommendation/"+id, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}
// Upsert is used to create or update a recommendation
func (r *Recommendations) Upsert(rec *Recommendation, q *WriteOptions) (*Recommendation, *WriteMeta, error) {
var resp Recommendation
wm, err := r.client.write("/v1/recommendation", rec, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}
// Delete is used to delete a list of recommendations
func (r *Recommendations) Delete(ids []string, q *WriteOptions) (*WriteMeta, error) {
req := &RecommendationApplyRequest{
Apply: []string{},
Dismiss: ids,
}
wm, err := r.client.write("/v1/recommendations/apply", req, nil, q)
if err != nil {
return nil, err
}
return wm, nil
}
// Apply is used to apply a set of recommendations
func (r *Recommendations) Apply(ids []string, policyOverride bool) (
*RecommendationApplyResponse, *WriteMeta, error) {
req := &RecommendationApplyRequest{
Apply: ids,
PolicyOverride: policyOverride,
}
var resp RecommendationApplyResponse
wm, err := r.client.write("/v1/recommendations/apply", req, &resp, nil)
if err != nil {
return nil, nil, err
}
resp.WriteMeta = *wm
return &resp, wm, nil
}
// Recommendation is used to serialize a recommendation.
type Recommendation struct {
ID string
Region string
Namespace string
JobID string
JobVersion uint64
Group string
Task string
Resource string
Value int
Current int
Meta map[string]interface{}
Stats map[string]float64
EnforceVersion bool
SubmitTime int64
CreateIndex uint64
ModifyIndex uint64
}
// RecommendationApplyRequest is used to apply and/or dismiss a set of recommendations
type RecommendationApplyRequest struct {
Apply []string
Dismiss []string
PolicyOverride bool
}
// RecommendationApplyResponse is used to apply a set of recommendations
type RecommendationApplyResponse struct {
UpdatedJobs []*SingleRecommendationApplyResult
Errors []*SingleRecommendationApplyError
WriteMeta
}
type SingleRecommendationApplyResult struct {
Namespace string
JobID string
JobModifyIndex uint64
EvalID string
EvalCreateIndex uint64
Warnings string
Recommendations []string
}
type SingleRecommendationApplyError struct {
Namespace string
JobID string
Recommendations []string
Error string
}

View file

@ -66,12 +66,12 @@ type ScalingPolicy struct {
Max *int64 `hcl:"max,optional"`
Policy map[string]interface{} `hcl:"policy,block"`
Enabled *bool `hcl:"enabled,optional"`
Type string `hcl:"type,optional"`
/* fields set by server */
ID string
Namespace string
Type string
Target map[string]string
CreateIndex uint64
ModifyIndex uint64

View file

@ -671,6 +671,7 @@ type Task struct {
ShutdownDelay time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"`
KillSignal string `mapstructure:"kill_signal" hcl:"kill_signal,optional"`
Kind string `hcl:"kind,optional"`
ScalingPolicies []*ScalingPolicy `hcl:"scaling,block"`
}
func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {