Merge pull request #5602 from hashicorp/normalized-plan

Normalize plan to increase the plan apply throughput
This commit is contained in:
Arshneet Singh 2019-04-24 14:07:26 -07:00 committed by GitHub
commit 78d49eda7c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 990 additions and 190 deletions

View file

@ -1243,7 +1243,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion) {
if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion, false) {
s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion)
return nil
}
@ -1270,7 +1270,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {
if config != nil {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion) {
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion, false) {
s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion)
return nil
}

View file

@ -237,7 +237,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
}
// All servers should be at or above 0.8.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion) {
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) {
return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion)
}
@ -305,7 +305,7 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
}
// All servers should be at or above 0.9.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion) {
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) {
return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion)
}
// Apply the update

View file

@ -15,7 +15,7 @@ import (
"github.com/hashicorp/raft"
)
// planner is used to mange the submitted allocation plans that are waiting
// planner is used to manage the submitted allocation plans that are waiting
// to be accessed by the leader
type planner struct {
*Server
@ -149,52 +149,82 @@ func (p *planner) planApply() {
// applyPlan is used to apply the plan result and to return the alloc index
func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)
// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: plan.Job,
Alloc: make([]*structs.Allocation, 0, minUpdates),
Job: plan.Job,
},
Deployment: result.Deployment,
DeploymentUpdates: result.DeploymentUpdates,
EvalID: plan.EvalID,
NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)),
}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
}
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}
for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
now := time.Now().UTC().UnixNano()
for _, alloc := range req.Alloc {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
alloc.ModifyTime = now
}
// Set modify time for preempted allocs if any
// Also gather jobids to create follow up evals
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
for _, alloc := range req.NodePreemptions {
alloc.ModifyTime = now
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
_, ok := preemptedJobIDs[id]
if !ok {
preemptedJobIDs[id] = struct{}{}
now := time.Now().UTC().UnixNano()
if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) {
// Initialize the allocs request using the new optimized log entry format.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation))
req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))
for _, updateList := range result.NodeUpdate {
for _, stoppedAlloc := range updateList {
req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, now))
}
}
for _, allocList := range result.NodeAllocation {
req.AllocsUpdated = append(req.AllocsUpdated, allocList...)
}
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.AllocsUpdated, now)
for _, preemptions := range result.NodePreemptions {
for _, preemptedAlloc := range preemptions {
req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, now))
// Gather jobids to create follow up evals
appendNamespacedJobID(preemptedJobIDs, preemptedAlloc)
}
}
} else {
// COMPAT 0.11: This branch is deprecated and will only be used to support
// application of older log entries. Expected to be removed in a future version.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)
// Initialize using the older log entry format for Alloc and NodePreemptions
req.Alloc = make([]*structs.Allocation, 0, minUpdates)
req.NodePreemptions = make([]*structs.Allocation, 0, len(result.NodePreemptions))
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
}
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}
for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.Alloc, now)
// Set modify time for preempted allocs if any
// Also gather jobids to create follow up evals
for _, alloc := range req.NodePreemptions {
alloc.ModifyTime = now
appendNamespacedJobID(preemptedJobIDs, alloc)
}
}
@ -232,6 +262,50 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
return future, nil
}
// normalizePreemptedAlloc removes redundant fields from a preempted allocation and
// returns AllocationDiff. Since a preempted allocation is always an existing allocation,
// the struct returned by this method contains only the differential, which can be
// applied to an existing allocation, to yield the updated struct
func normalizePreemptedAlloc(preemptedAlloc *structs.Allocation, now int64) *structs.AllocationDiff {
return &structs.AllocationDiff{
ID: preemptedAlloc.ID,
PreemptedByAllocation: preemptedAlloc.PreemptedByAllocation,
ModifyTime: now,
}
}
// normalizeStoppedAlloc removes redundant fields from a stopped allocation and
// returns AllocationDiff. Since a stopped allocation is always an existing allocation,
// the struct returned by this method contains only the differential, which can be
// applied to an existing allocation, to yield the updated struct
func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs.AllocationDiff {
return &structs.AllocationDiff{
ID: stoppedAlloc.ID,
DesiredDescription: stoppedAlloc.DesiredDescription,
ClientStatus: stoppedAlloc.ClientStatus,
ModifyTime: now,
}
}
// appendNamespacedJobID appends the namespaced Job ID for the alloc to the jobIDs set
func appendNamespacedJobID(jobIDs map[structs.NamespacedID]struct{}, alloc *structs.Allocation) {
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
if _, ok := jobIDs[id]; !ok {
jobIDs[id] = struct{}{}
}
}
// updateAllocTimestamps sets the CreateTime and ModifyTime for the allocations
// to the timestamp provided
func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
for _, alloc := range allocations {
if alloc.CreateTime == 0 {
alloc.CreateTime = timestamp
}
alloc.ModifyTime = timestamp
}
}
// asyncPlanWait is used to apply and respond to a plan async
func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
@ -264,6 +338,17 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.Plan, logger log.Logger) (*structs.PlanResult, error) {
defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now())
// Denormalize without the job
err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, nil)
if err != nil {
return nil, err
}
// Denormalize without the job
err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, nil)
if err != nil {
return nil, err
}
// Check if the plan exceeds quota
overQuota, err := evaluatePlanQuota(snap, plan)
if err != nil {
@ -521,15 +606,11 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
// Remove any preempted allocs
if preempted := plan.NodePreemptions[nodeID]; len(preempted) > 0 {
for _, allocs := range preempted {
remove = append(remove, allocs)
}
remove = append(remove, preempted...)
}
if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 {
for _, alloc := range updated {
remove = append(remove, alloc)
}
remove = append(remove, updated...)
}
proposed := structs.RemoveAllocs(existingAlloc, remove)
proposed = append(proposed, plan.NodeAllocation[nodeID]...)

View file

@ -3,8 +3,9 @@ package nomad
import (
"reflect"
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
@ -62,6 +63,7 @@ func testRegisterJob(t *testing.T, s *Server, j *structs.Job) {
}
}
// COMPAT 0.11: Tests the older unoptimized code path for applyPlan
func TestPlanApply_applyPlan(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
@ -228,6 +230,158 @@ func TestPlanApply_applyPlan(t *testing.T) {
assert.Equal(index, evalOut.ModifyIndex)
}
// Verifies that applyPlan properly updates the constituent objects in MemDB,
// when the plan contains normalized allocs.
func TestPlanApply_applyPlanWithNormalizedAllocs(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.Build = "0.9.2"
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Register node
node := mock.Node()
testRegisterNode(t, s1, node)
// Register a fake deployment
oldDeployment := mock.Deployment()
if err := s1.State().UpsertDeployment(900, oldDeployment); err != nil {
t.Fatalf("UpsertDeployment failed: %v", err)
}
// Create a deployment
dnew := mock.Deployment()
// Create a deployment update for the old deployment id
desiredStatus, desiredStatusDescription := "foo", "bar"
updates := []*structs.DeploymentStatusUpdate{
{
DeploymentID: oldDeployment.ID,
Status: desiredStatus,
StatusDescription: desiredStatusDescription,
},
}
// Register allocs, deployment and deployment update
alloc := mock.Alloc()
stoppedAlloc := mock.Alloc()
stoppedAllocDiff := &structs.Allocation{
ID: stoppedAlloc.ID,
DesiredDescription: "Desired Description",
ClientStatus: structs.AllocClientStatusLost,
}
preemptedAlloc := mock.Alloc()
preemptedAllocDiff := &structs.Allocation{
ID: preemptedAlloc.ID,
PreemptedByAllocation: alloc.ID,
}
s1.State().UpsertJobSummary(1000, mock.JobSummary(alloc.JobID))
s1.State().UpsertAllocs(1100, []*structs.Allocation{stoppedAlloc, preemptedAlloc})
// Create an eval
eval := mock.Eval()
eval.JobID = alloc.JobID
if err := s1.State().UpsertEvals(1, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
timestampBeforeCommit := time.Now().UTC().UnixNano()
planRes := &structs.PlanResult{
NodeAllocation: map[string][]*structs.Allocation{
node.ID: {alloc},
},
NodeUpdate: map[string][]*structs.Allocation{
stoppedAlloc.NodeID: {stoppedAllocDiff},
},
NodePreemptions: map[string][]*structs.Allocation{
preemptedAlloc.NodeID: {preemptedAllocDiff},
},
Deployment: dnew,
DeploymentUpdates: updates,
}
// Snapshot the state
snap, err := s1.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
// Create the plan with a deployment
plan := &structs.Plan{
Job: alloc.Job,
Deployment: dnew,
DeploymentUpdates: updates,
EvalID: eval.ID,
}
require := require.New(t)
assert := assert.New(t)
// Apply the plan
future, err := s1.applyPlan(plan, planRes, snap)
require.NoError(err)
// Verify our optimistic snapshot is updated
ws := memdb.NewWatchSet()
allocOut, err := snap.AllocByID(ws, alloc.ID)
require.NoError(err)
require.NotNil(allocOut)
deploymentOut, err := snap.DeploymentByID(ws, plan.Deployment.ID)
require.NoError(err)
require.NotNil(deploymentOut)
// Check plan does apply cleanly
index, err := planWaitFuture(future)
require.NoError(err)
assert.NotEqual(0, index)
// Lookup the allocation
fsmState := s1.fsm.State()
allocOut, err = fsmState.AllocByID(ws, alloc.ID)
require.NoError(err)
require.NotNil(allocOut)
assert.True(allocOut.CreateTime > 0)
assert.True(allocOut.ModifyTime > 0)
assert.Equal(allocOut.CreateTime, allocOut.ModifyTime)
// Verify stopped alloc diff applied cleanly
updatedStoppedAlloc, err := fsmState.AllocByID(ws, stoppedAlloc.ID)
require.NoError(err)
require.NotNil(updatedStoppedAlloc)
assert.True(updatedStoppedAlloc.ModifyTime > timestampBeforeCommit)
assert.Equal(updatedStoppedAlloc.DesiredDescription, stoppedAllocDiff.DesiredDescription)
assert.Equal(updatedStoppedAlloc.ClientStatus, stoppedAllocDiff.ClientStatus)
assert.Equal(updatedStoppedAlloc.DesiredStatus, structs.AllocDesiredStatusStop)
// Verify preempted alloc diff applied cleanly
updatedPreemptedAlloc, err := fsmState.AllocByID(ws, preemptedAlloc.ID)
require.NoError(err)
require.NotNil(updatedPreemptedAlloc)
assert.True(updatedPreemptedAlloc.ModifyTime > timestampBeforeCommit)
assert.Equal(updatedPreemptedAlloc.DesiredDescription,
"Preempted by alloc ID "+preemptedAllocDiff.PreemptedByAllocation)
assert.Equal(updatedPreemptedAlloc.DesiredStatus, structs.AllocDesiredStatusEvict)
// Lookup the new deployment
dout, err := fsmState.DeploymentByID(ws, plan.Deployment.ID)
require.NoError(err)
require.NotNil(dout)
// Lookup the updated deployment
dout2, err := fsmState.DeploymentByID(ws, oldDeployment.ID)
require.NoError(err)
require.NotNil(dout2)
assert.Equal(desiredStatus, dout2.Status)
assert.Equal(desiredStatusDescription, dout2.StatusDescription)
// Lookup updated eval
evalOut, err := fsmState.EvalByID(ws, eval.ID)
require.NoError(err)
require.NotNil(evalOut)
assert.Equal(index, evalOut.ModifyIndex)
}
func TestPlanApply_EvalPlan_Simple(t *testing.T) {
t.Parallel()
state := testStateStore(t)

View file

@ -0,0 +1,66 @@
package nomad
import (
"bytes"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/ugorji/go/codec"
)
// This test compares the size of the normalized + OmitEmpty raft plan log entry
// with the earlier denormalized log.
//
// Whenever this test is changed, care should be taken to ensure the older msgpack size
// is recalculated when new fields are introduced in ApplyPlanResultsRequest
func TestPlanNormalize(t *testing.T) {
// This size was calculated using the older ApplyPlanResultsRequest format, in which allocations
// didn't use OmitEmpty and only the job was normalized in the stopped and preempted allocs.
// The newer format uses OmitEmpty and uses a minimal set of fields for the diff of the
// stopped and preempted allocs. The file for the older format hasn't been checked in, because
// it's not a good idea to check-in a 20mb file to the git repo.
unoptimizedLogSize := 19460168
numUpdatedAllocs := 10000
numStoppedAllocs := 8000
numPreemptedAllocs := 2000
mockAlloc := mock.Alloc()
mockAlloc.Job = nil
mockUpdatedAllocSlice := make([]*structs.Allocation, numUpdatedAllocs)
for i := 0; i < numUpdatedAllocs; i++ {
mockUpdatedAllocSlice = append(mockUpdatedAllocSlice, mockAlloc)
}
now := time.Now().UTC().UnixNano()
mockStoppedAllocSlice := make([]*structs.AllocationDiff, numStoppedAllocs)
for i := 0; i < numStoppedAllocs; i++ {
mockStoppedAllocSlice = append(mockStoppedAllocSlice, normalizeStoppedAlloc(mockAlloc, now))
}
mockPreemptionAllocSlice := make([]*structs.AllocationDiff, numPreemptedAllocs)
for i := 0; i < numPreemptedAllocs; i++ {
mockPreemptionAllocSlice = append(mockPreemptionAllocSlice, normalizePreemptedAlloc(mockAlloc, now))
}
// Create a plan result
applyPlanLogEntry := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
AllocsUpdated: mockUpdatedAllocSlice,
AllocsStopped: mockStoppedAllocSlice,
},
AllocsPreempted: mockPreemptionAllocSlice,
}
handle := structs.MsgpackHandle
var buf bytes.Buffer
if err := codec.NewEncoder(&buf, handle).Encode(applyPlanLogEntry); err != nil {
t.Fatalf("Encoding failed: %v", err)
}
optimizedLogSize := buf.Len()
assert.True(t, float64(optimizedLogSize)/float64(unoptimizedLogSize) < 0.62)
}

View file

@ -170,6 +170,27 @@ RUN_QUERY:
// UpsertPlanResults is used to upsert the results of a plan.
func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanResultsRequest) error {
snapshot, err := s.Snapshot()
if err != nil {
return err
}
allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped, results.Job)
if err != nil {
return err
}
allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted, results.Job)
if err != nil {
return err
}
// COMPAT 0.11: Remove this denormalization when NodePreemptions is removed
results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions, results.Job)
if err != nil {
return err
}
txn := s.db.Txn(true)
defer txn.Abort()
@ -185,34 +206,6 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn)
}
// Attach the job to all the allocations. It is pulled out in the payload to
// avoid the redundancy of encoding, but should be denormalized prior to
// being inserted into MemDB.
structs.DenormalizeAllocationJobs(results.Job, results.Alloc)
// COMPAT(0.11): Remove in 0.11
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
for _, alloc := range results.Alloc {
if alloc.Resources != nil {
continue
}
alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}
// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
}
// Upsert the allocations
if err := s.upsertAllocsImpl(index, results.Alloc, txn); err != nil {
return err
}
// COMPAT: Nomad versions before 0.7.1 did not include the eval ID when
// applying the plan. Thus while we are upgrading, we ignore updating the
// modify index of evaluations from older plans.
@ -223,35 +216,33 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
}
}
// Prepare preempted allocs in the plan results for update
var preemptedAllocs []*structs.Allocation
for _, preemptedAlloc := range results.NodePreemptions {
// Look for existing alloc
existing, err := txn.First("allocs", "id", preemptedAlloc.ID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}
// Nothing to do if this does not exist
if existing == nil {
continue
}
exist := existing.(*structs.Allocation)
// Copy everything from the existing allocation
copyAlloc := exist.Copy()
// Only update the fields set by the scheduler
copyAlloc.DesiredStatus = preemptedAlloc.DesiredStatus
copyAlloc.PreemptedByAllocation = preemptedAlloc.PreemptedByAllocation
copyAlloc.DesiredDescription = preemptedAlloc.DesiredDescription
copyAlloc.ModifyTime = preemptedAlloc.ModifyTime
preemptedAllocs = append(preemptedAllocs, copyAlloc)
numAllocs := 0
if len(results.Alloc) > 0 || len(results.NodePreemptions) > 0 {
// COMPAT 0.11: This branch will be removed, when Alloc is removed
// Attach the job to all the allocations. It is pulled out in the payload to
// avoid the redundancy of encoding, but should be denormalized prior to
// being inserted into MemDB.
addComputedAllocAttrs(results.Alloc, results.Job)
numAllocs = len(results.Alloc) + len(results.NodePreemptions)
} else {
// Attach the job to all the allocations. It is pulled out in the payload to
// avoid the redundancy of encoding, but should be denormalized prior to
// being inserted into MemDB.
addComputedAllocAttrs(results.AllocsUpdated, results.Job)
numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted)
}
// Upsert the preempted allocations
if err := s.upsertAllocsImpl(index, preemptedAllocs, txn); err != nil {
allocsToUpsert := make([]*structs.Allocation, 0, numAllocs)
// COMPAT 0.11: Both these appends should be removed when Alloc and NodePreemptions are removed
allocsToUpsert = append(allocsToUpsert, results.Alloc...)
allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...)
allocsToUpsert = append(allocsToUpsert, allocsStopped...)
allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...)
allocsToUpsert = append(allocsToUpsert, allocsPreempted...)
if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil {
return err
}
@ -266,6 +257,30 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
return nil
}
// addComputedAllocAttrs adds the computed/derived attributes to the allocation.
// This method is used when an allocation is being denormalized.
func addComputedAllocAttrs(allocs []*structs.Allocation, job *structs.Job) {
structs.DenormalizeAllocationJobs(job, allocs)
// COMPAT(0.11): Remove in 0.11
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
for _, alloc := range allocs {
if alloc.Resources != nil {
continue
}
alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}
// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
}
}
// upsertDeploymentUpdates updates the deployments given the passed status
// updates.
func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error {
@ -4100,6 +4115,83 @@ type StateSnapshot struct {
StateStore
}
// DenormalizeAllocationsMap takes in a map of nodes to allocations, and queries the
// Allocation for each of the Allocation diffs and merges the updated attributes with
// the existing Allocation, and attaches the Job provided
func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation, job *structs.Job) error {
for nodeID, allocs := range nodeAllocations {
denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs, job)
if err != nil {
return err
}
nodeAllocations[nodeID] = denormalizedAllocs
}
return nil
}
// DenormalizeAllocationSlice queries the Allocation for each allocation diff
// represented as an Allocation and merges the updated attributes with the existing
// Allocation, and attaches the Job provided.
func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation, job *structs.Job) ([]*structs.Allocation, error) {
allocDiffs := make([]*structs.AllocationDiff, len(allocs))
for i, alloc := range allocs {
allocDiffs[i] = alloc.AllocationDiff()
}
return s.DenormalizeAllocationDiffSlice(allocDiffs, job)
}
// DenormalizeAllocationDiffSlice queries the Allocation for each AllocationDiff and merges
// the updated attributes with the existing Allocation, and attaches the Job provided
func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff, job *structs.Job) ([]*structs.Allocation, error) {
// Output index for denormalized Allocations
j := 0
denormalizedAllocs := make([]*structs.Allocation, len(allocDiffs))
for _, allocDiff := range allocDiffs {
alloc, err := s.AllocByID(nil, allocDiff.ID)
if err != nil {
return nil, fmt.Errorf("alloc lookup failed: %v", err)
}
if alloc == nil {
return nil, fmt.Errorf("alloc %v doesn't exist", allocDiff.ID)
}
// Merge the updates to the Allocation
allocCopy := alloc.CopySkipJob()
allocCopy.Job = job
if allocDiff.PreemptedByAllocation != "" {
// If alloc is a preemption
allocCopy.PreemptedByAllocation = allocDiff.PreemptedByAllocation
allocCopy.DesiredDescription = getPreemptedAllocDesiredDescription(allocDiff.PreemptedByAllocation)
allocCopy.DesiredStatus = structs.AllocDesiredStatusEvict
} else {
// If alloc is a stopped alloc
allocCopy.DesiredDescription = allocDiff.DesiredDescription
allocCopy.DesiredStatus = structs.AllocDesiredStatusStop
if allocDiff.ClientStatus != "" {
allocCopy.ClientStatus = allocDiff.ClientStatus
}
}
if allocDiff.ModifyTime != 0 {
allocCopy.ModifyTime = allocDiff.ModifyTime
}
// Update the allocDiff in the slice to equal the denormalized alloc
denormalizedAllocs[j] = allocCopy
j++
}
// Retain only the denormalized Allocations in the slice
denormalizedAllocs = denormalizedAllocs[:j]
return denormalizedAllocs, nil
}
func getPreemptedAllocDesiredDescription(PreemptedByAllocID string) string {
return fmt.Sprintf("Preempted by alloc ID %v", PreemptedByAllocID)
}
// StateRestore is used to optimize the performance when
// restoring state by only using a single large transaction
// instead of thousands of sub transactions

