nomad: adding FSM snapshot/restore of ACL policies
This commit is contained in:
parent
4cb544e8f3
commit
10b583ea38
42
nomad/fsm.go
42
nomad/fsm.go
|
@ -41,6 +41,7 @@ const (
|
|||
VaultAccessorSnapshot
|
||||
JobVersionSnapshot
|
||||
DeploymentSnapshot
|
||||
ACLPolicySnapshot
|
||||
)
|
||||
|
||||
// nomadFSM implements a finite state machine that is used
|
||||
|
@ -826,6 +827,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
|
|||
return err
|
||||
}
|
||||
|
||||
case ACLPolicySnapshot:
|
||||
policy := new(structs.ACLPolicy)
|
||||
if err := dec.Decode(policy); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.ACLPolicyRestore(policy); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Unrecognized snapshot type: %v", msgType)
|
||||
}
|
||||
|
@ -1032,6 +1042,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
|
|||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
if err := s.persistACLPolicies(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1308,6 +1322,34 @@ func (s *nomadSnapshot) persistDeployments(sink raft.SnapshotSink,
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *nomadSnapshot) persistACLPolicies(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
// Get all the jobs
|
||||
ws := memdb.NewWatchSet()
|
||||
policies, err := s.snap.ACLPolicies(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
// Get the next item
|
||||
raw := policies.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Prepare the request struct
|
||||
policy := raw.(*structs.ACLPolicy)
|
||||
|
||||
// Write out a job registration
|
||||
sink.Write([]byte{byte(ACLPolicySnapshot)})
|
||||
if err := encoder.Encode(policy); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Release is a no-op, as we just need to GC the pointer
|
||||
// to the state store snapshot. There is nothing to explicitly
|
||||
// cleanup.
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/kr/pretty"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type MockSink struct {
|
||||
|
@ -1858,6 +1859,26 @@ func TestFSM_SnapshotRestore_Deployments(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFSM_SnapshotRestore_ACLPolicy(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Add some state
|
||||
fsm := testFSM(t)
|
||||
state := fsm.State()
|
||||
p1 := mock.ACLPolicy()
|
||||
p2 := mock.ACLPolicy()
|
||||
state.UpsertACLPolicy(1000, p1)
|
||||
state.UpsertACLPolicy(1001, p2)
|
||||
|
||||
// Verify the contents
|
||||
fsm2 := testSnapshotRestore(t, fsm)
|
||||
state2 := fsm2.State()
|
||||
ws := memdb.NewWatchSet()
|
||||
out1, _ := state2.ACLPolicyByName(ws, p1.Name)
|
||||
out2, _ := state2.ACLPolicyByName(ws, p2.Name)
|
||||
assert.Equal(t, p1, out1)
|
||||
assert.Equal(t, p2, out2)
|
||||
}
|
||||
|
||||
func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Add some state
|
||||
|
|
Loading…
Reference in New Issue