scheduler: annotate tasksUpdated with reason and purge DeepEquals (#16421)

* scheduler: annotate tasksUpdated with reason and purge DeepEquals

* cr: move opaque into helper

* cr: swap affinity/spread hashing for slice equal

* contributing: update checklist-jobspec with notes about struct methods

* cr: add more cases to wait config equal method

* cr: use reflect when comparing envoy config blocks

* cl: add cl
This commit is contained in:
Seth Hoenig 2023-03-14 09:46:00 -05:00 committed by GitHub
parent 6a7e22d546
commit ed7177de76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1165 additions and 260 deletions

3
.changelog/16421.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
scheduler: remove most uses of reflection for task comparisons
```

View File

@ -8,8 +8,11 @@
* New fields should be added to existing Canonicalize, Copy methods
* Test the structs/fields via methods mentioned above
* [ ] Add structs/fields to `nomad/structs` package
* `structs/` structs usually have Copy, Equals, and Validate methods
* Validation happens in this package and _must_ be implemented
* `structs/` structs usually have Copy, Equal, and Validate methods
* `Validate` methods in this package _must_ be implemented
* `Equal` methods are used when comparing one job to another (e.g. did this thing get updated?)
* `Copy` methods ensure modifications do not modify the copy of a job in the state store
* Use `slices.CloneFunc` and `maps.CloneFunc` to ensure creation of deep copies
* Note that analogous struct field names should match with `api/` package
* Test the structs/fields via methods mentioned above
* Implement and test other logical methods

48
helper/opaque.go Normal file
View File

@ -0,0 +1,48 @@
package helper
import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/exp/maps"
)
var (
cmpOptIgnoreUnexported = ignoreUnexportedAlways()
cmpOptNilIsEmpty = cmpopts.EquateEmpty()
cmpOptIgnore = cmp.Ignore()
)
// ignoreUnexportedAlways creates a cmp.Option filter that will ignore unexported
// fields of on any/all types. It is a derivative of go-cmp.IgnoreUnexported,
// here we do not require specifying individual types.
//
// reference: https://github.com/google/go-cmp/blob/master/cmp/cmpopts/ignore.go#L110
func ignoreUnexportedAlways() cmp.Option {
return cmp.FilterPath(
func(p cmp.Path) bool {
sf, ok := p.Index(-1).(cmp.StructField)
if !ok {
return false
}
c := sf.Name()[0]
return c < 'A' || c > 'Z'
},
cmpOptIgnore,
)
}
// OpaqueMapsEqual compare maps[<comparable>]<any> for equality, but safely by
// using the cmp package and ignoring un-exported types, and by treating nil/empty
// slices and maps as equal.
//
// This is intended as a substitute for reflect.DeepEqual in the case of "opaque maps",
// e.g. `map[comparable]any` - such as the case for Task Driver config or Envoy proxy
// pass-through configuration.
func OpaqueMapsEqual[M ~map[K]V, K comparable, V any](m1, m2 M) bool {
return maps.EqualFunc(m1, m2, func(a, b V) bool {
return cmp.Equal(a, b,
cmpOptIgnoreUnexported, // ignore all unexported fields
cmpOptNilIsEmpty, // treat nil/empty slices as equal
)
})
}

88
helper/opaque_test.go Normal file
View File

@ -0,0 +1,88 @@
package helper
import (
"testing"
"github.com/hashicorp/nomad/ci"
"github.com/shoenig/test/must"
)
func Test_OpaqueMapsEqual(t *testing.T) {
ci.Parallel(t)
type public struct {
F int
}
type private struct {
g int
}
type mix struct {
F int
g int
}
cases := []struct {
name string
a, b map[string]any
exp bool
}{{
name: "both nil",
a: nil,
b: nil,
exp: true,
}, {
name: "empty and nil",
a: nil,
b: make(map[string]any),
exp: true,
}, {
name: "same strings",
a: map[string]any{"a": "A"},
b: map[string]any{"a": "A"},
exp: true,
}, {
name: "same public struct",
a: map[string]any{"a": &public{F: 42}},
b: map[string]any{"a": &public{F: 42}},
exp: true,
}, {
name: "different public struct",
a: map[string]any{"a": &public{F: 42}},
b: map[string]any{"a": &public{F: 10}},
exp: false,
}, {
name: "different private struct",
a: map[string]any{"a": &private{g: 42}},
b: map[string]any{"a": &private{g: 10}},
exp: true, // private fields not compared
}, {
name: "mix same public different private",
a: map[string]any{"a": &mix{F: 42, g: 1}},
b: map[string]any{"a": &mix{F: 42, g: 2}},
exp: true, // private fields not compared
}, {
name: "mix different public same private",
a: map[string]any{"a": &mix{F: 42, g: 1}},
b: map[string]any{"a": &mix{F: 10, g: 1}},
exp: false,
}, {
name: "nil vs empty slice values",
a: map[string]any{"key": []string(nil)},
b: map[string]any{"key": make([]string, 0)},
exp: true,
}, {
name: "nil vs empty map values",
a: map[string]any{"key": map[int]int(nil)},
b: map[string]any{"key": make(map[int]int, 0)},
exp: true,
}}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
result := OpaqueMapsEqual(tc.a, tc.b)
must.Eq(t, tc.exp, result)
})
}
}

View File

@ -80,6 +80,25 @@ type TaskCSIPluginConfig struct {
HealthTimeout time.Duration `mapstructure:"health_timeout" hcl:"health_timeout,optional"`
}
func (t *TaskCSIPluginConfig) Equal(o *TaskCSIPluginConfig) bool {
if t == nil || o == nil {
return t == o
}
switch {
case t.ID != o.ID:
return false
case t.Type != o.Type:
return false
case t.MountDir != o.MountDir:
return false
case t.StagePublishBaseDir != o.StagePublishBaseDir:
return false
case t.HealthTimeout != o.HealthTimeout:
return false
}
return true
}
func (t *TaskCSIPluginConfig) Copy() *TaskCSIPluginConfig {
if t == nil {
return nil

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/hashicorp/nomad/ci"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
@ -1033,3 +1034,33 @@ func TestDeleteNodeForType_Monolith_NilNode(t *testing.T) {
_, ok = plug.Controllers["foo"]
require.False(t, ok)
}
func TestTaskCSIPluginConfig_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*TaskCSIPluginConfig](t, nil, nil)
must.NotEqual[*TaskCSIPluginConfig](t, nil, new(TaskCSIPluginConfig))
must.StructEqual(t, &TaskCSIPluginConfig{
ID: "abc123",
Type: CSIPluginTypeMonolith,
MountDir: "/opt/csi/mount",
StagePublishBaseDir: "/base",
HealthTimeout: 42 * time.Second,
}, []must.Tweak[*TaskCSIPluginConfig]{{
Field: "ID",
Apply: func(c *TaskCSIPluginConfig) { c.ID = "def345" },
}, {
Field: "Type",
Apply: func(c *TaskCSIPluginConfig) { c.Type = CSIPluginTypeNode },
}, {
Field: "MountDir",
Apply: func(c *TaskCSIPluginConfig) { c.MountDir = "/csi" },
}, {
Field: "StagePublishBaseDir",
Apply: func(c *TaskCSIPluginConfig) { c.StagePublishBaseDir = "/opt/base" },
}, {
Field: "HealthTimeout",
Apply: func(c *TaskCSIPluginConfig) { c.HealthTimeout = 1 * time.Second },
}})
}

View File

@ -1201,8 +1201,8 @@ func (t *SidecarTask) Equal(o *SidecarTask) bool {
return false
}
// config compare
if !opaqueMapsEqual(t.Config, o.Config) {
// task config, use opaque maps equal
if !helper.OpaqueMapsEqual(t.Config, o.Config) {
return false
}
@ -1378,15 +1378,6 @@ func (p *ConsulProxy) Copy() *ConsulProxy {
}
}
// opaqueMapsEqual compares map[string]interface{} commonly used for opaque
// config blocks. Interprets nil and {} as the same.
func opaqueMapsEqual(a, b map[string]interface{}) bool {
if len(a) == 0 && len(b) == 0 {
return true
}
return reflect.DeepEqual(a, b)
}
// Equal returns true if the structs are recursively equal.
func (p *ConsulProxy) Equal(o *ConsulProxy) bool {
if p == nil || o == nil {
@ -1409,7 +1400,8 @@ func (p *ConsulProxy) Equal(o *ConsulProxy) bool {
return false
}
if !opaqueMapsEqual(p.Config, o.Config) {
// envoy config, use reflect
if !reflect.DeepEqual(p.Config, o.Config) {
return false
}
@ -1504,7 +1496,8 @@ func (u *ConsulUpstream) Equal(o *ConsulUpstream) bool {
return false
case !u.MeshGateway.Equal(o.MeshGateway):
return false
case !opaqueMapsEqual(u.Config, o.Config):
case !reflect.DeepEqual(u.Config, o.Config):
// envoy config, use reflect
return false
}
return true
@ -1790,7 +1783,8 @@ func (p *ConsulGatewayProxy) Equal(o *ConsulGatewayProxy) bool {
return false
}
if !opaqueMapsEqual(p.Config, o.Config) {
// envoy config, use reflect
if !reflect.DeepEqual(p.Config, o.Config) {
return false
}

View File

@ -2703,8 +2703,40 @@ type AllocatedPortMapping struct {
HostIP string
}
func (m *AllocatedPortMapping) Copy() *AllocatedPortMapping {
return &AllocatedPortMapping{
Label: m.Label,
Value: m.Value,
To: m.To,
HostIP: m.HostIP,
}
}
func (m *AllocatedPortMapping) Equal(o *AllocatedPortMapping) bool {
if m == nil || o == nil {
return m == o
}
switch {
case m.Label != o.Label:
return false
case m.Value != o.Value:
return false
case m.To != o.To:
return false
case m.HostIP != o.HostIP:
return false
}
return true
}
type AllocatedPorts []AllocatedPortMapping
func (p AllocatedPorts) Equal(o AllocatedPorts) bool {
return slices.EqualFunc(p, o, func(a, b AllocatedPortMapping) bool {
return a.Equal(&b)
})
}
func (p AllocatedPorts) Get(label string) (AllocatedPortMapping, bool) {
for _, port := range p {
if port.Label == label {
@ -2740,18 +2772,32 @@ type DNSConfig struct {
Options []string
}
func (d *DNSConfig) Equal(o *DNSConfig) bool {
if d == nil || o == nil {
return d == o
}
switch {
case !slices.Equal(d.Servers, o.Servers):
return false
case !slices.Equal(d.Searches, o.Searches):
return false
case !slices.Equal(d.Options, o.Options):
return false
}
return true
}
func (d *DNSConfig) Copy() *DNSConfig {
if d == nil {
return nil
}
newD := new(DNSConfig)
newD.Servers = make([]string, len(d.Servers))
copy(newD.Servers, d.Servers)
newD.Searches = make([]string, len(d.Searches))
copy(newD.Searches, d.Searches)
newD.Options = make([]string, len(d.Options))
copy(newD.Options, d.Options)
return newD
return &DNSConfig{
Servers: slices.Clone(d.Servers),
Searches: slices.Clone(d.Searches),
Options: slices.Clone(d.Options),
}
}
// NetworkResource is used to represent available network
@ -7933,6 +7979,47 @@ func DefaultTemplate() *Template {
}
}
func (t *Template) Equal(o *Template) bool {
if t == nil || o == nil {
return t == o
}
switch {
case t.SourcePath != o.SourcePath:
return false
case t.DestPath != o.DestPath:
return false
case t.EmbeddedTmpl != o.EmbeddedTmpl:
return false
case t.ChangeMode != o.ChangeMode:
return false
case t.ChangeSignal != o.ChangeSignal:
return false
case !t.ChangeScript.Equal(o.ChangeScript):
return false
case t.Splay != o.Splay:
return false
case t.Perms != o.Perms:
return false
case !pointer.Eq(t.Uid, o.Uid):
return false
case !pointer.Eq(t.Gid, o.Gid):
return false
case t.LeftDelim != o.LeftDelim:
return false
case t.RightDelim != o.RightDelim:
return false
case t.Envvars != o.Envvars:
return false
case t.VaultGrace != o.VaultGrace:
return false
case !t.Wait.Equal(o.Wait):
return false
case t.ErrMissingKey != o.ErrMissingKey:
return false
}
return true
}
func (t *Template) Copy() *Template {
if t == nil {
return nil
@ -8044,18 +8131,33 @@ type ChangeScript struct {
FailOnError bool
}
func (cs *ChangeScript) Equal(o *ChangeScript) bool {
if cs == nil || o == nil {
return cs == o
}
switch {
case cs.Command != o.Command:
return false
case !slices.Equal(cs.Args, o.Args):
return false
case cs.Timeout != o.Timeout:
return false
case cs.FailOnError != o.FailOnError:
return false
}
return true
}
func (cs *ChangeScript) Copy() *ChangeScript {
if cs == nil {
return nil
}
ncs := new(ChangeScript)
*ncs = *cs
// args is a slice!
ncs.Args = slices.Clone(cs.Args)
return ncs
return &ChangeScript{
Command: cs.Command,
Args: slices.Clone(cs.Args),
Timeout: cs.Timeout,
FailOnError: cs.FailOnError,
}
}
// Validate makes sure all the required fields of ChangeScript are present
@ -8099,22 +8201,15 @@ func (wc *WaitConfig) Copy() *WaitConfig {
}
func (wc *WaitConfig) Equal(o *WaitConfig) bool {
if wc.Min == nil && o.Min != nil {
if wc == nil || o == nil {
return wc == o
}
switch {
case !pointer.Eq(wc.Min, o.Min):
return false
case !pointer.Eq(wc.Max, o.Max):
return false
}
if wc.Max == nil && o.Max != nil {
return false
}
if wc.Min != nil && (o.Min == nil || *wc.Min != *o.Min) {
return false
}
if wc.Max != nil && (o.Max == nil || *wc.Max != *o.Max) {
return false
}
return true
}
@ -8781,6 +8876,25 @@ type TaskArtifact struct {
RelativeDest string
}
func (ta *TaskArtifact) Equal(o *TaskArtifact) bool {
if ta == nil || o == nil {
return ta == o
}
switch {
case ta.GetterSource != o.GetterSource:
return false
case !maps.Equal(ta.GetterOptions, o.GetterOptions):
return false
case !maps.Equal(ta.GetterHeaders, o.GetterHeaders):
return false
case ta.GetterMode != o.GetterMode:
return false
case ta.RelativeDest != o.RelativeDest:
return false
}
return true
}
func (ta *TaskArtifact) Copy() *TaskArtifact {
if ta == nil {
return nil
@ -9055,11 +9169,20 @@ type Affinity struct {
// Equal checks if two affinities are equal.
func (a *Affinity) Equal(o *Affinity) bool {
return a == o ||
a.LTarget == o.LTarget &&
a.RTarget == o.RTarget &&
a.Operand == o.Operand &&
a.Weight == o.Weight
if a == nil || o == nil {
return a == o
}
switch {
case a.LTarget != o.LTarget:
return false
case a.RTarget != o.RTarget:
return false
case a.Operand != o.Operand:
return false
case a.Weight != o.Weight:
return false
}
return true
}
func (a *Affinity) Copy() *Affinity {
@ -9143,6 +9266,21 @@ type Spread struct {
str string
}
func (s *Spread) Equal(o *Spread) bool {
if s == nil || o == nil {
return s == o
}
switch {
case s.Attribute != o.Attribute:
return false
case s.Weight != o.Weight:
return false
case !slices.EqualFunc(s.SpreadTarget, o.SpreadTarget, func(a, b *SpreadTarget) bool { return a.Equal(b) }):
return false
}
return true
}
type Affinities []*Affinity
// Equal compares Affinities as a set
@ -9247,6 +9385,19 @@ func (s *SpreadTarget) String() string {
return s.str
}
func (s *SpreadTarget) Equal(o *SpreadTarget) bool {
if s == nil || o == nil {
return s == o
}
switch {
case s.Value != o.Value:
return false
case s.Percent != o.Percent:
return false
}
return true
}
// EphemeralDisk is an ephemeral disk object
type EphemeralDisk struct {
// Sticky indicates whether the allocation is sticky to a node
@ -9267,6 +9418,21 @@ func DefaultEphemeralDisk() *EphemeralDisk {
}
}
func (d *EphemeralDisk) Equal(o *EphemeralDisk) bool {
if d == nil || o == nil {
return d == o
}
switch {
case d.Sticky != o.Sticky:
return false
case d.SizeMB != o.SizeMB:
return false
case d.Migrate != o.Migrate:
return false
}
return true
}
// Validate validates EphemeralDisk
func (d *EphemeralDisk) Validate() error {
if d.SizeMB < 10 {
@ -9327,6 +9493,25 @@ func DefaultVaultBlock() *Vault {
}
}
func (v *Vault) Equal(o *Vault) bool {
if v == nil || o == nil {
return v == o
}
switch {
case !slices.Equal(v.Policies, o.Policies):
return false
case v.Namespace != o.Namespace:
return false
case v.Env != o.Env:
return false
case v.ChangeMode != o.ChangeMode:
return false
case v.ChangeSignal != o.ChangeSignal:
return false
}
return true
}
// Copy returns a copy of this Vault block.
func (v *Vault) Copy() *Vault {
if v == nil {

View File

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -2847,49 +2848,74 @@ func TestTaskWaitConfig_Equals(t *testing.T) {
ci.Parallel(t)
testCases := []struct {
name string
config *WaitConfig
expected *WaitConfig
name string
wc1 *WaitConfig
wc2 *WaitConfig
exp bool
}{
{
name: "all-fields",
config: &WaitConfig{
wc1: &WaitConfig{
Min: pointer.Of(5 * time.Second),
Max: pointer.Of(10 * time.Second),
},
expected: &WaitConfig{
wc2: &WaitConfig{
Min: pointer.Of(5 * time.Second),
Max: pointer.Of(10 * time.Second),
},
exp: true,
},
{
name: "no-fields",
config: &WaitConfig{},
expected: &WaitConfig{},
name: "no-fields",
wc1: &WaitConfig{},
wc2: &WaitConfig{},
exp: true,
},
{
name: "min-only",
config: &WaitConfig{
wc1: &WaitConfig{
Min: pointer.Of(5 * time.Second),
},
expected: &WaitConfig{
wc2: &WaitConfig{
Min: pointer.Of(5 * time.Second),
},
exp: true,
},
{
name: "max-only",
config: &WaitConfig{
wc1: &WaitConfig{
Max: pointer.Of(10 * time.Second),
},
expected: &WaitConfig{
wc2: &WaitConfig{
Max: pointer.Of(10 * time.Second),
},
exp: true,
},
{
name: "min-nil-vs-set",
wc1: &WaitConfig{
Min: pointer.Of(1 * time.Second),
},
wc2: &WaitConfig{
Min: nil,
},
exp: false,
},
{
name: "max-nil-vs-set",
wc1: &WaitConfig{
Max: pointer.Of(1 * time.Second),
},
wc2: &WaitConfig{
Max: nil,
},
exp: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.True(t, tc.config.Equal(tc.expected))
must.Eq(t, tc.exp, tc.wc1.Equal(tc.wc2))
})
}
}
@ -6825,6 +6851,32 @@ func TestNodeResources_Merge(t *testing.T) {
}, res)
}
func TestAllocatedPortMapping_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*AllocatedPortMapping](t, nil, nil)
must.NotEqual[*AllocatedPortMapping](t, nil, new(AllocatedPortMapping))
must.StructEqual(t, &AllocatedPortMapping{
Label: "http",
Value: 80,
To: 9000,
HostIP: "10.0.0.1",
}, []must.Tweak[*AllocatedPortMapping]{{
Field: "Label",
Apply: func(m *AllocatedPortMapping) { m.Label = "https" },
}, {
Field: "Value",
Apply: func(m *AllocatedPortMapping) { m.Value = 443 },
}, {
Field: "To",
Apply: func(m *AllocatedPortMapping) { m.To = 9999 },
}, {
Field: "HostIP",
Apply: func(m *AllocatedPortMapping) { m.HostIP = "10.1.1.1" },
}})
}
func TestAllocatedResources_Canonicalize(t *testing.T) {
ci.Parallel(t)
@ -7120,3 +7172,314 @@ func requireErrors(t *testing.T, err error, expected ...string) {
require.Equal(t, expected, found)
}
func TestEphemeralDisk_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*EphemeralDisk](t, nil, nil)
must.NotEqual[*EphemeralDisk](t, nil, new(EphemeralDisk))
must.StructEqual(t, &EphemeralDisk{
Sticky: true,
SizeMB: 42,
Migrate: true,
}, []must.Tweak[*EphemeralDisk]{{
Field: "Sticky",
Apply: func(e *EphemeralDisk) { e.Sticky = false },
}, {
Field: "SizeMB",
Apply: func(e *EphemeralDisk) { e.SizeMB = 10 },
}, {
Field: "Migrate",
Apply: func(e *EphemeralDisk) { e.Migrate = false },
}})
}
func TestDNSConfig_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*DNSConfig](t, nil, nil)
must.NotEqual[*DNSConfig](t, nil, new(DNSConfig))
must.StructEqual(t, &DNSConfig{
Servers: []string{"8.8.8.8", "8.8.4.4"},
Searches: []string{"org", "local"},
Options: []string{"opt1"},
}, []must.Tweak[*DNSConfig]{{
Field: "Servers",
Apply: func(c *DNSConfig) { c.Servers = []string{"1.1.1.1"} },
}, {
Field: "Searches",
Apply: func(c *DNSConfig) { c.Searches = []string{"localhost"} },
}, {
Field: "Options",
Apply: func(c *DNSConfig) { c.Options = []string{"opt2"} },
}})
}
func TestChangeScript_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*ChangeScript](t, nil, nil)
must.NotEqual[*ChangeScript](t, nil, new(ChangeScript))
must.StructEqual(t, &ChangeScript{
Command: "/bin/sleep",
Args: []string{"infinity"},
Timeout: 1 * time.Second,
FailOnError: true,
}, []must.Tweak[*ChangeScript]{{
Field: "Command",
Apply: func(c *ChangeScript) { c.Command = "/bin/false" },
}, {
Field: "Args",
Apply: func(c *ChangeScript) { c.Args = []string{"1s"} },
}, {
Field: "Timeout",
Apply: func(c *ChangeScript) { c.Timeout = 2 * time.Second },
}, {
Field: "FailOnError",
Apply: func(c *ChangeScript) { c.FailOnError = false },
}})
}
func TestWaitConfig_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*WaitConfig](t, nil, nil)
must.NotEqual[*WaitConfig](t, nil, new(WaitConfig))
must.StructEqual(t, &WaitConfig{
Min: pointer.Of[time.Duration](100),
Max: pointer.Of[time.Duration](200),
}, []must.Tweak[*WaitConfig]{{
Field: "Min",
Apply: func(c *WaitConfig) { c.Min = pointer.Of[time.Duration](111) },
}, {
Field: "Max",
Apply: func(c *WaitConfig) { c.Max = pointer.Of[time.Duration](222) },
}})
}
func TestTaskArtifact_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*TaskArtifact](t, nil, nil)
must.NotEqual[*TaskArtifact](t, nil, new(TaskArtifact))
must.StructEqual(t, &TaskArtifact{
GetterSource: "source",
GetterOptions: map[string]string{"a": "A"},
GetterHeaders: map[string]string{"b": "B"},
GetterMode: "file",
RelativeDest: "./local",
}, []must.Tweak[*TaskArtifact]{{
Field: "GetterSource",
Apply: func(ta *TaskArtifact) { ta.GetterSource = "other" },
}, {
Field: "GetterOptions",
Apply: func(ta *TaskArtifact) { ta.GetterOptions = nil },
}, {
Field: "GetterHeaders",
Apply: func(ta *TaskArtifact) { ta.GetterHeaders = nil },
}, {
Field: "GetterMode",
Apply: func(ta *TaskArtifact) { ta.GetterMode = "directory" },
}, {
Field: "RelativeDest",
Apply: func(ta *TaskArtifact) { ta.RelativeDest = "./alloc" },
}})
}
func TestVault_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*Vault](t, nil, nil)
must.NotEqual[*Vault](t, nil, new(Vault))
must.StructEqual(t, &Vault{
Policies: []string{"one"},
Namespace: "global",
Env: true,
ChangeMode: "signal",
ChangeSignal: "SIGILL",
}, []must.Tweak[*Vault]{{
Field: "Policies",
Apply: func(v *Vault) { v.Policies = []string{"two"} },
}, {
Field: "Namespace",
Apply: func(v *Vault) { v.Namespace = "regional" },
}, {
Field: "Env",
Apply: func(v *Vault) { v.Env = false },
}, {
Field: "ChangeMode",
Apply: func(v *Vault) { v.ChangeMode = "restart" },
}, {
Field: "ChangeSignal",
Apply: func(v *Vault) { v.ChangeSignal = "SIGTERM" },
}})
}
func TestTemplate_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*Template](t, nil, nil)
must.NotEqual[*Template](t, nil, new(Template))
must.StructEqual(t, &Template{
SourcePath: "source",
DestPath: "destination",
EmbeddedTmpl: "tmpl",
ChangeMode: "mode",
ChangeSignal: "signal",
ChangeScript: &ChangeScript{
Command: "/bin/sleep",
Args: []string{"infinity"},
Timeout: 1 * time.Second,
FailOnError: true,
},
Splay: 1,
Perms: "perms",
Uid: pointer.Of(1000),
Gid: pointer.Of(1000),
LeftDelim: "{",
RightDelim: "}",
Envvars: true,
VaultGrace: 1 * time.Second,
Wait: &WaitConfig{
Min: pointer.Of[time.Duration](1),
Max: pointer.Of[time.Duration](2),
},
ErrMissingKey: true,
}, []must.Tweak[*Template]{{
Field: "SourcePath",
Apply: func(t *Template) { t.SourcePath = "source2" },
}, {
Field: "DestPath",
Apply: func(t *Template) { t.DestPath = "destination2" },
}, {
Field: "EmbeddedTmpl",
Apply: func(t *Template) { t.EmbeddedTmpl = "tmpl2" },
}, {
Field: "ChangeMode",
Apply: func(t *Template) { t.ChangeMode = "mode2" },
}, {
Field: "ChangeSignal",
Apply: func(t *Template) { t.ChangeSignal = "signal2" },
}, {
Field: "ChangeScript",
Apply: func(t *Template) {
t.ChangeScript = &ChangeScript{
Command: "/bin/sleep",
Args: []string{"infinity", "plus", "one"},
Timeout: 1 * time.Second,
FailOnError: true,
}
},
}, {
Field: "Splay",
Apply: func(t *Template) { t.Splay = 2 },
}, {
Field: "Perms",
Apply: func(t *Template) { t.Perms = "perms2" },
}, {
Field: "Uid",
Apply: func(t *Template) { t.Uid = pointer.Of(0) },
}, {
Field: "Gid",
Apply: func(t *Template) { t.Gid = pointer.Of(0) },
}, {
Field: "LeftDelim",
Apply: func(t *Template) { t.LeftDelim = "[" },
}, {
Field: "RightDelim",
Apply: func(t *Template) { t.RightDelim = "]" },
}, {
Field: "Envvars",
Apply: func(t *Template) { t.Envvars = false },
}, {
Field: "VaultGrace",
Apply: func(t *Template) { t.VaultGrace = 2 * time.Second },
}, {
Field: "Wait",
Apply: func(t *Template) {
t.Wait = &WaitConfig{
Min: pointer.Of[time.Duration](1),
Max: nil,
}
},
}, {
Field: "ErrMissingKey",
Apply: func(t *Template) { t.ErrMissingKey = false },
}})
}
func TestAffinity_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*Affinity](t, nil, nil)
must.NotEqual[*Affinity](t, nil, new(Affinity))
must.StructEqual(t, &Affinity{
LTarget: "left",
RTarget: "right",
Operand: "op",
Weight: 100,
}, []must.Tweak[*Affinity]{{
Field: "LTarget",
Apply: func(a *Affinity) { a.LTarget = "left2" },
}, {
Field: "RTarget",
Apply: func(a *Affinity) { a.RTarget = "right2" },
}, {
Field: "Operand",
Apply: func(a *Affinity) { a.Operand = "op2" },
}, {
Field: "Weight",
Apply: func(a *Affinity) { a.Weight = 50 },
}})
}
func TestSpreadTarget_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*SpreadTarget](t, nil, nil)
must.NotEqual[*SpreadTarget](t, nil, new(SpreadTarget))
must.StructEqual(t, &SpreadTarget{
Value: "dc1",
Percent: 99,
}, []must.Tweak[*SpreadTarget]{{
Field: "Value",
Apply: func(st *SpreadTarget) { st.Value = "dc2" },
}, {
Field: "Percent",
Apply: func(st *SpreadTarget) { st.Percent = 98 },
}})
}
func TestSpread_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*Spread](t, nil, nil)
must.NotEqual[*Spread](t, nil, new(Spread))
must.StructEqual(t, &Spread{
Attribute: "attr",
Weight: 100,
SpreadTarget: []*SpreadTarget{{
Value: "dc1",
Percent: 99,
}},
}, []must.Tweak[*Spread]{{
Field: "Attribute",
Apply: func(s *Spread) { s.Attribute = "attr2" },
}, {
Field: "Weight",
Apply: func(s *Spread) { s.Weight = 50 },
}, {
Field: "SpreadTarget",
Apply: func(s *Spread) { s.SpreadTarget = nil },
}})
}

View File

@ -4,6 +4,7 @@ import (
"testing"
"github.com/hashicorp/nomad/ci"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
@ -93,3 +94,74 @@ func TestVolumeRequest_Validate(t *testing.T) {
}
}
func TestVolumeRequest_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*VolumeRequest](t, nil, nil)
must.NotEqual[*VolumeRequest](t, nil, new(VolumeRequest))
must.StructEqual(t, &VolumeRequest{
Name: "name",
Type: "type",
Source: "source",
ReadOnly: true,
AccessMode: "access",
AttachmentMode: "attachment",
MountOptions: &CSIMountOptions{
FSType: "fs1",
MountFlags: []string{"flag1"},
},
PerAlloc: true,
}, []must.Tweak[*VolumeRequest]{{
Field: "Name",
Apply: func(vr *VolumeRequest) { vr.Name = "name2" },
}, {
Field: "Type",
Apply: func(vr *VolumeRequest) { vr.Type = "type2" },
}, {
Field: "Source",
Apply: func(vr *VolumeRequest) { vr.Source = "source2" },
}, {
Field: "ReadOnly",
Apply: func(vr *VolumeRequest) { vr.ReadOnly = false },
}, {
Field: "AccessMode",
Apply: func(vr *VolumeRequest) { vr.AccessMode = "access2" },
}, {
Field: "AttachmentMode",
Apply: func(vr *VolumeRequest) { vr.AttachmentMode = "attachment2" },
}, {
Field: "MountOptions",
Apply: func(vr *VolumeRequest) { vr.MountOptions = nil },
}, {
Field: "PerAlloc",
Apply: func(vr *VolumeRequest) { vr.PerAlloc = false },
}})
}
func TestVolumeMount_Equal(t *testing.T) {
ci.Parallel(t)
must.Equal[*VolumeMount](t, nil, nil)
must.NotEqual[*VolumeMount](t, nil, new(VolumeMount))
must.StructEqual(t, &VolumeMount{
Volume: "volume",
Destination: "destination",
ReadOnly: true,
PropagationMode: "mode",
}, []must.Tweak[*VolumeMount]{{
Field: "Volume",
Apply: func(vm *VolumeMount) { vm.Volume = "vol2" },
}, {
Field: "Destination",
Apply: func(vm *VolumeMount) { vm.Destination = "dest2" },
}, {
Field: "ReadOnly",
Apply: func(vm *VolumeMount) { vm.ReadOnly = false },
}, {
Field: "PropogationMode",
Apply: func(vm *VolumeMount) { vm.PropagationMode = "mode2" },
}})
}

View File

@ -102,6 +102,31 @@ type VolumeRequest struct {
PerAlloc bool
}
func (v *VolumeRequest) Equal(o *VolumeRequest) bool {
if v == nil || o == nil {
return v == o
}
switch {
case v.Name != o.Name:
return false
case v.Type != o.Type:
return false
case v.Source != o.Source:
return false
case v.ReadOnly != o.ReadOnly:
return false
case v.AccessMode != o.AccessMode:
return false
case v.AttachmentMode != o.AttachmentMode:
return false
case !v.MountOptions.Equal(o.MountOptions):
return false
case v.PerAlloc != o.PerAlloc:
return false
}
return true
}
func (v *VolumeRequest) Validate(jobType string, taskGroupCount, canaries int) error {
if !(v.Type == VolumeTypeHost ||
v.Type == VolumeTypeCSI) {
@ -216,6 +241,23 @@ type VolumeMount struct {
PropagationMode string
}
func (v *VolumeMount) Equal(o *VolumeMount) bool {
if v == nil || o == nil {
return v == o
}
switch {
case v.Volume != o.Volume:
return false
case v.Destination != o.Destination:
return false
case v.ReadOnly != o.ReadOnly:
return false
case v.PropagationMode != o.PropagationMode:
return false
}
return true
}
func (v *VolumeMount) Copy() *VolumeMount {
if v == nil {
return nil

View File

@ -4,11 +4,13 @@ import (
"encoding/binary"
"fmt"
"math/rand"
"reflect"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)
// allocTuple is a tuple of the allocation name and potential alloc ID
@ -169,121 +171,158 @@ func shuffleNodes(plan *structs.Plan, index uint64, nodes []*structs.Node) {
}
}
// tasksUpdated does a diff between task groups to see if the
// tasks, their drivers, environment variables or config have updated. The
// inputs are the task group name to diff and two jobs to diff.
// taskUpdated and functions called within assume that the given
// taskGroup has already been checked to not be nil
func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool {
// comparison records the _first_ detected difference between two groups during
// a comparison in tasksUpdated
//
// This is useful to provide context when debugging the result of tasksUpdated.
type comparison struct {
modified bool
label string
before any
after any
}
func difference(label string, before, after any) comparison {
// push string formatting into String(), so that we never call it in the
// hot path unless someone adds a log line to debug with this result
return comparison{
modified: true,
label: label,
before: before,
after: after,
}
}
func (c comparison) String() string {
return fmt.Sprintf("%s changed; before: %#v, after: %#v", c.label, c.before, c.after)
}
// same indicates no destructive difference between two task groups
var same = comparison{modified: false}
// tasksUpdated creates a comparison between task groups to see if the tasks, their
// drivers, environment variables or config have been modified.
func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) comparison {
a := jobA.LookupTaskGroup(taskGroup)
b := jobB.LookupTaskGroup(taskGroup)
// If the number of tasks do not match, clearly there is an update
if len(a.Tasks) != len(b.Tasks) {
return true
if lenA, lenB := len(a.Tasks), len(b.Tasks); lenA != lenB {
return difference("number of tasks", lenA, lenB)
}
// Check ephemeral disk
if !reflect.DeepEqual(a.EphemeralDisk, b.EphemeralDisk) {
return true
if !a.EphemeralDisk.Equal(b.EphemeralDisk) {
return difference("ephemeral disk", a.EphemeralDisk, b.EphemeralDisk)
}
// Check that the network resources haven't changed
if networkUpdated(a.Networks, b.Networks) {
return true
if c := networkUpdated(a.Networks, b.Networks); c.modified {
return c
}
// Check Affinities
if affinitiesUpdated(jobA, jobB, taskGroup) {
return true
if c := affinitiesUpdated(jobA, jobB, taskGroup); c.modified {
return c
}
// Check Spreads
if spreadsUpdated(jobA, jobB, taskGroup) {
return true
if c := spreadsUpdated(jobA, jobB, taskGroup); c.modified {
return c
}
// Check consul namespace updated
if consulNamespaceUpdated(a, b) {
return true
if c := consulNamespaceUpdated(a, b); c.modified {
return c
}
// Check connect service(s) updated
if connectServiceUpdated(a.Services, b.Services) {
return true
if c := connectServiceUpdated(a.Services, b.Services); c.modified {
return c
}
// Check if volumes are updated (no task driver can support
// altering mounts in-place)
if !reflect.DeepEqual(a.Volumes, b.Volumes) {
return true
if !maps.EqualFunc(a.Volumes, b.Volumes, func(a, b *structs.VolumeRequest) bool { return a.Equal(b) }) {
return difference("volume request", a.Volumes, b.Volumes)
}
// Check each task
for _, at := range a.Tasks {
bt := b.LookupTask(at.Name)
if bt == nil {
return true
return difference("task deleted", at.Name, "(nil)")
}
if at.Driver != bt.Driver {
return true
return difference("task driver", at.Driver, bt.Driver)
}
if at.User != bt.User {
return true
return difference("task user", at.User, bt.User)
}
if !reflect.DeepEqual(at.Config, bt.Config) {
return true
if !helper.OpaqueMapsEqual(at.Config, bt.Config) {
return difference("task config", at.Config, bt.Config)
}
if !reflect.DeepEqual(at.Env, bt.Env) {
return true
if !maps.Equal(at.Env, bt.Env) {
return difference("task env", at.Env, bt.Env)
}
if !reflect.DeepEqual(at.Artifacts, bt.Artifacts) {
return true
if !slices.EqualFunc(at.Artifacts, bt.Artifacts, func(a, b *structs.TaskArtifact) bool { return a.Equal(b) }) {
return difference("task artifacts", at.Artifacts, bt.Artifacts)
}
if !reflect.DeepEqual(at.Vault, bt.Vault) {
return true
if !at.Vault.Equal(bt.Vault) {
return difference("task vault", at.Vault, bt.Vault)
}
if !reflect.DeepEqual(at.Templates, bt.Templates) {
return true
if !slices.EqualFunc(at.Templates, bt.Templates, func(a, b *structs.Template) bool { return a.Equal(b) }) {
return difference("task templates", at.Templates, bt.Templates)
}
if !reflect.DeepEqual(at.CSIPluginConfig, bt.CSIPluginConfig) {
return true
if !at.CSIPluginConfig.Equal(bt.CSIPluginConfig) {
return difference("task csi config", at.CSIPluginConfig, bt.CSIPluginConfig)
}
if !reflect.DeepEqual(at.VolumeMounts, bt.VolumeMounts) {
return true
if !slices.EqualFunc(at.VolumeMounts, bt.VolumeMounts, func(a, b *structs.VolumeMount) bool { return a.Equal(b) }) {
return difference("task volume mount", at.VolumeMounts, bt.VolumeMounts)
}
// Check the metadata
if !reflect.DeepEqual(
jobA.CombinedTaskMeta(taskGroup, at.Name),
jobB.CombinedTaskMeta(taskGroup, bt.Name)) {
return true
metaA := jobA.CombinedTaskMeta(taskGroup, at.Name)
metaB := jobB.CombinedTaskMeta(taskGroup, bt.Name)
if !maps.Equal(metaA, metaB) {
return difference("task meta", metaA, metaB)
}
// Inspect the network to see if the dynamic ports are different
if networkUpdated(at.Resources.Networks, bt.Resources.Networks) {
return true
if c := networkUpdated(at.Resources.Networks, bt.Resources.Networks); c.modified {
return c
}
// Inspect the non-network resources
if ar, br := at.Resources, bt.Resources; ar.CPU != br.CPU {
return true
} else if ar.Cores != br.Cores {
return true
} else if ar.MemoryMB != br.MemoryMB {
return true
} else if ar.MemoryMaxMB != br.MemoryMaxMB {
return true
} else if !ar.Devices.Equal(&br.Devices) {
return true
if c := nonNetworkResourcesUpdated(at.Resources, bt.Resources); c.modified {
return c
}
// Inspect Identity being exposed
if !at.Identity.Equal(bt.Identity) {
return true
return difference("task identity", at.Identity, bt.Identity)
}
}
return false
// none of the fields that trigger a destructive update were modified,
// indicating this group can be updated in-place or ignored
return same
}
func nonNetworkResourcesUpdated(a, b *structs.Resources) comparison {
// Inspect the non-network resources
switch {
case a.CPU != b.CPU:
return difference("task cpu", a.CPU, b.CPU)
case a.Cores != b.Cores:
return difference("task cores", a.Cores, b.Cores)
case a.MemoryMB != b.MemoryMB:
return difference("task memory", a.MemoryMB, b.MemoryMB)
case a.MemoryMaxMB != b.MemoryMaxMB:
return difference("task memory max", a.MemoryMaxMB, b.MemoryMaxMB)
case !a.Devices.Equal(&b.Devices):
return difference("task devices", a.Devices, b.Devices)
}
return same
}
// consulNamespaceUpdated returns true if the Consul namespace in the task group
@ -293,9 +332,12 @@ func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool {
// because Namespaces directly impact networking validity among Consul intentions.
// Forcing the task through a reschedule is a sure way of breaking no-longer valid
// network connections.
func consulNamespaceUpdated(tgA, tgB *structs.TaskGroup) bool {
func consulNamespaceUpdated(tgA, tgB *structs.TaskGroup) comparison {
// job.ConsulNamespace is pushed down to the TGs, just check those
return tgA.Consul.GetNamespace() != tgB.Consul.GetNamespace()
if a, b := tgA.Consul.GetNamespace(), tgB.Consul.GetNamespace(); a != b {
return difference("consul namespace", a, b)
}
return same
}
// connectServiceUpdated returns true if any services with a connect block have
@ -303,25 +345,25 @@ func consulNamespaceUpdated(tgA, tgB *structs.TaskGroup) bool {
//
// Ordinary services can be updated in-place by updating the service definition
// in Consul. Connect service changes mostly require destroying the task.
func connectServiceUpdated(servicesA, servicesB []*structs.Service) bool {
func connectServiceUpdated(servicesA, servicesB []*structs.Service) comparison {
for _, serviceA := range servicesA {
if serviceA.Connect != nil {
for _, serviceB := range servicesB {
if serviceA.Name == serviceB.Name {
if connectUpdated(serviceA.Connect, serviceB.Connect) {
return true
if c := connectUpdated(serviceA.Connect, serviceB.Connect); c.modified {
return c
}
// Part of the Connect plumbing is derived from port label,
// if that changes we need to destroy the task.
if serviceA.PortLabel != serviceB.PortLabel {
return true
return difference("connect service port label", serviceA.PortLabel, serviceB.PortLabel)
}
break
}
}
}
}
return false
return same
}
// connectUpdated returns true if the connect block has been updated in a manner
@ -329,77 +371,93 @@ func connectServiceUpdated(servicesA, servicesB []*structs.Service) bool {
//
// Fields that can be updated through consul-sync do not need a destructive
// update.
func connectUpdated(connectA, connectB *structs.ConsulConnect) bool {
if connectA == nil || connectB == nil {
return connectA != connectB
func connectUpdated(connectA, connectB *structs.ConsulConnect) comparison {
if connectA == nil && connectB == nil {
return same
}
if connectA == nil && connectB != nil {
return difference("connect added", connectA, connectB)
}
if connectA != nil && connectB == nil {
return difference("connect removed", connectA, connectB)
}
if connectA.Native != connectB.Native {
return true
return difference("connect native", connectA.Native, connectB.Native)
}
if !connectA.Gateway.Equal(connectB.Gateway) {
return true
return difference("connect gateway", connectA.Gateway, connectB.Gateway)
}
if !connectA.SidecarTask.Equal(connectB.SidecarTask) {
return true
return difference("connect sidecar task", connectA.SidecarTask, connectB.SidecarTask)
}
// not everything in sidecar_service needs task destruction
if connectSidecarServiceUpdated(connectA.SidecarService, connectB.SidecarService) {
return true
if c := connectSidecarServiceUpdated(connectA.SidecarService, connectB.SidecarService); c.modified {
return c
}
return false
return same
}
func connectSidecarServiceUpdated(ssA, ssB *structs.ConsulSidecarService) bool {
if ssA == nil || ssB == nil {
return ssA != ssB
func connectSidecarServiceUpdated(ssA, ssB *structs.ConsulSidecarService) comparison {
if ssA == nil && ssB == nil {
return same
}
if ssA == nil && ssB != nil {
return difference("connect service add", ssA, ssB)
}
if ssA != nil && ssB == nil {
return difference("connect service delete", ssA, ssB)
}
if ssA.Port != ssB.Port {
return true
return difference("connect port", ssA.Port, ssB.Port)
}
// sidecar_service.tags handled in-place (registration)
// sidecar_service.tags (handled in-place via registration)
// sidecar_service.proxy handled in-place (registration + xDS)
// sidecar_service.proxy (handled in-place via registration + xDS)
return false
return same
}
func networkUpdated(netA, netB []*structs.NetworkResource) bool {
if len(netA) != len(netB) {
return true
func networkUpdated(netA, netB []*structs.NetworkResource) comparison {
if lenNetA, lenNetB := len(netA), len(netB); lenNetA != lenNetB {
return difference("network lengths", lenNetA, lenNetB)
}
for idx := range netA {
an := netA[idx]
bn := netB[idx]
if an.Mode != bn.Mode {
return true
return difference("network mode", an.Mode, bn.Mode)
}
if an.MBits != bn.MBits {
return true
return difference("network mbits", an.MBits, bn.MBits)
}
if an.Hostname != bn.Hostname {
return true
return difference("network hostname", an.Hostname, bn.Hostname)
}
if !reflect.DeepEqual(an.DNS, bn.DNS) {
return true
if !an.DNS.Equal(bn.DNS) {
return difference("network dns", an.DNS, bn.DNS)
}
aPorts, bPorts := networkPortMap(an), networkPortMap(bn)
if !reflect.DeepEqual(aPorts, bPorts) {
return true
if !aPorts.Equal(bPorts) {
return difference("network port map", aPorts, bPorts)
}
}
return false
return same
}
// networkPortMap takes a network resource and returns a AllocatedPorts.
@ -426,59 +484,65 @@ func networkPortMap(n *structs.NetworkResource) structs.AllocatedPorts {
return m
}
func affinitiesUpdated(jobA, jobB *structs.Job, taskGroup string) bool {
var aAffinities []*structs.Affinity
var bAffinities []*structs.Affinity
func affinitiesUpdated(jobA, jobB *structs.Job, taskGroup string) comparison {
var affinitiesA structs.Affinities
var affinitiesB structs.Affinities
// accumulate job affinities
affinitiesA = append(affinitiesA, jobA.Affinities...)
affinitiesB = append(affinitiesB, jobB.Affinities...)
tgA := jobA.LookupTaskGroup(taskGroup)
tgB := jobB.LookupTaskGroup(taskGroup)
// Append jobA job and task group level affinities
aAffinities = append(aAffinities, jobA.Affinities...)
aAffinities = append(aAffinities, tgA.Affinities...)
// append group level affinities
// Append jobB job and task group level affinities
bAffinities = append(bAffinities, jobB.Affinities...)
bAffinities = append(bAffinities, tgB.Affinities...)
affinitiesA = append(affinitiesA, tgA.Affinities...)
affinitiesB = append(affinitiesB, tgB.Affinities...)
// append task level affinities for A
// append task affinities
for _, task := range tgA.Tasks {
aAffinities = append(aAffinities, task.Affinities...)
affinitiesA = append(affinitiesA, task.Affinities...)
}
// append task level affinities for B
for _, task := range tgB.Tasks {
bAffinities = append(bAffinities, task.Affinities...)
affinitiesB = append(affinitiesB, task.Affinities...)
}
// Check for equality
if len(aAffinities) != len(bAffinities) {
return true
// finally check if all the affinities from both jobs match
if !affinitiesA.Equal(&affinitiesB) {
return difference("affinities", affinitiesA, affinitiesB)
}
return !reflect.DeepEqual(aAffinities, bAffinities)
return same
}
func spreadsUpdated(jobA, jobB *structs.Job, taskGroup string) bool {
var aSpreads []*structs.Spread
var bSpreads []*structs.Spread
func spreadsUpdated(jobA, jobB *structs.Job, taskGroup string) comparison {
var spreadsA []*structs.Spread
var spreadsB []*structs.Spread
// accumulate job spreads
spreadsA = append(spreadsA, jobA.Spreads...)
spreadsB = append(spreadsB, jobB.Spreads...)
tgA := jobA.LookupTaskGroup(taskGroup)
tgB := jobB.LookupTaskGroup(taskGroup)
// append jobA and task group level spreads
aSpreads = append(aSpreads, jobA.Spreads...)
aSpreads = append(aSpreads, tgA.Spreads...)
// append group spreads
spreadsA = append(spreadsA, tgA.Spreads...)
spreadsB = append(spreadsB, tgB.Spreads...)
// append jobB and task group level spreads
bSpreads = append(bSpreads, jobB.Spreads...)
bSpreads = append(bSpreads, tgB.Spreads...)
// Check for equality
if len(aSpreads) != len(bSpreads) {
return true
if !slices.EqualFunc(spreadsA, spreadsB, func(a, b *structs.Spread) bool {
return a.Equal(b)
}) {
return difference("spreads", spreadsA, spreadsB)
}
return !reflect.DeepEqual(aSpreads, bSpreads)
return same
}
// setStatus is used to update the status of the evaluation
@ -530,7 +594,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
// Check if the task drivers or config has changed, requires
// a rolling upgrade since that cannot be done in-place.
existing := update.Alloc.Job
if tasksUpdated(job, existing, update.TaskGroup.Name) {
if c := tasksUpdated(job, existing, update.TaskGroup.Name); c.modified {
continue
}
@ -774,7 +838,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
// Check if the task drivers or config has changed, requires
// a destructive upgrade since that cannot be done in-place.
if tasksUpdated(newJob, existing.Job, newTG.Name) {
if c := tasksUpdated(newJob, existing.Job, newTG.Name); c.modified {
return false, true, nil
}

View File

@ -6,24 +6,22 @@ import (
"time"
"github.com/hashicorp/nomad/ci"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
func BenchmarkTasksUpdated(b *testing.B) {
jobA := mock.BigBenchmarkJob()
jobB := jobA.Copy()
group := jobA.TaskGroups[0].Name
for n := 0; n < b.N; n++ {
if tasksUpdated(jobA, jobB, group) {
b.Fatal("tasks should be the same")
if c := tasksUpdated(jobA, jobB, jobA.TaskGroups[0].Name); c.modified {
b.Errorf("tasks should be the same")
}
}
}
@ -200,8 +198,7 @@ func TestTaskUpdatedAffinity(t *testing.T) {
j1 := mock.Job()
j2 := mock.Job()
name := j1.TaskGroups[0].Name
require.False(t, tasksUpdated(j1, j2, name))
must.False(t, tasksUpdated(j1, j2, name).modified)
// TaskGroup Affinity
j2.TaskGroups[0].Affinities = []*structs.Affinity{
@ -212,7 +209,7 @@ func TestTaskUpdatedAffinity(t *testing.T) {
Weight: 100,
},
}
require.True(t, tasksUpdated(j1, j2, name))
must.True(t, tasksUpdated(j1, j2, name).modified)
// TaskGroup Task Affinity
j3 := mock.Job()
@ -224,8 +221,7 @@ func TestTaskUpdatedAffinity(t *testing.T) {
Weight: 100,
},
}
require.True(t, tasksUpdated(j1, j3, name))
must.True(t, tasksUpdated(j1, j3, name).modified)
j4 := mock.Job()
j4.TaskGroups[0].Tasks[0].Affinities = []*structs.Affinity{
@ -236,8 +232,7 @@ func TestTaskUpdatedAffinity(t *testing.T) {
Weight: 100,
},
}
require.True(t, tasksUpdated(j1, j4, name))
must.True(t, tasksUpdated(j1, j4, name).modified)
// check different level of same affinity
j5 := mock.Job()
@ -260,8 +255,7 @@ func TestTaskUpdatedAffinity(t *testing.T) {
Weight: 100,
},
}
require.False(t, tasksUpdated(j5, j6, name))
must.False(t, tasksUpdated(j5, j6, name).modified)
}
func TestTaskUpdatedSpread(t *testing.T) {
@ -271,7 +265,7 @@ func TestTaskUpdatedSpread(t *testing.T) {
j2 := mock.Job()
name := j1.TaskGroups[0].Name
require.False(t, tasksUpdated(j1, j2, name))
must.False(t, tasksUpdated(j1, j2, name).modified)
// TaskGroup Spread
j2.TaskGroups[0].Spreads = []*structs.Spread{
@ -290,7 +284,7 @@ func TestTaskUpdatedSpread(t *testing.T) {
},
},
}
require.True(t, tasksUpdated(j1, j2, name))
must.True(t, tasksUpdated(j1, j2, name).modified)
// check different level of same constraint
j5 := mock.Job()
@ -329,7 +323,7 @@ func TestTaskUpdatedSpread(t *testing.T) {
},
}
require.False(t, tasksUpdated(j5, j6, name))
must.False(t, tasksUpdated(j5, j6, name).modified)
}
func TestTasksUpdated(t *testing.T) {
@ -338,23 +332,23 @@ func TestTasksUpdated(t *testing.T) {
j1 := mock.Job()
j2 := mock.Job()
name := j1.TaskGroups[0].Name
require.False(t, tasksUpdated(j1, j2, name))
must.False(t, tasksUpdated(j1, j2, name).modified)
j2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.True(t, tasksUpdated(j1, j2, name))
must.True(t, tasksUpdated(j1, j2, name).modified)
j3 := mock.Job()
j3.TaskGroups[0].Tasks[0].Name = "foo"
require.True(t, tasksUpdated(j1, j3, name))
must.True(t, tasksUpdated(j1, j3, name).modified)
j4 := mock.Job()
j4.TaskGroups[0].Tasks[0].Driver = "foo"
require.True(t, tasksUpdated(j1, j4, name))
must.True(t, tasksUpdated(j1, j4, name).modified)
j5 := mock.Job()
j5.TaskGroups[0].Tasks = append(j5.TaskGroups[0].Tasks,
j5.TaskGroups[0].Tasks[0])
require.True(t, tasksUpdated(j1, j5, name))
must.True(t, tasksUpdated(j1, j5, name).modified)
j6 := mock.Job()
j6.TaskGroups[0].Networks[0].DynamicPorts = []structs.Port{
@ -362,15 +356,15 @@ func TestTasksUpdated(t *testing.T) {
{Label: "https", Value: 0},
{Label: "admin", Value: 0},
}
require.True(t, tasksUpdated(j1, j6, name))
must.True(t, tasksUpdated(j1, j6, name).modified)
j7 := mock.Job()
j7.TaskGroups[0].Tasks[0].Env["NEW_ENV"] = "NEW_VALUE"
require.True(t, tasksUpdated(j1, j7, name))
must.True(t, tasksUpdated(j1, j7, name).modified)
j8 := mock.Job()
j8.TaskGroups[0].Tasks[0].User = "foo"
require.True(t, tasksUpdated(j1, j8, name))
must.True(t, tasksUpdated(j1, j8, name).modified)
j9 := mock.Job()
j9.TaskGroups[0].Tasks[0].Artifacts = []*structs.TaskArtifact{
@ -378,15 +372,15 @@ func TestTasksUpdated(t *testing.T) {
GetterSource: "http://foo.com/bar",
},
}
require.True(t, tasksUpdated(j1, j9, name))
must.True(t, tasksUpdated(j1, j9, name).modified)
j10 := mock.Job()
j10.TaskGroups[0].Tasks[0].Meta["baz"] = "boom"
require.True(t, tasksUpdated(j1, j10, name))
must.True(t, tasksUpdated(j1, j10, name).modified)
j11 := mock.Job()
j11.TaskGroups[0].Tasks[0].Resources.CPU = 1337
require.True(t, tasksUpdated(j1, j11, name))
must.True(t, tasksUpdated(j1, j11, name).modified)
j11d1 := mock.Job()
j11d1.TaskGroups[0].Tasks[0].Resources.Devices = structs.ResourceDevices{
@ -402,38 +396,38 @@ func TestTasksUpdated(t *testing.T) {
Count: 2,
},
}
require.True(t, tasksUpdated(j11d1, j11d2, name))
must.True(t, tasksUpdated(j11d1, j11d2, name).modified)
j13 := mock.Job()
j13.TaskGroups[0].Networks[0].DynamicPorts[0].Label = "foobar"
require.True(t, tasksUpdated(j1, j13, name))
must.True(t, tasksUpdated(j1, j13, name).modified)
j14 := mock.Job()
j14.TaskGroups[0].Networks[0].ReservedPorts = []structs.Port{{Label: "foo", Value: 1312}}
require.True(t, tasksUpdated(j1, j14, name))
must.True(t, tasksUpdated(j1, j14, name).modified)
j15 := mock.Job()
j15.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{"foo"}}
require.True(t, tasksUpdated(j1, j15, name))
must.True(t, tasksUpdated(j1, j15, name).modified)
j16 := mock.Job()
j16.TaskGroups[0].EphemeralDisk.Sticky = true
require.True(t, tasksUpdated(j1, j16, name))
must.True(t, tasksUpdated(j1, j16, name).modified)
// Change group meta
j17 := mock.Job()
j17.TaskGroups[0].Meta["j17_test"] = "roll_baby_roll"
require.True(t, tasksUpdated(j1, j17, name))
must.True(t, tasksUpdated(j1, j17, name).modified)
// Change job meta
j18 := mock.Job()
j18.Meta["j18_test"] = "roll_baby_roll"
require.True(t, tasksUpdated(j1, j18, name))
must.True(t, tasksUpdated(j1, j18, name).modified)
// Change network mode
j19 := mock.Job()
j19.TaskGroups[0].Networks[0].Mode = "bridge"
require.True(t, tasksUpdated(j1, j19, name))
must.True(t, tasksUpdated(j1, j19, name).modified)
// Change cores resource
j20 := mock.Job()
@ -442,7 +436,7 @@ func TestTasksUpdated(t *testing.T) {
j21 := mock.Job()
j21.TaskGroups[0].Tasks[0].Resources.CPU = 0
j21.TaskGroups[0].Tasks[0].Resources.Cores = 4
require.True(t, tasksUpdated(j20, j21, name))
must.True(t, tasksUpdated(j20, j21, name).modified)
// Compare identical Template wait configs
j22 := mock.Job()
@ -463,10 +457,10 @@ func TestTasksUpdated(t *testing.T) {
},
},
}
require.False(t, tasksUpdated(j22, j23, name))
must.False(t, tasksUpdated(j22, j23, name).modified)
// Compare changed Template wait configs
j23.TaskGroups[0].Tasks[0].Templates[0].Wait.Max = pointer.Of(10 * time.Second)
require.True(t, tasksUpdated(j22, j23, name))
must.True(t, tasksUpdated(j22, j23, name).modified)
// Add a volume
j24 := mock.Job()
@ -477,12 +471,12 @@ func TestTasksUpdated(t *testing.T) {
Type: "csi",
Source: "test-volume[0]",
}}
require.True(t, tasksUpdated(j24, j25, name))
must.True(t, tasksUpdated(j24, j25, name).modified)
// Alter a volume
j26 := j25.Copy()
j26.TaskGroups[0].Volumes["myvolume"].ReadOnly = true
require.True(t, tasksUpdated(j25, j26, name))
must.True(t, tasksUpdated(j25, j26, name).modified)
// Alter a CSI plugin
j27 := mock.Job()
@ -492,7 +486,7 @@ func TestTasksUpdated(t *testing.T) {
}
j28 := j27.Copy()
j28.TaskGroups[0].Tasks[0].CSIPluginConfig.Type = "monolith"
require.True(t, tasksUpdated(j27, j28, name))
must.True(t, tasksUpdated(j27, j28, name).modified)
// Compare identical Template ErrMissingKey
j29 := mock.Job()
@ -507,11 +501,11 @@ func TestTasksUpdated(t *testing.T) {
ErrMissingKey: false,
},
}
require.False(t, tasksUpdated(j29, j30, name))
must.False(t, tasksUpdated(j29, j30, name).modified)
// Compare changed Template ErrMissingKey
j30.TaskGroups[0].Tasks[0].Templates[0].ErrMissingKey = true
require.True(t, tasksUpdated(j29, j30, name))
must.True(t, tasksUpdated(j29, j30, name).modified)
}
func TestTasksUpdated_connectServiceUpdated(t *testing.T) {
@ -541,8 +535,8 @@ func TestTasksUpdated_connectServiceUpdated(t *testing.T) {
}, {
Name: "service2",
}}
updated := connectServiceUpdated(servicesA, servicesB)
require.False(t, updated)
updated := connectServiceUpdated(servicesA, servicesB).modified
must.False(t, updated)
})
t.Run("service connect tags updated", func(t *testing.T) {
@ -557,8 +551,8 @@ func TestTasksUpdated_connectServiceUpdated(t *testing.T) {
},
},
}}
updated := connectServiceUpdated(servicesA, servicesB)
require.False(t, updated)
updated := connectServiceUpdated(servicesA, servicesB).modified
must.False(t, updated)
})
t.Run("service connect port updated", func(t *testing.T) {
@ -574,8 +568,8 @@ func TestTasksUpdated_connectServiceUpdated(t *testing.T) {
},
},
}}
updated := connectServiceUpdated(servicesA, servicesB)
require.True(t, updated)
updated := connectServiceUpdated(servicesA, servicesB).modified
must.True(t, updated)
})
t.Run("service port label updated", func(t *testing.T) {
@ -590,13 +584,14 @@ func TestTasksUpdated_connectServiceUpdated(t *testing.T) {
},
},
}}
updated := connectServiceUpdated(servicesA, servicesB)
require.True(t, updated)
updated := connectServiceUpdated(servicesA, servicesB).modified
must.True(t, updated)
})
}
func TestNetworkUpdated(t *testing.T) {
ci.Parallel(t)
cases := []struct {
name string
a []*structs.NetworkResource
@ -653,11 +648,9 @@ func TestNetworkUpdated(t *testing.T) {
},
}
for i := range cases {
c := cases[i]
t.Run(c.name, func(tc *testing.T) {
tc.Parallel()
require.Equal(tc, c.updated, networkUpdated(c.a, c.b), "unexpected network updated result")
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
must.Eq(t, tc.updated, networkUpdated(tc.a, tc.b).modified)
})
}
}
@ -1017,17 +1010,17 @@ func TestUtil_connectUpdated(t *testing.T) {
ci.Parallel(t)
t.Run("both nil", func(t *testing.T) {
require.False(t, connectUpdated(nil, nil))
must.False(t, connectUpdated(nil, nil).modified)
})
t.Run("one nil", func(t *testing.T) {
require.True(t, connectUpdated(nil, new(structs.ConsulConnect)))
must.True(t, connectUpdated(nil, new(structs.ConsulConnect)).modified)
})
t.Run("native differ", func(t *testing.T) {
a := &structs.ConsulConnect{Native: true}
b := &structs.ConsulConnect{Native: false}
require.True(t, connectUpdated(a, b))
must.True(t, connectUpdated(a, b).modified)
})
t.Run("gateway differ", func(t *testing.T) {
@ -1037,7 +1030,7 @@ func TestUtil_connectUpdated(t *testing.T) {
b := &structs.ConsulConnect{Gateway: &structs.ConsulGateway{
Terminating: new(structs.ConsulTerminatingConfigEntry),
}}
require.True(t, connectUpdated(a, b))
must.True(t, connectUpdated(a, b).modified)
})
t.Run("sidecar task differ", func(t *testing.T) {
@ -1047,7 +1040,7 @@ func TestUtil_connectUpdated(t *testing.T) {
b := &structs.ConsulConnect{SidecarTask: &structs.SidecarTask{
Driver: "docker",
}}
require.True(t, connectUpdated(a, b))
must.True(t, connectUpdated(a, b).modified)
})
t.Run("sidecar service differ", func(t *testing.T) {
@ -1057,13 +1050,13 @@ func TestUtil_connectUpdated(t *testing.T) {
b := &structs.ConsulConnect{SidecarService: &structs.ConsulSidecarService{
Port: "2222",
}}
require.True(t, connectUpdated(a, b))
must.True(t, connectUpdated(a, b).modified)
})
t.Run("same", func(t *testing.T) {
a := new(structs.ConsulConnect)
b := new(structs.ConsulConnect)
require.False(t, connectUpdated(a, b))
must.False(t, connectUpdated(a, b).modified)
})
}
@ -1071,23 +1064,23 @@ func TestUtil_connectSidecarServiceUpdated(t *testing.T) {
ci.Parallel(t)
t.Run("both nil", func(t *testing.T) {
require.False(t, connectSidecarServiceUpdated(nil, nil))
require.False(t, connectSidecarServiceUpdated(nil, nil).modified)
})
t.Run("one nil", func(t *testing.T) {
require.True(t, connectSidecarServiceUpdated(nil, new(structs.ConsulSidecarService)))
require.True(t, connectSidecarServiceUpdated(nil, new(structs.ConsulSidecarService)).modified)
})
t.Run("ports differ", func(t *testing.T) {
a := &structs.ConsulSidecarService{Port: "1111"}
b := &structs.ConsulSidecarService{Port: "2222"}
require.True(t, connectSidecarServiceUpdated(a, b))
require.True(t, connectSidecarServiceUpdated(a, b).modified)
})
t.Run("same", func(t *testing.T) {
a := &structs.ConsulSidecarService{Port: "1111"}
b := &structs.ConsulSidecarService{Port: "1111"}
require.False(t, connectSidecarServiceUpdated(a, b))
require.False(t, connectSidecarServiceUpdated(a, b).modified)
})
}
@ -1100,12 +1093,12 @@ func TestTasksUpdated_Identity(t *testing.T) {
j2 := j1.Copy()
must.False(t, tasksUpdated(j1, j2, name))
must.False(t, tasksUpdated(j1, j2, name).modified)
// Set identity on j1 and assert update
j1.TaskGroups[0].Tasks[0].Identity = &structs.WorkloadIdentity{}
must.True(t, tasksUpdated(j1, j2, name))
must.True(t, tasksUpdated(j1, j2, name).modified)
}
func TestTaskGroupConstraints(t *testing.T) {