Merge pull request #8753 from hashicorp/b-scaling-policy-delete-job-prefix

resolve prefix bugs around job scaling policies
This commit is contained in:
Chris Baker 2020-08-27 10:04:21 -05:00 committed by GitHub
commit 6344efe2f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 175 additions and 5 deletions

View File

@ -8,6 +8,7 @@ IMPROVEMENTS:
BUG FIXES: BUG FIXES:
* core: Fixed a bug where unpromoted job versions are used when rescheduling failed allocations [[GH-8691](https://github.com/hashicorp/nomad/issues/8691)] * core: Fixed a bug where unpromoted job versions are used when rescheduling failed allocations [[GH-8691](https://github.com/hashicorp/nomad/issues/8691)]
* core: Fixed bugs where scaling policies could be matched against incorrect jobs with a similar prefix [[GH-8753](https://github.com/hashicorp/nomad/issues/8753)]
## 0.12.3 (August 13, 2020) ## 0.12.3 (August 13, 2020)

View File

@ -136,6 +136,7 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) {
get := &structs.ScalingPolicyListRequest{ get := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{ QueryOptions: structs.QueryOptions{
Region: "global", Region: "global",
Namespace: "default",
}, },
} }
var resp structs.ACLPolicyListResponse var resp structs.ACLPolicyListResponse
@ -170,6 +171,7 @@ func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) {
get := &structs.ScalingPolicyListRequest{ get := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{ QueryOptions: structs.QueryOptions{
Region: "global", Region: "global",
Namespace: "default",
}, },
} }
@ -261,6 +263,7 @@ func TestScalingEndpoint_ListPolicies_Blocking(t *testing.T) {
req := &structs.ScalingPolicyListRequest{ req := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{ QueryOptions: structs.QueryOptions{
Region: "global", Region: "global",
Namespace: "default",
MinQueryIndex: 150, MinQueryIndex: 150,
}, },
} }

View File

@ -1539,11 +1539,30 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
// deleteJobScalingPolicies deletes any scaling policies associated with the job // deleteJobScalingPolicies deletes any scaling policies associated with the job
func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error { func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error {
numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", job.Namespace, job.ID) iter, err := s.ScalingPoliciesByJobTxn(nil, job.Namespace, job.ID, txn)
if err != nil { if err != nil {
return fmt.Errorf("deleting job scaling policies failed: %v", err) return fmt.Errorf("getting job scaling policies for deletion failed: %v", err)
} }
if numDeletedScalingPolicies > 0 {
// Put them into a slice so there are no safety concerns while actually
// performing the deletes
policies := []interface{}{}
for {
raw := iter.Next()
if raw == nil {
break
}
policies = append(policies, raw)
}
// Do the deletes
for _, p := range policies {
if err := txn.Delete("scaling_policy", p); err != nil {
return fmt.Errorf("deleting scaling policy failed: %v", err)
}
}
if len(policies) > 0 {
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
return fmt.Errorf("index update failed: %v", err) return fmt.Errorf("index update failed: %v", err)
} }
@ -5332,7 +5351,19 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str
} }
ws.Add(iter.WatchCh()) ws.Add(iter.WatchCh())
return iter, nil
filter := func(raw interface{}) bool {
d, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}
return d.Target[structs.ScalingTargetNamespace] != namespace
}
// Wrap the iterator in a filter
wrap := memdb.NewFilterIterator(iter, filter)
return wrap, nil
} }
func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) { func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) {
@ -5349,7 +5380,19 @@ func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID
} }
ws.Add(iter.WatchCh()) ws.Add(iter.WatchCh())
return iter, nil
filter := func(raw interface{}) bool {
d, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}
return d.Target[structs.ScalingTargetJob] != jobID
}
// Wrap the iterator in a filter
wrap := memdb.NewFilterIterator(iter, filter)
return wrap, nil
} }
func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.ScalingPolicy, error) { func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.ScalingPolicy, error) {

View File

@ -8640,6 +8640,59 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) {
require.ElementsMatch([]string{policy2.ID}, policiesInOtherNamespace) require.ElementsMatch([]string{policy2.ID}, policiesInOtherNamespace)
} }
func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
t.Parallel()
require := require.New(t)
ns1 := "name"
ns2 := "name2" // matches prefix "name"
state := testStateStore(t)
policy1 := mock.ScalingPolicy()
policy1.Target[structs.ScalingTargetNamespace] = ns1
policy2 := mock.ScalingPolicy()
policy2.Target[structs.ScalingTargetNamespace] = ns2
ws1 := memdb.NewWatchSet()
iter, err := state.ScalingPoliciesByNamespace(ws1, ns1)
require.NoError(err)
require.Nil(iter.Next())
ws2 := memdb.NewWatchSet()
iter, err = state.ScalingPoliciesByNamespace(ws2, ns2)
require.NoError(err)
require.Nil(iter.Next())
err = state.UpsertScalingPolicies(1000, []*structs.ScalingPolicy{policy1, policy2})
require.NoError(err)
require.True(watchFired(ws1))
require.True(watchFired(ws2))
iter, err = state.ScalingPoliciesByNamespace(nil, ns1)
require.NoError(err)
policiesInNS1 := []string{}
for {
raw := iter.Next()
if raw == nil {
break
}
policiesInNS1 = append(policiesInNS1, raw.(*structs.ScalingPolicy).ID)
}
require.ElementsMatch([]string{policy1.ID}, policiesInNS1)
iter, err = state.ScalingPoliciesByNamespace(nil, ns2)
require.NoError(err)
policiesInNS2 := []string{}
for {
raw := iter.Next()
if raw == nil {
break
}
policiesInNS2 = append(policiesInNS2, raw.(*structs.ScalingPolicy).ID)
}
require.ElementsMatch([]string{policy2.ID}, policiesInNS2)
}
func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) { func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) {
t.Parallel() t.Parallel()
@ -8940,6 +8993,37 @@ func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) {
require.True(index > 1001) require.True(index > 1001)
} }
func TestStateStore_DeleteJob_DeleteScalingPoliciesPrefixBug(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
job := mock.Job()
require.NoError(state.UpsertJob(1000, job))
job2 := job.Copy()
job2.ID = job.ID + "-but-longer"
require.NoError(state.UpsertJob(1001, job2))
policy := mock.ScalingPolicy()
policy.Target[structs.ScalingTargetJob] = job.ID
policy2 := mock.ScalingPolicy()
policy2.Target[structs.ScalingTargetJob] = job2.ID
require.NoError(state.UpsertScalingPolicies(1002, []*structs.ScalingPolicy{policy, policy2}))
// Delete job with the shorter prefix-ID
require.NoError(state.DeleteJob(1003, job.Namespace, job.ID))
// Ensure only the associated scaling policy was deleted, not the one matching the job with the longer ID
out, err := state.ScalingPolicyByID(nil, policy.ID)
require.NoError(err)
require.Nil(out)
out, err = state.ScalingPolicyByID(nil, policy2.ID)
require.NoError(err)
require.NotNil(out)
}
// This test ensures that deleting a job that doesn't have any scaling policies // This test ensures that deleting a job that doesn't have any scaling policies
// will not cause the scaling_policy table index to increase, on either job // will not cause the scaling_policy table index to increase, on either job
// registration or deletion. // registration or deletion.
@ -9035,6 +9119,45 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {
require.Equal(expect, found) require.Equal(expect, found)
} }
func TestStateStore_ScalingPoliciesByJob_PrefixBug(t *testing.T) {
t.Parallel()
require := require.New(t)
jobPrefix := "job-name-" + uuid.Generate()
state := testStateStore(t)
policy1 := mock.ScalingPolicy()
policy1.Target[structs.ScalingTargetJob] = jobPrefix
policy2 := mock.ScalingPolicy()
policy2.Target[structs.ScalingTargetJob] = jobPrefix + "-more"
// Create the policies
var baseIndex uint64 = 1000
err := state.UpsertScalingPolicies(baseIndex, []*structs.ScalingPolicy{policy1, policy2})
require.NoError(err)
iter, err := state.ScalingPoliciesByJob(nil,
policy1.Target[structs.ScalingTargetNamespace],
jobPrefix)
require.NoError(err)
// Ensure we see expected policies
count := 0
found := []string{}
for {
raw := iter.Next()
if raw == nil {
break
}
count++
found = append(found, raw.(*structs.ScalingPolicy).ID)
}
require.Equal(1, count)
expect := []string{policy1.ID}
require.Equal(expect, found)
}
func TestStateStore_UpsertScalingEvent(t *testing.T) { func TestStateStore_UpsertScalingEvent(t *testing.T) {
t.Parallel() t.Parallel()
require := require.New(t) require := require.New(t)