diff --git a/nomad/fsm.go b/nomad/fsm.go index d14b4f021..a4d1047d1 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -27,6 +27,7 @@ const ( NodeSnapshot SnapshotType = iota JobSnapshot IndexSnapshot + EvalSnapshot ) // nomadFSM implements a finite state machine that is used @@ -99,6 +100,10 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyRegisterJob(buf[1:], log.Index) case structs.JobDeregisterRequestType: return n.applyDeregisterJob(buf[1:], log.Index) + case structs.EvalUpdateRequestType: + return n.applyUpdateEval(buf[1:], log.Index) + case structs.EvalDeleteRequestType: + return n.applyDeleteEval(buf[1:], log.Index) default: if ignoreUnknown { n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) @@ -179,6 +184,34 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { return nil } +func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "update_eval"}, time.Now()) + var req structs.EvalUpdateRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.UpsertEval(index, req.Eval); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpsertEval failed: %v", err) + return err + } + return nil +} + +func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "delete_eval"}, time.Now()) + var req structs.EvalDeleteRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.DeleteEval(index, req.EvalID); err != nil { + n.logger.Printf("[ERR] nomad.fsm: DeleteEval failed: %v", err) + return err + } + return nil +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() @@ -245,6 +278,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } + case EvalSnapshot: + eval := new(structs.Evaluation) + if err := dec.Decode(eval); err != nil { + return err + } + if err := restore.EvalRestore(eval); err != nil { + return err + } + case IndexSnapshot: idx := new(IndexEntry) if err := dec.Decode(idx); err != nil { @@ -289,6 +331,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistEvals(sink, encoder); err != nil { + sink.Cancel() + return err + } return nil } @@ -373,6 +419,33 @@ func (s *nomadSnapshot) persistJobs(sink raft.SnapshotSink, return nil } +func (s *nomadSnapshot) persistEvals(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + // Get all the evaluations + evals, err := s.snap.Evals() + if err != nil { + return err + } + + for { + // Get the next item + raw := evals.Next() + if raw == nil { + break + } + + // Prepare the request struct + eval := raw.(*structs.Evaluation) + + // Write out the evaluation + sink.Write([]byte{byte(EvalSnapshot)}) + if err := encoder.Encode(eval); err != nil { + return err + } + } + return nil +} + // Release is a no-op, as we just need to GC the pointer // to the state store snapshot. There is nothing to explicitly // cleanup. diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index cd6282538..49e7696eb 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -227,6 +227,75 @@ func TestFSM_DeregisterJob(t *testing.T) { } } +func TestFSM_UpdateEval(t *testing.T) { + fsm := testFSM(t) + + req := structs.EvalUpdateRequest{ + Eval: mockEval(), + } + buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + eval, err := fsm.State().GetEvalByID(req.Eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if eval == nil { + t.Fatalf("not found!") + } + if eval.CreateIndex != 1 { + t.Fatalf("bad index: %d", eval.CreateIndex) + } +} + +func TestFSM_DeleteEval(t *testing.T) { + fsm := testFSM(t) + + eval := mockEval() + req := structs.EvalUpdateRequest{ + Eval: eval, + } + buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + req2 := structs.EvalDeleteRequest{ + EvalID: eval.ID, + } + buf, err = structs.Encode(structs.EvalDeleteRequestType, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are NOT registered + eval, err = fsm.State().GetEvalByID(req.Eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if eval != nil { + t.Fatalf("eval found!") + } +} + func testSnapshotRestore(t *testing.T, fsm *nomadFSM) *nomadFSM { // Snapshot snap, err := fsm.Snapshot() @@ -296,6 +365,28 @@ func TestFSM_SnapshotRestore_Jobs(t *testing.T) { } } +func TestFSM_SnapshotRestore_Evals(t *testing.T) { + // Add some state + fsm := testFSM(t) + state := fsm.State() + eval1 := mockEval() + state.UpsertEval(1000, eval1) + eval2 := mockEval() + state.UpsertEval(1001, eval2) + + // Verify the contents + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + out1, _ := state2.GetEvalByID(eval1.ID) + out2, _ := state2.GetEvalByID(eval2.ID) + if !reflect.DeepEqual(eval1, out1) { + t.Fatalf("bad: \n%#v\n%#v", out1, eval1) + } + if !reflect.DeepEqual(eval2, out2) { + t.Fatalf("bad: \n%#v\n%#v", out2, eval2) + } +} + func TestFSM_SnapshotRestore_Indexes(t *testing.T) { // Add some state fsm := testFSM(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5b626a352..0548ee519 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -21,6 +21,8 @@ const ( NodeUpdateStatusRequestType JobRegisterRequestType JobDeregisterRequestType + EvalUpdateRequestType + EvalDeleteRequestType ) const ( @@ -158,6 +160,19 @@ type JobSpecificRequest struct { WriteRequest } +// EvalUpdateRequest is used for updating the status +// of an evaluation. +type EvalUpdateRequest struct { + Eval *Evaluation + WriteRequest +} + +// EvalDeleteRequest is used for deleting an evaluation. +type EvalDeleteRequest struct { + EvalID string + WriteRequest +} + // GenericResponse is used to respond to a request where no // specific response information is needed. type GenericResponse struct {