nomad: testing fsm snapshot and restore
This commit is contained in:
parent
410f3a0555
commit
f16fc93d85
116
nomad/fsm.go
116
nomad/fsm.go
|
@ -7,10 +7,18 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
var (
|
||||
msgpackHandle = &codec.MsgpackHandle{
|
||||
RawToString: true,
|
||||
WriteExt: true,
|
||||
}
|
||||
)
|
||||
|
||||
// nomadFSM implements a finite state machine that is used
|
||||
// along with Raft to provide strong consistency. We implement
|
||||
// this outside the Server to avoid exposing this outside the package.
|
||||
|
@ -24,7 +32,11 @@ type nomadFSM struct {
|
|||
// state in a way that can be accessed concurrently with operations
|
||||
// that may modify the live state.
|
||||
type nomadSnapshot struct {
|
||||
state *StateSnapshot
|
||||
snap *StateSnapshot
|
||||
}
|
||||
|
||||
// snapshotHeader is the first entry in our snapshot
|
||||
type snapshotHeader struct {
|
||||
}
|
||||
|
||||
// NewFSMPath is used to construct a new FSM with a blank state
|
||||
|
@ -139,14 +151,112 @@ func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
|
|||
|
||||
func (n *nomadFSM) Restore(old io.ReadCloser) error {
|
||||
defer old.Close()
|
||||
|
||||
// Create a new state store
|
||||
state, err := NewStateStore(n.logOutput)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.state = state
|
||||
|
||||
// Start the state restore
|
||||
restore, err := state.Restore()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer restore.Abort()
|
||||
|
||||
// Create a decoder
|
||||
dec := codec.NewDecoder(old, msgpackHandle)
|
||||
|
||||
// Read in the header
|
||||
var header snapshotHeader
|
||||
if err := dec.Decode(&header); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Populate the new state
|
||||
msgType := make([]byte, 1)
|
||||
for {
|
||||
// Read the message type
|
||||
_, err := old.Read(msgType)
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Decode
|
||||
switch structs.MessageType(msgType[0]) {
|
||||
case structs.RegisterRequestType:
|
||||
var req structs.RegisterRequest
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.NodeRestore(req.Node); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Unrecognized msg type: %v", msgType)
|
||||
}
|
||||
}
|
||||
|
||||
// Commit the state restore
|
||||
restore.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now())
|
||||
// Register the nodes
|
||||
encoder := codec.NewEncoder(sink, msgpackHandle)
|
||||
|
||||
// Write the header
|
||||
header := snapshotHeader{}
|
||||
if err := encoder.Encode(&header); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
// Write all the data out
|
||||
if err := s.persistNodes(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *nomadSnapshot) Release() {
|
||||
return
|
||||
func (s *nomadSnapshot) persistNodes(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
// Get all the nodes
|
||||
nodes, err := s.snap.Nodes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var req structs.RegisterRequest
|
||||
for {
|
||||
// Get the next item
|
||||
raw := nodes.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Prepare the request struct
|
||||
node := raw.(*structs.Node)
|
||||
req = structs.RegisterRequest{Node: node}
|
||||
|
||||
// Write out a node registration
|
||||
sink.Write([]byte{byte(structs.RegisterRequestType)})
|
||||
if err := encoder.Encode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Release is a no-op, as we just need to GC the pointer
|
||||
// to the state store snapshot. There is nothing to explicitly
|
||||
// cleanup.
|
||||
func (s *nomadSnapshot) Release() {}
|
||||
|
|
|
@ -1,13 +1,33 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
type MockSink struct {
|
||||
*bytes.Buffer
|
||||
cancel bool
|
||||
}
|
||||
|
||||
func (m *MockSink) ID() string {
|
||||
return "Mock"
|
||||
}
|
||||
|
||||
func (m *MockSink) Cancel() error {
|
||||
m.cancel = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MockSink) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func testFSM(t *testing.T) *nomadFSM {
|
||||
fsm, err := NewFSM(os.Stderr)
|
||||
if err != nil {
|
||||
|
@ -137,3 +157,50 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
|
|||
t.Fatalf("bad node: %#v", node)
|
||||
}
|
||||
}
|
||||
|
||||
func testSnapshotRestore(t *testing.T, fsm *nomadFSM) *nomadFSM {
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer snap.Release()
|
||||
|
||||
// Persist
|
||||
buf := bytes.NewBuffer(nil)
|
||||
sink := &MockSink{buf, false}
|
||||
if err := snap.Persist(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Try to restore on a new FSM
|
||||
fsm2 := testFSM(t)
|
||||
|
||||
// Do a restore
|
||||
if err := fsm2.Restore(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
return fsm2
|
||||
}
|
||||
|
||||
func TestFSM_SnapshotRestore_Nodes(t *testing.T) {
|
||||
// Add some state
|
||||
fsm := testFSM(t)
|
||||
state := fsm.State()
|
||||
node1 := mockNode()
|
||||
state.RegisterNode(1000, node1)
|
||||
node2 := mockNode()
|
||||
state.RegisterNode(1001, node2)
|
||||
|
||||
// Verify the contents
|
||||
fsm2 := testSnapshotRestore(t, fsm)
|
||||
state2 := fsm2.State()
|
||||
out1, _ := state2.GetNodeByID(node1.ID)
|
||||
out2, _ := state2.GetNodeByID(node2.ID)
|
||||
if !reflect.DeepEqual(node1, out1) {
|
||||
t.Fatalf("bad: \n%#v\n%#v", out1, node1)
|
||||
}
|
||||
if !reflect.DeepEqual(node2, out2) {
|
||||
t.Fatalf("bad: \n%#v\n%#v", out2, node2)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,13 +25,13 @@ func mockNode() *structs.Node {
|
|||
ID: generateUUID(),
|
||||
Datacenter: "dc1",
|
||||
Name: "foobar",
|
||||
Attributes: map[string]interface{}{
|
||||
Attributes: map[string]string{
|
||||
"os": "linux",
|
||||
"arch": "x86",
|
||||
"version": "0.1.0",
|
||||
"driver.docker": 1,
|
||||
"driver.docker": "1.0.0",
|
||||
},
|
||||
Resouces: &structs.Resources{
|
||||
Resources: &structs.Resources{
|
||||
CPU: 4.0,
|
||||
MemoryMB: 8192,
|
||||
DiskMB: 100 * 1024,
|
||||
|
@ -50,7 +50,7 @@ func mockNode() *structs.Node {
|
|||
MemoryMB: 256,
|
||||
DiskMB: 4 * 1024,
|
||||
},
|
||||
Links: map[string]interface{}{
|
||||
Links: map[string]string{
|
||||
"consul": "foobar.dc1",
|
||||
},
|
||||
Meta: map[string]string{
|
||||
|
|
|
@ -160,11 +160,11 @@ type Node struct {
|
|||
// data that can be used for constraints. Examples
|
||||
// include "os=linux", "arch=386", "driver.docker=1",
|
||||
// "docker.runtime=1.8.3"
|
||||
Attributes map[string]interface{}
|
||||
Attributes map[string]string
|
||||
|
||||
// Resources is the available resources on the client.
|
||||
// For example 'cpu=2' 'memory=2048'
|
||||
Resouces *Resources
|
||||
Resources *Resources
|
||||
|
||||
// Reserved is the set of resources that are reserved,
|
||||
// and should be subtracted from the total resources for
|
||||
|
@ -181,7 +181,7 @@ type Node struct {
|
|||
// Links are used to 'link' this client to external
|
||||
// systems. For example 'consul=foo.dc1' 'aws=i-83212'
|
||||
// 'ami=ami-123'
|
||||
Links map[string]interface{}
|
||||
Links map[string]string
|
||||
|
||||
// Meta is used to associate arbitrary metadata with this
|
||||
// client. This is opaque to Nomad.
|
||||
|
|
Loading…
Reference in New Issue