View file

@ -9,7 +9,7 @@ import (
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
@ -88,6 +88,7 @@ func TestStateStore_Blocking_MinQuery(t *testing.T) {
}
}
// COMPAT 0.11: Uses AllocUpdateRequest.Alloc
// This test checks that:
// 1) The job is denormalized
// 2) Allocations are created
@ -140,6 +141,86 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing
assert.EqualValues(1000, evalOut.ModifyIndex)
}
// This test checks that:
// 1) The job is denormalized
// 2) Allocations are denormalized and updated with the diff
func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
job := alloc.Job
alloc.Job = nil
stoppedAlloc := mock.Alloc()
stoppedAlloc.Job = job
stoppedAllocDiff := &structs.AllocationDiff{
ID: stoppedAlloc.ID,
DesiredDescription: "desired desc",
ClientStatus: structs.AllocClientStatusLost,
}
preemptedAlloc := mock.Alloc()
preemptedAlloc.Job = job
preemptedAllocDiff := &structs.AllocationDiff{
ID: preemptedAlloc.ID,
PreemptedByAllocation: alloc.ID,
}
require := require.New(t)
require.NoError(state.UpsertAllocs(900, []*structs.Allocation{stoppedAlloc, preemptedAlloc}))
require.NoError(state.UpsertJob(999, job))
eval := mock.Eval()
eval.JobID = job.ID
// Create an eval
require.NoError(state.UpsertEvals(1, []*structs.Evaluation{eval}))
// Create a plan result
res := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
AllocsUpdated: []*structs.Allocation{alloc},
AllocsStopped: []*structs.AllocationDiff{stoppedAllocDiff},
Job: job,
},
EvalID: eval.ID,
AllocsPreempted: []*structs.AllocationDiff{preemptedAllocDiff},
}
assert := assert.New(t)
planModifyIndex := uint64(1000)
err := state.UpsertPlanResults(planModifyIndex, &res)
require.NoError(err)
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
require.NoError(err)
assert.Equal(alloc, out)
updatedStoppedAlloc, err := state.AllocByID(ws, stoppedAlloc.ID)
require.NoError(err)
assert.Equal(stoppedAllocDiff.DesiredDescription, updatedStoppedAlloc.DesiredDescription)
assert.Equal(structs.AllocDesiredStatusStop, updatedStoppedAlloc.DesiredStatus)
assert.Equal(stoppedAllocDiff.ClientStatus, updatedStoppedAlloc.ClientStatus)
assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex)
assert.Equal(planModifyIndex, updatedStoppedAlloc.AllocModifyIndex)
updatedPreemptedAlloc, err := state.AllocByID(ws, preemptedAlloc.ID)
require.NoError(err)
assert.Equal(structs.AllocDesiredStatusEvict, updatedPreemptedAlloc.DesiredStatus)
assert.Equal(preemptedAllocDiff.PreemptedByAllocation, updatedPreemptedAlloc.PreemptedByAllocation)
assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex)
assert.Equal(planModifyIndex, updatedPreemptedAlloc.AllocModifyIndex)
index, err := state.Index("allocs")
require.NoError(err)
assert.EqualValues(planModifyIndex, index)
require.False(watchFired(ws))
evalOut, err := state.EvalByID(ws, eval.ID)
require.NoError(err)
require.NotNil(evalOut)
assert.EqualValues(planModifyIndex, evalOut.ModifyIndex)
}
// This test checks that the deployment is created and allocations count towards
// the deployment
func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) {
@ -271,11 +352,9 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) {
require.NoError(err)
minimalPreemptedAlloc := &structs.Allocation{
ID: preemptedAlloc.ID,
Namespace: preemptedAlloc.Namespace,
DesiredStatus: structs.AllocDesiredStatusEvict,
ModifyTime: time.Now().Unix(),
DesiredDescription: fmt.Sprintf("Preempted by allocation %v", alloc.ID),
ID: preemptedAlloc.ID,
PreemptedByAllocation: alloc.ID,
ModifyTime: time.Now().Unix(),
}
// Create eval for preempted job
@ -316,7 +395,7 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) {
preempted, err := state.AllocByID(ws, preemptedAlloc.ID)
require.NoError(err)
require.Equal(preempted.DesiredStatus, structs.AllocDesiredStatusEvict)
require.Equal(preempted.DesiredDescription, fmt.Sprintf("Preempted by allocation %v", alloc.ID))
require.Equal(preempted.DesiredDescription, fmt.Sprintf("Preempted by alloc ID %v", alloc.ID))
// Verify eval for preempted job
preemptedJobEval, err := state.EvalByID(ws, eval2.ID)
@ -6975,6 +7054,31 @@ func TestStateStore_Abandon(t *testing.T) {
}
}
// Verifies that an error is returned when an allocation doesn't exist in the state store.
func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()
require := require.New(t)
// Insert job
err := state.UpsertJob(999, alloc.Job)
require.NoError(err)
allocDiffs := []*structs.AllocationDiff{
{
ID: alloc.ID,
},
}
snap, err := state.Snapshot()
require.NoError(err)
denormalizedAllocs, err := snap.DenormalizeAllocationDiffSlice(allocDiffs, alloc.Job)
require.EqualError(err, fmt.Sprintf("alloc %v doesn't exist", alloc.ID))
require.Nil(denormalizedAllocs)
}
// watchFired is a helper for unit tests that returns if the given watch set
// fired (it doesn't care which watch actually fired). This uses a fixed
// timeout since we already expect the event happened before calling this and

