nomad: adding FSM support for evaluations

This commit is contained in:
Armon Dadgar 2015-07-23 15:52:38 -07:00
parent a69c71b239
commit 9b921d9f92
3 changed files with 179 additions and 0 deletions

View file

@ -27,6 +27,7 @@ const (
NodeSnapshot SnapshotType = iota
JobSnapshot
IndexSnapshot
EvalSnapshot
)
// nomadFSM implements a finite state machine that is used
@ -99,6 +100,10 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyRegisterJob(buf[1:], log.Index)
case structs.JobDeregisterRequestType:
return n.applyDeregisterJob(buf[1:], log.Index)
case structs.EvalUpdateRequestType:
return n.applyUpdateEval(buf[1:], log.Index)
case structs.EvalDeleteRequestType:
return n.applyDeleteEval(buf[1:], log.Index)
default:
if ignoreUnknown {
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@ -179,6 +184,34 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
return nil
}
func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "update_eval"}, time.Now())
var req structs.EvalUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertEval(index, req.Eval); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertEval failed: %v", err)
return err
}
return nil
}
func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "delete_eval"}, time.Now())
var req structs.EvalDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteEval(index, req.EvalID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: DeleteEval failed: %v", err)
return err
}
return nil
}
func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) {
// Create a new snapshot
snap, err := n.state.Snapshot()
@ -245,6 +278,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}
case EvalSnapshot:
eval := new(structs.Evaluation)
if err := dec.Decode(eval); err != nil {
return err
}
if err := restore.EvalRestore(eval); err != nil {
return err
}
case IndexSnapshot:
idx := new(IndexEntry)
if err := dec.Decode(idx); err != nil {
@ -289,6 +331,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistEvals(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}
@ -373,6 +419,33 @@ func (s *nomadSnapshot) persistJobs(sink raft.SnapshotSink,
return nil
}
func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the evaluations
evals, err := s.snap.Evals()
if err != nil {
return err
}
for {
// Get the next item
raw := evals.Next()
if raw == nil {
break
}
// Prepare the request struct
eval := raw.(*structs.Evaluation)
// Write out the evaluation
sink.Write([]byte{byte(EvalSnapshot)})
if err := encoder.Encode(eval); err != nil {
return err
}
}
return nil
}
// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.

View file

@ -227,6 +227,75 @@ func TestFSM_DeregisterJob(t *testing.T) {
}
}
func TestFSM_UpdateEval(t *testing.T) {
fsm := testFSM(t)
req := structs.EvalUpdateRequest{
Eval: mockEval(),
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
eval, err := fsm.State().GetEvalByID(req.Eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if eval == nil {
t.Fatalf("not found!")
}
if eval.CreateIndex != 1 {
t.Fatalf("bad index: %d", eval.CreateIndex)
}
}
func TestFSM_DeleteEval(t *testing.T) {
fsm := testFSM(t)
eval := mockEval()
req := structs.EvalUpdateRequest{
Eval: eval,
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
req2 := structs.EvalDeleteRequest{
EvalID: eval.ID,
}
buf, err = structs.Encode(structs.EvalDeleteRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
eval, err = fsm.State().GetEvalByID(req.Eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if eval != nil {
t.Fatalf("eval found!")
}
}
func testSnapshotRestore(t *testing.T, fsm *nomadFSM) *nomadFSM {
// Snapshot
snap, err := fsm.Snapshot()
@ -296,6 +365,28 @@ func TestFSM_SnapshotRestore_Jobs(t *testing.T) {
}
}
func TestFSM_SnapshotRestore_Evals(t *testing.T) {
// Add some state
fsm := testFSM(t)
state := fsm.State()
eval1 := mockEval()
state.UpsertEval(1000, eval1)
eval2 := mockEval()
state.UpsertEval(1001, eval2)
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.GetEvalByID(eval1.ID)
out2, _ := state2.GetEvalByID(eval2.ID)
if !reflect.DeepEqual(eval1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, eval1)
}
if !reflect.DeepEqual(eval2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, eval2)
}
}
func TestFSM_SnapshotRestore_Indexes(t *testing.T) {
// Add some state
fsm := testFSM(t)

View file

@ -21,6 +21,8 @@ const (
NodeUpdateStatusRequestType
JobRegisterRequestType
JobDeregisterRequestType
EvalUpdateRequestType
EvalDeleteRequestType
)
const (
@ -158,6 +160,19 @@ type JobSpecificRequest struct {
WriteRequest
}
// EvalUpdateRequest is used for updating the status
// of an evaluation.
type EvalUpdateRequest struct {
Eval *Evaluation
WriteRequest
}
// EvalDeleteRequest is used for deleting an evaluation.
type EvalDeleteRequest struct {
EvalID string
WriteRequest
}
// GenericResponse is used to respond to a request where no
// specific response information is needed.
type GenericResponse struct {