more tests
This commit is contained in:
parent
b4c8f56570
commit
c52790e448
|
@ -8,6 +8,12 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
// evalBatchDuration is the duration in which evaluations are batched before
|
||||
// commiting to Raft.
|
||||
evalBatchDuration = 200 * time.Millisecond
|
||||
)
|
||||
|
||||
// EvalBatcher is used to batch the creation of evaluations
|
||||
type EvalBatcher struct {
|
||||
// raft is used to actually commit the evaluations
|
||||
|
@ -32,6 +38,7 @@ func NewEvalBatcher(raft DeploymentRaftEndpoints, ctx context.Context) *EvalBatc
|
|||
b := &EvalBatcher{
|
||||
raft: raft,
|
||||
ctx: ctx,
|
||||
inCh: make(chan *structs.Evaluation, 10),
|
||||
}
|
||||
|
||||
go b.batcher()
|
||||
|
@ -54,7 +61,7 @@ func (b *EvalBatcher) CreateEval(e *structs.Evaluation) *EvalFuture {
|
|||
|
||||
// batcher is the long lived batcher goroutine
|
||||
func (b *EvalBatcher) batcher() {
|
||||
ticker := time.NewTicker(200 * time.Millisecond)
|
||||
ticker := time.NewTicker(evalBatchDuration)
|
||||
evals := make(map[string]*structs.Evaluation)
|
||||
for {
|
||||
select {
|
||||
|
|
|
@ -241,7 +241,7 @@ func (w *deploymentWatcher) watch() {
|
|||
// Block getting all allocations that are part of the deployment using
|
||||
// the last evaluation index. This will have us block waiting for
|
||||
// something to change past what the scheduler has evaluated.
|
||||
allocs, err := w.getAllocs(latestEval)
|
||||
allocResp, err := w.getAllocs(latestEval)
|
||||
if err != nil {
|
||||
if err == context.Canceled {
|
||||
return
|
||||
|
@ -263,7 +263,7 @@ func (w *deploymentWatcher) watch() {
|
|||
// Create an evaluation trigger if there is any allocation whose
|
||||
// deployment status has been updated past the latest eval index.
|
||||
createEval, failDeployment, rollback := false, false, false
|
||||
for _, alloc := range allocs {
|
||||
for _, alloc := range allocResp.Allocations {
|
||||
if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.ModifyIndex <= latestEval {
|
||||
continue
|
||||
}
|
||||
|
@ -303,7 +303,9 @@ func (w *deploymentWatcher) watch() {
|
|||
|
||||
// Description should include that the job is being rolled back to
|
||||
// version N
|
||||
desc = fmt.Sprintf("%s - rolling back to job version %d", desc, j.Version)
|
||||
if j != nil {
|
||||
desc = structs.DeploymentStatusDescriptionRollback(desc, j.Version)
|
||||
}
|
||||
}
|
||||
|
||||
// Update the status of the deployment to failed and create an
|
||||
|
@ -316,7 +318,7 @@ func (w *deploymentWatcher) watch() {
|
|||
}
|
||||
} else if createEval {
|
||||
// Create an eval to push the deployment along
|
||||
w.createEvalBatched()
|
||||
w.createEvalBatched(allocResp.Index)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -348,7 +350,7 @@ func (w *deploymentWatcher) createEval() (evalID string, evalCreateIndex uint64,
|
|||
}
|
||||
|
||||
// createEvalBatched creates an eval but batches calls together
|
||||
func (w *deploymentWatcher) createEvalBatched() {
|
||||
func (w *deploymentWatcher) createEvalBatched(forIndex uint64) {
|
||||
w.l.Lock()
|
||||
defer w.l.Unlock()
|
||||
|
||||
|
@ -356,6 +358,9 @@ func (w *deploymentWatcher) createEvalBatched() {
|
|||
return
|
||||
}
|
||||
|
||||
w.logger.Printf("[TRACE] nomad.deployment_watcher: creating eval for index %d %q", forIndex, w.d.ID)
|
||||
w.outstandingBatch = true
|
||||
|
||||
go func() {
|
||||
// Sleep til the batching period is over
|
||||
time.Sleep(evalBatchPeriod)
|
||||
|
@ -394,7 +399,7 @@ func (w *deploymentWatcher) getDeploymentStatusUpdate(status, desc string) *stru
|
|||
|
||||
// getAllocs retrieves the allocations that are part of the deployment blocking
|
||||
// at the given index.
|
||||
func (w *deploymentWatcher) getAllocs(index uint64) ([]*structs.AllocListStub, error) {
|
||||
func (w *deploymentWatcher) getAllocs(index uint64) (*structs.AllocListResponse, error) {
|
||||
// Build the request
|
||||
args := &structs.DeploymentSpecificRequest{
|
||||
DeploymentID: w.d.ID,
|
||||
|
@ -414,7 +419,7 @@ func (w *deploymentWatcher) getAllocs(index uint64) ([]*structs.AllocListStub, e
|
|||
}
|
||||
}
|
||||
|
||||
return resp.Allocations, nil
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// latestEvalIndex returns the index of the last evaluation created for
|
||||
|
|
|
@ -56,7 +56,7 @@ type DeploymentStateWatchers interface {
|
|||
const (
|
||||
// limitStateQueriesPerSecond is the number of state queries allowed per
|
||||
// second
|
||||
limitStateQueriesPerSecond = 10.0
|
||||
limitStateQueriesPerSecond = 15.0
|
||||
)
|
||||
|
||||
// Watcher is used to watch deployments and their allocations created
|
||||
|
|
|
@ -1,337 +1,19 @@
|
|||
package deploymentwatcher
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
mocker "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// TODO
|
||||
// Test evaluations are batched between watchers
|
||||
// Test allocation watcher
|
||||
// Test that evaluation due to allocation changes are batched
|
||||
|
||||
func testLogger() *log.Logger {
|
||||
return log.New(os.Stderr, "", log.LstdFlags)
|
||||
}
|
||||
|
||||
type mockBackend struct {
|
||||
mocker.Mock
|
||||
index uint64
|
||||
state *state.StateStore
|
||||
l sync.Mutex
|
||||
}
|
||||
|
||||
func newMockBackend(t *testing.T) *mockBackend {
|
||||
state, err := state.NewStateStore(os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if state == nil {
|
||||
t.Fatalf("missing state")
|
||||
}
|
||||
return &mockBackend{
|
||||
index: 10000,
|
||||
state: state,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockBackend) nextIndex() uint64 {
|
||||
m.l.Lock()
|
||||
defer m.l.Unlock()
|
||||
i := m.index
|
||||
m.index++
|
||||
return i
|
||||
}
|
||||
|
||||
func (m *mockBackend) UpsertEvals(evals []*structs.Evaluation) (uint64, error) {
|
||||
m.Called(evals)
|
||||
i := m.nextIndex()
|
||||
return i, m.state.UpsertEvals(i, evals)
|
||||
}
|
||||
|
||||
func (m *mockBackend) UpsertJob(job *structs.Job) (uint64, error) {
|
||||
m.Called(job)
|
||||
i := m.nextIndex()
|
||||
return i, m.state.UpsertJob(i, job)
|
||||
}
|
||||
|
||||
func (m *mockBackend) UpsertDeploymentStatusUpdate(u *structs.DeploymentStatusUpdateRequest) (uint64, error) {
|
||||
m.Called(u)
|
||||
i := m.nextIndex()
|
||||
return i, m.state.UpsertDeploymentStatusUpdate(i, u)
|
||||
}
|
||||
|
||||
// matchDeploymentStatusUpdateConfig is used to configure the matching
|
||||
// function
|
||||
type matchDeploymentStatusUpdateConfig struct {
|
||||
// DeploymentID is the expected ID
|
||||
DeploymentID string
|
||||
|
||||
// Status is the desired status
|
||||
Status string
|
||||
|
||||
// StatusDescription is the desired status description
|
||||
StatusDescription string
|
||||
|
||||
// JobVersion marks whether we expect a roll back job at the given version
|
||||
JobVersion *uint64
|
||||
|
||||
// Eval marks whether we expect an evaluation.
|
||||
Eval bool
|
||||
}
|
||||
|
||||
// matchDeploymentStatusUpdateRequest is used to match an update request
|
||||
func matchDeploymentStatusUpdateRequest(c *matchDeploymentStatusUpdateConfig) func(args *structs.DeploymentStatusUpdateRequest) bool {
|
||||
return func(args *structs.DeploymentStatusUpdateRequest) bool {
|
||||
if args.DeploymentUpdate.DeploymentID != c.DeploymentID {
|
||||
return false
|
||||
}
|
||||
|
||||
if args.DeploymentUpdate.Status != c.Status && args.DeploymentUpdate.StatusDescription != c.StatusDescription {
|
||||
return false
|
||||
}
|
||||
|
||||
if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if (c.JobVersion != nil && (args.Job == nil || args.Job.Version != *c.JobVersion)) || c.JobVersion == nil && args.Job != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockBackend) UpsertDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error) {
|
||||
m.Called(req)
|
||||
i := m.nextIndex()
|
||||
return i, m.state.UpsertDeploymentPromotion(i, req)
|
||||
}
|
||||
|
||||
// matchDeploymentPromoteRequestConfig is used to configure the matching
|
||||
// function
|
||||
type matchDeploymentPromoteRequestConfig struct {
|
||||
// Promotion holds the expected promote request
|
||||
Promotion *structs.DeploymentPromoteRequest
|
||||
|
||||
// Eval marks whether we expect an evaluation.
|
||||
Eval bool
|
||||
}
|
||||
|
||||
// matchDeploymentPromoteRequest is used to match a promote request
|
||||
func matchDeploymentPromoteRequest(c *matchDeploymentPromoteRequestConfig) func(args *structs.ApplyDeploymentPromoteRequest) bool {
|
||||
return func(args *structs.ApplyDeploymentPromoteRequest) bool {
|
||||
if !reflect.DeepEqual(*c.Promotion, args.DeploymentPromoteRequest) {
|
||||
return false
|
||||
}
|
||||
|
||||
if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
func (m *mockBackend) UpsertDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) {
|
||||
m.Called(req)
|
||||
i := m.nextIndex()
|
||||
return i, m.state.UpsertDeploymentAllocHealth(i, req)
|
||||
}
|
||||
|
||||
// matchDeploymentAllocHealthRequestConfig is used to configure the matching
|
||||
// function
|
||||
type matchDeploymentAllocHealthRequestConfig struct {
|
||||
// DeploymentID is the expected ID
|
||||
DeploymentID string
|
||||
|
||||
// Healthy and Unhealthy contain the expected allocation IDs that are having
|
||||
// their health set
|
||||
Healthy, Unhealthy []string
|
||||
|
||||
// DeploymentUpdate holds the expected values of status and description. We
|
||||
// don't check for exact match but string contains
|
||||
DeploymentUpdate *structs.DeploymentStatusUpdate
|
||||
|
||||
// JobVersion marks whether we expect a roll back job at the given version
|
||||
JobVersion *uint64
|
||||
|
||||
// Eval marks whether we expect an evaluation.
|
||||
Eval bool
|
||||
}
|
||||
|
||||
// matchDeploymentAllocHealthRequest is used to match an update request
|
||||
func matchDeploymentAllocHealthRequest(c *matchDeploymentAllocHealthRequestConfig) func(args *structs.ApplyDeploymentAllocHealthRequest) bool {
|
||||
return func(args *structs.ApplyDeploymentAllocHealthRequest) bool {
|
||||
if args.DeploymentID != c.DeploymentID {
|
||||
return false
|
||||
}
|
||||
|
||||
if len(c.Healthy) != len(args.HealthyAllocationIDs) {
|
||||
return false
|
||||
}
|
||||
if len(c.Unhealthy) != len(args.UnhealthyAllocationIDs) {
|
||||
return false
|
||||
}
|
||||
|
||||
hmap, umap := make(map[string]struct{}, len(c.Healthy)), make(map[string]struct{}, len(c.Unhealthy))
|
||||
for _, h := range c.Healthy {
|
||||
hmap[h] = struct{}{}
|
||||
}
|
||||
for _, u := range c.Unhealthy {
|
||||
umap[u] = struct{}{}
|
||||
}
|
||||
|
||||
for _, h := range args.HealthyAllocationIDs {
|
||||
if _, ok := hmap[h]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
for _, u := range args.UnhealthyAllocationIDs {
|
||||
if _, ok := umap[u]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if c.DeploymentUpdate != nil {
|
||||
if args.DeploymentUpdate == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if !strings.Contains(args.DeploymentUpdate.Status, c.DeploymentUpdate.Status) {
|
||||
return false
|
||||
}
|
||||
if !strings.Contains(args.DeploymentUpdate.StatusDescription, c.DeploymentUpdate.StatusDescription) {
|
||||
return false
|
||||
}
|
||||
} else if args.DeploymentUpdate != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if (c.JobVersion != nil && (args.Job == nil || args.Job.Version != *c.JobVersion)) || c.JobVersion == nil && args.Job != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockBackend) Evaluations(args *structs.JobSpecificRequest, reply *structs.JobEvaluationsResponse) error {
|
||||
rargs := m.Called(args, reply)
|
||||
return rargs.Error(0)
|
||||
}
|
||||
|
||||
func (m *mockBackend) evaluationsFromState(in mocker.Arguments) {
|
||||
args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.JobEvaluationsResponse)
|
||||
ws := memdb.NewWatchSet()
|
||||
evals, _ := m.state.EvalsByJob(ws, args.JobID)
|
||||
reply.Evaluations = evals
|
||||
reply.Index = m.nextIndex()
|
||||
}
|
||||
|
||||
func (m *mockBackend) Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error {
|
||||
rargs := m.Called(args, reply)
|
||||
return rargs.Error(0)
|
||||
}
|
||||
|
||||
func (m *mockBackend) allocationsFromState(in mocker.Arguments) {
|
||||
args, reply := in.Get(0).(*structs.DeploymentSpecificRequest), in.Get(1).(*structs.AllocListResponse)
|
||||
ws := memdb.NewWatchSet()
|
||||
allocs, _ := m.state.AllocsByDeployment(ws, args.DeploymentID)
|
||||
|
||||
var stubs []*structs.AllocListStub
|
||||
for _, a := range allocs {
|
||||
stubs = append(stubs, a.Stub())
|
||||
}
|
||||
|
||||
reply.Allocations = stubs
|
||||
reply.Index = m.nextIndex()
|
||||
}
|
||||
|
||||
func (m *mockBackend) List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error {
|
||||
rargs := m.Called(args, reply)
|
||||
return rargs.Error(0)
|
||||
}
|
||||
|
||||
func (m *mockBackend) listFromState(in mocker.Arguments) {
|
||||
reply := in.Get(1).(*structs.DeploymentListResponse)
|
||||
ws := memdb.NewWatchSet()
|
||||
iter, _ := m.state.Deployments(ws)
|
||||
|
||||
var deploys []*structs.Deployment
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
|
||||
deploys = append(deploys, raw.(*structs.Deployment))
|
||||
}
|
||||
|
||||
reply.Deployments = deploys
|
||||
reply.Index = m.nextIndex()
|
||||
}
|
||||
|
||||
func (m *mockBackend) GetJobVersions(args *structs.JobSpecificRequest, reply *structs.JobVersionsResponse) error {
|
||||
rargs := m.Called(args, reply)
|
||||
return rargs.Error(0)
|
||||
}
|
||||
|
||||
func (m *mockBackend) getJobVersionsFromState(in mocker.Arguments) {
|
||||
args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.JobVersionsResponse)
|
||||
ws := memdb.NewWatchSet()
|
||||
versions, _ := m.state.JobVersionsByID(ws, args.JobID)
|
||||
reply.Versions = versions
|
||||
reply.Index = m.nextIndex()
|
||||
}
|
||||
|
||||
func (m *mockBackend) GetJob(args *structs.JobSpecificRequest, reply *structs.SingleJobResponse) error {
|
||||
rargs := m.Called(args, reply)
|
||||
return rargs.Error(0)
|
||||
}
|
||||
|
||||
func (m *mockBackend) getJobFromState(in mocker.Arguments) {
|
||||
args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.SingleJobResponse)
|
||||
ws := memdb.NewWatchSet()
|
||||
job, _ := m.state.JobByID(ws, args.JobID)
|
||||
reply.Job = job
|
||||
reply.Index = m.nextIndex()
|
||||
}
|
||||
|
||||
// matchDeploymentSpecificRequest is used to match that a deployment specific
|
||||
// request is for the passed deployment id
|
||||
func matchDeploymentSpecificRequest(dID string) func(args *structs.DeploymentSpecificRequest) bool {
|
||||
return func(args *structs.DeploymentSpecificRequest) bool {
|
||||
return args.DeploymentID == dID
|
||||
}
|
||||
}
|
||||
|
||||
// matchJobSpecificRequest is used to match that a job specific
|
||||
// request is for the passed job id
|
||||
func matchJobSpecificRequest(jID string) func(args *structs.JobSpecificRequest) bool {
|
||||
return func(args *structs.JobSpecificRequest) bool {
|
||||
return args.JobID == jID
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that the watcher properly watches for deployments and reconciles them
|
||||
func TestWatcher_WatchDeployments(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
@ -998,3 +680,136 @@ func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) {
|
|||
assert.Equal(1, len(w.watchers), "Deployment should still be active")
|
||||
m.AssertCalled(t, "UpsertDeploymentStatusUpdate", mocker.MatchedBy(matcher))
|
||||
}
|
||||
|
||||
// Tests that the watcher properly watches for allocation changes and takes the
|
||||
// proper actions
|
||||
func TestDeploymentWatcher_Watch(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
m := newMockBackend(t)
|
||||
w := NewDeploymentsWatcher(testLogger(), m, m)
|
||||
|
||||
// Create a job, alloc, and a deployment
|
||||
j := mock.Job()
|
||||
j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy()
|
||||
j.TaskGroups[0].Update.MaxParallel = 2
|
||||
j.TaskGroups[0].Update.AutoRevert = true
|
||||
j.Stable = true
|
||||
d := mock.Deployment()
|
||||
d.JobID = j.ID
|
||||
a := mock.Alloc()
|
||||
a.DeploymentID = d.ID
|
||||
assert.Nil(m.state.UpsertJob(m.nextIndex(), j), "UpsertJob")
|
||||
assert.Nil(m.state.UpsertDeployment(m.nextIndex(), d, false), "UpsertDeployment")
|
||||
assert.Nil(m.state.UpsertAllocs(m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs")
|
||||
|
||||
// Upsert the job again to get a new version
|
||||
j2 := j.Copy()
|
||||
j2.Stable = false
|
||||
assert.Nil(m.state.UpsertJob(m.nextIndex(), j2), "UpsertJob2")
|
||||
|
||||
// Assert the following methods will be called
|
||||
m.On("List", mocker.Anything, mocker.Anything).Return(nil).Run(m.listFromState)
|
||||
m.On("Allocations", mocker.MatchedBy(matchDeploymentSpecificRequest(d.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.allocationsFromState)
|
||||
m.On("Evaluations", mocker.MatchedBy(matchJobSpecificRequest(j.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.evaluationsFromState)
|
||||
m.On("GetJob", mocker.MatchedBy(matchJobSpecificRequest(j.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.getJobFromState)
|
||||
m.On("GetJobVersions", mocker.MatchedBy(matchJobSpecificRequest(j.ID)),
|
||||
mocker.Anything).Return(nil).Run(m.getJobVersionsFromState)
|
||||
|
||||
w.SetEnabled(true)
|
||||
testutil.WaitForResult(func() (bool, error) { return 1 == len(w.watchers), nil },
|
||||
func(err error) { assert.Equal(1, len(w.watchers), "Should have 1 deployment") })
|
||||
|
||||
// Assert that we will get a createEvaluation call only once. This will
|
||||
// verify that the watcher is batching allocation changes
|
||||
m1 := matchUpsertEvals([]string{d.ID})
|
||||
m.On("UpsertEvals", mocker.MatchedBy(m1)).Return(nil).Once()
|
||||
|
||||
// Update the allocs health to healthy which should create an evaluation
|
||||
for i := 0; i < 5; i++ {
|
||||
req := &structs.ApplyDeploymentAllocHealthRequest{
|
||||
DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{
|
||||
DeploymentID: d.ID,
|
||||
HealthyAllocationIDs: []string{a.ID},
|
||||
},
|
||||
}
|
||||
i := m.nextIndex()
|
||||
assert.Nil(m.state.UpsertDeploymentAllocHealth(i, req), "UpsertDeploymentAllocHealth")
|
||||
}
|
||||
|
||||
// Wait for there to be one eval
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
ws := memdb.NewWatchSet()
|
||||
evals, err := m.state.EvalsByJob(ws, j.ID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if l := len(evals); l != 1 {
|
||||
return false, fmt.Errorf("Got %d evals; want 1", l)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
|
||||
// Assert that we get a call to UpsertDeploymentStatusUpdate
|
||||
c := &matchDeploymentStatusUpdateConfig{
|
||||
DeploymentID: d.ID,
|
||||
Status: structs.DeploymentStatusFailed,
|
||||
StatusDescription: structs.DeploymentStatusDescriptionRollback(structs.DeploymentStatusDescriptionFailedAllocations, 0),
|
||||
JobVersion: helper.Uint64ToPtr(0),
|
||||
Eval: true,
|
||||
}
|
||||
m2 := matchDeploymentStatusUpdateRequest(c)
|
||||
m.On("UpsertDeploymentStatusUpdate", mocker.MatchedBy(m2)).Return(nil)
|
||||
|
||||
// Update the allocs health to unhealthy which should create a job rollback,
|
||||
// status update and eval
|
||||
req2 := &structs.ApplyDeploymentAllocHealthRequest{
|
||||
DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{
|
||||
DeploymentID: d.ID,
|
||||
UnhealthyAllocationIDs: []string{a.ID},
|
||||
},
|
||||
}
|
||||
i := m.nextIndex()
|
||||
assert.Nil(m.state.UpsertDeploymentAllocHealth(i, req2), "UpsertDeploymentAllocHealth")
|
||||
|
||||
// Wait for there to be one eval
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
ws := memdb.NewWatchSet()
|
||||
evals, err := m.state.EvalsByJob(ws, j.ID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if l := len(evals); l != 2 {
|
||||
return false, fmt.Errorf("Got %d evals; want 1", l)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
|
||||
m.AssertCalled(t, "UpsertEvals", mocker.MatchedBy(m1))
|
||||
|
||||
// After we upsert the job version will go to 2. So use this to assert the
|
||||
// original call happened.
|
||||
c2 := &matchDeploymentStatusUpdateConfig{
|
||||
DeploymentID: d.ID,
|
||||
Status: structs.DeploymentStatusFailed,
|
||||
StatusDescription: structs.DeploymentStatusDescriptionRollback(structs.DeploymentStatusDescriptionFailedAllocations, 0),
|
||||
JobVersion: helper.Uint64ToPtr(2),
|
||||
Eval: true,
|
||||
}
|
||||
m3 := matchDeploymentStatusUpdateRequest(c2)
|
||||
m.AssertCalled(t, "UpsertDeploymentStatusUpdate", mocker.MatchedBy(m3))
|
||||
testutil.WaitForResult(func() (bool, error) { return 0 == len(w.watchers), nil },
|
||||
func(err error) { assert.Equal(0, len(w.watchers), "Should have no deployment") })
|
||||
}
|
||||
|
||||
// Test evaluations are batched between watchers
|
||||
|
|
356
nomad/deploymentwatcher/testutil_test.go
Normal file
356
nomad/deploymentwatcher/testutil_test.go
Normal file
|
@ -0,0 +1,356 @@
|
|||
package deploymentwatcher
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
mocker "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
func testLogger() *log.Logger {
|
||||
return log.New(os.Stderr, "", log.LstdFlags)
|
||||
}
|
||||
|
||||
type mockBackend struct {
|
||||
mocker.Mock
|
||||
index uint64
|
||||
state *state.StateStore
|
||||
l sync.Mutex
|
||||
}
|
||||
|
||||
func newMockBackend(t *testing.T) *mockBackend {
|
||||
state, err := state.NewStateStore(os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if state == nil {
|
||||
t.Fatalf("missing state")
|
||||
}
|
||||
return &mockBackend{
|
||||
index: 10000,
|
||||
state: state,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockBackend) nextIndex() uint64 {
|
||||
m.l.Lock()
|
||||
defer m.l.Unlock()
|
||||
i := m.index
|
||||
m.index++
|
||||
return i
|
||||
}
|
||||
|
||||
func (m *mockBackend) UpsertEvals(evals []*structs.Evaluation) (uint64, error) {
|
||||
m.Called(evals)
|
||||
i := m.nextIndex()
|
||||
return i, m.state.UpsertEvals(i, evals)
|
||||
}
|
||||
|
||||
// matchUpsertEvals is used to match an upsert request
|
||||
func matchUpsertEvals(deploymentIDs []string) func(evals []*structs.Evaluation) bool {
|
||||
return func(evals []*structs.Evaluation) bool {
|
||||
if len(evals) != len(deploymentIDs) {
|
||||
return false
|
||||
}
|
||||
|
||||
dmap := make(map[string]struct{}, len(deploymentIDs))
|
||||
for _, d := range deploymentIDs {
|
||||
dmap[d] = struct{}{}
|
||||
}
|
||||
|
||||
for _, e := range evals {
|
||||
if _, ok := dmap[e.DeploymentID]; !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
delete(dmap, e.DeploymentID)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockBackend) UpsertJob(job *structs.Job) (uint64, error) {
|
||||
m.Called(job)
|
||||
i := m.nextIndex()
|
||||
return i, m.state.UpsertJob(i, job)
|
||||
}
|
||||
|
||||
func (m *mockBackend) UpsertDeploymentStatusUpdate(u *structs.DeploymentStatusUpdateRequest) (uint64, error) {
|
||||
m.Called(u)
|
||||
i := m.nextIndex()
|
||||
return i, m.state.UpsertDeploymentStatusUpdate(i, u)
|
||||
}
|
||||
|
||||
// matchDeploymentStatusUpdateConfig is used to configure the matching
|
||||
// function
|
||||
type matchDeploymentStatusUpdateConfig struct {
|
||||
// DeploymentID is the expected ID
|
||||
DeploymentID string
|
||||
|
||||
// Status is the desired status
|
||||
Status string
|
||||
|
||||
// StatusDescription is the desired status description
|
||||
StatusDescription string
|
||||
|
||||
// JobVersion marks whether we expect a roll back job at the given version
|
||||
JobVersion *uint64
|
||||
|
||||
// Eval marks whether we expect an evaluation.
|
||||
Eval bool
|
||||
}
|
||||
|
||||
// matchDeploymentStatusUpdateRequest is used to match an update request
|
||||
func matchDeploymentStatusUpdateRequest(c *matchDeploymentStatusUpdateConfig) func(args *structs.DeploymentStatusUpdateRequest) bool {
|
||||
return func(args *structs.DeploymentStatusUpdateRequest) bool {
|
||||
if args.DeploymentUpdate.DeploymentID != c.DeploymentID {
|
||||
testLogger().Printf("deployment ids dont match")
|
||||
return false
|
||||
}
|
||||
|
||||
if args.DeploymentUpdate.Status != c.Status && args.DeploymentUpdate.StatusDescription != c.StatusDescription {
|
||||
testLogger().Printf("status's dont match")
|
||||
return false
|
||||
}
|
||||
|
||||
if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil {
|
||||
testLogger().Printf("evals dont match")
|
||||
return false
|
||||
}
|
||||
|
||||
if c.JobVersion != nil {
|
||||
if args.Job == nil {
|
||||
return false
|
||||
} else if args.Job.Version != *c.JobVersion {
|
||||
return false
|
||||
}
|
||||
} else if c.JobVersion == nil && args.Job != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockBackend) UpsertDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error) {
|
||||
m.Called(req)
|
||||
i := m.nextIndex()
|
||||
return i, m.state.UpsertDeploymentPromotion(i, req)
|
||||
}
|
||||
|
||||
// matchDeploymentPromoteRequestConfig is used to configure the matching
|
||||
// function
|
||||
type matchDeploymentPromoteRequestConfig struct {
|
||||
// Promotion holds the expected promote request
|
||||
Promotion *structs.DeploymentPromoteRequest
|
||||
|
||||
// Eval marks whether we expect an evaluation.
|
||||
Eval bool
|
||||
}
|
||||
|
||||
// matchDeploymentPromoteRequest is used to match a promote request
|
||||
func matchDeploymentPromoteRequest(c *matchDeploymentPromoteRequestConfig) func(args *structs.ApplyDeploymentPromoteRequest) bool {
|
||||
return func(args *structs.ApplyDeploymentPromoteRequest) bool {
|
||||
if !reflect.DeepEqual(*c.Promotion, args.DeploymentPromoteRequest) {
|
||||
return false
|
||||
}
|
||||
|
||||
if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
func (m *mockBackend) UpsertDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) {
|
||||
m.Called(req)
|
||||
i := m.nextIndex()
|
||||
return i, m.state.UpsertDeploymentAllocHealth(i, req)
|
||||
}
|
||||
|
||||
// matchDeploymentAllocHealthRequestConfig is used to configure the matching
|
||||
// function
|
||||
type matchDeploymentAllocHealthRequestConfig struct {
|
||||
// DeploymentID is the expected ID
|
||||
DeploymentID string
|
||||
|
||||
// Healthy and Unhealthy contain the expected allocation IDs that are having
|
||||
// their health set
|
||||
Healthy, Unhealthy []string
|
||||
|
||||
// DeploymentUpdate holds the expected values of status and description. We
|
||||
// don't check for exact match but string contains
|
||||
DeploymentUpdate *structs.DeploymentStatusUpdate
|
||||
|
||||
// JobVersion marks whether we expect a roll back job at the given version
|
||||
JobVersion *uint64
|
||||
|
||||
// Eval marks whether we expect an evaluation.
|
||||
Eval bool
|
||||
}
|
||||
|
||||
// matchDeploymentAllocHealthRequest is used to match an update request
|
||||
func matchDeploymentAllocHealthRequest(c *matchDeploymentAllocHealthRequestConfig) func(args *structs.ApplyDeploymentAllocHealthRequest) bool {
|
||||
return func(args *structs.ApplyDeploymentAllocHealthRequest) bool {
|
||||
if args.DeploymentID != c.DeploymentID {
|
||||
return false
|
||||
}
|
||||
|
||||
if len(c.Healthy) != len(args.HealthyAllocationIDs) {
|
||||
return false
|
||||
}
|
||||
if len(c.Unhealthy) != len(args.UnhealthyAllocationIDs) {
|
||||
return false
|
||||
}
|
||||
|
||||
hmap, umap := make(map[string]struct{}, len(c.Healthy)), make(map[string]struct{}, len(c.Unhealthy))
|
||||
for _, h := range c.Healthy {
|
||||
hmap[h] = struct{}{}
|
||||
}
|
||||
for _, u := range c.Unhealthy {
|
||||
umap[u] = struct{}{}
|
||||
}
|
||||
|
||||
for _, h := range args.HealthyAllocationIDs {
|
||||
if _, ok := hmap[h]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
for _, u := range args.UnhealthyAllocationIDs {
|
||||
if _, ok := umap[u]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if c.DeploymentUpdate != nil {
|
||||
if args.DeploymentUpdate == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if !strings.Contains(args.DeploymentUpdate.Status, c.DeploymentUpdate.Status) {
|
||||
return false
|
||||
}
|
||||
if !strings.Contains(args.DeploymentUpdate.StatusDescription, c.DeploymentUpdate.StatusDescription) {
|
||||
return false
|
||||
}
|
||||
} else if args.DeploymentUpdate != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if (c.JobVersion != nil && (args.Job == nil || args.Job.Version != *c.JobVersion)) || c.JobVersion == nil && args.Job != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockBackend) Evaluations(args *structs.JobSpecificRequest, reply *structs.JobEvaluationsResponse) error {
|
||||
rargs := m.Called(args, reply)
|
||||
return rargs.Error(0)
|
||||
}
|
||||
|
||||
func (m *mockBackend) evaluationsFromState(in mocker.Arguments) {
|
||||
args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.JobEvaluationsResponse)
|
||||
ws := memdb.NewWatchSet()
|
||||
evals, _ := m.state.EvalsByJob(ws, args.JobID)
|
||||
reply.Evaluations = evals
|
||||
reply.Index, _ = m.state.Index("evals")
|
||||
}
|
||||
|
||||
func (m *mockBackend) Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error {
|
||||
rargs := m.Called(args, reply)
|
||||
return rargs.Error(0)
|
||||
}
|
||||
|
||||
func (m *mockBackend) allocationsFromState(in mocker.Arguments) {
|
||||
args, reply := in.Get(0).(*structs.DeploymentSpecificRequest), in.Get(1).(*structs.AllocListResponse)
|
||||
ws := memdb.NewWatchSet()
|
||||
allocs, _ := m.state.AllocsByDeployment(ws, args.DeploymentID)
|
||||
|
||||
var stubs []*structs.AllocListStub
|
||||
for _, a := range allocs {
|
||||
stubs = append(stubs, a.Stub())
|
||||
}
|
||||
|
||||
reply.Allocations = stubs
|
||||
reply.Index, _ = m.state.Index("allocs")
|
||||
}
|
||||
|
||||
func (m *mockBackend) List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error {
|
||||
rargs := m.Called(args, reply)
|
||||
return rargs.Error(0)
|
||||
}
|
||||
|
||||
func (m *mockBackend) listFromState(in mocker.Arguments) {
|
||||
reply := in.Get(1).(*structs.DeploymentListResponse)
|
||||
ws := memdb.NewWatchSet()
|
||||
iter, _ := m.state.Deployments(ws)
|
||||
|
||||
var deploys []*structs.Deployment
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
|
||||
deploys = append(deploys, raw.(*structs.Deployment))
|
||||
}
|
||||
|
||||
reply.Deployments = deploys
|
||||
reply.Index, _ = m.state.Index("deployment")
|
||||
}
|
||||
|
||||
func (m *mockBackend) GetJobVersions(args *structs.JobSpecificRequest, reply *structs.JobVersionsResponse) error {
|
||||
rargs := m.Called(args, reply)
|
||||
return rargs.Error(0)
|
||||
}
|
||||
|
||||
func (m *mockBackend) getJobVersionsFromState(in mocker.Arguments) {
|
||||
args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.JobVersionsResponse)
|
||||
ws := memdb.NewWatchSet()
|
||||
versions, _ := m.state.JobVersionsByID(ws, args.JobID)
|
||||
reply.Versions = versions
|
||||
reply.Index, _ = m.state.Index("jobs")
|
||||
}
|
||||
|
||||
func (m *mockBackend) GetJob(args *structs.JobSpecificRequest, reply *structs.SingleJobResponse) error {
|
||||
rargs := m.Called(args, reply)
|
||||
return rargs.Error(0)
|
||||
}
|
||||
|
||||
func (m *mockBackend) getJobFromState(in mocker.Arguments) {
|
||||
args, reply := in.Get(0).(*structs.JobSpecificRequest), in.Get(1).(*structs.SingleJobResponse)
|
||||
ws := memdb.NewWatchSet()
|
||||
job, _ := m.state.JobByID(ws, args.JobID)
|
||||
reply.Job = job
|
||||
reply.Index, _ = m.state.Index("jobs")
|
||||
}
|
||||
|
||||
// matchDeploymentSpecificRequest is used to match that a deployment specific
|
||||
// request is for the passed deployment id
|
||||
func matchDeploymentSpecificRequest(dID string) func(args *structs.DeploymentSpecificRequest) bool {
|
||||
return func(args *structs.DeploymentSpecificRequest) bool {
|
||||
return args.DeploymentID == dID
|
||||
}
|
||||
}
|
||||
|
||||
// matchJobSpecificRequest is used to match that a job specific
|
||||
// request is for the passed job id
|
||||
func matchJobSpecificRequest(jID string) func(args *structs.JobSpecificRequest) bool {
|
||||
return func(args *structs.JobSpecificRequest) bool {
|
||||
return args.JobID == jID
|
||||
}
|
||||
}
|
|
@ -3857,6 +3857,12 @@ const (
|
|||
DeploymentStatusDescriptionFailedAllocations = "Failed due to unhealthy allocations"
|
||||
)
|
||||
|
||||
// DeploymentStatusDescriptionRollback is used to get the status description of
|
||||
// a deployment when rolling back to an older job.
|
||||
func DeploymentStatusDescriptionRollback(baseDescription string, jobVersion uint64) string {
|
||||
return fmt.Sprintf("%s - rolling back to job version %d", baseDescription, jobVersion)
|
||||
}
|
||||
|
||||
// Deployment is the object that represents a job deployment which is used to
|
||||
// transistion a job between versions.
|
||||
type Deployment struct {
|
||||
|
|
Loading…
Reference in a new issue