diff --git a/nomad/fsm.go b/nomad/fsm.go index d5ba49298..4b757b89a 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -612,9 +612,6 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := n.state.ReconcileJobSummaries(latestIndex); err != nil { return fmt.Errorf("error reconciling summaries: %v", err) } - if err := n.reconcileQueuedAllocations(latestIndex); err != nil { - return fmt.Errorf("error re-computing the number of queued allocations:; %v", err) - } } return nil diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index b06816396..2274ea27d 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -981,6 +981,44 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { fsm := testFSM(t) state := fsm.State() + // make an allocation + alloc := mock.Alloc() + state.UpsertJob(1010, alloc.Job) + state.UpsertAllocs(1011, []*structs.Allocation{alloc}) + + // Delete the summary + state.DeleteJobSummary(1040, alloc.Job.ID) + + // Delete the index + if err := state.RemoveIndex("job_summary"); err != nil { + t.Fatalf("err: %v", err) + } + + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + latestIndex, _ := state.LatestIndex() + + out, _ := state2.JobSummaryByID(alloc.Job.ID) + expected := structs.JobSummary{ + JobID: alloc.Job.ID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{ + Starting: 1, + }, + }, + CreateIndex: 1010, + ModifyIndex: latestIndex, + } + if !reflect.DeepEqual(&expected, out) { + t.Fatalf("expected: %#v, actual: %#v", &expected, out) + } +} + +func TestFSM_ReconcileSummaries(t *testing.T) { + // Add some state + fsm := testFSM(t) + state := fsm.State() + // Add a node node := mock.Node() state.UpsertNode(800, node) @@ -1000,16 +1038,18 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { state.DeleteJobSummary(1030, job1.ID) state.DeleteJobSummary(1040, alloc.Job.ID) - // Delete the index - if err := state.RemoveIndex("job_summary"); err != nil { + req := structs.GenericRequest{} + buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req) + if err != nil { t.Fatalf("err: %v", err) } - fsm2 := testSnapshotRestore(t, fsm) - state2 := fsm2.State() - latestIndex, _ := state.LatestIndex() + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } - out1, _ := state2.JobSummaryByID(job1.ID) + out1, _ := state.JobSummaryByID(job1.ID) expected := structs.JobSummary{ JobID: job1.ID, Summary: map[string]structs.TaskGroupSummary{ @@ -1018,7 +1058,7 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { }, }, CreateIndex: 1000, - ModifyIndex: latestIndex, + ModifyIndex: out1.ModifyIndex, } if !reflect.DeepEqual(&expected, out1) { t.Fatalf("expected: %#v, actual: %#v", &expected, out1) @@ -1027,7 +1067,7 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { // This exercises the code path which adds the allocations made by the // planner and the number of unplaced allocations in the reconcile summaries // codepath - out2, _ := state2.JobSummaryByID(alloc.Job.ID) + out2, _ := state.JobSummaryByID(alloc.Job.ID) expected = structs.JobSummary{ JobID: alloc.Job.ID, Summary: map[string]structs.TaskGroupSummary{ @@ -1037,7 +1077,7 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { }, }, CreateIndex: 1010, - ModifyIndex: latestIndex, + ModifyIndex: out2.ModifyIndex, } if !reflect.DeepEqual(&expected, out2) { t.Fatalf("expected: %#v, actual: %#v", &expected, out2) diff --git a/nomad/leader.go b/nomad/leader.go index ab8b10c85..9340385d0 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -3,6 +3,7 @@ package nomad import ( "errors" "fmt" + "strings" "time" "github.com/armon/go-metrics" @@ -166,6 +167,17 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { s.logger.Printf("[ERR] nomad: heartbeat timer setup failed: %v", err) return err } + + // COMPAT 0.4 - 0.4.1 + // Reconcile the summaries of the registered jobs. We reconcile summaries + // only if the server is 0.4.1 since summaries are not present in 0.4 they + // might be incorrect after upgrading to 0.4.1 the summaries might not be + // correct + if strings.HasPrefix(s.config.Build, "0.4.1") { + if err := s.reconcileJobSummaries(); err != nil { + return fmt.Errorf("unable to reconcile job summaries: %v", err) + } + } return nil } @@ -458,6 +470,25 @@ func (s *Server) reconcileMember(member serf.Member) error { return nil } +// reconcileJobSummaries reconciles the summaries of all the jobs registered in +// the system +// COMPAT 0.4 -> 0.4.1 +func (s *Server) reconcileJobSummaries() error { + index, err := s.fsm.state.LatestIndex() + if err != nil { + return fmt.Errorf("unable to read latest index: %v", err) + } + s.logger.Printf("[DEBUG] leader: reconciling job summaries at index: %v", index) + + args := &structs.GenericResponse{} + msg := structs.ReconcileJobSummariesRequestType | structs.IgnoreUnknownTypeFlag + if _, _, err = s.raftApply(msg, args); err != nil { + return fmt.Errorf("reconciliation of job summaries failed: %v", err) + } + + return nil +} + // addRaftPeer is used to add a new Raft peer when a Nomad server joins func (s *Server) addRaftPeer(m serf.Member, parts *serverParts) error { // Check for possibility of multiple bootstrap nodes diff --git a/nomad/system_endpoint.go b/nomad/system_endpoint.go index 65a0b7c52..92d35845b 100644 --- a/nomad/system_endpoint.go +++ b/nomad/system_endpoint.go @@ -37,8 +37,7 @@ func (s *System) ReconcileJobSummaries(args *structs.GenericRequest, reply *stru _, index, err := s.srv.raftApply(structs.ReconcileJobSummariesRequestType, args) if err != nil { - s.srv.logger.Printf("[ERR] nomad.client: Reconcile failed: %v", err) - return err + return fmt.Errorf("reconciliation of job summaries failed: %v", err) } reply.Index = index return nil