Making servers reconcile job summaries when they acquire leadership

This commit is contained in:
Diptanu Choudhury 2016-08-05 14:40:35 -07:00
parent 1813a5fa1f
commit 1518f23d0a
4 changed files with 81 additions and 14 deletions

View File

@ -612,9 +612,6 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
if err := n.state.ReconcileJobSummaries(latestIndex); err != nil {
return fmt.Errorf("error reconciling summaries: %v", err)
}
if err := n.reconcileQueuedAllocations(latestIndex); err != nil {
return fmt.Errorf("error re-computing the number of queued allocations:; %v", err)
}
}
return nil

View File

@ -981,6 +981,44 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
fsm := testFSM(t)
state := fsm.State()
// make an allocation
alloc := mock.Alloc()
state.UpsertJob(1010, alloc.Job)
state.UpsertAllocs(1011, []*structs.Allocation{alloc})
// Delete the summary
state.DeleteJobSummary(1040, alloc.Job.ID)
// Delete the index
if err := state.RemoveIndex("job_summary"); err != nil {
t.Fatalf("err: %v", err)
}
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
latestIndex, _ := state.LatestIndex()
out, _ := state2.JobSummaryByID(alloc.Job.ID)
expected := structs.JobSummary{
JobID: alloc.Job.ID,
Summary: map[string]structs.TaskGroupSummary{
"web": structs.TaskGroupSummary{
Starting: 1,
},
},
CreateIndex: 1010,
ModifyIndex: latestIndex,
}
if !reflect.DeepEqual(&expected, out) {
t.Fatalf("expected: %#v, actual: %#v", &expected, out)
}
}
func TestFSM_ReconcileSummaries(t *testing.T) {
// Add some state
fsm := testFSM(t)
state := fsm.State()
// Add a node
node := mock.Node()
state.UpsertNode(800, node)
@ -1000,16 +1038,18 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
state.DeleteJobSummary(1030, job1.ID)
state.DeleteJobSummary(1040, alloc.Job.ID)
// Delete the index
if err := state.RemoveIndex("job_summary"); err != nil {
req := structs.GenericRequest{}
buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
latestIndex, _ := state.LatestIndex()
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
out1, _ := state2.JobSummaryByID(job1.ID)
out1, _ := state.JobSummaryByID(job1.ID)
expected := structs.JobSummary{
JobID: job1.ID,
Summary: map[string]structs.TaskGroupSummary{
@ -1018,7 +1058,7 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
},
},
CreateIndex: 1000,
ModifyIndex: latestIndex,
ModifyIndex: out1.ModifyIndex,
}
if !reflect.DeepEqual(&expected, out1) {
t.Fatalf("expected: %#v, actual: %#v", &expected, out1)
@ -1027,7 +1067,7 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
// This exercises the code path which adds the allocations made by the
// planner and the number of unplaced allocations in the reconcile summaries
// codepath
out2, _ := state2.JobSummaryByID(alloc.Job.ID)
out2, _ := state.JobSummaryByID(alloc.Job.ID)
expected = structs.JobSummary{
JobID: alloc.Job.ID,
Summary: map[string]structs.TaskGroupSummary{
@ -1037,7 +1077,7 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
},
},
CreateIndex: 1010,
ModifyIndex: latestIndex,
ModifyIndex: out2.ModifyIndex,
}
if !reflect.DeepEqual(&expected, out2) {
t.Fatalf("expected: %#v, actual: %#v", &expected, out2)

View File

@ -3,6 +3,7 @@ package nomad
import (
"errors"
"fmt"
"strings"
"time"
"github.com/armon/go-metrics"
@ -166,6 +167,17 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
s.logger.Printf("[ERR] nomad: heartbeat timer setup failed: %v", err)
return err
}
// COMPAT 0.4 - 0.4.1
// Reconcile the summaries of the registered jobs. We reconcile summaries
// only if the server is 0.4.1 since summaries are not present in 0.4 they
// might be incorrect after upgrading to 0.4.1 the summaries might not be
// correct
if strings.HasPrefix(s.config.Build, "0.4.1") {
if err := s.reconcileJobSummaries(); err != nil {
return fmt.Errorf("unable to reconcile job summaries: %v", err)
}
}
return nil
}
@ -458,6 +470,25 @@ func (s *Server) reconcileMember(member serf.Member) error {
return nil
}
// reconcileJobSummaries reconciles the summaries of all the jobs registered in
// the system
// COMPAT 0.4 -> 0.4.1
func (s *Server) reconcileJobSummaries() error {
index, err := s.fsm.state.LatestIndex()
if err != nil {
return fmt.Errorf("unable to read latest index: %v", err)
}
s.logger.Printf("[DEBUG] leader: reconciling job summaries at index: %v", index)
args := &structs.GenericResponse{}
msg := structs.ReconcileJobSummariesRequestType | structs.IgnoreUnknownTypeFlag
if _, _, err = s.raftApply(msg, args); err != nil {
return fmt.Errorf("reconciliation of job summaries failed: %v", err)
}
return nil
}
// addRaftPeer is used to add a new Raft peer when a Nomad server joins
func (s *Server) addRaftPeer(m serf.Member, parts *serverParts) error {
// Check for possibility of multiple bootstrap nodes

View File

@ -37,8 +37,7 @@ func (s *System) ReconcileJobSummaries(args *structs.GenericRequest, reply *stru
_, index, err := s.srv.raftApply(structs.ReconcileJobSummariesRequestType, args)
if err != nil {
s.srv.logger.Printf("[ERR] nomad.client: Reconcile failed: %v", err)
return err
return fmt.Errorf("reconciliation of job summaries failed: %v", err)
}
reply.Index = index
return nil