From 4b2912d343a9ac37efc6a1967bde79244c45365f Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Thu, 18 Mar 2021 22:49:06 -0400 Subject: [PATCH] structs: add struct fields and funcs for reservable cpu cores --- lib/cpuset/cpuset.go | 125 +++++++++++++++++++++++++++++++++ lib/cpuset/cpuset_test.go | 128 ++++++++++++++++++++++++++++++++++ nomad/structs/diff_test.go | 12 ++++ nomad/structs/structs.go | 87 +++++++++++++++++------ nomad/structs/structs_test.go | 50 +++++-------- 5 files changed, 346 insertions(+), 56 deletions(-) create mode 100644 lib/cpuset/cpuset.go create mode 100644 lib/cpuset/cpuset_test.go diff --git a/lib/cpuset/cpuset.go b/lib/cpuset/cpuset.go new file mode 100644 index 000000000..c97f369dc --- /dev/null +++ b/lib/cpuset/cpuset.go @@ -0,0 +1,125 @@ +package cpuset + +import ( + "fmt" + "reflect" + "sort" + "strconv" + "strings" +) + +// CPUSet is a set like object that provides methods helpful when working with cpus with systems +// such as the Linux cpuset cgroup subsystem. A CPUSet is immutable and can be safely accessed concurrently. +type CPUSet struct { + cpus map[uint16]struct{} +} + +// New initializes a new CPUSet with 0 or more containing cpus +func New(cpus ...uint16) CPUSet { + cpuset := CPUSet{ + cpus: make(map[uint16]struct{}), + } + + for _, v := range cpus { + cpuset.cpus[v] = struct{}{} + } + + return cpuset +} + +// Size returns to the number of cpus contained in the CPUSet +func (c CPUSet) Size() int { + return len(c.cpus) +} + +// ToSlice returns a sorted slice of uint16 CPU IDs contained in the CPUSet. +func (c CPUSet) ToSlice() []uint16 { + cpus := []uint16{} + for k := range c.cpus { + cpus = append(cpus, k) + } + sort.Slice(cpus, func(i, j int) bool { return cpus[i] < cpus[j] }) + return cpus +} + +// Union returns a new set that is the union of this CPUSet and the supplied other. +// Ex. [0,1,2,3].Union([2,3,4,5]) = [0,1,2,3,4,5] +func (c CPUSet) Union(other CPUSet) CPUSet { + s := New() + for k := range c.cpus { + s.cpus[k] = struct{}{} + } + for k := range other.cpus { + s.cpus[k] = struct{}{} + } + return s +} + +// Difference returns a new set that is the difference of this CPUSet and the supplied other. +// [0,1,2,3].Difference([2,3,4]) = [0,1] +func (c CPUSet) Difference(other CPUSet) CPUSet { + s := New() + for k := range c.cpus { + s.cpus[k] = struct{}{} + } + for k := range other.cpus { + delete(s.cpus, k) + } + return s + +} + +// IsSubsetOf returns true if all cpus of the this CPUSet are present in the other CPUSet. +func (s CPUSet) IsSubsetOf(other CPUSet) bool { + for cpu := range s.cpus { + if _, ok := other.cpus[cpu]; !ok { + return false + } + } + return true +} + +// Equals tests the equality of the elements in the CPUSet +func (s CPUSet) Equals(other CPUSet) bool { + return reflect.DeepEqual(s.cpus, other.cpus) +} + +// Parse parses the Linux cpuset format into a CPUSet +// +// Ref: http://man7.org/linux/man-pages/man7/cpuset.7.html#FORMATS +func Parse(s string) (CPUSet, error) { + cpuset := New() + if s == "" { + return cpuset, nil + } + sets := strings.Split(s, ",") + for _, set := range sets { + bounds := strings.Split(set, "-") + if len(bounds) == 1 { + v, err := strconv.Atoi(bounds[0]) + if err != nil { + return New(), err + } + + cpuset.cpus[uint16(v)] = struct{}{} + continue + } + if len(bounds) > 2 { + return New(), fmt.Errorf("failed to parse element %s, more than 1 '-' found", set) + } + + lower, err := strconv.Atoi(bounds[0]) + if err != nil { + return New(), err + } + upper, err := strconv.Atoi(bounds[1]) + if err != nil { + return New(), err + } + for v := lower; v <= upper; v++ { + cpuset.cpus[uint16(v)] = struct{}{} + } + } + + return cpuset, nil +} diff --git a/lib/cpuset/cpuset_test.go b/lib/cpuset/cpuset_test.go new file mode 100644 index 000000000..f3c58afb2 --- /dev/null +++ b/lib/cpuset/cpuset_test.go @@ -0,0 +1,128 @@ +package cpuset + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCPUSet_Size(t *testing.T) { + set := New(0, 1, 2, 3) + require.Equal(t, 4, set.Size()) + require.Equal(t, 0, New().Size()) +} + +func TestCPUSet_ToSlice(t *testing.T) { + cases := []struct { + desc string + in CPUSet + out []uint16 + }{ + { + "empty cpuset", + New(), + []uint16{}, + }, + { + "in order", + New(0, 1, 2, 3, 4, 5, 6, 7), + []uint16{0, 1, 2, 3, 4, 5, 6, 7}, + }, + { + "out of order", + New(3, 1, 2, 0), + []uint16{0, 1, 2, 3}, + }, + } + + for _, c := range cases { + require.Exactly(t, c.out, c.in.ToSlice(), c.desc) + } +} + +func TestCPUSet_Equals(t *testing.T) { + cases := []struct { + a CPUSet + b CPUSet + shouldEqual bool + }{ + {New(), New(), true}, + {New(5), New(5), true}, + {New(1, 2, 3, 4, 5), New(1, 2, 3, 4, 5), true}, + + {New(), New(5), false}, + {New(5), New(), false}, + {New(), New(1, 2, 3, 4, 5), false}, + {New(1, 2, 3, 4, 5), New(), false}, + {New(5), New(1, 2, 3, 4, 5), false}, + {New(1, 2, 3, 4, 5), New(5), false}, + } + + for _, c := range cases { + require.Equal(t, c.shouldEqual, c.a.Equals(c.b)) + } +} + +func TestCpuSet_Union(t *testing.T) { + cases := []struct { + a CPUSet + b CPUSet + expected CPUSet + }{ + {New(), New(), New()}, + + {New(), New(0), New(0)}, + {New(0), New(), New(0)}, + {New(0), New(0), New(0)}, + + {New(), New(0, 1, 2, 3), New(0, 1, 2, 3)}, + {New(0, 1), New(0, 1, 2, 3), New(0, 1, 2, 3)}, + {New(2, 3), New(4, 5), New(2, 3, 4, 5)}, + {New(3, 4), New(0, 1, 2, 3), New(0, 1, 2, 3, 4)}, + } + + for _, c := range cases { + require.Exactly(t, c.expected.ToSlice(), c.a.Union(c.b).ToSlice()) + } +} + +func TestCpuSet_Difference(t *testing.T) { + cases := []struct { + a CPUSet + b CPUSet + expected CPUSet + }{ + {New(), New(), New()}, + + {New(), New(0), New()}, + {New(0), New(), New(0)}, + {New(0), New(0), New()}, + + {New(0, 1), New(0, 1, 2, 3), New()}, + {New(2, 3), New(4, 5), New(2, 3)}, + {New(3, 4), New(0, 1, 2, 3), New(4)}, + } + + for _, c := range cases { + require.Exactly(t, c.expected.ToSlice(), c.a.Difference(c.b).ToSlice()) + } +} + +func TestParse(t *testing.T) { + cases := []struct { + cpuset string + expected CPUSet + }{ + {"", New()}, + {"1", New(1)}, + {"0,1,2,3", New(0, 1, 2, 3)}, + {"0-3", New(0, 1, 2, 3)}, + {"0,2-3,5", New(0, 2, 3, 5)}, + } + + for _, c := range cases { + result, err := Parse(c.cpuset) + require.NoError(t, err) + require.True(t, result.Equals(c.expected)) + } +} diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 2d1637c19..47ae7e8a3 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -4516,6 +4516,12 @@ func TestTaskDiff(t *testing.T) { Old: "100", New: "200", }, + { + Type: DiffTypeNone, + Name: "Cores", + Old: "0", + New: "0", + }, { Type: DiffTypeEdited, Name: "DiskMB", @@ -4869,6 +4875,12 @@ func TestTaskDiff(t *testing.T) { Old: "100", New: "100", }, + { + Type: DiffTypeNone, + Name: "Cores", + Old: "0", + New: "0", + }, { Type: DiffTypeNone, Name: "DiskMB", diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1c8bfe445..f3c082abb 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -25,6 +25,8 @@ import ( "strings" "time" + "github.com/hashicorp/nomad/lib/cpuset" + "github.com/hashicorp/cronexpr" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-multierror" @@ -2178,6 +2180,7 @@ type NodeStubFields struct { // on a client type Resources struct { CPU int + Cores int MemoryMB int DiskMB int IOPS int // COMPAT(0.10): Only being used to issue warnings @@ -2196,6 +2199,7 @@ const ( func DefaultResources() *Resources { return &Resources{ CPU: 100, + Cores: 0, MemoryMB: 300, } } @@ -2219,6 +2223,11 @@ func (r *Resources) DiskInBytes() int64 { func (r *Resources) Validate() error { var mErr multierror.Error + + if r.Cores > 0 && r.CPU > 0 { + mErr.Errors = append(mErr.Errors, errors.New("Task can only ask for 'cpu' or 'cores' resource, not both.")) + } + if err := r.MeetsMinResources(); err != nil { mErr.Errors = append(mErr.Errors, err) } @@ -2243,6 +2252,9 @@ func (r *Resources) Merge(other *Resources) { if other.CPU != 0 { r.CPU = other.CPU } + if other.Cores != 0 { + r.Cores = other.Cores + } if other.MemoryMB != 0 { r.MemoryMB = other.MemoryMB } @@ -2266,6 +2278,7 @@ func (r *Resources) Equals(o *Resources) bool { return false } return r.CPU == o.CPU && + r.Cores == o.Cores && r.MemoryMB == o.MemoryMB && r.DiskMB == o.DiskMB && r.IOPS == o.IOPS && @@ -2325,7 +2338,7 @@ func (r *Resources) Canonicalize() { func (r *Resources) MeetsMinResources() error { var mErr multierror.Error minResources := MinResources() - if r.CPU < minResources.CPU { + if r.CPU < minResources.CPU && r.Cores == 0 { mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum CPU value is %d; got %d", minResources.CPU, r.CPU)) } if r.MemoryMB < minResources.MemoryMB { @@ -2363,23 +2376,6 @@ func (r *Resources) NetIndex(n *NetworkResource) int { return r.Networks.NetIndex(n) } -// Superset checks if one set of resources is a superset -// of another. This ignores network resources, and the NetworkIndex -// should be used for that. -// COMPAT(0.10): Remove in 0.10 -func (r *Resources) Superset(other *Resources) (bool, string) { - if r.CPU < other.CPU { - return false, "cpu" - } - if r.MemoryMB < other.MemoryMB { - return false, "memory" - } - if r.DiskMB < other.DiskMB { - return false, "disk" - } - return true, "" -} - // Add adds the resources of the delta to this, potentially // returning an error if not possible. // COMPAT(0.10): Remove in 0.10 @@ -2807,7 +2803,8 @@ func (n *NodeResources) Comparable() *ComparableResources { c := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: n.Cpu.CpuShares, + CpuShares: n.Cpu.CpuShares, + ReservedCores: n.Cpu.ReservableCpuCores, }, Memory: AllocatedMemoryResources{ MemoryMB: n.Memory.MemoryMB, @@ -2957,6 +2954,15 @@ type NodeCpuResources struct { // CpuShares is the CPU shares available. This is calculated by number of // cores multiplied by the core frequency. CpuShares int64 + + // TotalCpuCores is the total number of cores on the machine. This includes cores not in + // the agent's cpuset if on a linux platform + TotalCpuCores uint16 + + // ReservableCpuCores is the set of cpus which are available to be reserved on the Node. + // This value is currently only reported on Linux platforms which support cgroups and is + // discovered by inspecting the cpuset of the agent's cgroup. + ReservableCpuCores []uint16 } func (n *NodeCpuResources) Merge(o *NodeCpuResources) { @@ -2967,6 +2973,14 @@ func (n *NodeCpuResources) Merge(o *NodeCpuResources) { if o.CpuShares != 0 { n.CpuShares = o.CpuShares } + + if o.TotalCpuCores != 0 { + n.TotalCpuCores = o.TotalCpuCores + } + + if len(o.ReservableCpuCores) != 0 { + n.ReservableCpuCores = o.ReservableCpuCores + } } func (n *NodeCpuResources) Equals(o *NodeCpuResources) bool { @@ -2982,9 +2996,25 @@ func (n *NodeCpuResources) Equals(o *NodeCpuResources) bool { return false } + if n.TotalCpuCores != o.TotalCpuCores { + return false + } + + if len(n.ReservableCpuCores) != len(o.ReservableCpuCores) { + return false + } + for i := range n.ReservableCpuCores { + if n.ReservableCpuCores[i] != o.ReservableCpuCores[i] { + return false + } + } return true } +func (n *NodeCpuResources) SharesPerCore() int64 { + return n.CpuShares / int64(n.TotalCpuCores) +} + // NodeMemoryResources captures the memory resources of the node type NodeMemoryResources struct { // MemoryMB is the total available memory on the node @@ -3298,7 +3328,8 @@ func (n *NodeReservedResources) Comparable() *ComparableResources { c := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: n.Cpu.CpuShares, + CpuShares: n.Cpu.CpuShares, + ReservedCores: n.Cpu.ReservedCpuCores, }, Memory: AllocatedMemoryResources{ MemoryMB: n.Memory.MemoryMB, @@ -3313,7 +3344,8 @@ func (n *NodeReservedResources) Comparable() *ComparableResources { // NodeReservedCpuResources captures the reserved CPU resources of the node. type NodeReservedCpuResources struct { - CpuShares int64 + CpuShares int64 + ReservedCpuCores []uint16 } // NodeReservedMemoryResources captures the reserved memory resources of the node. @@ -3552,7 +3584,8 @@ func (a *AllocatedTaskResources) Comparable() *ComparableResources { ret := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: a.Cpu.CpuShares, + CpuShares: a.Cpu.CpuShares, + ReservedCores: a.Cpu.ReservedCores, }, Memory: AllocatedMemoryResources{ MemoryMB: a.Memory.MemoryMB, @@ -3636,7 +3669,8 @@ func (a *AllocatedSharedResources) Canonicalize() { // AllocatedCpuResources captures the allocated CPU resources. type AllocatedCpuResources struct { - CpuShares int64 + CpuShares int64 + ReservedCores []uint16 } func (a *AllocatedCpuResources) Add(delta *AllocatedCpuResources) { @@ -3645,6 +3679,8 @@ func (a *AllocatedCpuResources) Add(delta *AllocatedCpuResources) { } a.CpuShares += delta.CpuShares + + a.ReservedCores = cpuset.New(a.ReservedCores...).Union(cpuset.New(delta.ReservedCores...)).ToSlice() } func (a *AllocatedCpuResources) Subtract(delta *AllocatedCpuResources) { @@ -3653,6 +3689,7 @@ func (a *AllocatedCpuResources) Subtract(delta *AllocatedCpuResources) { } a.CpuShares -= delta.CpuShares + a.ReservedCores = cpuset.New(a.ReservedCores...).Difference(cpuset.New(delta.ReservedCores...)).ToSlice() } func (a *AllocatedCpuResources) Max(other *AllocatedCpuResources) { @@ -3663,6 +3700,10 @@ func (a *AllocatedCpuResources) Max(other *AllocatedCpuResources) { if other.CpuShares > a.CpuShares { a.CpuShares = other.CpuShares } + + if len(other.ReservedCores) > len(a.ReservedCores) { + a.ReservedCores = other.ReservedCores + } } // AllocatedMemoryResources captures the allocated memory resources. diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 4787c5832..dd5ea3bf5 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -2599,32 +2599,6 @@ func TestResource_NetIndex(t *testing.T) { } } -func TestResource_Superset(t *testing.T) { - r1 := &Resources{ - CPU: 2000, - MemoryMB: 2048, - DiskMB: 10000, - } - r2 := &Resources{ - CPU: 2000, - MemoryMB: 1024, - DiskMB: 5000, - } - - if s, _ := r1.Superset(r1); !s { - t.Fatalf("bad") - } - if s, _ := r1.Superset(r2); !s { - t.Fatalf("bad") - } - if s, _ := r2.Superset(r1); s { - t.Fatalf("bad") - } - if s, _ := r2.Superset(r2); !s { - t.Fatalf("bad") - } -} - func TestResource_Add(t *testing.T) { r1 := &Resources{ CPU: 2000, @@ -2711,7 +2685,8 @@ func TestComparableResources_Subtract(t *testing.T) { r1 := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: 2000, + CpuShares: 2000, + ReservedCores: []uint16{0, 1}, }, Memory: AllocatedMemoryResources{ MemoryMB: 2048, @@ -2732,7 +2707,8 @@ func TestComparableResources_Subtract(t *testing.T) { r2 := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: 1000, + CpuShares: 1000, + ReservedCores: []uint16{0}, }, Memory: AllocatedMemoryResources{ MemoryMB: 1024, @@ -2754,7 +2730,8 @@ func TestComparableResources_Subtract(t *testing.T) { expect := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: 1000, + CpuShares: 1000, + ReservedCores: []uint16{1}, }, Memory: AllocatedMemoryResources{ MemoryMB: 1024, @@ -5510,7 +5487,9 @@ func TestNode_Copy(t *testing.T) { }, NodeResources: &NodeResources{ Cpu: NodeCpuResources{ - CpuShares: 4000, + CpuShares: 4000, + TotalCpuCores: 4, + ReservableCpuCores: []uint16{0, 1, 2, 3}, }, Memory: NodeMemoryResources{ MemoryMB: 8192, @@ -5528,7 +5507,8 @@ func TestNode_Copy(t *testing.T) { }, ReservedResources: &NodeReservedResources{ Cpu: NodeReservedCpuResources{ - CpuShares: 100, + CpuShares: 100, + ReservedCpuCores: []uint16{0}, }, Memory: NodeReservedMemoryResources{ MemoryMB: 256, @@ -5781,7 +5761,8 @@ func TestMultiregion_CopyCanonicalize(t *testing.T) { func TestNodeResources_Merge(t *testing.T) { res := &NodeResources{ Cpu: NodeCpuResources{ - CpuShares: int64(32000), + CpuShares: int64(32000), + TotalCpuCores: 32, }, Memory: NodeMemoryResources{ MemoryMB: int64(64000), @@ -5794,6 +5775,7 @@ func TestNodeResources_Merge(t *testing.T) { } res.Merge(&NodeResources{ + Cpu: NodeCpuResources{ReservableCpuCores: []uint16{0, 1, 2, 3}}, Memory: NodeMemoryResources{ MemoryMB: int64(100000), }, @@ -5806,7 +5788,9 @@ func TestNodeResources_Merge(t *testing.T) { require.Exactly(t, &NodeResources{ Cpu: NodeCpuResources{ - CpuShares: int64(32000), + CpuShares: int64(32000), + TotalCpuCores: 32, + ReservableCpuCores: []uint16{0, 1, 2, 3}, }, Memory: NodeMemoryResources{ MemoryMB: int64(100000),