nomad: add FSM support for Jobs
This commit is contained in:
parent
1947a19885
commit
bc0cea6d26
41
nomad/fsm.go
41
nomad/fsm.go
|
@ -25,6 +25,7 @@ type SnapshotType byte
|
|||
|
||||
const (
|
||||
NodeSnapshot SnapshotType = iota
|
||||
JobSnapshot
|
||||
IndexSnapshot
|
||||
)
|
||||
|
||||
|
@ -235,6 +236,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
|
|||
return err
|
||||
}
|
||||
|
||||
case JobSnapshot:
|
||||
job := new(structs.Job)
|
||||
if err := dec.Decode(job); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.JobRestore(job); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case IndexSnapshot:
|
||||
idx := new(IndexEntry)
|
||||
if err := dec.Decode(idx); err != nil {
|
||||
|
@ -275,6 +285,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
|
|||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
if err := s.persistJobs(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -332,6 +346,33 @@ func (s *nomadSnapshot) persistNodes(sink raft.SnapshotSink,
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *nomadSnapshot) persistJobs(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
// Get all the jobs
|
||||
jobs, err := s.snap.Jobs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
// Get the next item
|
||||
raw := jobs.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Prepare the request struct
|
||||
job := raw.(*structs.Job)
|
||||
|
||||
// Write out a job registration
|
||||
sink.Write([]byte{byte(JobSnapshot)})
|
||||
if err := encoder.Encode(job); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Release is a no-op, as we just need to GC the pointer
|
||||
// to the state store snapshot. There is nothing to explicitly
|
||||
// cleanup.
|
||||
|
|
|
@ -158,6 +158,75 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFSM_RegisterJob(t *testing.T) {
|
||||
fsm := testFSM(t)
|
||||
|
||||
req := structs.JobRegisterRequest{
|
||||
Job: mockJob(),
|
||||
}
|
||||
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify we are registered
|
||||
job, err := fsm.State().GetJobByName(req.Job.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if job == nil {
|
||||
t.Fatalf("not found!")
|
||||
}
|
||||
if job.CreateIndex != 1 {
|
||||
t.Fatalf("bad index: %d", job.CreateIndex)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_DeregisterJob(t *testing.T) {
|
||||
fsm := testFSM(t)
|
||||
|
||||
job := mockJob()
|
||||
req := structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
}
|
||||
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
req2 := structs.JobDeregisterRequest{
|
||||
JobName: job.Name,
|
||||
}
|
||||
buf, err = structs.Encode(structs.JobDeregisterRequestType, req2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp = fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify we are NOT registered
|
||||
job, err = fsm.State().GetJobByName(req.Job.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if job != nil {
|
||||
t.Fatalf("job found!")
|
||||
}
|
||||
}
|
||||
|
||||
func testSnapshotRestore(t *testing.T, fsm *nomadFSM) *nomadFSM {
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
|
@ -205,6 +274,28 @@ func TestFSM_SnapshotRestore_Nodes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFSM_SnapshotRestore_Jobs(t *testing.T) {
|
||||
// Add some state
|
||||
fsm := testFSM(t)
|
||||
state := fsm.State()
|
||||
job1 := mockJob()
|
||||
state.RegisterJob(1000, job1)
|
||||
job2 := mockJob()
|
||||
state.RegisterJob(1001, job2)
|
||||
|
||||
// Verify the contents
|
||||
fsm2 := testSnapshotRestore(t, fsm)
|
||||
state2 := fsm2.State()
|
||||
out1, _ := state2.GetJobByName(job1.Name)
|
||||
out2, _ := state2.GetJobByName(job2.Name)
|
||||
if !reflect.DeepEqual(job1, out1) {
|
||||
t.Fatalf("bad: \n%#v\n%#v", out1, job1)
|
||||
}
|
||||
if !reflect.DeepEqual(job2, out2) {
|
||||
t.Fatalf("bad: \n%#v\n%#v", out2, job2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_SnapshotRestore_Indexes(t *testing.T) {
|
||||
// Add some state
|
||||
fsm := testFSM(t)
|
||||
|
|
Loading…
Reference in a new issue