diff --git a/nomad/deploymentwatcher/batcher.go b/nomad/deploymentwatcher/batcher.go index 29df3af51..a47ea37fa 100644 --- a/nomad/deploymentwatcher/batcher.go +++ b/nomad/deploymentwatcher/batcher.go @@ -8,6 +8,12 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // evalBatchDuration is the duration in which evaluations are batched before + // commiting to Raft. + evalBatchDuration = 200 * time.Millisecond +) + // EvalBatcher is used to batch the creation of evaluations type EvalBatcher struct { // raft is used to actually commit the evaluations @@ -32,6 +38,7 @@ func NewEvalBatcher(raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatc b := &EvalBatcher{ raft: raft, ctx: ctx, + inCh: make(chan *structs.Evaluation, 10), } go b.batcher() @@ -54,7 +61,7 @@ func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture { // batcher is the long lived batcher goroutine func (b *EvalBatcher) batcher() { - ticker := time.NewTicker(200 * time.Millisecond) + ticker := time.NewTicker(evalBatchDuration) evals := make(map[string]*structs.Evaluation) for { select { diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index da01a0c48..2c578f595 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -241,7 +241,7 @@ func (w *deploymentWatcher) watch() { // Block getting all allocations that are part of the deployment using // the last evaluation index. This will have us block waiting for // something to change past what the scheduler has evaluated. - allocs, err := w.getAllocs(latestEval) + allocResp, err := w.getAllocs(latestEval) if err != nil { if err == context.Canceled { return @@ -263,7 +263,7 @@ func (w *deploymentWatcher) watch() { // Create an evaluation trigger if there is any allocation whose // deployment status has been updated past the latest eval index. createEval, failDeployment, rollback := false, false, false - for _, alloc := range allocs { + for _, alloc := range allocResp.Allocations { if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.ModifyIndex <= latestEval { continue } @@ -303,7 +303,9 @@ func (w *deploymentWatcher) watch() { // Description should include that the job is being rolled back to // version N - desc = fmt.Sprintf("%s - rolling back to job version %d", desc, j.Version) + if j != nil { + desc = structs.DeploymentStatusDescriptionRollback(desc, j.Version) + } } // Update the status of the deployment to failed and create an @@ -316,7 +318,7 @@ func (w *deploymentWatcher) watch() { } } else if createEval { // Create an eval to push the deployment along - w.createEvalBatched() + w.createEvalBatched(allocResp.Index) } } } @@ -348,7 +350,7 @@ func (w *deploymentWatcher) createEval() (evalID string, evalCreateIndex uint64, } // createEvalBatched creates an eval but batches calls together -func (w *deploymentWatcher) createEvalBatched() { +func (w *deploymentWatcher) createEvalBatched(forIndex uint64) { w.l.Lock() defer w.l.Unlock() @@ -356,6 +358,9 @@ func (w *deploymentWatcher) createEvalBatched() { return } + w.logger.Printf("[TRACE] nomad.deployment_watcher: creating eval for index %d %q", forIndex, w.d.ID) + w.outstandingBatch = true + go func() { // Sleep til the batching period is over time.Sleep(evalBatchPeriod) @@ -394,7 +399,7 @@ func (w *deploymentWatcher) getDeploymentStatusUpdate(status, desc string) *stru // getAllocs retrieves the allocations that are part of the deployment blocking // at the given index. -func (w *deploymentWatcher) getAllocs(index uint64) ([]*structs.AllocListStub, error) { +func (w *deploymentWatcher) getAllocs(index uint64) (*structs.AllocListResponse, error) { // Build the request args := &structs.DeploymentSpecificRequest{ DeploymentID: w.d.ID, @@ -414,7 +419,7 @@ func (w *deploymentWatcher) getAllocs(index uint64) ([]*structs.AllocListStub, e } } - return resp.Allocations, nil + return &resp, nil } // latestEvalIndex returns the index of the last evaluation created for diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index fb1880a52..a6c952856 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -56,7 +56,7 @@ type DeploymentStateWatchers interface { const ( // limitStateQueriesPerSecond is the number of state queries allowed per // second - limitStateQueriesPerSecond = 10.0 + limitStateQueriesPerSecond = 15.0 ) // Watcher is used to watch deployments and their allocations created diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index 2988c624d..527f3778e 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -1,337 +1,19 @@ package deploymentwatcher import ( - "log" - "os" - "reflect" - "strings" - "sync" + "fmt" "testing" "time" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" mocker "github.com/stretchr/testify/mock" ) -// TODO -// Test evaluations are batched between watchers -// Test allocation watcher -// Test that evaluation due to allocation changes are batched - -func testLogger() *log.Logger { - return log.New(os.Stderr, "", log.LstdFlags) -} - -type mockBackend struct { - mocker.Mock - index uint64 - state *state.StateStore - l sync.Mutex -} - -func newMockBackend(t *testing.T) *mockBackend { - state, err := state.NewStateStore(os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - if state == nil { - t.Fatalf("missing state") - } - return &mockBackend{ - index: 10000, - state: state, - } -} - -func (m *mockBackend) nextIndex() uint64 { - m.l.Lock() - defer m.l.Unlock() - i := m.index - m.index++ - return i -} - -func (m *mockBackend) UpsertEvals(evals []*structs.Evaluation) (uint64, error) { - m.Called(evals) - i := m.nextIndex() - return i, m.state.UpsertEvals(i, evals) -} - -func (m *mockBackend) UpsertJob(job *structs.Job) (uint64, error) { - m.Called(job) - i := m.nextIndex() - return i, m.state.UpsertJob(i, job) -} - -func (m *mockBackend) UpsertDeploymentStatusUpdate(u *structs.DeploymentStatusUpdateRequest) (uint64, error) { - m.Called(u) - i := m.nextIndex() - return i, m.state.UpsertDeploymentStatusUpdate(i, u) -} - -// matchDeploymentStatusUpdateConfig is used to configure the matching -// function -type matchDeploymentStatusUpdateConfig struct { - // DeploymentID is the expected ID - DeploymentID string - - // Status is the desired status - Status string - - // StatusDescription is the desired status description - StatusDescription string - - // JobVersion marks whether we expect a roll back job at the given version - JobVersion *uint64 - - // Eval marks whether we expect an evaluation. - Eval bool -} - -// matchDeploymentStatusUpdateRequest is used to match an update request -func matchDeploymentStatusUpdateRequest(c *matchDeploymentStatusUpdateConfig) func(args *structs.DeploymentStatusUpdateRequest) bool { - return func(args *structs.DeploymentStatusUpdateRequest) bool { - if args.DeploymentUpdate.DeploymentID != c.DeploymentID { - return false - } - - if args.DeploymentUpdate.Status != c.Status && args.DeploymentUpdate.StatusDescription != c.StatusDescription { - return false - } - - if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { - return false - } - - if (c.JobVersion != nil && (args.Job == nil || args.Job.Version != *c.JobVersion)) || c.JobVersion == nil && args.Job != nil { - return false - } - - return true - } -} - -func (m *mockBackend) UpsertDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error) { - m.Called(req) - i := m.nextIndex() - return i, m.state.UpsertDeploymentPromotion(i, req) -} - -// matchDeploymentPromoteRequestConfig is used to configure the matching -// function -type matchDeploymentPromoteRequestConfig struct { - // Promotion holds the expected promote request - Promotion *structs.DeploymentPromoteRequest - - // Eval marks whether we expect an evaluation. - Eval bool -} - -// matchDeploymentPromoteRequest is used to match a promote request -func matchDeploymentPromoteRequest(c *matchDeploymentPromoteRequestConfig) func(args *structs.ApplyDeploymentPromoteRequest) bool { - return func(args *structs.ApplyDeploymentPromoteRequest) bool { - if !reflect.DeepEqual(*c.Promotion, args.DeploymentPromoteRequest) { - return false - } - - if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { - return false - } - - return true - } -} -func (m *mockBackend) UpsertDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) { - m.Called(req) - i := m.nextIndex() - return i, m.state.UpsertDeploymentAllocHealth(i, req) -} - -// matchDeploymentAllocHealthRequestConfig is used to configure the matching -// function -type matchDeploymentAllocHealthRequestConfig struct { - // DeploymentID is the expected ID - DeploymentID string - - // Healthy and Unhealthy contain the expected allocation IDs that are having - // their health set - Healthy, Unhealthy []string - - // DeploymentUpdate holds the expected values of status and description. We - // don't check for exact match but string contains - DeploymentUpdate *structs.DeploymentStatusUpdate - - // JobVersion marks whether we expect a roll back job at the given version - JobVersion *uint64 - - // Eval marks whether we expect an evaluation. - Eval bool -} - -// matchDeploymentAllocHealthRequest is used to match an update request -func matchDeploymentAllocHealthRequest(c *matchDeploymentAllocHealthRequestConfig) func(args *structs.ApplyDeploymentAllocHealthRequest) bool { - return func(args *structs.ApplyDeploymentAllocHealthRequest) bool { - if args.DeploymentID != c.DeploymentID { - return false - } - - if len(c.Healthy) != len(args.HealthyAllocationIDs) { - return false - } - if len(c.Unhealthy) != len(args.UnhealthyAllocationIDs) { - return false - } - - hmap, umap := make(map[string]struct{}, len(c.Healthy)), make(map[string]struct{}, len(c.Unhealthy)) - for _, h := range c.Healthy { - hmap[h] = struct{}{} - } - for _, u := range c.Unhealthy { - umap[u] = struct{}{} - } - - for _, h := range args.HealthyAllocationIDs { - if _, ok := hmap[h]; !ok { - return false - } - } - for _, u := range args.UnhealthyAllocationIDs { - if _, ok := umap[u]; !ok { - return false - } - } - - if c.DeploymentUpdate != nil { - if args.DeploymentUpdate == nil { - return false - } - - if !strings.Contains(args.DeploymentUpdate.Status, c.DeploymentUpdate.Status) { - return false - } - if !strings.Contains(args.DeploymentUpdate.StatusDescription, c.DeploymentUpdate.StatusDescription) { - return false - } - } else if args.DeploymentUpdate != nil { - return false - } - - if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { - return false - } - - if (c.JobVersion != nil && (args.Job == nil || args.Job.Version != *c.JobVersion)) || c.JobVersion == nil && args.Job != nil { - return false - } - - return true - } -} - -func (m *mockBackend) Evaluations(args *structs.JobSpecificRequest, reply *structs.JobEvaluationsResponse) error { - rargs := m.Called(args, reply) - return rargs.Error(0) -} - -func (m *mockBackend) evaluationsFromState(in mocker.Arguments) { - args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.JobEvaluationsResponse) - ws := memdb.NewWatchSet() - evals, _ := m.state.EvalsByJob(ws, args.JobID) - reply.Evaluations = evals - reply.Index = m.nextIndex() -} - -func (m *mockBackend) Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error { - rargs := m.Called(args, reply) - return rargs.Error(0) -} - -func (m *mockBackend) allocationsFromState(in mocker.Arguments) { - args, reply := in.Get(0).(*structs.DeploymentSpecificRequest), in.Get(1).(*structs.AllocListResponse) - ws := memdb.NewWatchSet() - allocs, _ := m.state.AllocsByDeployment(ws, args.DeploymentID) - - var stubs []*structs.AllocListStub - for _, a := range allocs { - stubs = append(stubs, a.Stub()) - } - - reply.Allocations = stubs - reply.Index = m.nextIndex() -} - -func (m *mockBackend) List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error { - rargs := m.Called(args, reply) - return rargs.Error(0) -} - -func (m *mockBackend) listFromState(in mocker.Arguments) { - reply := in.Get(1).(*structs.DeploymentListResponse) - ws := memdb.NewWatchSet() - iter, _ := m.state.Deployments(ws) - - var deploys []*structs.Deployment - for { - raw := iter.Next() - if raw == nil { - break - } - - deploys = append(deploys, raw.(*structs.Deployment)) - } - - reply.Deployments = deploys - reply.Index = m.nextIndex() -} - -func (m *mockBackend) GetJobVersions(args *structs.JobSpecificRequest, reply *structs.JobVersionsResponse) error { - rargs := m.Called(args, reply) - return rargs.Error(0) -} - -func (m *mockBackend) getJobVersionsFromState(in mocker.Arguments) { - args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.JobVersionsResponse) - ws := memdb.NewWatchSet() - versions, _ := m.state.JobVersionsByID(ws, args.JobID) - reply.Versions = versions - reply.Index = m.nextIndex() -} - -func (m *mockBackend) GetJob(args *structs.JobSpecificRequest, reply *structs.SingleJobResponse) error { - rargs := m.Called(args, reply) - return rargs.Error(0) -} - -func (m *mockBackend) getJobFromState(in mocker.Arguments) { - args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.SingleJobResponse) - ws := memdb.NewWatchSet() - job, _ := m.state.JobByID(ws, args.JobID) - reply.Job = job - reply.Index = m.nextIndex() -} - -// matchDeploymentSpecificRequest is used to match that a deployment specific -// request is for the passed deployment id -func matchDeploymentSpecificRequest(dID string) func(args *structs.DeploymentSpecificRequest) bool { - return func(args *structs.DeploymentSpecificRequest) bool { - return args.DeploymentID == dID - } -} - -// matchJobSpecificRequest is used to match that a job specific -// request is for the passed job id -func matchJobSpecificRequest(jID string) func(args *structs.JobSpecificRequest) bool { - return func(args *structs.JobSpecificRequest) bool { - return args.JobID == jID - } -} - // Tests that the watcher properly watches for deployments and reconciles them func TestWatcher_WatchDeployments(t *testing.T) { assert := assert.New(t) @@ -998,3 +680,136 @@ func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { assert.Equal(1, len(w.watchers), "Deployment should still be active") m.AssertCalled(t, "UpsertDeploymentStatusUpdate", mocker.MatchedBy(matcher)) } + +// Tests that the watcher properly watches for allocation changes and takes the +// proper actions +func TestDeploymentWatcher_Watch(t *testing.T) { + assert := assert.New(t) + m := newMockBackend(t) + w := NewDeploymentsWatcher(testLogger(), m, m) + + // Create a job, alloc, and a deployment + j := mock.Job() + j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() + j.TaskGroups[0].Update.MaxParallel = 2 + j.TaskGroups[0].Update.AutoRevert = true + j.Stable = true + d := mock.Deployment() + d.JobID = j.ID + a := mock.Alloc() + a.DeploymentID = d.ID + assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob") + assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d, false), "UpsertDeployment") + assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + + // Upsert the job again to get a new version + j2 := j.Copy() + j2.Stable = false + assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2") + + // Assert the following methods will be called + m.On("List", mocker.Anything, mocker.Anything).Return(nil).Run(m.listFromState) + m.On("Allocations", mocker.MatchedBy(matchDeploymentSpecificRequest(d.ID)), + mocker.Anything).Return(nil).Run(m.allocationsFromState) + m.On("Evaluations", mocker.MatchedBy(matchJobSpecificRequest(j.ID)), + mocker.Anything).Return(nil).Run(m.evaluationsFromState) + m.On("GetJob", mocker.MatchedBy(matchJobSpecificRequest(j.ID)), + mocker.Anything).Return(nil).Run(m.getJobFromState) + m.On("GetJobVersions", mocker.MatchedBy(matchJobSpecificRequest(j.ID)), + mocker.Anything).Return(nil).Run(m.getJobVersionsFromState) + + w.SetEnabled(true) + testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil }, + func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") }) + + // Assert that we will get a createEvaluation call only once. This will + // verify that the watcher is batching allocation changes + m1 := matchUpsertEvals([]string{d.ID}) + m.On("UpsertEvals", mocker.MatchedBy(m1)).Return(nil).Once() + + // Update the allocs health to healthy which should create an evaluation + for i := 0; i < 5; i++ { + req := &structs.ApplyDeploymentAllocHealthRequest{ + DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ + DeploymentID: d.ID, + HealthyAllocationIDs: []string{a.ID}, + }, + } + i := m.nextIndex() + assert.Nil(m.state.UpsertDeploymentAllocHealth(i, req), "UpsertDeploymentAllocHealth") + } + + // Wait for there to be one eval + testutil.WaitForResult(func() (bool, error) { + ws := memdb.NewWatchSet() + evals, err := m.state.EvalsByJob(ws, j.ID) + if err != nil { + return false, err + } + + if l := len(evals); l != 1 { + return false, fmt.Errorf("Got %d evals; want 1", l) + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) + + // Assert that we get a call to UpsertDeploymentStatusUpdate + c := &matchDeploymentStatusUpdateConfig{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusFailed, + StatusDescription: structs.DeploymentStatusDescriptionRollback(structs.DeploymentStatusDescriptionFailedAllocations, 0), + JobVersion: helper.Uint64ToPtr(0), + Eval: true, + } + m2 := matchDeploymentStatusUpdateRequest(c) + m.On("UpsertDeploymentStatusUpdate", mocker.MatchedBy(m2)).Return(nil) + + // Update the allocs health to unhealthy which should create a job rollback, + // status update and eval + req2 := &structs.ApplyDeploymentAllocHealthRequest{ + DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ + DeploymentID: d.ID, + UnhealthyAllocationIDs: []string{a.ID}, + }, + } + i := m.nextIndex() + assert.Nil(m.state.UpsertDeploymentAllocHealth(i, req2), "UpsertDeploymentAllocHealth") + + // Wait for there to be one eval + testutil.WaitForResult(func() (bool, error) { + ws := memdb.NewWatchSet() + evals, err := m.state.EvalsByJob(ws, j.ID) + if err != nil { + return false, err + } + + if l := len(evals); l != 2 { + return false, fmt.Errorf("Got %d evals; want 1", l) + } + + return true, nil + }, func(err error) { + t.Fatal(err) + }) + + m.AssertCalled(t, "UpsertEvals", mocker.MatchedBy(m1)) + + // After we upsert the job version will go to 2. So use this to assert the + // original call happened. + c2 := &matchDeploymentStatusUpdateConfig{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusFailed, + StatusDescription: structs.DeploymentStatusDescriptionRollback(structs.DeploymentStatusDescriptionFailedAllocations, 0), + JobVersion: helper.Uint64ToPtr(2), + Eval: true, + } + m3 := matchDeploymentStatusUpdateRequest(c2) + m.AssertCalled(t, "UpsertDeploymentStatusUpdate", mocker.MatchedBy(m3)) + testutil.WaitForResult(func() (bool, error) { return 0 == len(w.watchers), nil }, + func(err error) { assert.Equal(0, len(w.watchers), "Should have no deployment") }) +} + +// Test evaluations are batched between watchers diff --git a/nomad/deploymentwatcher/testutil_test.go b/nomad/deploymentwatcher/testutil_test.go new file mode 100644 index 000000000..71b49b9cc --- /dev/null +++ b/nomad/deploymentwatcher/testutil_test.go @@ -0,0 +1,356 @@ +package deploymentwatcher + +import ( + "log" + "os" + "reflect" + "strings" + "sync" + "testing" + + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" + mocker "github.com/stretchr/testify/mock" +) + +func testLogger() *log.Logger { + return log.New(os.Stderr, "", log.LstdFlags) +} + +type mockBackend struct { + mocker.Mock + index uint64 + state *state.StateStore + l sync.Mutex +} + +func newMockBackend(t *testing.T) *mockBackend { + state, err := state.NewStateStore(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + if state == nil { + t.Fatalf("missing state") + } + return &mockBackend{ + index: 10000, + state: state, + } +} + +func (m *mockBackend) nextIndex() uint64 { + m.l.Lock() + defer m.l.Unlock() + i := m.index + m.index++ + return i +} + +func (m *mockBackend) UpsertEvals(evals []*structs.Evaluation) (uint64, error) { + m.Called(evals) + i := m.nextIndex() + return i, m.state.UpsertEvals(i, evals) +} + +// matchUpsertEvals is used to match an upsert request +func matchUpsertEvals(deploymentIDs []string) func(evals []*structs.Evaluation) bool { + return func(evals []*structs.Evaluation) bool { + if len(evals) != len(deploymentIDs) { + return false + } + + dmap := make(map[string]struct{}, len(deploymentIDs)) + for _, d := range deploymentIDs { + dmap[d] = struct{}{} + } + + for _, e := range evals { + if _, ok := dmap[e.DeploymentID]; !ok { + return false + } + + delete(dmap, e.DeploymentID) + } + + return true + } +} + +func (m *mockBackend) UpsertJob(job *structs.Job) (uint64, error) { + m.Called(job) + i := m.nextIndex() + return i, m.state.UpsertJob(i, job) +} + +func (m *mockBackend) UpsertDeploymentStatusUpdate(u *structs.DeploymentStatusUpdateRequest) (uint64, error) { + m.Called(u) + i := m.nextIndex() + return i, m.state.UpsertDeploymentStatusUpdate(i, u) +} + +// matchDeploymentStatusUpdateConfig is used to configure the matching +// function +type matchDeploymentStatusUpdateConfig struct { + // DeploymentID is the expected ID + DeploymentID string + + // Status is the desired status + Status string + + // StatusDescription is the desired status description + StatusDescription string + + // JobVersion marks whether we expect a roll back job at the given version + JobVersion *uint64 + + // Eval marks whether we expect an evaluation. + Eval bool +} + +// matchDeploymentStatusUpdateRequest is used to match an update request +func matchDeploymentStatusUpdateRequest(c *matchDeploymentStatusUpdateConfig) func(args *structs.DeploymentStatusUpdateRequest) bool { + return func(args *structs.DeploymentStatusUpdateRequest) bool { + if args.DeploymentUpdate.DeploymentID != c.DeploymentID { + testLogger().Printf("deployment ids dont match") + return false + } + + if args.DeploymentUpdate.Status != c.Status && args.DeploymentUpdate.StatusDescription != c.StatusDescription { + testLogger().Printf("status's dont match") + return false + } + + if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { + testLogger().Printf("evals dont match") + return false + } + + if c.JobVersion != nil { + if args.Job == nil { + return false + } else if args.Job.Version != *c.JobVersion { + return false + } + } else if c.JobVersion == nil && args.Job != nil { + return false + } + + return true + } +} + +func (m *mockBackend) UpsertDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error) { + m.Called(req) + i := m.nextIndex() + return i, m.state.UpsertDeploymentPromotion(i, req) +} + +// matchDeploymentPromoteRequestConfig is used to configure the matching +// function +type matchDeploymentPromoteRequestConfig struct { + // Promotion holds the expected promote request + Promotion *structs.DeploymentPromoteRequest + + // Eval marks whether we expect an evaluation. + Eval bool +} + +// matchDeploymentPromoteRequest is used to match a promote request +func matchDeploymentPromoteRequest(c *matchDeploymentPromoteRequestConfig) func(args *structs.ApplyDeploymentPromoteRequest) bool { + return func(args *structs.ApplyDeploymentPromoteRequest) bool { + if !reflect.DeepEqual(*c.Promotion, args.DeploymentPromoteRequest) { + return false + } + + if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { + return false + } + + return true + } +} +func (m *mockBackend) UpsertDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) { + m.Called(req) + i := m.nextIndex() + return i, m.state.UpsertDeploymentAllocHealth(i, req) +} + +// matchDeploymentAllocHealthRequestConfig is used to configure the matching +// function +type matchDeploymentAllocHealthRequestConfig struct { + // DeploymentID is the expected ID + DeploymentID string + + // Healthy and Unhealthy contain the expected allocation IDs that are having + // their health set + Healthy, Unhealthy []string + + // DeploymentUpdate holds the expected values of status and description. We + // don't check for exact match but string contains + DeploymentUpdate *structs.DeploymentStatusUpdate + + // JobVersion marks whether we expect a roll back job at the given version + JobVersion *uint64 + + // Eval marks whether we expect an evaluation. + Eval bool +} + +// matchDeploymentAllocHealthRequest is used to match an update request +func matchDeploymentAllocHealthRequest(c *matchDeploymentAllocHealthRequestConfig) func(args *structs.ApplyDeploymentAllocHealthRequest) bool { + return func(args *structs.ApplyDeploymentAllocHealthRequest) bool { + if args.DeploymentID != c.DeploymentID { + return false + } + + if len(c.Healthy) != len(args.HealthyAllocationIDs) { + return false + } + if len(c.Unhealthy) != len(args.UnhealthyAllocationIDs) { + return false + } + + hmap, umap := make(map[string]struct{}, len(c.Healthy)), make(map[string]struct{}, len(c.Unhealthy)) + for _, h := range c.Healthy { + hmap[h] = struct{}{} + } + for _, u := range c.Unhealthy { + umap[u] = struct{}{} + } + + for _, h := range args.HealthyAllocationIDs { + if _, ok := hmap[h]; !ok { + return false + } + } + for _, u := range args.UnhealthyAllocationIDs { + if _, ok := umap[u]; !ok { + return false + } + } + + if c.DeploymentUpdate != nil { + if args.DeploymentUpdate == nil { + return false + } + + if !strings.Contains(args.DeploymentUpdate.Status, c.DeploymentUpdate.Status) { + return false + } + if !strings.Contains(args.DeploymentUpdate.StatusDescription, c.DeploymentUpdate.StatusDescription) { + return false + } + } else if args.DeploymentUpdate != nil { + return false + } + + if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { + return false + } + + if (c.JobVersion != nil && (args.Job == nil || args.Job.Version != *c.JobVersion)) || c.JobVersion == nil && args.Job != nil { + return false + } + + return true + } +} + +func (m *mockBackend) Evaluations(args *structs.JobSpecificRequest, reply *structs.JobEvaluationsResponse) error { + rargs := m.Called(args, reply) + return rargs.Error(0) +} + +func (m *mockBackend) evaluationsFromState(in mocker.Arguments) { + args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.JobEvaluationsResponse) + ws := memdb.NewWatchSet() + evals, _ := m.state.EvalsByJob(ws, args.JobID) + reply.Evaluations = evals + reply.Index, _ = m.state.Index("evals") +} + +func (m *mockBackend) Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error { + rargs := m.Called(args, reply) + return rargs.Error(0) +} + +func (m *mockBackend) allocationsFromState(in mocker.Arguments) { + args, reply := in.Get(0).(*structs.DeploymentSpecificRequest), in.Get(1).(*structs.AllocListResponse) + ws := memdb.NewWatchSet() + allocs, _ := m.state.AllocsByDeployment(ws, args.DeploymentID) + + var stubs []*structs.AllocListStub + for _, a := range allocs { + stubs = append(stubs, a.Stub()) + } + + reply.Allocations = stubs + reply.Index, _ = m.state.Index("allocs") +} + +func (m *mockBackend) List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error { + rargs := m.Called(args, reply) + return rargs.Error(0) +} + +func (m *mockBackend) listFromState(in mocker.Arguments) { + reply := in.Get(1).(*structs.DeploymentListResponse) + ws := memdb.NewWatchSet() + iter, _ := m.state.Deployments(ws) + + var deploys []*structs.Deployment + for { + raw := iter.Next() + if raw == nil { + break + } + + deploys = append(deploys, raw.(*structs.Deployment)) + } + + reply.Deployments = deploys + reply.Index, _ = m.state.Index("deployment") +} + +func (m *mockBackend) GetJobVersions(args *structs.JobSpecificRequest, reply *structs.JobVersionsResponse) error { + rargs := m.Called(args, reply) + return rargs.Error(0) +} + +func (m *mockBackend) getJobVersionsFromState(in mocker.Arguments) { + args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.JobVersionsResponse) + ws := memdb.NewWatchSet() + versions, _ := m.state.JobVersionsByID(ws, args.JobID) + reply.Versions = versions + reply.Index, _ = m.state.Index("jobs") +} + +func (m *mockBackend) GetJob(args *structs.JobSpecificRequest, reply *structs.SingleJobResponse) error { + rargs := m.Called(args, reply) + return rargs.Error(0) +} + +func (m *mockBackend) getJobFromState(in mocker.Arguments) { + args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.SingleJobResponse) + ws := memdb.NewWatchSet() + job, _ := m.state.JobByID(ws, args.JobID) + reply.Job = job + reply.Index, _ = m.state.Index("jobs") +} + +// matchDeploymentSpecificRequest is used to match that a deployment specific +// request is for the passed deployment id +func matchDeploymentSpecificRequest(dID string) func(args *structs.DeploymentSpecificRequest) bool { + return func(args *structs.DeploymentSpecificRequest) bool { + return args.DeploymentID == dID + } +} + +// matchJobSpecificRequest is used to match that a job specific +// request is for the passed job id +func matchJobSpecificRequest(jID string) func(args *structs.JobSpecificRequest) bool { + return func(args *structs.JobSpecificRequest) bool { + return args.JobID == jID + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7b26b838e..468341441 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3857,6 +3857,12 @@ const ( DeploymentStatusDescriptionFailedAllocations = "Failed due to unhealthy allocations" ) +// DeploymentStatusDescriptionRollback is used to get the status description of +// a deployment when rolling back to an older job. +func DeploymentStatusDescriptionRollback(baseDescription string, jobVersion uint64) string { + return fmt.Sprintf("%s - rolling back to job version %d", baseDescription, jobVersion) +} + // Deployment is the object that represents a job deployment which is used to // transistion a job between versions. type Deployment struct {