diff --git a/.changelog/16421.txt b/.changelog/16421.txt new file mode 100644 index 000000000..0e8fe91e8 --- /dev/null +++ b/.changelog/16421.txt @@ -0,0 +1,3 @@ +```release-note:improvement +scheduler: remove most uses of reflection for task comparisons +``` diff --git a/contributing/checklist-jobspec.md b/contributing/checklist-jobspec.md index bfe9b5d15..8f3faa986 100644 --- a/contributing/checklist-jobspec.md +++ b/contributing/checklist-jobspec.md @@ -8,8 +8,11 @@ * New fields should be added to existing Canonicalize, Copy methods * Test the structs/fields via methods mentioned above * [ ] Add structs/fields to `nomad/structs` package - * `structs/` structs usually have Copy, Equals, and Validate methods - * Validation happens in this package and _must_ be implemented + * `structs/` structs usually have Copy, Equal, and Validate methods + * `Validate` methods in this package _must_ be implemented + * `Equal` methods are used when comparing one job to another (e.g. did this thing get updated?) + * `Copy` methods ensure modifications do not modify the copy of a job in the state store + * Use `slices.CloneFunc` and `maps.CloneFunc` to ensure creation of deep copies * Note that analogous struct field names should match with `api/` package * Test the structs/fields via methods mentioned above * Implement and test other logical methods diff --git a/helper/opaque.go b/helper/opaque.go new file mode 100644 index 000000000..0dbe172f0 --- /dev/null +++ b/helper/opaque.go @@ -0,0 +1,48 @@ +package helper + +import ( + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "golang.org/x/exp/maps" +) + +var ( + cmpOptIgnoreUnexported = ignoreUnexportedAlways() + cmpOptNilIsEmpty = cmpopts.EquateEmpty() + cmpOptIgnore = cmp.Ignore() +) + +// ignoreUnexportedAlways creates a cmp.Option filter that will ignore unexported +// fields of on any/all types. It is a derivative of go-cmp.IgnoreUnexported, +// here we do not require specifying individual types. +// +// reference: https://github.com/google/go-cmp/blob/master/cmp/cmpopts/ignore.go#L110 +func ignoreUnexportedAlways() cmp.Option { + return cmp.FilterPath( + func(p cmp.Path) bool { + sf, ok := p.Index(-1).(cmp.StructField) + if !ok { + return false + } + c := sf.Name()[0] + return c < 'A' || c > 'Z' + }, + cmpOptIgnore, + ) +} + +// OpaqueMapsEqual compare maps[] for equality, but safely by +// using the cmp package and ignoring un-exported types, and by treating nil/empty +// slices and maps as equal. +// +// This is intended as a substitute for reflect.DeepEqual in the case of "opaque maps", +// e.g. `map[comparable]any` - such as the case for Task Driver config or Envoy proxy +// pass-through configuration. +func OpaqueMapsEqual[M ~map[K]V, K comparable, V any](m1, m2 M) bool { + return maps.EqualFunc(m1, m2, func(a, b V) bool { + return cmp.Equal(a, b, + cmpOptIgnoreUnexported, // ignore all unexported fields + cmpOptNilIsEmpty, // treat nil/empty slices as equal + ) + }) +} diff --git a/helper/opaque_test.go b/helper/opaque_test.go new file mode 100644 index 000000000..52913c7b7 --- /dev/null +++ b/helper/opaque_test.go @@ -0,0 +1,88 @@ +package helper + +import ( + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" +) + +func Test_OpaqueMapsEqual(t *testing.T) { + ci.Parallel(t) + + type public struct { + F int + } + + type private struct { + g int + } + + type mix struct { + F int + g int + } + + cases := []struct { + name string + a, b map[string]any + exp bool + }{{ + name: "both nil", + a: nil, + b: nil, + exp: true, + }, { + name: "empty and nil", + a: nil, + b: make(map[string]any), + exp: true, + }, { + name: "same strings", + a: map[string]any{"a": "A"}, + b: map[string]any{"a": "A"}, + exp: true, + }, { + name: "same public struct", + a: map[string]any{"a": &public{F: 42}}, + b: map[string]any{"a": &public{F: 42}}, + exp: true, + }, { + name: "different public struct", + a: map[string]any{"a": &public{F: 42}}, + b: map[string]any{"a": &public{F: 10}}, + exp: false, + }, { + name: "different private struct", + a: map[string]any{"a": &private{g: 42}}, + b: map[string]any{"a": &private{g: 10}}, + exp: true, // private fields not compared + }, { + name: "mix same public different private", + a: map[string]any{"a": &mix{F: 42, g: 1}}, + b: map[string]any{"a": &mix{F: 42, g: 2}}, + exp: true, // private fields not compared + }, { + name: "mix different public same private", + a: map[string]any{"a": &mix{F: 42, g: 1}}, + b: map[string]any{"a": &mix{F: 10, g: 1}}, + exp: false, + }, { + name: "nil vs empty slice values", + a: map[string]any{"key": []string(nil)}, + b: map[string]any{"key": make([]string, 0)}, + exp: true, + }, { + name: "nil vs empty map values", + a: map[string]any{"key": map[int]int(nil)}, + b: map[string]any{"key": make(map[int]int, 0)}, + exp: true, + }} + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + result := OpaqueMapsEqual(tc.a, tc.b) + must.Eq(t, tc.exp, result) + }) + } +} diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 5e4c8996d..130362aa1 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -80,6 +80,25 @@ type TaskCSIPluginConfig struct { HealthTimeout time.Duration `mapstructure:"health_timeout" hcl:"health_timeout,optional"` } +func (t *TaskCSIPluginConfig) Equal(o *TaskCSIPluginConfig) bool { + if t == nil || o == nil { + return t == o + } + switch { + case t.ID != o.ID: + return false + case t.Type != o.Type: + return false + case t.MountDir != o.MountDir: + return false + case t.StagePublishBaseDir != o.StagePublishBaseDir: + return false + case t.HealthTimeout != o.HealthTimeout: + return false + } + return true +} + func (t *TaskCSIPluginConfig) Copy() *TaskCSIPluginConfig { if t == nil { return nil diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index 5da73a78b..c9ffab948 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -1033,3 +1034,33 @@ func TestDeleteNodeForType_Monolith_NilNode(t *testing.T) { _, ok = plug.Controllers["foo"] require.False(t, ok) } + +func TestTaskCSIPluginConfig_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*TaskCSIPluginConfig](t, nil, nil) + must.NotEqual[*TaskCSIPluginConfig](t, nil, new(TaskCSIPluginConfig)) + + must.StructEqual(t, &TaskCSIPluginConfig{ + ID: "abc123", + Type: CSIPluginTypeMonolith, + MountDir: "/opt/csi/mount", + StagePublishBaseDir: "/base", + HealthTimeout: 42 * time.Second, + }, []must.Tweak[*TaskCSIPluginConfig]{{ + Field: "ID", + Apply: func(c *TaskCSIPluginConfig) { c.ID = "def345" }, + }, { + Field: "Type", + Apply: func(c *TaskCSIPluginConfig) { c.Type = CSIPluginTypeNode }, + }, { + Field: "MountDir", + Apply: func(c *TaskCSIPluginConfig) { c.MountDir = "/csi" }, + }, { + Field: "StagePublishBaseDir", + Apply: func(c *TaskCSIPluginConfig) { c.StagePublishBaseDir = "/opt/base" }, + }, { + Field: "HealthTimeout", + Apply: func(c *TaskCSIPluginConfig) { c.HealthTimeout = 1 * time.Second }, + }}) +} diff --git a/nomad/structs/services.go b/nomad/structs/services.go index 4b15a350d..4496ba707 100644 --- a/nomad/structs/services.go +++ b/nomad/structs/services.go @@ -1201,8 +1201,8 @@ func (t *SidecarTask) Equal(o *SidecarTask) bool { return false } - // config compare - if !opaqueMapsEqual(t.Config, o.Config) { + // task config, use opaque maps equal + if !helper.OpaqueMapsEqual(t.Config, o.Config) { return false } @@ -1378,15 +1378,6 @@ func (p *ConsulProxy) Copy() *ConsulProxy { } } -// opaqueMapsEqual compares map[string]interface{} commonly used for opaque -// config blocks. Interprets nil and {} as the same. -func opaqueMapsEqual(a, b map[string]interface{}) bool { - if len(a) == 0 && len(b) == 0 { - return true - } - return reflect.DeepEqual(a, b) -} - // Equal returns true if the structs are recursively equal. func (p *ConsulProxy) Equal(o *ConsulProxy) bool { if p == nil || o == nil { @@ -1409,7 +1400,8 @@ func (p *ConsulProxy) Equal(o *ConsulProxy) bool { return false } - if !opaqueMapsEqual(p.Config, o.Config) { + // envoy config, use reflect + if !reflect.DeepEqual(p.Config, o.Config) { return false } @@ -1504,7 +1496,8 @@ func (u *ConsulUpstream) Equal(o *ConsulUpstream) bool { return false case !u.MeshGateway.Equal(o.MeshGateway): return false - case !opaqueMapsEqual(u.Config, o.Config): + case !reflect.DeepEqual(u.Config, o.Config): + // envoy config, use reflect return false } return true @@ -1790,7 +1783,8 @@ func (p *ConsulGatewayProxy) Equal(o *ConsulGatewayProxy) bool { return false } - if !opaqueMapsEqual(p.Config, o.Config) { + // envoy config, use reflect + if !reflect.DeepEqual(p.Config, o.Config) { return false } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 91f6db052..490c96358 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2703,8 +2703,40 @@ type AllocatedPortMapping struct { HostIP string } +func (m *AllocatedPortMapping) Copy() *AllocatedPortMapping { + return &AllocatedPortMapping{ + Label: m.Label, + Value: m.Value, + To: m.To, + HostIP: m.HostIP, + } +} + +func (m *AllocatedPortMapping) Equal(o *AllocatedPortMapping) bool { + if m == nil || o == nil { + return m == o + } + switch { + case m.Label != o.Label: + return false + case m.Value != o.Value: + return false + case m.To != o.To: + return false + case m.HostIP != o.HostIP: + return false + } + return true +} + type AllocatedPorts []AllocatedPortMapping +func (p AllocatedPorts) Equal(o AllocatedPorts) bool { + return slices.EqualFunc(p, o, func(a, b AllocatedPortMapping) bool { + return a.Equal(&b) + }) +} + func (p AllocatedPorts) Get(label string) (AllocatedPortMapping, bool) { for _, port := range p { if port.Label == label { @@ -2740,18 +2772,32 @@ type DNSConfig struct { Options []string } +func (d *DNSConfig) Equal(o *DNSConfig) bool { + if d == nil || o == nil { + return d == o + } + + switch { + case !slices.Equal(d.Servers, o.Servers): + return false + case !slices.Equal(d.Searches, o.Searches): + return false + case !slices.Equal(d.Options, o.Options): + return false + } + + return true +} + func (d *DNSConfig) Copy() *DNSConfig { if d == nil { return nil } - newD := new(DNSConfig) - newD.Servers = make([]string, len(d.Servers)) - copy(newD.Servers, d.Servers) - newD.Searches = make([]string, len(d.Searches)) - copy(newD.Searches, d.Searches) - newD.Options = make([]string, len(d.Options)) - copy(newD.Options, d.Options) - return newD + return &DNSConfig{ + Servers: slices.Clone(d.Servers), + Searches: slices.Clone(d.Searches), + Options: slices.Clone(d.Options), + } } // NetworkResource is used to represent available network @@ -7933,6 +7979,47 @@ func DefaultTemplate() *Template { } } +func (t *Template) Equal(o *Template) bool { + if t == nil || o == nil { + return t == o + } + switch { + case t.SourcePath != o.SourcePath: + return false + case t.DestPath != o.DestPath: + return false + case t.EmbeddedTmpl != o.EmbeddedTmpl: + return false + case t.ChangeMode != o.ChangeMode: + return false + case t.ChangeSignal != o.ChangeSignal: + return false + case !t.ChangeScript.Equal(o.ChangeScript): + return false + case t.Splay != o.Splay: + return false + case t.Perms != o.Perms: + return false + case !pointer.Eq(t.Uid, o.Uid): + return false + case !pointer.Eq(t.Gid, o.Gid): + return false + case t.LeftDelim != o.LeftDelim: + return false + case t.RightDelim != o.RightDelim: + return false + case t.Envvars != o.Envvars: + return false + case t.VaultGrace != o.VaultGrace: + return false + case !t.Wait.Equal(o.Wait): + return false + case t.ErrMissingKey != o.ErrMissingKey: + return false + } + return true +} + func (t *Template) Copy() *Template { if t == nil { return nil @@ -8044,18 +8131,33 @@ type ChangeScript struct { FailOnError bool } +func (cs *ChangeScript) Equal(o *ChangeScript) bool { + if cs == nil || o == nil { + return cs == o + } + switch { + case cs.Command != o.Command: + return false + case !slices.Equal(cs.Args, o.Args): + return false + case cs.Timeout != o.Timeout: + return false + case cs.FailOnError != o.FailOnError: + return false + } + return true +} + func (cs *ChangeScript) Copy() *ChangeScript { if cs == nil { return nil } - - ncs := new(ChangeScript) - *ncs = *cs - - // args is a slice! - ncs.Args = slices.Clone(cs.Args) - - return ncs + return &ChangeScript{ + Command: cs.Command, + Args: slices.Clone(cs.Args), + Timeout: cs.Timeout, + FailOnError: cs.FailOnError, + } } // Validate makes sure all the required fields of ChangeScript are present @@ -8099,22 +8201,15 @@ func (wc *WaitConfig) Copy() *WaitConfig { } func (wc *WaitConfig) Equal(o *WaitConfig) bool { - if wc.Min == nil && o.Min != nil { + if wc == nil || o == nil { + return wc == o + } + switch { + case !pointer.Eq(wc.Min, o.Min): + return false + case !pointer.Eq(wc.Max, o.Max): return false } - - if wc.Max == nil && o.Max != nil { - return false - } - - if wc.Min != nil && (o.Min == nil || *wc.Min != *o.Min) { - return false - } - - if wc.Max != nil && (o.Max == nil || *wc.Max != *o.Max) { - return false - } - return true } @@ -8781,6 +8876,25 @@ type TaskArtifact struct { RelativeDest string } +func (ta *TaskArtifact) Equal(o *TaskArtifact) bool { + if ta == nil || o == nil { + return ta == o + } + switch { + case ta.GetterSource != o.GetterSource: + return false + case !maps.Equal(ta.GetterOptions, o.GetterOptions): + return false + case !maps.Equal(ta.GetterHeaders, o.GetterHeaders): + return false + case ta.GetterMode != o.GetterMode: + return false + case ta.RelativeDest != o.RelativeDest: + return false + } + return true +} + func (ta *TaskArtifact) Copy() *TaskArtifact { if ta == nil { return nil @@ -9055,11 +9169,20 @@ type Affinity struct { // Equal checks if two affinities are equal. func (a *Affinity) Equal(o *Affinity) bool { - return a == o || - a.LTarget == o.LTarget && - a.RTarget == o.RTarget && - a.Operand == o.Operand && - a.Weight == o.Weight + if a == nil || o == nil { + return a == o + } + switch { + case a.LTarget != o.LTarget: + return false + case a.RTarget != o.RTarget: + return false + case a.Operand != o.Operand: + return false + case a.Weight != o.Weight: + return false + } + return true } func (a *Affinity) Copy() *Affinity { @@ -9143,6 +9266,21 @@ type Spread struct { str string } +func (s *Spread) Equal(o *Spread) bool { + if s == nil || o == nil { + return s == o + } + switch { + case s.Attribute != o.Attribute: + return false + case s.Weight != o.Weight: + return false + case !slices.EqualFunc(s.SpreadTarget, o.SpreadTarget, func(a, b *SpreadTarget) bool { return a.Equal(b) }): + return false + } + return true +} + type Affinities []*Affinity // Equal compares Affinities as a set @@ -9247,6 +9385,19 @@ func (s *SpreadTarget) String() string { return s.str } +func (s *SpreadTarget) Equal(o *SpreadTarget) bool { + if s == nil || o == nil { + return s == o + } + switch { + case s.Value != o.Value: + return false + case s.Percent != o.Percent: + return false + } + return true +} + // EphemeralDisk is an ephemeral disk object type EphemeralDisk struct { // Sticky indicates whether the allocation is sticky to a node @@ -9267,6 +9418,21 @@ func DefaultEphemeralDisk() *EphemeralDisk { } } +func (d *EphemeralDisk) Equal(o *EphemeralDisk) bool { + if d == nil || o == nil { + return d == o + } + switch { + case d.Sticky != o.Sticky: + return false + case d.SizeMB != o.SizeMB: + return false + case d.Migrate != o.Migrate: + return false + } + return true +} + // Validate validates EphemeralDisk func (d *EphemeralDisk) Validate() error { if d.SizeMB < 10 { @@ -9327,6 +9493,25 @@ func DefaultVaultBlock() *Vault { } } +func (v *Vault) Equal(o *Vault) bool { + if v == nil || o == nil { + return v == o + } + switch { + case !slices.Equal(v.Policies, o.Policies): + return false + case v.Namespace != o.Namespace: + return false + case v.Env != o.Env: + return false + case v.ChangeMode != o.ChangeMode: + return false + case v.ChangeSignal != o.ChangeSignal: + return false + } + return true +} + // Copy returns a copy of this Vault block. func (v *Vault) Copy() *Vault { if v == nil { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 6dc38b716..772fa49ff 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" "github.com/kr/pretty" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -2847,49 +2848,74 @@ func TestTaskWaitConfig_Equals(t *testing.T) { ci.Parallel(t) testCases := []struct { - name string - config *WaitConfig - expected *WaitConfig + name string + wc1 *WaitConfig + wc2 *WaitConfig + exp bool }{ { name: "all-fields", - config: &WaitConfig{ + wc1: &WaitConfig{ Min: pointer.Of(5 * time.Second), Max: pointer.Of(10 * time.Second), }, - expected: &WaitConfig{ + wc2: &WaitConfig{ Min: pointer.Of(5 * time.Second), Max: pointer.Of(10 * time.Second), }, + exp: true, }, { - name: "no-fields", - config: &WaitConfig{}, - expected: &WaitConfig{}, + name: "no-fields", + wc1: &WaitConfig{}, + wc2: &WaitConfig{}, + exp: true, }, { name: "min-only", - config: &WaitConfig{ + wc1: &WaitConfig{ Min: pointer.Of(5 * time.Second), }, - expected: &WaitConfig{ + wc2: &WaitConfig{ Min: pointer.Of(5 * time.Second), }, + exp: true, }, { name: "max-only", - config: &WaitConfig{ + wc1: &WaitConfig{ Max: pointer.Of(10 * time.Second), }, - expected: &WaitConfig{ + wc2: &WaitConfig{ Max: pointer.Of(10 * time.Second), }, + exp: true, + }, + { + name: "min-nil-vs-set", + wc1: &WaitConfig{ + Min: pointer.Of(1 * time.Second), + }, + wc2: &WaitConfig{ + Min: nil, + }, + exp: false, + }, + { + name: "max-nil-vs-set", + wc1: &WaitConfig{ + Max: pointer.Of(1 * time.Second), + }, + wc2: &WaitConfig{ + Max: nil, + }, + exp: false, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - require.True(t, tc.config.Equal(tc.expected)) + must.Eq(t, tc.exp, tc.wc1.Equal(tc.wc2)) }) } } @@ -6825,6 +6851,32 @@ func TestNodeResources_Merge(t *testing.T) { }, res) } +func TestAllocatedPortMapping_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*AllocatedPortMapping](t, nil, nil) + must.NotEqual[*AllocatedPortMapping](t, nil, new(AllocatedPortMapping)) + + must.StructEqual(t, &AllocatedPortMapping{ + Label: "http", + Value: 80, + To: 9000, + HostIP: "10.0.0.1", + }, []must.Tweak[*AllocatedPortMapping]{{ + Field: "Label", + Apply: func(m *AllocatedPortMapping) { m.Label = "https" }, + }, { + Field: "Value", + Apply: func(m *AllocatedPortMapping) { m.Value = 443 }, + }, { + Field: "To", + Apply: func(m *AllocatedPortMapping) { m.To = 9999 }, + }, { + Field: "HostIP", + Apply: func(m *AllocatedPortMapping) { m.HostIP = "10.1.1.1" }, + }}) +} + func TestAllocatedResources_Canonicalize(t *testing.T) { ci.Parallel(t) @@ -7120,3 +7172,314 @@ func requireErrors(t *testing.T, err error, expected ...string) { require.Equal(t, expected, found) } + +func TestEphemeralDisk_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*EphemeralDisk](t, nil, nil) + must.NotEqual[*EphemeralDisk](t, nil, new(EphemeralDisk)) + + must.StructEqual(t, &EphemeralDisk{ + Sticky: true, + SizeMB: 42, + Migrate: true, + }, []must.Tweak[*EphemeralDisk]{{ + Field: "Sticky", + Apply: func(e *EphemeralDisk) { e.Sticky = false }, + }, { + Field: "SizeMB", + Apply: func(e *EphemeralDisk) { e.SizeMB = 10 }, + }, { + Field: "Migrate", + Apply: func(e *EphemeralDisk) { e.Migrate = false }, + }}) +} + +func TestDNSConfig_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*DNSConfig](t, nil, nil) + must.NotEqual[*DNSConfig](t, nil, new(DNSConfig)) + + must.StructEqual(t, &DNSConfig{ + Servers: []string{"8.8.8.8", "8.8.4.4"}, + Searches: []string{"org", "local"}, + Options: []string{"opt1"}, + }, []must.Tweak[*DNSConfig]{{ + Field: "Servers", + Apply: func(c *DNSConfig) { c.Servers = []string{"1.1.1.1"} }, + }, { + Field: "Searches", + Apply: func(c *DNSConfig) { c.Searches = []string{"localhost"} }, + }, { + Field: "Options", + Apply: func(c *DNSConfig) { c.Options = []string{"opt2"} }, + }}) +} + +func TestChangeScript_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*ChangeScript](t, nil, nil) + must.NotEqual[*ChangeScript](t, nil, new(ChangeScript)) + + must.StructEqual(t, &ChangeScript{ + Command: "/bin/sleep", + Args: []string{"infinity"}, + Timeout: 1 * time.Second, + FailOnError: true, + }, []must.Tweak[*ChangeScript]{{ + Field: "Command", + Apply: func(c *ChangeScript) { c.Command = "/bin/false" }, + }, { + Field: "Args", + Apply: func(c *ChangeScript) { c.Args = []string{"1s"} }, + }, { + Field: "Timeout", + Apply: func(c *ChangeScript) { c.Timeout = 2 * time.Second }, + }, { + Field: "FailOnError", + Apply: func(c *ChangeScript) { c.FailOnError = false }, + }}) +} + +func TestWaitConfig_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*WaitConfig](t, nil, nil) + must.NotEqual[*WaitConfig](t, nil, new(WaitConfig)) + + must.StructEqual(t, &WaitConfig{ + Min: pointer.Of[time.Duration](100), + Max: pointer.Of[time.Duration](200), + }, []must.Tweak[*WaitConfig]{{ + Field: "Min", + Apply: func(c *WaitConfig) { c.Min = pointer.Of[time.Duration](111) }, + }, { + Field: "Max", + Apply: func(c *WaitConfig) { c.Max = pointer.Of[time.Duration](222) }, + }}) +} + +func TestTaskArtifact_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*TaskArtifact](t, nil, nil) + must.NotEqual[*TaskArtifact](t, nil, new(TaskArtifact)) + + must.StructEqual(t, &TaskArtifact{ + GetterSource: "source", + GetterOptions: map[string]string{"a": "A"}, + GetterHeaders: map[string]string{"b": "B"}, + GetterMode: "file", + RelativeDest: "./local", + }, []must.Tweak[*TaskArtifact]{{ + Field: "GetterSource", + Apply: func(ta *TaskArtifact) { ta.GetterSource = "other" }, + }, { + Field: "GetterOptions", + Apply: func(ta *TaskArtifact) { ta.GetterOptions = nil }, + }, { + Field: "GetterHeaders", + Apply: func(ta *TaskArtifact) { ta.GetterHeaders = nil }, + }, { + Field: "GetterMode", + Apply: func(ta *TaskArtifact) { ta.GetterMode = "directory" }, + }, { + Field: "RelativeDest", + Apply: func(ta *TaskArtifact) { ta.RelativeDest = "./alloc" }, + }}) +} + +func TestVault_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*Vault](t, nil, nil) + must.NotEqual[*Vault](t, nil, new(Vault)) + + must.StructEqual(t, &Vault{ + Policies: []string{"one"}, + Namespace: "global", + Env: true, + ChangeMode: "signal", + ChangeSignal: "SIGILL", + }, []must.Tweak[*Vault]{{ + Field: "Policies", + Apply: func(v *Vault) { v.Policies = []string{"two"} }, + }, { + Field: "Namespace", + Apply: func(v *Vault) { v.Namespace = "regional" }, + }, { + Field: "Env", + Apply: func(v *Vault) { v.Env = false }, + }, { + Field: "ChangeMode", + Apply: func(v *Vault) { v.ChangeMode = "restart" }, + }, { + Field: "ChangeSignal", + Apply: func(v *Vault) { v.ChangeSignal = "SIGTERM" }, + }}) +} + +func TestTemplate_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*Template](t, nil, nil) + must.NotEqual[*Template](t, nil, new(Template)) + + must.StructEqual(t, &Template{ + SourcePath: "source", + DestPath: "destination", + EmbeddedTmpl: "tmpl", + ChangeMode: "mode", + ChangeSignal: "signal", + ChangeScript: &ChangeScript{ + Command: "/bin/sleep", + Args: []string{"infinity"}, + Timeout: 1 * time.Second, + FailOnError: true, + }, + Splay: 1, + Perms: "perms", + Uid: pointer.Of(1000), + Gid: pointer.Of(1000), + LeftDelim: "{", + RightDelim: "}", + Envvars: true, + VaultGrace: 1 * time.Second, + Wait: &WaitConfig{ + Min: pointer.Of[time.Duration](1), + Max: pointer.Of[time.Duration](2), + }, + ErrMissingKey: true, + }, []must.Tweak[*Template]{{ + Field: "SourcePath", + Apply: func(t *Template) { t.SourcePath = "source2" }, + }, { + Field: "DestPath", + Apply: func(t *Template) { t.DestPath = "destination2" }, + }, { + Field: "EmbeddedTmpl", + Apply: func(t *Template) { t.EmbeddedTmpl = "tmpl2" }, + }, { + Field: "ChangeMode", + Apply: func(t *Template) { t.ChangeMode = "mode2" }, + }, { + Field: "ChangeSignal", + Apply: func(t *Template) { t.ChangeSignal = "signal2" }, + }, { + Field: "ChangeScript", + Apply: func(t *Template) { + t.ChangeScript = &ChangeScript{ + Command: "/bin/sleep", + Args: []string{"infinity", "plus", "one"}, + Timeout: 1 * time.Second, + FailOnError: true, + } + }, + }, { + Field: "Splay", + Apply: func(t *Template) { t.Splay = 2 }, + }, { + Field: "Perms", + Apply: func(t *Template) { t.Perms = "perms2" }, + }, { + Field: "Uid", + Apply: func(t *Template) { t.Uid = pointer.Of(0) }, + }, { + Field: "Gid", + Apply: func(t *Template) { t.Gid = pointer.Of(0) }, + }, { + Field: "LeftDelim", + Apply: func(t *Template) { t.LeftDelim = "[" }, + }, { + Field: "RightDelim", + Apply: func(t *Template) { t.RightDelim = "]" }, + }, { + Field: "Envvars", + Apply: func(t *Template) { t.Envvars = false }, + }, { + Field: "VaultGrace", + Apply: func(t *Template) { t.VaultGrace = 2 * time.Second }, + }, { + Field: "Wait", + Apply: func(t *Template) { + t.Wait = &WaitConfig{ + Min: pointer.Of[time.Duration](1), + Max: nil, + } + }, + }, { + Field: "ErrMissingKey", + Apply: func(t *Template) { t.ErrMissingKey = false }, + }}) +} + +func TestAffinity_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*Affinity](t, nil, nil) + must.NotEqual[*Affinity](t, nil, new(Affinity)) + + must.StructEqual(t, &Affinity{ + LTarget: "left", + RTarget: "right", + Operand: "op", + Weight: 100, + }, []must.Tweak[*Affinity]{{ + Field: "LTarget", + Apply: func(a *Affinity) { a.LTarget = "left2" }, + }, { + Field: "RTarget", + Apply: func(a *Affinity) { a.RTarget = "right2" }, + }, { + Field: "Operand", + Apply: func(a *Affinity) { a.Operand = "op2" }, + }, { + Field: "Weight", + Apply: func(a *Affinity) { a.Weight = 50 }, + }}) +} + +func TestSpreadTarget_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*SpreadTarget](t, nil, nil) + must.NotEqual[*SpreadTarget](t, nil, new(SpreadTarget)) + + must.StructEqual(t, &SpreadTarget{ + Value: "dc1", + Percent: 99, + }, []must.Tweak[*SpreadTarget]{{ + Field: "Value", + Apply: func(st *SpreadTarget) { st.Value = "dc2" }, + }, { + Field: "Percent", + Apply: func(st *SpreadTarget) { st.Percent = 98 }, + }}) +} + +func TestSpread_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*Spread](t, nil, nil) + must.NotEqual[*Spread](t, nil, new(Spread)) + + must.StructEqual(t, &Spread{ + Attribute: "attr", + Weight: 100, + SpreadTarget: []*SpreadTarget{{ + Value: "dc1", + Percent: 99, + }}, + }, []must.Tweak[*Spread]{{ + Field: "Attribute", + Apply: func(s *Spread) { s.Attribute = "attr2" }, + }, { + Field: "Weight", + Apply: func(s *Spread) { s.Weight = 50 }, + }, { + Field: "SpreadTarget", + Apply: func(s *Spread) { s.SpreadTarget = nil }, + }}) +} diff --git a/nomad/structs/volume_test.go b/nomad/structs/volume_test.go index 11415963b..0c5fc2045 100644 --- a/nomad/structs/volume_test.go +++ b/nomad/structs/volume_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -93,3 +94,74 @@ func TestVolumeRequest_Validate(t *testing.T) { } } + +func TestVolumeRequest_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*VolumeRequest](t, nil, nil) + must.NotEqual[*VolumeRequest](t, nil, new(VolumeRequest)) + + must.StructEqual(t, &VolumeRequest{ + Name: "name", + Type: "type", + Source: "source", + ReadOnly: true, + AccessMode: "access", + AttachmentMode: "attachment", + MountOptions: &CSIMountOptions{ + FSType: "fs1", + MountFlags: []string{"flag1"}, + }, + PerAlloc: true, + }, []must.Tweak[*VolumeRequest]{{ + Field: "Name", + Apply: func(vr *VolumeRequest) { vr.Name = "name2" }, + }, { + Field: "Type", + Apply: func(vr *VolumeRequest) { vr.Type = "type2" }, + }, { + Field: "Source", + Apply: func(vr *VolumeRequest) { vr.Source = "source2" }, + }, { + Field: "ReadOnly", + Apply: func(vr *VolumeRequest) { vr.ReadOnly = false }, + }, { + Field: "AccessMode", + Apply: func(vr *VolumeRequest) { vr.AccessMode = "access2" }, + }, { + Field: "AttachmentMode", + Apply: func(vr *VolumeRequest) { vr.AttachmentMode = "attachment2" }, + }, { + Field: "MountOptions", + Apply: func(vr *VolumeRequest) { vr.MountOptions = nil }, + }, { + Field: "PerAlloc", + Apply: func(vr *VolumeRequest) { vr.PerAlloc = false }, + }}) +} + +func TestVolumeMount_Equal(t *testing.T) { + ci.Parallel(t) + + must.Equal[*VolumeMount](t, nil, nil) + must.NotEqual[*VolumeMount](t, nil, new(VolumeMount)) + + must.StructEqual(t, &VolumeMount{ + Volume: "volume", + Destination: "destination", + ReadOnly: true, + PropagationMode: "mode", + }, []must.Tweak[*VolumeMount]{{ + Field: "Volume", + Apply: func(vm *VolumeMount) { vm.Volume = "vol2" }, + }, { + Field: "Destination", + Apply: func(vm *VolumeMount) { vm.Destination = "dest2" }, + }, { + Field: "ReadOnly", + Apply: func(vm *VolumeMount) { vm.ReadOnly = false }, + }, { + Field: "PropogationMode", + Apply: func(vm *VolumeMount) { vm.PropagationMode = "mode2" }, + }}) +} diff --git a/nomad/structs/volumes.go b/nomad/structs/volumes.go index d1ba01659..0a5571afb 100644 --- a/nomad/structs/volumes.go +++ b/nomad/structs/volumes.go @@ -102,6 +102,31 @@ type VolumeRequest struct { PerAlloc bool } +func (v *VolumeRequest) Equal(o *VolumeRequest) bool { + if v == nil || o == nil { + return v == o + } + switch { + case v.Name != o.Name: + return false + case v.Type != o.Type: + return false + case v.Source != o.Source: + return false + case v.ReadOnly != o.ReadOnly: + return false + case v.AccessMode != o.AccessMode: + return false + case v.AttachmentMode != o.AttachmentMode: + return false + case !v.MountOptions.Equal(o.MountOptions): + return false + case v.PerAlloc != o.PerAlloc: + return false + } + return true +} + func (v *VolumeRequest) Validate(jobType string, taskGroupCount, canaries int) error { if !(v.Type == VolumeTypeHost || v.Type == VolumeTypeCSI) { @@ -216,6 +241,23 @@ type VolumeMount struct { PropagationMode string } +func (v *VolumeMount) Equal(o *VolumeMount) bool { + if v == nil || o == nil { + return v == o + } + switch { + case v.Volume != o.Volume: + return false + case v.Destination != o.Destination: + return false + case v.ReadOnly != o.ReadOnly: + return false + case v.PropagationMode != o.PropagationMode: + return false + } + return true +} + func (v *VolumeMount) Copy() *VolumeMount { if v == nil { return nil diff --git a/scheduler/util.go b/scheduler/util.go index a9499a5e2..d885a300d 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -4,11 +4,13 @@ import ( "encoding/binary" "fmt" "math/rand" - "reflect" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" ) // allocTuple is a tuple of the allocation name and potential alloc ID @@ -169,121 +171,158 @@ func shuffleNodes(plan *structs.Plan, index uint64, nodes []*structs.Node) { } } -// tasksUpdated does a diff between task groups to see if the -// tasks, their drivers, environment variables or config have updated. The -// inputs are the task group name to diff and two jobs to diff. -// taskUpdated and functions called within assume that the given -// taskGroup has already been checked to not be nil -func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool { +// comparison records the _first_ detected difference between two groups during +// a comparison in tasksUpdated +// +// This is useful to provide context when debugging the result of tasksUpdated. +type comparison struct { + modified bool + label string + before any + after any +} + +func difference(label string, before, after any) comparison { + // push string formatting into String(), so that we never call it in the + // hot path unless someone adds a log line to debug with this result + return comparison{ + modified: true, + label: label, + before: before, + after: after, + } +} + +func (c comparison) String() string { + return fmt.Sprintf("%s changed; before: %#v, after: %#v", c.label, c.before, c.after) +} + +// same indicates no destructive difference between two task groups +var same = comparison{modified: false} + +// tasksUpdated creates a comparison between task groups to see if the tasks, their +// drivers, environment variables or config have been modified. +func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) comparison { a := jobA.LookupTaskGroup(taskGroup) b := jobB.LookupTaskGroup(taskGroup) // If the number of tasks do not match, clearly there is an update - if len(a.Tasks) != len(b.Tasks) { - return true + if lenA, lenB := len(a.Tasks), len(b.Tasks); lenA != lenB { + return difference("number of tasks", lenA, lenB) } // Check ephemeral disk - if !reflect.DeepEqual(a.EphemeralDisk, b.EphemeralDisk) { - return true + if !a.EphemeralDisk.Equal(b.EphemeralDisk) { + return difference("ephemeral disk", a.EphemeralDisk, b.EphemeralDisk) } // Check that the network resources haven't changed - if networkUpdated(a.Networks, b.Networks) { - return true + if c := networkUpdated(a.Networks, b.Networks); c.modified { + return c } // Check Affinities - if affinitiesUpdated(jobA, jobB, taskGroup) { - return true + if c := affinitiesUpdated(jobA, jobB, taskGroup); c.modified { + return c } // Check Spreads - if spreadsUpdated(jobA, jobB, taskGroup) { - return true + if c := spreadsUpdated(jobA, jobB, taskGroup); c.modified { + return c } // Check consul namespace updated - if consulNamespaceUpdated(a, b) { - return true + if c := consulNamespaceUpdated(a, b); c.modified { + return c } // Check connect service(s) updated - if connectServiceUpdated(a.Services, b.Services) { - return true + if c := connectServiceUpdated(a.Services, b.Services); c.modified { + return c } // Check if volumes are updated (no task driver can support // altering mounts in-place) - if !reflect.DeepEqual(a.Volumes, b.Volumes) { - return true + if !maps.EqualFunc(a.Volumes, b.Volumes, func(a, b *structs.VolumeRequest) bool { return a.Equal(b) }) { + return difference("volume request", a.Volumes, b.Volumes) } // Check each task for _, at := range a.Tasks { bt := b.LookupTask(at.Name) if bt == nil { - return true + return difference("task deleted", at.Name, "(nil)") } if at.Driver != bt.Driver { - return true + return difference("task driver", at.Driver, bt.Driver) } if at.User != bt.User { - return true + return difference("task user", at.User, bt.User) } - if !reflect.DeepEqual(at.Config, bt.Config) { - return true + if !helper.OpaqueMapsEqual(at.Config, bt.Config) { + return difference("task config", at.Config, bt.Config) } - if !reflect.DeepEqual(at.Env, bt.Env) { - return true + if !maps.Equal(at.Env, bt.Env) { + return difference("task env", at.Env, bt.Env) } - if !reflect.DeepEqual(at.Artifacts, bt.Artifacts) { - return true + if !slices.EqualFunc(at.Artifacts, bt.Artifacts, func(a, b *structs.TaskArtifact) bool { return a.Equal(b) }) { + return difference("task artifacts", at.Artifacts, bt.Artifacts) } - if !reflect.DeepEqual(at.Vault, bt.Vault) { - return true + if !at.Vault.Equal(bt.Vault) { + return difference("task vault", at.Vault, bt.Vault) } - if !reflect.DeepEqual(at.Templates, bt.Templates) { - return true + if !slices.EqualFunc(at.Templates, bt.Templates, func(a, b *structs.Template) bool { return a.Equal(b) }) { + return difference("task templates", at.Templates, bt.Templates) } - if !reflect.DeepEqual(at.CSIPluginConfig, bt.CSIPluginConfig) { - return true + if !at.CSIPluginConfig.Equal(bt.CSIPluginConfig) { + return difference("task csi config", at.CSIPluginConfig, bt.CSIPluginConfig) } - if !reflect.DeepEqual(at.VolumeMounts, bt.VolumeMounts) { - return true + if !slices.EqualFunc(at.VolumeMounts, bt.VolumeMounts, func(a, b *structs.VolumeMount) bool { return a.Equal(b) }) { + return difference("task volume mount", at.VolumeMounts, bt.VolumeMounts) } // Check the metadata - if !reflect.DeepEqual( - jobA.CombinedTaskMeta(taskGroup, at.Name), - jobB.CombinedTaskMeta(taskGroup, bt.Name)) { - return true + metaA := jobA.CombinedTaskMeta(taskGroup, at.Name) + metaB := jobB.CombinedTaskMeta(taskGroup, bt.Name) + if !maps.Equal(metaA, metaB) { + return difference("task meta", metaA, metaB) } // Inspect the network to see if the dynamic ports are different - if networkUpdated(at.Resources.Networks, bt.Resources.Networks) { - return true + if c := networkUpdated(at.Resources.Networks, bt.Resources.Networks); c.modified { + return c } - // Inspect the non-network resources - if ar, br := at.Resources, bt.Resources; ar.CPU != br.CPU { - return true - } else if ar.Cores != br.Cores { - return true - } else if ar.MemoryMB != br.MemoryMB { - return true - } else if ar.MemoryMaxMB != br.MemoryMaxMB { - return true - } else if !ar.Devices.Equal(&br.Devices) { - return true + if c := nonNetworkResourcesUpdated(at.Resources, bt.Resources); c.modified { + return c } // Inspect Identity being exposed if !at.Identity.Equal(bt.Identity) { - return true + return difference("task identity", at.Identity, bt.Identity) } } - return false + + // none of the fields that trigger a destructive update were modified, + // indicating this group can be updated in-place or ignored + return same +} + +func nonNetworkResourcesUpdated(a, b *structs.Resources) comparison { + // Inspect the non-network resources + switch { + case a.CPU != b.CPU: + return difference("task cpu", a.CPU, b.CPU) + case a.Cores != b.Cores: + return difference("task cores", a.Cores, b.Cores) + case a.MemoryMB != b.MemoryMB: + return difference("task memory", a.MemoryMB, b.MemoryMB) + case a.MemoryMaxMB != b.MemoryMaxMB: + return difference("task memory max", a.MemoryMaxMB, b.MemoryMaxMB) + case !a.Devices.Equal(&b.Devices): + return difference("task devices", a.Devices, b.Devices) + } + return same } // consulNamespaceUpdated returns true if the Consul namespace in the task group @@ -293,9 +332,12 @@ func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool { // because Namespaces directly impact networking validity among Consul intentions. // Forcing the task through a reschedule is a sure way of breaking no-longer valid // network connections. -func consulNamespaceUpdated(tgA, tgB *structs.TaskGroup) bool { +func consulNamespaceUpdated(tgA, tgB *structs.TaskGroup) comparison { // job.ConsulNamespace is pushed down to the TGs, just check those - return tgA.Consul.GetNamespace() != tgB.Consul.GetNamespace() + if a, b := tgA.Consul.GetNamespace(), tgB.Consul.GetNamespace(); a != b { + return difference("consul namespace", a, b) + } + return same } // connectServiceUpdated returns true if any services with a connect block have @@ -303,25 +345,25 @@ func consulNamespaceUpdated(tgA, tgB *structs.TaskGroup) bool { // // Ordinary services can be updated in-place by updating the service definition // in Consul. Connect service changes mostly require destroying the task. -func connectServiceUpdated(servicesA, servicesB []*structs.Service) bool { +func connectServiceUpdated(servicesA, servicesB []*structs.Service) comparison { for _, serviceA := range servicesA { if serviceA.Connect != nil { for _, serviceB := range servicesB { if serviceA.Name == serviceB.Name { - if connectUpdated(serviceA.Connect, serviceB.Connect) { - return true + if c := connectUpdated(serviceA.Connect, serviceB.Connect); c.modified { + return c } // Part of the Connect plumbing is derived from port label, // if that changes we need to destroy the task. if serviceA.PortLabel != serviceB.PortLabel { - return true + return difference("connect service port label", serviceA.PortLabel, serviceB.PortLabel) } break } } } } - return false + return same } // connectUpdated returns true if the connect block has been updated in a manner @@ -329,77 +371,93 @@ func connectServiceUpdated(servicesA, servicesB []*structs.Service) bool { // // Fields that can be updated through consul-sync do not need a destructive // update. -func connectUpdated(connectA, connectB *structs.ConsulConnect) bool { - if connectA == nil || connectB == nil { - return connectA != connectB +func connectUpdated(connectA, connectB *structs.ConsulConnect) comparison { + if connectA == nil && connectB == nil { + return same + } + + if connectA == nil && connectB != nil { + return difference("connect added", connectA, connectB) + } + + if connectA != nil && connectB == nil { + return difference("connect removed", connectA, connectB) } if connectA.Native != connectB.Native { - return true + return difference("connect native", connectA.Native, connectB.Native) } if !connectA.Gateway.Equal(connectB.Gateway) { - return true + return difference("connect gateway", connectA.Gateway, connectB.Gateway) } if !connectA.SidecarTask.Equal(connectB.SidecarTask) { - return true + return difference("connect sidecar task", connectA.SidecarTask, connectB.SidecarTask) } // not everything in sidecar_service needs task destruction - if connectSidecarServiceUpdated(connectA.SidecarService, connectB.SidecarService) { - return true + if c := connectSidecarServiceUpdated(connectA.SidecarService, connectB.SidecarService); c.modified { + return c } - return false + return same } -func connectSidecarServiceUpdated(ssA, ssB *structs.ConsulSidecarService) bool { - if ssA == nil || ssB == nil { - return ssA != ssB +func connectSidecarServiceUpdated(ssA, ssB *structs.ConsulSidecarService) comparison { + if ssA == nil && ssB == nil { + return same + } + + if ssA == nil && ssB != nil { + return difference("connect service add", ssA, ssB) + } + + if ssA != nil && ssB == nil { + return difference("connect service delete", ssA, ssB) } if ssA.Port != ssB.Port { - return true + return difference("connect port", ssA.Port, ssB.Port) } - // sidecar_service.tags handled in-place (registration) + // sidecar_service.tags (handled in-place via registration) - // sidecar_service.proxy handled in-place (registration + xDS) + // sidecar_service.proxy (handled in-place via registration + xDS) - return false + return same } -func networkUpdated(netA, netB []*structs.NetworkResource) bool { - if len(netA) != len(netB) { - return true +func networkUpdated(netA, netB []*structs.NetworkResource) comparison { + if lenNetA, lenNetB := len(netA), len(netB); lenNetA != lenNetB { + return difference("network lengths", lenNetA, lenNetB) } for idx := range netA { an := netA[idx] bn := netB[idx] if an.Mode != bn.Mode { - return true + return difference("network mode", an.Mode, bn.Mode) } if an.MBits != bn.MBits { - return true + return difference("network mbits", an.MBits, bn.MBits) } if an.Hostname != bn.Hostname { - return true + return difference("network hostname", an.Hostname, bn.Hostname) } - if !reflect.DeepEqual(an.DNS, bn.DNS) { - return true + if !an.DNS.Equal(bn.DNS) { + return difference("network dns", an.DNS, bn.DNS) } aPorts, bPorts := networkPortMap(an), networkPortMap(bn) - if !reflect.DeepEqual(aPorts, bPorts) { - return true + if !aPorts.Equal(bPorts) { + return difference("network port map", aPorts, bPorts) } } - return false + return same } // networkPortMap takes a network resource and returns a AllocatedPorts. @@ -426,59 +484,65 @@ func networkPortMap(n *structs.NetworkResource) structs.AllocatedPorts { return m } -func affinitiesUpdated(jobA, jobB *structs.Job, taskGroup string) bool { - var aAffinities []*structs.Affinity - var bAffinities []*structs.Affinity +func affinitiesUpdated(jobA, jobB *structs.Job, taskGroup string) comparison { + var affinitiesA structs.Affinities + var affinitiesB structs.Affinities + + // accumulate job affinities + + affinitiesA = append(affinitiesA, jobA.Affinities...) + affinitiesB = append(affinitiesB, jobB.Affinities...) tgA := jobA.LookupTaskGroup(taskGroup) tgB := jobB.LookupTaskGroup(taskGroup) - // Append jobA job and task group level affinities - aAffinities = append(aAffinities, jobA.Affinities...) - aAffinities = append(aAffinities, tgA.Affinities...) + // append group level affinities - // Append jobB job and task group level affinities - bAffinities = append(bAffinities, jobB.Affinities...) - bAffinities = append(bAffinities, tgB.Affinities...) + affinitiesA = append(affinitiesA, tgA.Affinities...) + affinitiesB = append(affinitiesB, tgB.Affinities...) + + // append task level affinities for A - // append task affinities for _, task := range tgA.Tasks { - aAffinities = append(aAffinities, task.Affinities...) + affinitiesA = append(affinitiesA, task.Affinities...) } + // append task level affinities for B for _, task := range tgB.Tasks { - bAffinities = append(bAffinities, task.Affinities...) + affinitiesB = append(affinitiesB, task.Affinities...) } - // Check for equality - if len(aAffinities) != len(bAffinities) { - return true + // finally check if all the affinities from both jobs match + if !affinitiesA.Equal(&affinitiesB) { + return difference("affinities", affinitiesA, affinitiesB) } - return !reflect.DeepEqual(aAffinities, bAffinities) + return same } -func spreadsUpdated(jobA, jobB *structs.Job, taskGroup string) bool { - var aSpreads []*structs.Spread - var bSpreads []*structs.Spread +func spreadsUpdated(jobA, jobB *structs.Job, taskGroup string) comparison { + var spreadsA []*structs.Spread + var spreadsB []*structs.Spread + + // accumulate job spreads + + spreadsA = append(spreadsA, jobA.Spreads...) + spreadsB = append(spreadsB, jobB.Spreads...) tgA := jobA.LookupTaskGroup(taskGroup) tgB := jobB.LookupTaskGroup(taskGroup) - // append jobA and task group level spreads - aSpreads = append(aSpreads, jobA.Spreads...) - aSpreads = append(aSpreads, tgA.Spreads...) + // append group spreads + spreadsA = append(spreadsA, tgA.Spreads...) + spreadsB = append(spreadsB, tgB.Spreads...) - // append jobB and task group level spreads - bSpreads = append(bSpreads, jobB.Spreads...) - bSpreads = append(bSpreads, tgB.Spreads...) - - // Check for equality - if len(aSpreads) != len(bSpreads) { - return true + if !slices.EqualFunc(spreadsA, spreadsB, func(a, b *structs.Spread) bool { + return a.Equal(b) + }) { + return difference("spreads", spreadsA, spreadsB) } - return !reflect.DeepEqual(aSpreads, bSpreads) + return same } // setStatus is used to update the status of the evaluation @@ -530,7 +594,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, // Check if the task drivers or config has changed, requires // a rolling upgrade since that cannot be done in-place. existing := update.Alloc.Job - if tasksUpdated(job, existing, update.TaskGroup.Name) { + if c := tasksUpdated(job, existing, update.TaskGroup.Name); c.modified { continue } @@ -774,7 +838,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy // Check if the task drivers or config has changed, requires // a destructive upgrade since that cannot be done in-place. - if tasksUpdated(newJob, existing.Job, newTG.Name) { + if c := tasksUpdated(newJob, existing.Job, newTG.Name); c.modified { return false, true, nil } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 76ad2b425..dc05f7ee8 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -6,24 +6,22 @@ import ( "time" "github.com/hashicorp/nomad/ci" - "github.com/shoenig/test/must" - "github.com/stretchr/testify/require" - "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" + "github.com/stretchr/testify/require" ) func BenchmarkTasksUpdated(b *testing.B) { jobA := mock.BigBenchmarkJob() jobB := jobA.Copy() - group := jobA.TaskGroups[0].Name for n := 0; n < b.N; n++ { - if tasksUpdated(jobA, jobB, group) { - b.Fatal("tasks should be the same") + if c := tasksUpdated(jobA, jobB, jobA.TaskGroups[0].Name); c.modified { + b.Errorf("tasks should be the same") } } } @@ -200,8 +198,7 @@ func TestTaskUpdatedAffinity(t *testing.T) { j1 := mock.Job() j2 := mock.Job() name := j1.TaskGroups[0].Name - - require.False(t, tasksUpdated(j1, j2, name)) + must.False(t, tasksUpdated(j1, j2, name).modified) // TaskGroup Affinity j2.TaskGroups[0].Affinities = []*structs.Affinity{ @@ -212,7 +209,7 @@ func TestTaskUpdatedAffinity(t *testing.T) { Weight: 100, }, } - require.True(t, tasksUpdated(j1, j2, name)) + must.True(t, tasksUpdated(j1, j2, name).modified) // TaskGroup Task Affinity j3 := mock.Job() @@ -224,8 +221,7 @@ func TestTaskUpdatedAffinity(t *testing.T) { Weight: 100, }, } - - require.True(t, tasksUpdated(j1, j3, name)) + must.True(t, tasksUpdated(j1, j3, name).modified) j4 := mock.Job() j4.TaskGroups[0].Tasks[0].Affinities = []*structs.Affinity{ @@ -236,8 +232,7 @@ func TestTaskUpdatedAffinity(t *testing.T) { Weight: 100, }, } - - require.True(t, tasksUpdated(j1, j4, name)) + must.True(t, tasksUpdated(j1, j4, name).modified) // check different level of same affinity j5 := mock.Job() @@ -260,8 +255,7 @@ func TestTaskUpdatedAffinity(t *testing.T) { Weight: 100, }, } - - require.False(t, tasksUpdated(j5, j6, name)) + must.False(t, tasksUpdated(j5, j6, name).modified) } func TestTaskUpdatedSpread(t *testing.T) { @@ -271,7 +265,7 @@ func TestTaskUpdatedSpread(t *testing.T) { j2 := mock.Job() name := j1.TaskGroups[0].Name - require.False(t, tasksUpdated(j1, j2, name)) + must.False(t, tasksUpdated(j1, j2, name).modified) // TaskGroup Spread j2.TaskGroups[0].Spreads = []*structs.Spread{ @@ -290,7 +284,7 @@ func TestTaskUpdatedSpread(t *testing.T) { }, }, } - require.True(t, tasksUpdated(j1, j2, name)) + must.True(t, tasksUpdated(j1, j2, name).modified) // check different level of same constraint j5 := mock.Job() @@ -329,7 +323,7 @@ func TestTaskUpdatedSpread(t *testing.T) { }, } - require.False(t, tasksUpdated(j5, j6, name)) + must.False(t, tasksUpdated(j5, j6, name).modified) } func TestTasksUpdated(t *testing.T) { @@ -338,23 +332,23 @@ func TestTasksUpdated(t *testing.T) { j1 := mock.Job() j2 := mock.Job() name := j1.TaskGroups[0].Name - require.False(t, tasksUpdated(j1, j2, name)) + must.False(t, tasksUpdated(j1, j2, name).modified) j2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" - require.True(t, tasksUpdated(j1, j2, name)) + must.True(t, tasksUpdated(j1, j2, name).modified) j3 := mock.Job() j3.TaskGroups[0].Tasks[0].Name = "foo" - require.True(t, tasksUpdated(j1, j3, name)) + must.True(t, tasksUpdated(j1, j3, name).modified) j4 := mock.Job() j4.TaskGroups[0].Tasks[0].Driver = "foo" - require.True(t, tasksUpdated(j1, j4, name)) + must.True(t, tasksUpdated(j1, j4, name).modified) j5 := mock.Job() j5.TaskGroups[0].Tasks = append(j5.TaskGroups[0].Tasks, j5.TaskGroups[0].Tasks[0]) - require.True(t, tasksUpdated(j1, j5, name)) + must.True(t, tasksUpdated(j1, j5, name).modified) j6 := mock.Job() j6.TaskGroups[0].Networks[0].DynamicPorts = []structs.Port{ @@ -362,15 +356,15 @@ func TestTasksUpdated(t *testing.T) { {Label: "https", Value: 0}, {Label: "admin", Value: 0}, } - require.True(t, tasksUpdated(j1, j6, name)) + must.True(t, tasksUpdated(j1, j6, name).modified) j7 := mock.Job() j7.TaskGroups[0].Tasks[0].Env["NEW_ENV"] = "NEW_VALUE" - require.True(t, tasksUpdated(j1, j7, name)) + must.True(t, tasksUpdated(j1, j7, name).modified) j8 := mock.Job() j8.TaskGroups[0].Tasks[0].User = "foo" - require.True(t, tasksUpdated(j1, j8, name)) + must.True(t, tasksUpdated(j1, j8, name).modified) j9 := mock.Job() j9.TaskGroups[0].Tasks[0].Artifacts = []*structs.TaskArtifact{ @@ -378,15 +372,15 @@ func TestTasksUpdated(t *testing.T) { GetterSource: "http://foo.com/bar", }, } - require.True(t, tasksUpdated(j1, j9, name)) + must.True(t, tasksUpdated(j1, j9, name).modified) j10 := mock.Job() j10.TaskGroups[0].Tasks[0].Meta["baz"] = "boom" - require.True(t, tasksUpdated(j1, j10, name)) + must.True(t, tasksUpdated(j1, j10, name).modified) j11 := mock.Job() j11.TaskGroups[0].Tasks[0].Resources.CPU = 1337 - require.True(t, tasksUpdated(j1, j11, name)) + must.True(t, tasksUpdated(j1, j11, name).modified) j11d1 := mock.Job() j11d1.TaskGroups[0].Tasks[0].Resources.Devices = structs.ResourceDevices{ @@ -402,38 +396,38 @@ func TestTasksUpdated(t *testing.T) { Count: 2, }, } - require.True(t, tasksUpdated(j11d1, j11d2, name)) + must.True(t, tasksUpdated(j11d1, j11d2, name).modified) j13 := mock.Job() j13.TaskGroups[0].Networks[0].DynamicPorts[0].Label = "foobar" - require.True(t, tasksUpdated(j1, j13, name)) + must.True(t, tasksUpdated(j1, j13, name).modified) j14 := mock.Job() j14.TaskGroups[0].Networks[0].ReservedPorts = []structs.Port{{Label: "foo", Value: 1312}} - require.True(t, tasksUpdated(j1, j14, name)) + must.True(t, tasksUpdated(j1, j14, name).modified) j15 := mock.Job() j15.TaskGroups[0].Tasks[0].Vault = &structs.Vault{Policies: []string{"foo"}} - require.True(t, tasksUpdated(j1, j15, name)) + must.True(t, tasksUpdated(j1, j15, name).modified) j16 := mock.Job() j16.TaskGroups[0].EphemeralDisk.Sticky = true - require.True(t, tasksUpdated(j1, j16, name)) + must.True(t, tasksUpdated(j1, j16, name).modified) // Change group meta j17 := mock.Job() j17.TaskGroups[0].Meta["j17_test"] = "roll_baby_roll" - require.True(t, tasksUpdated(j1, j17, name)) + must.True(t, tasksUpdated(j1, j17, name).modified) // Change job meta j18 := mock.Job() j18.Meta["j18_test"] = "roll_baby_roll" - require.True(t, tasksUpdated(j1, j18, name)) + must.True(t, tasksUpdated(j1, j18, name).modified) // Change network mode j19 := mock.Job() j19.TaskGroups[0].Networks[0].Mode = "bridge" - require.True(t, tasksUpdated(j1, j19, name)) + must.True(t, tasksUpdated(j1, j19, name).modified) // Change cores resource j20 := mock.Job() @@ -442,7 +436,7 @@ func TestTasksUpdated(t *testing.T) { j21 := mock.Job() j21.TaskGroups[0].Tasks[0].Resources.CPU = 0 j21.TaskGroups[0].Tasks[0].Resources.Cores = 4 - require.True(t, tasksUpdated(j20, j21, name)) + must.True(t, tasksUpdated(j20, j21, name).modified) // Compare identical Template wait configs j22 := mock.Job() @@ -463,10 +457,10 @@ func TestTasksUpdated(t *testing.T) { }, }, } - require.False(t, tasksUpdated(j22, j23, name)) + must.False(t, tasksUpdated(j22, j23, name).modified) // Compare changed Template wait configs j23.TaskGroups[0].Tasks[0].Templates[0].Wait.Max = pointer.Of(10 * time.Second) - require.True(t, tasksUpdated(j22, j23, name)) + must.True(t, tasksUpdated(j22, j23, name).modified) // Add a volume j24 := mock.Job() @@ -477,12 +471,12 @@ func TestTasksUpdated(t *testing.T) { Type: "csi", Source: "test-volume[0]", }} - require.True(t, tasksUpdated(j24, j25, name)) + must.True(t, tasksUpdated(j24, j25, name).modified) // Alter a volume j26 := j25.Copy() j26.TaskGroups[0].Volumes["myvolume"].ReadOnly = true - require.True(t, tasksUpdated(j25, j26, name)) + must.True(t, tasksUpdated(j25, j26, name).modified) // Alter a CSI plugin j27 := mock.Job() @@ -492,7 +486,7 @@ func TestTasksUpdated(t *testing.T) { } j28 := j27.Copy() j28.TaskGroups[0].Tasks[0].CSIPluginConfig.Type = "monolith" - require.True(t, tasksUpdated(j27, j28, name)) + must.True(t, tasksUpdated(j27, j28, name).modified) // Compare identical Template ErrMissingKey j29 := mock.Job() @@ -507,11 +501,11 @@ func TestTasksUpdated(t *testing.T) { ErrMissingKey: false, }, } - require.False(t, tasksUpdated(j29, j30, name)) + must.False(t, tasksUpdated(j29, j30, name).modified) // Compare changed Template ErrMissingKey j30.TaskGroups[0].Tasks[0].Templates[0].ErrMissingKey = true - require.True(t, tasksUpdated(j29, j30, name)) + must.True(t, tasksUpdated(j29, j30, name).modified) } func TestTasksUpdated_connectServiceUpdated(t *testing.T) { @@ -541,8 +535,8 @@ func TestTasksUpdated_connectServiceUpdated(t *testing.T) { }, { Name: "service2", }} - updated := connectServiceUpdated(servicesA, servicesB) - require.False(t, updated) + updated := connectServiceUpdated(servicesA, servicesB).modified + must.False(t, updated) }) t.Run("service connect tags updated", func(t *testing.T) { @@ -557,8 +551,8 @@ func TestTasksUpdated_connectServiceUpdated(t *testing.T) { }, }, }} - updated := connectServiceUpdated(servicesA, servicesB) - require.False(t, updated) + updated := connectServiceUpdated(servicesA, servicesB).modified + must.False(t, updated) }) t.Run("service connect port updated", func(t *testing.T) { @@ -574,8 +568,8 @@ func TestTasksUpdated_connectServiceUpdated(t *testing.T) { }, }, }} - updated := connectServiceUpdated(servicesA, servicesB) - require.True(t, updated) + updated := connectServiceUpdated(servicesA, servicesB).modified + must.True(t, updated) }) t.Run("service port label updated", func(t *testing.T) { @@ -590,13 +584,14 @@ func TestTasksUpdated_connectServiceUpdated(t *testing.T) { }, }, }} - updated := connectServiceUpdated(servicesA, servicesB) - require.True(t, updated) + updated := connectServiceUpdated(servicesA, servicesB).modified + must.True(t, updated) }) } func TestNetworkUpdated(t *testing.T) { ci.Parallel(t) + cases := []struct { name string a []*structs.NetworkResource @@ -653,11 +648,9 @@ func TestNetworkUpdated(t *testing.T) { }, } - for i := range cases { - c := cases[i] - t.Run(c.name, func(tc *testing.T) { - tc.Parallel() - require.Equal(tc, c.updated, networkUpdated(c.a, c.b), "unexpected network updated result") + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + must.Eq(t, tc.updated, networkUpdated(tc.a, tc.b).modified) }) } } @@ -1017,17 +1010,17 @@ func TestUtil_connectUpdated(t *testing.T) { ci.Parallel(t) t.Run("both nil", func(t *testing.T) { - require.False(t, connectUpdated(nil, nil)) + must.False(t, connectUpdated(nil, nil).modified) }) t.Run("one nil", func(t *testing.T) { - require.True(t, connectUpdated(nil, new(structs.ConsulConnect))) + must.True(t, connectUpdated(nil, new(structs.ConsulConnect)).modified) }) t.Run("native differ", func(t *testing.T) { a := &structs.ConsulConnect{Native: true} b := &structs.ConsulConnect{Native: false} - require.True(t, connectUpdated(a, b)) + must.True(t, connectUpdated(a, b).modified) }) t.Run("gateway differ", func(t *testing.T) { @@ -1037,7 +1030,7 @@ func TestUtil_connectUpdated(t *testing.T) { b := &structs.ConsulConnect{Gateway: &structs.ConsulGateway{ Terminating: new(structs.ConsulTerminatingConfigEntry), }} - require.True(t, connectUpdated(a, b)) + must.True(t, connectUpdated(a, b).modified) }) t.Run("sidecar task differ", func(t *testing.T) { @@ -1047,7 +1040,7 @@ func TestUtil_connectUpdated(t *testing.T) { b := &structs.ConsulConnect{SidecarTask: &structs.SidecarTask{ Driver: "docker", }} - require.True(t, connectUpdated(a, b)) + must.True(t, connectUpdated(a, b).modified) }) t.Run("sidecar service differ", func(t *testing.T) { @@ -1057,13 +1050,13 @@ func TestUtil_connectUpdated(t *testing.T) { b := &structs.ConsulConnect{SidecarService: &structs.ConsulSidecarService{ Port: "2222", }} - require.True(t, connectUpdated(a, b)) + must.True(t, connectUpdated(a, b).modified) }) t.Run("same", func(t *testing.T) { a := new(structs.ConsulConnect) b := new(structs.ConsulConnect) - require.False(t, connectUpdated(a, b)) + must.False(t, connectUpdated(a, b).modified) }) } @@ -1071,23 +1064,23 @@ func TestUtil_connectSidecarServiceUpdated(t *testing.T) { ci.Parallel(t) t.Run("both nil", func(t *testing.T) { - require.False(t, connectSidecarServiceUpdated(nil, nil)) + require.False(t, connectSidecarServiceUpdated(nil, nil).modified) }) t.Run("one nil", func(t *testing.T) { - require.True(t, connectSidecarServiceUpdated(nil, new(structs.ConsulSidecarService))) + require.True(t, connectSidecarServiceUpdated(nil, new(structs.ConsulSidecarService)).modified) }) t.Run("ports differ", func(t *testing.T) { a := &structs.ConsulSidecarService{Port: "1111"} b := &structs.ConsulSidecarService{Port: "2222"} - require.True(t, connectSidecarServiceUpdated(a, b)) + require.True(t, connectSidecarServiceUpdated(a, b).modified) }) t.Run("same", func(t *testing.T) { a := &structs.ConsulSidecarService{Port: "1111"} b := &structs.ConsulSidecarService{Port: "1111"} - require.False(t, connectSidecarServiceUpdated(a, b)) + require.False(t, connectSidecarServiceUpdated(a, b).modified) }) } @@ -1100,12 +1093,12 @@ func TestTasksUpdated_Identity(t *testing.T) { j2 := j1.Copy() - must.False(t, tasksUpdated(j1, j2, name)) + must.False(t, tasksUpdated(j1, j2, name).modified) // Set identity on j1 and assert update j1.TaskGroups[0].Tasks[0].Identity = &structs.WorkloadIdentity{} - must.True(t, tasksUpdated(j1, j2, name)) + must.True(t, tasksUpdated(j1, j2, name).modified) } func TestTaskGroupConstraints(t *testing.T) {