View file

@ -28,8 +28,8 @@ import (
"github.com/gorhill/cronexpr"
"github.com/hashicorp/consul/api"
hcodec "github.com/hashicorp/go-msgpack/codec"
multierror "github.com/hashicorp/go-multierror"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/args"
@ -660,10 +660,16 @@ type ApplyPlanResultsRequest struct {
// the evaluation itself being updated.
EvalID string
// COMPAT 0.11
// NodePreemptions is a slice of allocations from other lower priority jobs
// that are preempted. Preempted allocations are marked as evicted.
// Deprecated: Replaced with AllocsPreempted which contains only the diff
NodePreemptions []*Allocation
// AllocsPreempted is a slice of allocation diffs from other lower priority jobs
// that are preempted. Preempted allocations are marked as evicted.
AllocsPreempted []*AllocationDiff
// PreemptionEvals is a slice of follow up evals for jobs whose allocations
// have been preempted to place allocs in this plan
PreemptionEvals []*Evaluation
@ -673,9 +679,18 @@ type ApplyPlanResultsRequest struct {
// to cause evictions or to assign new allocations. Both can be done
// within a single transaction
type AllocUpdateRequest struct {
// COMPAT 0.11
// Alloc is the list of new allocations to assign
// Deprecated: Replaced with two separate slices, one containing stopped allocations
// and another containing updated allocations
Alloc []*Allocation
// Allocations to stop. Contains only the diff, not the entire allocation
AllocsStopped []*AllocationDiff
// New or updated allocations
AllocsUpdated []*Allocation
// Evals is the list of new evaluations to create
// Evals are valid only when used in the Raft RPC
Evals []*Evaluation
@ -7168,6 +7183,9 @@ const (
// Allocation is used to allocate the placement of a task group to a node.
type Allocation struct {
// msgpack omit empty fields during serialization
_struct bool `codec:",omitempty"` // nolint: structcheck
// ID of the allocation (UUID)
ID string
@ -7296,11 +7314,12 @@ func (a *Allocation) Index() uint {
return uint(num)
}
// Copy provides a copy of the allocation and deep copies the job
func (a *Allocation) Copy() *Allocation {
return a.copyImpl(true)
}
// Copy provides a copy of the allocation but doesn't deep copy the job
// CopySkipJob provides a copy of the allocation but doesn't deep copy the job
func (a *Allocation) CopySkipJob() *Allocation {
return a.copyImpl(false)
}
@ -7670,6 +7689,19 @@ func (a *Allocation) Stub() *AllocListStub {
}
}
// AllocationDiff converts an Allocation type to an AllocationDiff type
// If at any time, modification are made to AllocationDiff so that an
// Allocation can no longer be safely converted to AllocationDiff,
// this method should be changed accordingly.
func (a *Allocation) AllocationDiff() *AllocationDiff {
return (*AllocationDiff)(a)
}
// AllocationDiff is another named type for Allocation (to use the same fields),
// which is used to represent the delta for an Allocation. If you need a method
// defined on the al
type AllocationDiff Allocation
// AllocListStub is used to return a subset of alloc information
type AllocListStub struct {
ID string
@ -8037,6 +8069,9 @@ const (
// potentially taking action (allocation of work) or doing nothing if the state
// of the world does not require it.
type Evaluation struct {
// msgpack omit empty fields during serialization
_struct bool `codec:",omitempty"` // nolint: structcheck
// ID is a randomly generated UUID used for this evaluation. This
// is assigned upon the creation of the evaluation.
ID string
@ -8304,6 +8339,9 @@ func (e *Evaluation) CreateFailedFollowUpEval(wait time.Duration) *Evaluation {
// are submitted to the leader which verifies that resources have
// not been overcommitted before admitting the plan.
type Plan struct {
// msgpack omit empty fields during serialization
_struct bool `codec:",omitempty"` // nolint: structcheck
// EvalID is the evaluation ID this plan is associated with
EvalID string
@ -8355,9 +8393,9 @@ type Plan struct {
NodePreemptions map[string][]*Allocation
}
// AppendUpdate marks the allocation for eviction. The clientStatus of the
// AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the
// allocation may be optionally set by passing in a non-empty value.
func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clientStatus string) {
func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus string) {
newAlloc := new(Allocation)
*newAlloc = *alloc
@ -8373,7 +8411,7 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien
// Strip the resources as it can be rebuilt.
newAlloc.Resources = nil
newAlloc.DesiredStatus = desiredStatus
newAlloc.DesiredStatus = AllocDesiredStatusStop
newAlloc.DesiredDescription = desiredDesc
if clientStatus != "" {
@ -8387,12 +8425,12 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien
// AppendPreemptedAlloc is used to append an allocation that's being preempted to the plan.
// To minimize the size of the plan, this only sets a minimal set of fields in the allocation
func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preemptingAllocID string) {
func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, preemptingAllocID string) {
newAlloc := &Allocation{}
newAlloc.ID = alloc.ID
newAlloc.JobID = alloc.JobID
newAlloc.Namespace = alloc.Namespace
newAlloc.DesiredStatus = desiredStatus
newAlloc.DesiredStatus = AllocDesiredStatusEvict
newAlloc.PreemptedByAllocation = preemptingAllocID
desiredDesc := fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID)
@ -8445,6 +8483,29 @@ func (p *Plan) IsNoOp() bool {
len(p.DeploymentUpdates) == 0
}
// NormalizeAllocations normalizes allocations to remove fields that can
// be fetched from the MemDB instead of sending over the wire
func (p *Plan) NormalizeAllocations() {
for _, allocs := range p.NodeUpdate {
for i, alloc := range allocs {
allocs[i] = &Allocation{
ID: alloc.ID,
DesiredDescription: alloc.DesiredDescription,
ClientStatus: alloc.ClientStatus,
}
}
}
for _, allocs := range p.NodePreemptions {
for i, alloc := range allocs {
allocs[i] = &Allocation{
ID: alloc.ID,
PreemptedByAllocation: alloc.PreemptedByAllocation,
}
}
}
}
// PlanResult is the result of a plan submitted to the leader.
type PlanResult struct {
// NodeUpdate contains all the updates that were committed.

View file

@ -9,7 +9,7 @@ import (
"time"
"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
@ -2842,6 +2842,100 @@ func TestTaskArtifact_Validate_Checksum(t *testing.T) {
}
}
func TestPlan_NormalizeAllocations(t *testing.T) {
t.Parallel()
plan := &Plan{
NodeUpdate: make(map[string][]*Allocation),
NodePreemptions: make(map[string][]*Allocation),
}
stoppedAlloc := MockAlloc()
desiredDesc := "Desired desc"
plan.AppendStoppedAlloc(stoppedAlloc, desiredDesc, AllocClientStatusLost)
preemptedAlloc := MockAlloc()
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)
plan.NormalizeAllocations()
actualStoppedAlloc := plan.NodeUpdate[stoppedAlloc.NodeID][0]
expectedStoppedAlloc := &Allocation{
ID: stoppedAlloc.ID,
DesiredDescription: desiredDesc,
ClientStatus: AllocClientStatusLost,
}
assert.Equal(t, expectedStoppedAlloc, actualStoppedAlloc)
actualPreemptedAlloc := plan.NodePreemptions[preemptedAlloc.NodeID][0]
expectedPreemptedAlloc := &Allocation{
ID: preemptedAlloc.ID,
PreemptedByAllocation: preemptingAllocID,
}
assert.Equal(t, expectedPreemptedAlloc, actualPreemptedAlloc)
}
func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) {
t.Parallel()
plan := &Plan{
NodeUpdate: make(map[string][]*Allocation),
}
alloc := MockAlloc()
desiredDesc := "Desired desc"
plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost)
appendedAlloc := plan.NodeUpdate[alloc.NodeID][0]
expectedAlloc := new(Allocation)
*expectedAlloc = *alloc
expectedAlloc.DesiredDescription = desiredDesc
expectedAlloc.DesiredStatus = AllocDesiredStatusStop
expectedAlloc.ClientStatus = AllocClientStatusLost
expectedAlloc.Job = nil
assert.Equal(t, expectedAlloc, appendedAlloc)
assert.Equal(t, alloc.Job, plan.Job)
}
func TestPlan_AppendPreemptedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) {
t.Parallel()
plan := &Plan{
NodePreemptions: make(map[string][]*Allocation),
}
alloc := MockAlloc()
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(alloc, preemptingAllocID)
appendedAlloc := plan.NodePreemptions[alloc.NodeID][0]
expectedAlloc := &Allocation{
ID: alloc.ID,
PreemptedByAllocation: preemptingAllocID,
JobID: alloc.JobID,
Namespace: alloc.Namespace,
DesiredStatus: AllocDesiredStatusEvict,
DesiredDescription: fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID),
AllocatedResources: alloc.AllocatedResources,
TaskResources: alloc.TaskResources,
SharedResources: alloc.SharedResources,
}
assert.Equal(t, expectedAlloc, appendedAlloc)
}
func TestAllocation_MsgPackTags(t *testing.T) {
t.Parallel()
planType := reflect.TypeOf(Allocation{})
msgPackTags, _ := planType.FieldByName("_struct")
assert.Equal(t, msgPackTags.Tag, reflect.StructTag(`codec:",omitempty"`))
}
func TestEvaluation_MsgPackTags(t *testing.T) {
t.Parallel()
planType := reflect.TypeOf(Evaluation{})
msgPackTags, _ := planType.FieldByName("_struct")
assert.Equal(t, msgPackTags.Tag, reflect.StructTag(`codec:",omitempty"`))
}
func TestAllocation_Terminated(t *testing.T) {
type desiredState struct {
ClientStatus string

View file

@ -14,6 +14,11 @@ import (
"github.com/hashicorp/serf/serf"
)
// MinVersionPlanNormalization is the minimum version to support the
// normalization of Plan in SubmitPlan, and the denormalization raft log entry committed
// in ApplyPlanResultsRequest
var MinVersionPlanNormalization = version.Must(version.NewVersion("0.9.2"))
// ensurePath is used to make sure a path exists
func ensurePath(path string, dir bool) error {
if !dir {
@ -143,11 +148,12 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
return true, parts
}
// ServersMeetMinimumVersion returns whether the given alive servers are at least on the
// given Nomad version
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool {
// ServersMeetMinimumVersion returns whether the Nomad servers are at least on the
// given Nomad version. The checkFailedServers parameter specifies whether version
// for the failed servers should be verified.
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version, checkFailedServers bool) bool {
for _, member := range members {
if valid, parts := isNomadServer(member); valid && parts.Status == serf.StatusAlive {
if valid, parts := isNomadServer(member); valid && (parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) {
// Check if the versions match - version.LessThan will return true for
// 0.8.0-rc1 < 0.8.0, so we want to ignore the metadata
versionsMatch := slicesMatch(minVersion.Segments(), parts.Build.Segments())

View file

@ -86,23 +86,8 @@ func TestIsNomadServer(t *testing.T) {
}
}
func TestServersMeetMinimumVersion(t *testing.T) {
func TestServersMeetMinimumVersionExcludingFailed(t *testing.T) {
t.Parallel()
makeMember := func(version string) serf.Member {
return serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "nomad",
"region": "aws",
"dc": "east-aws",
"port": "10000",
"build": version,
"vsn": "1",
},
Status: serf.StatusAlive,
}
}
cases := []struct {
members []serf.Member
@ -112,7 +97,7 @@ func TestServersMeetMinimumVersion(t *testing.T) {
// One server, meets reqs
{
members: []serf.Member{
makeMember("0.7.5"),
makeMember("0.7.5", serf.StatusAlive),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
@ -120,7 +105,7 @@ func TestServersMeetMinimumVersion(t *testing.T) {
// One server in dev, meets reqs
{
members: []serf.Member{
makeMember("0.8.5-dev"),
makeMember("0.8.5-dev", serf.StatusAlive),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
@ -128,7 +113,7 @@ func TestServersMeetMinimumVersion(t *testing.T) {
// One server with meta, meets reqs
{
members: []serf.Member{
makeMember("0.7.5+ent"),
makeMember("0.7.5+ent", serf.StatusAlive),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
@ -136,16 +121,17 @@ func TestServersMeetMinimumVersion(t *testing.T) {
// One server, doesn't meet reqs
{
members: []serf.Member{
makeMember("0.7.5"),
makeMember("0.7.5", serf.StatusAlive),
},
ver: version.Must(version.NewVersion("0.8.0")),
expected: false,
},
// Multiple servers, meets req version
// Multiple servers, meets req version, includes failed that doesn't meet req
{
members: []serf.Member{
makeMember("0.7.5"),
makeMember("0.8.0"),
makeMember("0.7.5", serf.StatusAlive),
makeMember("0.8.0", serf.StatusAlive),
makeMember("0.7.0", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
@ -153,8 +139,8 @@ func TestServersMeetMinimumVersion(t *testing.T) {
// Multiple servers, doesn't meet req version
{
members: []serf.Member{
makeMember("0.7.5"),
makeMember("0.8.0"),
makeMember("0.7.5", serf.StatusAlive),
makeMember("0.8.0", serf.StatusAlive),
},
ver: version.Must(version.NewVersion("0.8.0")),
expected: false,
@ -162,13 +148,67 @@ func TestServersMeetMinimumVersion(t *testing.T) {
}
for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver)
result := ServersMeetMinimumVersion(tc.members, tc.ver, false)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
}
}
func TestServersMeetMinimumVersionIncludingFailed(t *testing.T) {
t.Parallel()
cases := []struct {
members []serf.Member
ver *version.Version
expected bool
}{
// Multiple servers, meets req version
{
members: []serf.Member{
makeMember("0.7.5", serf.StatusAlive),
makeMember("0.8.0", serf.StatusAlive),
makeMember("0.7.5", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
},
// Multiple servers, doesn't meet req version
{
members: []serf.Member{
makeMember("0.7.5", serf.StatusAlive),
makeMember("0.8.0", serf.StatusAlive),
makeMember("0.7.0", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: false,
},
}
for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, true)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
}
}
func makeMember(version string, status serf.MemberStatus) serf.Member {
return serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "nomad",
"region": "aws",
"dc": "east-aws",
"port": "10000",
"build": version,
"vsn": "1",
},
Status: status,
}
}
func TestShuffleStrings(t *testing.T) {
t.Parallel()
// Generate input

View file

@ -310,6 +310,12 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.
// Add the evaluation token to the plan
plan.EvalToken = w.evalToken
// Normalize stopped and preempted allocs before RPC
normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true)
if normalizePlan {
plan.NormalizeAllocations()
}
// Setup the request
req := structs.PlanRequest{
Plan: plan,

View file

@ -8,13 +8,14 @@ import (
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
)
type NoopScheduler struct {
@ -390,6 +391,57 @@ func TestWorker_SubmitPlan(t *testing.T) {
}
}
func TestWorker_SubmitPlanNormalizedAllocations(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.EnabledSchedulers = []string{structs.JobTypeService}
c.Build = "0.9.2"
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Register node
node := mock.Node()
testRegisterNode(t, s1, node)
job := mock.Job()
eval1 := mock.Eval()
eval1.JobID = job.ID
s1.fsm.State().UpsertJob(0, job)
s1.fsm.State().UpsertEvals(0, []*structs.Evaluation{eval1})
stoppedAlloc := mock.Alloc()
preemptedAlloc := mock.Alloc()
s1.fsm.State().UpsertAllocs(5, []*structs.Allocation{stoppedAlloc, preemptedAlloc})
// Create an allocation plan
plan := &structs.Plan{
Job: job,
EvalID: eval1.ID,
NodeUpdate: make(map[string][]*structs.Allocation),
NodePreemptions: make(map[string][]*structs.Allocation),
}
desiredDescription := "desired desc"
plan.AppendStoppedAlloc(stoppedAlloc, desiredDescription, structs.AllocClientStatusLost)
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)
// Attempt to submit a plan
w := &Worker{srv: s1, logger: s1.logger}
w.SubmitPlan(plan)
assert.Equal(t, &structs.Allocation{
ID: preemptedAlloc.ID,
PreemptedByAllocation: preemptingAllocID,
}, plan.NodePreemptions[preemptedAlloc.NodeID][0])
assert.Equal(t, &structs.Allocation{
ID: stoppedAlloc.ID,
DesiredDescription: desiredDescription,
ClientStatus: structs.AllocClientStatusLost,
}, plan.NodeUpdate[stoppedAlloc.NodeID][0])
}
func TestWorker_SubmitPlan_MissingNodeRefresh(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {

View file

@ -5,8 +5,8 @@ import (
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -366,7 +366,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
// Handle the stop
for _, stop := range results.stop {
s.plan.AppendUpdate(stop.alloc, structs.AllocDesiredStatusStop, stop.statusDescription, stop.clientStatus)
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus)
}
// Handle the in-place updates
@ -464,7 +464,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc()
prevAllocation := missing.PreviousAllocation()
if stopPrevAlloc {
s.plan.AppendUpdate(prevAllocation, structs.AllocDesiredStatusStop, stopPrevAllocDesc, "")
s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "")
}
// Compute penalty nodes for rescheduled allocs

View file

@ -4,7 +4,7 @@ import (
"fmt"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -210,18 +210,18 @@ func (s *SystemScheduler) computeJobAllocs() error {
// Add all the allocs to stop
for _, e := range diff.stop {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded, "")
s.plan.AppendStoppedAlloc(e.Alloc, allocNotNeeded, "")
}
// Add all the allocs to migrate
for _, e := range diff.migrate {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNodeTainted, "")
s.plan.AppendStoppedAlloc(e.Alloc, allocNodeTainted, "")
}
// Lost allocations should be transitioned to desired status stop and client
// status lost.
for _, e := range diff.lost {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocLost, structs.AllocClientStatusLost)
s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost)
}
// Attempt to do the upgrades in place
@ -351,7 +351,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
if option.PreemptedAllocs != nil {
var preemptedAllocIDs []string
for _, stop := range option.PreemptedAllocs {
s.plan.AppendPreemptedAlloc(stop, structs.AllocDesiredStatusEvict, alloc.ID)
s.plan.AppendPreemptedAlloc(stop, alloc.ID)
preemptedAllocIDs = append(preemptedAllocIDs, stop.ID)
if s.eval.AnnotatePlan && s.plan.Annotations != nil {

View file

@ -7,7 +7,7 @@ import (
testing "github.com/mitchellh/go-testing-interface"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@ -53,6 +53,8 @@ type Harness struct {
nextIndex uint64
nextIndexLock sync.Mutex
optimizePlan bool
}
// NewHarness is used to make a new testing harness
@ -101,42 +103,69 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
result.AllocIndex = index
// Flatten evicts and allocs
var allocs []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
allocs = append(allocs, updateList...)
}
for _, allocList := range plan.NodeAllocation {
allocs = append(allocs, allocList...)
}
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
now := time.Now().UTC().UnixNano()
for _, alloc := range allocs {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
}
// Set modify time for preempted allocs and flatten them
var preemptedAllocs []*structs.Allocation
for _, preemptions := range result.NodePreemptions {
for _, alloc := range preemptions {
alloc.ModifyTime = now
preemptedAllocs = append(preemptedAllocs, alloc)
}
allocsUpdated := make([]*structs.Allocation, 0, len(result.NodeAllocation))
for _, allocList := range plan.NodeAllocation {
allocsUpdated = append(allocsUpdated, allocList...)
}
updateCreateTimestamp(allocsUpdated, now)
// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: plan.Job,
Alloc: allocs,
Job: plan.Job,
},
Deployment: plan.Deployment,
DeploymentUpdates: plan.DeploymentUpdates,
EvalID: plan.EvalID,
NodePreemptions: preemptedAllocs,
}
if h.optimizePlan {
stoppedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
for _, updateList := range plan.NodeUpdate {
for _, stoppedAlloc := range updateList {
stoppedAllocDiffs = append(stoppedAllocDiffs, stoppedAlloc.AllocationDiff())
}
}
req.AllocsStopped = stoppedAllocDiffs
req.AllocsUpdated = allocsUpdated
preemptedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))
for _, preemptions := range plan.NodePreemptions {
for _, preemptedAlloc := range preemptions {
allocDiff := preemptedAlloc.AllocationDiff()
allocDiff.ModifyTime = now
preemptedAllocDiffs = append(preemptedAllocDiffs, allocDiff)
}
}
req.AllocsPreempted = preemptedAllocDiffs
} else {
// COMPAT 0.11: Handles unoptimized log format
var allocs []*structs.Allocation
allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate))
for _, updateList := range plan.NodeUpdate {
allocsStopped = append(allocsStopped, updateList...)
}
allocs = append(allocs, allocsStopped...)
allocs = append(allocs, allocsUpdated...)
updateCreateTimestamp(allocs, now)
req.Alloc = allocs
// Set modify time for preempted allocs and flatten them
var preemptedAllocs []*structs.Allocation
for _, preemptions := range result.NodePreemptions {
for _, alloc := range preemptions {
alloc.ModifyTime = now
preemptedAllocs = append(preemptedAllocs, alloc)
}
}
req.NodePreemptions = preemptedAllocs
}
// Apply the full plan
@ -144,6 +173,22 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
return result, nil, err
}
// OptimizePlan is a function used only for Harness to help set the optimzePlan field,
// since Harness doesn't have access to a Server object
func (h *Harness) OptimizePlan(optimize bool) {
h.optimizePlan = optimize
}
func updateCreateTimestamp(allocations []*structs.Allocation, now int64) {
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
for _, alloc := range allocations {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
}
}
func (h *Harness) UpdateEval(eval *structs.Evaluation) error {
// Ensure sequential plan application
h.planLock.Lock()

View file

@ -507,8 +507,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
// the current allocation is discounted when checking for feasibility.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
ctx.Plan().AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop,
allocInPlace, "")
ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "")
// Attempt to match the task group
option := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass selectOptions
@ -573,7 +572,7 @@ func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc stri
n := len(allocs)
for i := 0; i < n && i < *limit; i++ {
a := allocs[i]
ctx.Plan().AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc, "")
ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "")
diff.place = append(diff.place, a)
}
if n <= *limit {
@ -734,7 +733,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc
if alloc.DesiredStatus == structs.AllocDesiredStatusStop &&
(alloc.ClientStatus == structs.AllocClientStatusRunning ||
alloc.ClientStatus == structs.AllocClientStatusPending) {
plan.AppendUpdate(alloc, structs.AllocDesiredStatusStop, allocLost, structs.AllocClientStatusLost)
plan.AppendStoppedAlloc(alloc, allocLost, structs.AllocClientStatusLost)
}
}
}
@ -784,7 +783,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
// the current allocation is discounted when checking for feasibility.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
ctx.Plan().AppendUpdate(existing, structs.AllocDesiredStatusStop, allocInPlace, "")
ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "")
// Attempt to match the task group
option := stack.Select(newTG, nil) // This select only looks at one node so we don't pass selectOptions