remove deprecated `AllocUpdateRequestType` raft entry (#15285)
After Deployments were added in Nomad 0.6.0, the `AllocUpdateRequestType` raft log entry was no longer in use. Mark this as deprecated, remove the associated dead code, and remove references to the metrics it emits from the docs. We'll leave the entry itself just in case we encounter old raft logs that we need to be able to safely load.
This commit is contained in:
parent
845ff10281
commit
510eb435dc
39
nomad/fsm.go
39
nomad/fsm.go
|
@ -819,41 +819,10 @@ func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *nomadFSM) applyAllocUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
|
||||
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update"}, time.Now())
|
||||
var req structs.AllocUpdateRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
// Attach the job to all the allocations. It is pulled out in the
|
||||
// payload to avoid the redundancy of encoding, but should be denormalized
|
||||
// prior to being inserted into MemDB.
|
||||
structs.DenormalizeAllocationJobs(req.Job, req.Alloc)
|
||||
|
||||
for _, alloc := range req.Alloc {
|
||||
// COMPAT(0.11): Remove in 0.11
|
||||
// Calculate the total resources of allocations. It is pulled out in the
|
||||
// payload to avoid encoding something that can be computed, but should be
|
||||
// denormalized prior to being inserted into MemDB.
|
||||
if alloc.Resources == nil {
|
||||
alloc.Resources = new(structs.Resources)
|
||||
for _, task := range alloc.TaskResources {
|
||||
alloc.Resources.Add(task)
|
||||
}
|
||||
|
||||
// Add the shared resources
|
||||
alloc.Resources.Add(alloc.SharedResources)
|
||||
}
|
||||
|
||||
// Handle upgrade path
|
||||
alloc.Canonicalize()
|
||||
}
|
||||
|
||||
if err := n.state.UpsertAllocs(msgType, index, req.Alloc); err != nil {
|
||||
n.logger.Error("UpsertAllocs failed", "error", err)
|
||||
return err
|
||||
}
|
||||
// DEPRECATED: AllocUpdateRequestType was removed in Nomad 0.6.0 when we built
|
||||
// Deployments. This handler remains so that older raft logs can be read without
|
||||
// panicking.
|
||||
func (n *nomadFSM) applyAllocUpdate(_ structs.MessageType, _ []byte, _ uint64) interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,11 @@ import (
|
|||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/kr/pretty"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/helper/pointer"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
|
@ -20,10 +25,6 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/stream"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/kr/pretty"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type MockSink struct {
|
||||
|
@ -1184,256 +1185,6 @@ func TestFSM_DeleteEval(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFSM_UpsertAllocs(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
fsm := testFSM(t)
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store
|
||||
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
|
||||
req := structs.AllocUpdateRequest{
|
||||
Alloc: []*structs.Allocation{alloc},
|
||||
}
|
||||
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify we are registered
|
||||
ws := memdb.NewWatchSet()
|
||||
out, err := fsm.State().AllocByID(ws, alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
alloc.CreateIndex = out.CreateIndex
|
||||
alloc.ModifyIndex = out.ModifyIndex
|
||||
alloc.AllocModifyIndex = out.AllocModifyIndex
|
||||
if !reflect.DeepEqual(alloc, out) {
|
||||
t.Fatalf("bad: %#v %#v", alloc, out)
|
||||
}
|
||||
|
||||
evictAlloc := new(structs.Allocation)
|
||||
*evictAlloc = *alloc
|
||||
evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict
|
||||
req2 := structs.AllocUpdateRequest{
|
||||
Alloc: []*structs.Allocation{evictAlloc},
|
||||
}
|
||||
buf, err = structs.Encode(structs.AllocUpdateRequestType, req2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp = fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify we are evicted
|
||||
out, err = fsm.State().AllocByID(ws, alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out.DesiredStatus != structs.AllocDesiredStatusEvict {
|
||||
t.Fatalf("alloc found!")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_UpsertAllocs_SharedJob(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
fsm := testFSM(t)
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store
|
||||
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
|
||||
job := alloc.Job
|
||||
alloc.Job = nil
|
||||
req := structs.AllocUpdateRequest{
|
||||
Job: job,
|
||||
Alloc: []*structs.Allocation{alloc},
|
||||
}
|
||||
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify we are registered
|
||||
ws := memdb.NewWatchSet()
|
||||
out, err := fsm.State().AllocByID(ws, alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
alloc.CreateIndex = out.CreateIndex
|
||||
alloc.ModifyIndex = out.ModifyIndex
|
||||
alloc.AllocModifyIndex = out.AllocModifyIndex
|
||||
|
||||
// Job should be re-attached
|
||||
alloc.Job = job
|
||||
require.Equal(t, alloc, out)
|
||||
|
||||
// Ensure that the original job is used
|
||||
evictAlloc := new(structs.Allocation)
|
||||
*evictAlloc = *alloc
|
||||
job = mock.Job()
|
||||
job.Priority = 123
|
||||
|
||||
evictAlloc.Job = nil
|
||||
evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict
|
||||
req2 := structs.AllocUpdateRequest{
|
||||
Job: job,
|
||||
Alloc: []*structs.Allocation{evictAlloc},
|
||||
}
|
||||
buf, err = structs.Encode(structs.AllocUpdateRequestType, req2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp = fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify we are evicted
|
||||
out, err = fsm.State().AllocByID(ws, alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out.DesiredStatus != structs.AllocDesiredStatusEvict {
|
||||
t.Fatalf("alloc found!")
|
||||
}
|
||||
if out.Job == nil || out.Job.Priority == 123 {
|
||||
t.Fatalf("bad job")
|
||||
}
|
||||
}
|
||||
|
||||
// COMPAT(0.11): Remove in 0.11
|
||||
func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
fsm := testFSM(t)
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Resources = &structs.Resources{
|
||||
CPU: 500,
|
||||
MemoryMB: 256,
|
||||
DiskMB: 150,
|
||||
Networks: []*structs.NetworkResource{
|
||||
{
|
||||
Device: "eth0",
|
||||
IP: "192.168.0.100",
|
||||
ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}},
|
||||
MBits: 50,
|
||||
DynamicPorts: []structs.Port{{Label: "http"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
alloc.TaskResources = map[string]*structs.Resources{
|
||||
"web": {
|
||||
CPU: 500,
|
||||
MemoryMB: 256,
|
||||
Networks: []*structs.NetworkResource{
|
||||
{
|
||||
Device: "eth0",
|
||||
IP: "192.168.0.100",
|
||||
ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}},
|
||||
MBits: 50,
|
||||
DynamicPorts: []structs.Port{{Label: "http", Value: 9876}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
alloc.SharedResources = &structs.Resources{
|
||||
DiskMB: 150,
|
||||
}
|
||||
|
||||
// Need to remove mock dynamic port from alloc as it won't be computed
|
||||
// in this test
|
||||
alloc.TaskResources["web"].Networks[0].DynamicPorts[0].Value = 0
|
||||
|
||||
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
|
||||
job := alloc.Job
|
||||
origResources := alloc.Resources
|
||||
alloc.Resources = nil
|
||||
req := structs.AllocUpdateRequest{
|
||||
Job: job,
|
||||
Alloc: []*structs.Allocation{alloc},
|
||||
}
|
||||
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
}
|
||||
|
||||
// Verify we are registered
|
||||
ws := memdb.NewWatchSet()
|
||||
out, err := fsm.State().AllocByID(ws, alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
alloc.CreateIndex = out.CreateIndex
|
||||
alloc.ModifyIndex = out.ModifyIndex
|
||||
alloc.AllocModifyIndex = out.AllocModifyIndex
|
||||
|
||||
// Resources should be recomputed
|
||||
origResources.DiskMB = alloc.Job.TaskGroups[0].EphemeralDisk.SizeMB
|
||||
origResources.MemoryMaxMB = origResources.MemoryMB
|
||||
alloc.Resources = origResources
|
||||
if !reflect.DeepEqual(alloc, out) {
|
||||
t.Fatalf("not equal: % #v", pretty.Diff(alloc, out))
|
||||
}
|
||||
}
|
||||
|
||||
// TestFSM_UpsertAllocs_Canonicalize asserts that allocations are Canonicalized
|
||||
// to handle logs emited by servers running old versions
|
||||
func TestFSM_UpsertAllocs_Canonicalize(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
fsm := testFSM(t)
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store
|
||||
alloc.AllocatedResources = nil
|
||||
|
||||
// pre-assert that our mock populates old field
|
||||
require.NotEmpty(t, alloc.TaskResources)
|
||||
|
||||
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
|
||||
req := structs.AllocUpdateRequest{
|
||||
Alloc: []*structs.Allocation{alloc},
|
||||
}
|
||||
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
require.Nil(t, resp)
|
||||
|
||||
// Verify we are registered
|
||||
ws := memdb.NewWatchSet()
|
||||
out, err := fsm.State().AllocByID(ws, alloc.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NotNil(t, out.AllocatedResources)
|
||||
require.Contains(t, out.AllocatedResources.Tasks, "web")
|
||||
|
||||
expected := alloc.Copy()
|
||||
expected.Canonicalize()
|
||||
expected.CreateIndex = out.CreateIndex
|
||||
expected.ModifyIndex = out.ModifyIndex
|
||||
expected.AllocModifyIndex = out.AllocModifyIndex
|
||||
require.Equal(t, expected, out)
|
||||
}
|
||||
|
||||
func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
fsm := testFSM(t)
|
||||
|
|
|
@ -12,7 +12,6 @@ var MsgTypeEvents = map[structs.MessageType]string{
|
|||
structs.EvalUpdateRequestType: structs.TypeEvalUpdated,
|
||||
structs.AllocClientUpdateRequestType: structs.TypeAllocationUpdated,
|
||||
structs.JobRegisterRequestType: structs.TypeJobRegistered,
|
||||
structs.AllocUpdateRequestType: structs.TypeAllocationUpdated,
|
||||
structs.NodeUpdateStatusRequestType: structs.TypeNodeEvent,
|
||||
structs.JobDeregisterRequestType: structs.TypeJobDeregistered,
|
||||
structs.JobBatchDeregisterRequestType: structs.TypeJobBatchDeregistered,
|
||||
|
|
|
@ -639,10 +639,6 @@ func TestEventsFromChanges_AllocClientUpdateRequestType(t *testing.T) {
|
|||
t.SkipNow()
|
||||
}
|
||||
|
||||
func TestEventsFromChanges_AllocUpdateRequestType(t *testing.T) {
|
||||
t.SkipNow()
|
||||
}
|
||||
|
||||
func TestEventsFromChanges_JobDeregisterRequestType(t *testing.T) {
|
||||
t.SkipNow()
|
||||
}
|
||||
|
|
|
@ -322,7 +322,6 @@ those listed in [Key Metrics](#key-metrics) above.
|
|||
| `nomad.nomad.file_system.stream` | Time elapsed to establish `FileSystem.Stream` RPC | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.fsm.alloc_client_update` | Time elapsed to apply `AllocClientUpdate` raft entry | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.fsm.alloc_update_desired_transition` | Time elapsed to apply `AllocUpdateDesiredTransition` raft entry | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.fsm.alloc_update` | Time elapsed to apply `AllocUpdate` raft entry | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.fsm.apply_acl_policy_delete` | Time elapsed to apply `ApplyACLPolicyDelete` raft entry | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.fsm.apply_acl_policy_upsert` | Time elapsed to apply `ApplyACLPolicyUpsert` raft entry | Nanoseconds | Summary | host |
|
||||
| `nomad.nomad.fsm.apply_acl_token_bootstrap` | Time elapsed to apply `ApplyACLTokenBootstrap` raft entry | Nanoseconds | Summary | host |
|
||||
|
|
Loading…
Reference in New Issue