Merge pull request #7541 from hashicorp/b-7539-persist-snapshot-scaling-policy-table

support for persist/restore snapshot of scaling_policy table
This commit is contained in:
Chris Baker 2020-03-29 18:59:20 -05:00 committed by GitHub
commit 2169eb6b3b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 150 additions and 5 deletions

View file

@ -48,6 +48,7 @@ const (
SchedulerConfigSnapshot
ClusterMetadataSnapshot
ServiceIdentityTokenAccessorSnapshot
ScalingPolicySnapshot
)
// LogApplier is the definition of a function that can apply a Raft log
@ -1400,6 +1401,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}
case ScalingPolicySnapshot:
scalingPolicy := new(structs.ScalingPolicy)
if err := dec.Decode(scalingPolicy); err != nil {
return err
}
if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil {
return err
}
default:
// Check if this is an enterprise only object being restored
restorer, ok := n.enterpriseRestorers[snapType]
@ -1660,6 +1671,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistScalingPolicies(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistACLPolicies(sink, encoder); err != nil {
sink.Cancel()
return err
@ -2062,6 +2077,35 @@ func (s *nomadSnapshot) persistClusterMetadata(sink raft.SnapshotSink,
return nil
}
func (s *nomadSnapshot) persistScalingPolicies(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the scaling policies
ws := memdb.NewWatchSet()
scalingPolicies, err := s.snap.ScalingPolicies(ws)
if err != nil {
return err
}
for {
// Get the next item
raw := scalingPolicies.Next()
if raw == nil {
break
}
// Prepare the request struct
scalingPolicy := raw.(*structs.ScalingPolicy)
// Write out a scaling policy snapshot
sink.Write([]byte{byte(ScalingPolicySnapshot)})
if err := encoder.Encode(scalingPolicy); err != nil {
return err
}
}
return nil
}
// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.

View file

@ -4945,6 +4945,21 @@ func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *m
return nil
}
// ScalingPolicies returns an iterator over all the scaling policies
func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
// Walk the entire scaling_policy table
iter, err := txn.Get("scaling_policy", "id")
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}
func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
@ -5229,3 +5244,11 @@ func (r *StateRestore) ClusterMetadataRestore(meta *structs.ClusterMetadata) err
}
return nil
}
// ScalingPolicyRestore is used to restore a scaling policy
func (r *StateRestore) ScalingPolicyRestore(scalingPolicy *structs.ScalingPolicy) error {
if err := r.txn.Insert("scaling_policy", scalingPolicy); err != nil {
return fmt.Errorf("scaling policy insert failed: %v", err)
}
return nil
}

View file

@ -8087,6 +8087,26 @@ func TestStateStore_ClusterMetadataRestore(t *testing.T) {
require.Equal(now, out.CreateTime)
}
func TestStateStore_RestoreScalingPolicy(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
scalingPolicy := mock.ScalingPolicy()
restore, err := state.Restore()
require.NoError(err)
err = restore.ScalingPolicyRestore(scalingPolicy)
require.NoError(err)
restore.Commit()
ws := memdb.NewWatchSet()
out, err := state.ScalingPolicyByID(ws, scalingPolicy.ID)
require.NoError(err)
require.EqualValues(out, scalingPolicy)
}
func TestStateStore_UpsertScalingPolicy(t *testing.T) {
t.Parallel()
require := require.New(t)
@ -8095,19 +8115,27 @@ func TestStateStore_UpsertScalingPolicy(t *testing.T) {
policy := mock.ScalingPolicy()
policy2 := mock.ScalingPolicy()
ws := memdb.NewWatchSet()
_, err := state.ScalingPolicyByTarget(ws, policy.Target)
wsAll := memdb.NewWatchSet()
all, err := state.ScalingPolicies(wsAll)
require.NoError(err)
require.Nil(all.Next())
_, err = state.ScalingPolicyByTarget(ws, policy2.Target)
ws := memdb.NewWatchSet()
out, err := state.ScalingPolicyByTarget(ws, policy.Target)
require.NoError(err)
require.Nil(out)
out, err = state.ScalingPolicyByTarget(ws, policy2.Target)
require.NoError(err)
require.Nil(out)
err = state.UpsertScalingPolicies(1000, []*structs.ScalingPolicy{policy, policy2})
require.NoError(err)
require.True(watchFired(ws))
require.True(watchFired(wsAll))
ws = memdb.NewWatchSet()
out, err := state.ScalingPolicyByTarget(ws, policy.Target)
out, err = state.ScalingPolicyByTarget(ws, policy.Target)
require.NoError(err)
require.Equal(policy, out)
@ -8115,7 +8143,7 @@ func TestStateStore_UpsertScalingPolicy(t *testing.T) {
require.NoError(err)
require.Equal(policy2, out)
iter, err := state.ScalingPoliciesByNamespace(ws, policy.Target[structs.ScalingTargetNamespace])
iter, err := state.ScalingPolicies(ws)
require.NoError(err)
// Ensure we see both policies
@ -8135,6 +8163,56 @@ func TestStateStore_UpsertScalingPolicy(t *testing.T) {
require.False(watchFired(ws))
}
func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) {
t.Parallel()
require := require.New(t)
otherNamespace := "not-default-namespace"
state := testStateStore(t)
policy := mock.ScalingPolicy()
policy2 := mock.ScalingPolicy()
policy2.Target[structs.ScalingTargetNamespace] = otherNamespace
ws1 := memdb.NewWatchSet()
iter, err := state.ScalingPoliciesByNamespace(ws1, structs.DefaultNamespace)
require.NoError(err)
require.Nil(iter.Next())
ws2 := memdb.NewWatchSet()
iter, err = state.ScalingPoliciesByNamespace(ws2, otherNamespace)
require.NoError(err)
require.Nil(iter.Next())
err = state.UpsertScalingPolicies(1000, []*structs.ScalingPolicy{policy, policy2})
require.NoError(err)
require.True(watchFired(ws1))
require.True(watchFired(ws2))
iter, err = state.ScalingPoliciesByNamespace(nil, structs.DefaultNamespace)
require.NoError(err)
policiesInDefaultNamespace := []string{}
for {
raw := iter.Next()
if raw == nil {
break
}
policiesInDefaultNamespace = append(policiesInDefaultNamespace, raw.(*structs.ScalingPolicy).ID)
}
require.ElementsMatch([]string{policy.ID}, policiesInDefaultNamespace)
iter, err = state.ScalingPoliciesByNamespace(nil, otherNamespace)
require.NoError(err)
policiesInOtherNamespace := []string{}
for {
raw := iter.Next()
if raw == nil {
break
}
policiesInOtherNamespace = append(policiesInOtherNamespace, raw.(*structs.ScalingPolicy).ID)
}
require.ElementsMatch([]string{policy2.ID}, policiesInOtherNamespace)
}
func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) {
t.Parallel()