Merge pull request #9312 from hashicorp/b-9227-scaling-policy-filtering

use both job and type query on scaling policy list endpoint
This commit is contained in:
Chris Baker 2020-11-11 12:04:06 -06:00 committed by GitHub
commit 26469093cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 106 additions and 25 deletions

View File

@ -39,9 +39,10 @@ __BACKWARDS INCOMPATIBILITIES:__
BUG FIXES:
* agent (Enterprise): Fixed a bug where audit logging caused websocket and streaming http endpoints to fail [[GH-9319](https://github.com/hashicorp/nomad/issues/9319)]
* core: Fixed a bug where blocking queries would not include the query's maximum wait time when calculating whether it was safe to retry. [[GH-8921](https://github.com/hashicorp/nomad/issues/8921)]
* core: Fixed a bug where ACL handling prevented cross-namespace allocation listing [[GH-9278](https://github.com/hashicorp/nomad/issues/9278)]
* core: Fixed a bug where scaling policy filtering would ignore type query if job query was present [[GH-9312](https://github.com/hashicorp/nomad/issues/9312)]
* core: Fixed a bug where a request to scale a job would fail if the job was not in the default namespace. [[GH-9296](https://github.com/hashicorp/nomad/pull/9296)]
* core: Fixed a bug where blocking queries would not include the query's maximum wait time when calculating whether it was safe to retry. [[GH-8921](https://github.com/hashicorp/nomad/issues/8921)]
* config (Enterprise): Fixed default enterprise config merging. [[GH-9083](https://github.com/hashicorp/nomad/pull/9083)]
* client: Fixed an issue with the Java fingerprinter on macOS causing pop-up notifications when no JVM installed. [[GH-9225](https://github.com/hashicorp/nomad/pull/9225)]
* client: Fixed an fingerprinter issue detecting bridge kernel module [[GH-9299](https://github.com/hashicorp/nomad/pull/9299)]

View File

@ -54,7 +54,7 @@ func (p *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, reply *st
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.ScalingPoliciesByIDPrefix(ws, args.RequestNamespace(), prefix)
} else if job := args.Job; job != "" {
iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job)
iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job, args.Type)
} else {
iter, err = state.ScalingPoliciesByNamespace(ws, args.Namespace, args.Type)
}

View File

@ -1,11 +1,12 @@
package nomad
import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
@ -125,7 +126,6 @@ func TestScalingEndpoint_GetPolicy_ACL(t *testing.T) {
func TestScalingEndpoint_ListPolicies(t *testing.T) {
t.Parallel()
require := require.New(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
@ -133,25 +133,87 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
// Lookup the policies
get := &structs.ScalingPolicyListRequest{
var resp structs.ScalingPolicyListResponse
err := msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Region: "global",
Namespace: "default",
},
}, &resp)
require.NoError(t, err)
require.Empty(t, resp.Policies)
j1 := mock.Job()
j1polV := mock.ScalingPolicy()
j1polV.Type = "vertical-cpu"
j1polV.TargetTask(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0])
j1polH := mock.ScalingPolicy()
j1polH.Type = "horizontal"
j1polH.TargetTaskGroup(j1, j1.TaskGroups[0])
j2 := mock.Job()
j2polH := mock.ScalingPolicy()
j2polH.Type = "horizontal"
j2polH.TargetTaskGroup(j2, j2.TaskGroups[0])
s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, j1)
s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, j2)
pols := []*structs.ScalingPolicy{j1polV, j1polH, j2polH}
s1.fsm.State().UpsertScalingPolicies(1000, pols)
for _, p := range pols {
p.ModifyIndex = 1000
p.CreateIndex = 1000
}
var resp structs.ACLPolicyListResponse
err := msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp)
require.NoError(err)
require.Empty(resp.Policies)
p1 := mock.ScalingPolicy()
p2 := mock.ScalingPolicy()
s1.fsm.State().UpsertScalingPolicies(1000, []*structs.ScalingPolicy{p1, p2})
cases := []struct {
Label string
Job string
Type string
Expected []*structs.ScalingPolicy
}{
{
Label: "all policies",
Expected: []*structs.ScalingPolicy{j1polH, j1polV, j2polH},
},
{
Label: "job filter",
Job: j1.ID,
Expected: []*structs.ScalingPolicy{j1polH, j1polV},
},
{
Label: "type filter",
Type: "horizontal",
Expected: []*structs.ScalingPolicy{j1polH, j2polH},
},
{
Label: "job and type",
Job: j1.ID,
Type: "horizontal",
Expected: []*structs.ScalingPolicy{j1polH},
},
}
err = msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp)
require.NoError(err)
require.EqualValues(1000, resp.Index)
require.Len(resp.Policies, 2)
for _, tc := range cases {
t.Run(tc.Label, func(t *testing.T) {
get := &structs.ScalingPolicyListRequest{
Job: tc.Job,
Type: tc.Type,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
},
}
var resp structs.ScalingPolicyListResponse
err = msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp)
require.NoError(t, err)
stubs := []*structs.ScalingPolicyListStub{}
for _, p := range tc.Expected {
stubs = append(stubs, p.Stub())
}
require.ElementsMatch(t, stubs, resp.Policies)
})
}
}
func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) {
@ -170,7 +232,7 @@ func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) {
get := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Region: "global",
Namespace: "default",
},
}
@ -263,7 +325,7 @@ func TestScalingEndpoint_ListPolicies_Blocking(t *testing.T) {
req := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
Namespace: "default",
MinQueryIndex: 150,
},
}

View File

@ -5760,9 +5760,27 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace, ty
return iter, nil
}
func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) {
func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID, policyType string) (memdb.ResultIterator,
error) {
txn := s.db.ReadTxn()
return s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn)
iter, err := s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn)
if err != nil {
return nil, err
}
if policyType == "" {
return iter, nil
}
filter := func(raw interface{}) bool {
p, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}
return policyType != p.Type
}
return memdb.NewFilterIterator(iter, filter), nil
}
func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID string,

View File

@ -9201,7 +9201,7 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {
iter, err := state.ScalingPoliciesByJob(nil,
policyA.Target[structs.ScalingTargetNamespace],
policyA.Target[structs.ScalingTargetJob])
policyA.Target[structs.ScalingTargetJob], "")
require.NoError(err)
// Ensure we see expected policies
@ -9223,7 +9223,7 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {
iter, err = state.ScalingPoliciesByJob(nil,
policyB1.Target[structs.ScalingTargetNamespace],
policyB1.Target[structs.ScalingTargetJob])
policyB1.Target[structs.ScalingTargetJob], "")
require.NoError(err)
// Ensure we see expected policies
@ -9267,7 +9267,7 @@ func TestStateStore_ScalingPoliciesByJob_PrefixBug(t *testing.T) {
iter, err := state.ScalingPoliciesByJob(nil,
policy1.Target[structs.ScalingTargetNamespace],
jobPrefix)
jobPrefix, "")
require.NoError(err)
// Ensure we see expected policies