structs: add struct fields and funcs for reservable cpu cores

This commit is contained in:
Nick Ethier 2021-03-18 22:49:06 -04:00
parent a00b967efa
commit 4b2912d343
5 changed files with 346 additions and 56 deletions

125
lib/cpuset/cpuset.go Normal file
View file

@ -0,0 +1,125 @@
package cpuset
import (
"fmt"
"reflect"
"sort"
"strconv"
"strings"
)
// CPUSet is a set like object that provides methods helpful when working with cpus with systems
// such as the Linux cpuset cgroup subsystem. A CPUSet is immutable and can be safely accessed concurrently.
type CPUSet struct {
cpus map[uint16]struct{}
}
// New initializes a new CPUSet with 0 or more containing cpus
func New(cpus ...uint16) CPUSet {
cpuset := CPUSet{
cpus: make(map[uint16]struct{}),
}
for _, v := range cpus {
cpuset.cpus[v] = struct{}{}
}
return cpuset
}
// Size returns to the number of cpus contained in the CPUSet
func (c CPUSet) Size() int {
return len(c.cpus)
}
// ToSlice returns a sorted slice of uint16 CPU IDs contained in the CPUSet.
func (c CPUSet) ToSlice() []uint16 {
cpus := []uint16{}
for k := range c.cpus {
cpus = append(cpus, k)
}
sort.Slice(cpus, func(i, j int) bool { return cpus[i] < cpus[j] })
return cpus
}
// Union returns a new set that is the union of this CPUSet and the supplied other.
// Ex. [0,1,2,3].Union([2,3,4,5]) = [0,1,2,3,4,5]
func (c CPUSet) Union(other CPUSet) CPUSet {
s := New()
for k := range c.cpus {
s.cpus[k] = struct{}{}
}
for k := range other.cpus {
s.cpus[k] = struct{}{}
}
return s
}
// Difference returns a new set that is the difference of this CPUSet and the supplied other.
// [0,1,2,3].Difference([2,3,4]) = [0,1]
func (c CPUSet) Difference(other CPUSet) CPUSet {
s := New()
for k := range c.cpus {
s.cpus[k] = struct{}{}
}
for k := range other.cpus {
delete(s.cpus, k)
}
return s
}
// IsSubsetOf returns true if all cpus of the this CPUSet are present in the other CPUSet.
func (s CPUSet) IsSubsetOf(other CPUSet) bool {
for cpu := range s.cpus {
if _, ok := other.cpus[cpu]; !ok {
return false
}
}
return true
}
// Equals tests the equality of the elements in the CPUSet
func (s CPUSet) Equals(other CPUSet) bool {
return reflect.DeepEqual(s.cpus, other.cpus)
}
// Parse parses the Linux cpuset format into a CPUSet
//
// Ref: http://man7.org/linux/man-pages/man7/cpuset.7.html#FORMATS
func Parse(s string) (CPUSet, error) {
cpuset := New()
if s == "" {
return cpuset, nil
}
sets := strings.Split(s, ",")
for _, set := range sets {
bounds := strings.Split(set, "-")
if len(bounds) == 1 {
v, err := strconv.Atoi(bounds[0])
if err != nil {
return New(), err
}
cpuset.cpus[uint16(v)] = struct{}{}
continue
}
if len(bounds) > 2 {
return New(), fmt.Errorf("failed to parse element %s, more than 1 '-' found", set)
}
lower, err := strconv.Atoi(bounds[0])
if err != nil {
return New(), err
}
upper, err := strconv.Atoi(bounds[1])
if err != nil {
return New(), err
}
for v := lower; v <= upper; v++ {
cpuset.cpus[uint16(v)] = struct{}{}
}
}
return cpuset, nil
}

128
lib/cpuset/cpuset_test.go Normal file
View file

@ -0,0 +1,128 @@
package cpuset
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestCPUSet_Size(t *testing.T) {
set := New(0, 1, 2, 3)
require.Equal(t, 4, set.Size())
require.Equal(t, 0, New().Size())
}
func TestCPUSet_ToSlice(t *testing.T) {
cases := []struct {
desc string
in CPUSet
out []uint16
}{
{
"empty cpuset",
New(),
[]uint16{},
},
{
"in order",
New(0, 1, 2, 3, 4, 5, 6, 7),
[]uint16{0, 1, 2, 3, 4, 5, 6, 7},
},
{
"out of order",
New(3, 1, 2, 0),
[]uint16{0, 1, 2, 3},
},
}
for _, c := range cases {
require.Exactly(t, c.out, c.in.ToSlice(), c.desc)
}
}
func TestCPUSet_Equals(t *testing.T) {
cases := []struct {
a CPUSet
b CPUSet
shouldEqual bool
}{
{New(), New(), true},
{New(5), New(5), true},
{New(1, 2, 3, 4, 5), New(1, 2, 3, 4, 5), true},
{New(), New(5), false},
{New(5), New(), false},
{New(), New(1, 2, 3, 4, 5), false},
{New(1, 2, 3, 4, 5), New(), false},
{New(5), New(1, 2, 3, 4, 5), false},
{New(1, 2, 3, 4, 5), New(5), false},
}
for _, c := range cases {
require.Equal(t, c.shouldEqual, c.a.Equals(c.b))
}
}
func TestCpuSet_Union(t *testing.T) {
cases := []struct {
a CPUSet
b CPUSet
expected CPUSet
}{
{New(), New(), New()},
{New(), New(0), New(0)},
{New(0), New(), New(0)},
{New(0), New(0), New(0)},
{New(), New(0, 1, 2, 3), New(0, 1, 2, 3)},
{New(0, 1), New(0, 1, 2, 3), New(0, 1, 2, 3)},
{New(2, 3), New(4, 5), New(2, 3, 4, 5)},
{New(3, 4), New(0, 1, 2, 3), New(0, 1, 2, 3, 4)},
}
for _, c := range cases {
require.Exactly(t, c.expected.ToSlice(), c.a.Union(c.b).ToSlice())
}
}
func TestCpuSet_Difference(t *testing.T) {
cases := []struct {
a CPUSet
b CPUSet
expected CPUSet
}{
{New(), New(), New()},
{New(), New(0), New()},
{New(0), New(), New(0)},
{New(0), New(0), New()},
{New(0, 1), New(0, 1, 2, 3), New()},
{New(2, 3), New(4, 5), New(2, 3)},
{New(3, 4), New(0, 1, 2, 3), New(4)},
}
for _, c := range cases {
require.Exactly(t, c.expected.ToSlice(), c.a.Difference(c.b).ToSlice())
}
}
func TestParse(t *testing.T) {
cases := []struct {
cpuset string
expected CPUSet
}{
{"", New()},
{"1", New(1)},
{"0,1,2,3", New(0, 1, 2, 3)},
{"0-3", New(0, 1, 2, 3)},
{"0,2-3,5", New(0, 2, 3, 5)},
}
for _, c := range cases {
result, err := Parse(c.cpuset)
require.NoError(t, err)
require.True(t, result.Equals(c.expected))
}
}

View file

@ -4516,6 +4516,12 @@ func TestTaskDiff(t *testing.T) {
Old: "100",
New: "200",
},
{
Type: DiffTypeNone,
Name: "Cores",
Old: "0",
New: "0",
},
{
Type: DiffTypeEdited,
Name: "DiskMB",
@ -4869,6 +4875,12 @@ func TestTaskDiff(t *testing.T) {
Old: "100",
New: "100",
},
{
Type: DiffTypeNone,
Name: "Cores",
Old: "0",
New: "0",
},
{
Type: DiffTypeNone,
Name: "DiskMB",

View file

@ -25,6 +25,8 @@ import (
"strings"
"time"
"github.com/hashicorp/nomad/lib/cpuset"
"github.com/hashicorp/cronexpr"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/go-multierror"
@ -2178,6 +2180,7 @@ type NodeStubFields struct {
// on a client
type Resources struct {
CPU int
Cores int
MemoryMB int
DiskMB int
IOPS int // COMPAT(0.10): Only being used to issue warnings
@ -2196,6 +2199,7 @@ const (
func DefaultResources() *Resources {
return &Resources{
CPU: 100,
Cores: 0,
MemoryMB: 300,
}
}
@ -2219,6 +2223,11 @@ func (r *Resources) DiskInBytes() int64 {
func (r *Resources) Validate() error {
var mErr multierror.Error
if r.Cores > 0 && r.CPU > 0 {
mErr.Errors = append(mErr.Errors, errors.New("Task can only ask for 'cpu' or 'cores' resource, not both."))
}
if err := r.MeetsMinResources(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
@ -2243,6 +2252,9 @@ func (r *Resources) Merge(other *Resources) {
if other.CPU != 0 {
r.CPU = other.CPU
}
if other.Cores != 0 {
r.Cores = other.Cores
}
if other.MemoryMB != 0 {
r.MemoryMB = other.MemoryMB
}
@ -2266,6 +2278,7 @@ func (r *Resources) Equals(o *Resources) bool {
return false
}
return r.CPU == o.CPU &&
r.Cores == o.Cores &&
r.MemoryMB == o.MemoryMB &&
r.DiskMB == o.DiskMB &&
r.IOPS == o.IOPS &&
@ -2325,7 +2338,7 @@ func (r *Resources) Canonicalize() {
func (r *Resources) MeetsMinResources() error {
var mErr multierror.Error
minResources := MinResources()
if r.CPU < minResources.CPU {
if r.CPU < minResources.CPU && r.Cores == 0 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum CPU value is %d; got %d", minResources.CPU, r.CPU))
}
if r.MemoryMB < minResources.MemoryMB {
@ -2363,23 +2376,6 @@ func (r *Resources) NetIndex(n *NetworkResource) int {
return r.Networks.NetIndex(n)
}
// Superset checks if one set of resources is a superset
// of another. This ignores network resources, and the NetworkIndex
// should be used for that.
// COMPAT(0.10): Remove in 0.10
func (r *Resources) Superset(other *Resources) (bool, string) {
if r.CPU < other.CPU {
return false, "cpu"
}
if r.MemoryMB < other.MemoryMB {
return false, "memory"
}
if r.DiskMB < other.DiskMB {
return false, "disk"
}
return true, ""
}
// Add adds the resources of the delta to this, potentially
// returning an error if not possible.
// COMPAT(0.10): Remove in 0.10
@ -2808,6 +2804,7 @@ func (n *NodeResources) Comparable() *ComparableResources {
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{
CpuShares: n.Cpu.CpuShares,
ReservedCores: n.Cpu.ReservableCpuCores,
},
Memory: AllocatedMemoryResources{
MemoryMB: n.Memory.MemoryMB,
@ -2957,6 +2954,15 @@ type NodeCpuResources struct {
// CpuShares is the CPU shares available. This is calculated by number of
// cores multiplied by the core frequency.
CpuShares int64
// TotalCpuCores is the total number of cores on the machine. This includes cores not in
// the agent's cpuset if on a linux platform
TotalCpuCores uint16
// ReservableCpuCores is the set of cpus which are available to be reserved on the Node.
// This value is currently only reported on Linux platforms which support cgroups and is
// discovered by inspecting the cpuset of the agent's cgroup.
ReservableCpuCores []uint16
}
func (n *NodeCpuResources) Merge(o *NodeCpuResources) {
@ -2967,6 +2973,14 @@ func (n *NodeCpuResources) Merge(o *NodeCpuResources) {
if o.CpuShares != 0 {
n.CpuShares = o.CpuShares
}
if o.TotalCpuCores != 0 {
n.TotalCpuCores = o.TotalCpuCores
}
if len(o.ReservableCpuCores) != 0 {
n.ReservableCpuCores = o.ReservableCpuCores
}
}
func (n *NodeCpuResources) Equals(o *NodeCpuResources) bool {
@ -2982,9 +2996,25 @@ func (n *NodeCpuResources) Equals(o *NodeCpuResources) bool {
return false
}
if n.TotalCpuCores != o.TotalCpuCores {
return false
}
if len(n.ReservableCpuCores) != len(o.ReservableCpuCores) {
return false
}
for i := range n.ReservableCpuCores {
if n.ReservableCpuCores[i] != o.ReservableCpuCores[i] {
return false
}
}
return true
}
func (n *NodeCpuResources) SharesPerCore() int64 {
return n.CpuShares / int64(n.TotalCpuCores)
}
// NodeMemoryResources captures the memory resources of the node
type NodeMemoryResources struct {
// MemoryMB is the total available memory on the node
@ -3299,6 +3329,7 @@ func (n *NodeReservedResources) Comparable() *ComparableResources {
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{
CpuShares: n.Cpu.CpuShares,
ReservedCores: n.Cpu.ReservedCpuCores,
},
Memory: AllocatedMemoryResources{
MemoryMB: n.Memory.MemoryMB,
@ -3314,6 +3345,7 @@ func (n *NodeReservedResources) Comparable() *ComparableResources {
// NodeReservedCpuResources captures the reserved CPU resources of the node.
type NodeReservedCpuResources struct {
CpuShares int64
ReservedCpuCores []uint16
}
// NodeReservedMemoryResources captures the reserved memory resources of the node.
@ -3553,6 +3585,7 @@ func (a *AllocatedTaskResources) Comparable() *ComparableResources {
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{
CpuShares: a.Cpu.CpuShares,
ReservedCores: a.Cpu.ReservedCores,
},
Memory: AllocatedMemoryResources{
MemoryMB: a.Memory.MemoryMB,
@ -3637,6 +3670,7 @@ func (a *AllocatedSharedResources) Canonicalize() {
// AllocatedCpuResources captures the allocated CPU resources.
type AllocatedCpuResources struct {
CpuShares int64
ReservedCores []uint16
}
func (a *AllocatedCpuResources) Add(delta *AllocatedCpuResources) {
@ -3645,6 +3679,8 @@ func (a *AllocatedCpuResources) Add(delta *AllocatedCpuResources) {
}
a.CpuShares += delta.CpuShares
a.ReservedCores = cpuset.New(a.ReservedCores...).Union(cpuset.New(delta.ReservedCores...)).ToSlice()
}
func (a *AllocatedCpuResources) Subtract(delta *AllocatedCpuResources) {
@ -3653,6 +3689,7 @@ func (a *AllocatedCpuResources) Subtract(delta *AllocatedCpuResources) {
}
a.CpuShares -= delta.CpuShares
a.ReservedCores = cpuset.New(a.ReservedCores...).Difference(cpuset.New(delta.ReservedCores...)).ToSlice()
}
func (a *AllocatedCpuResources) Max(other *AllocatedCpuResources) {
@ -3663,6 +3700,10 @@ func (a *AllocatedCpuResources) Max(other *AllocatedCpuResources) {
if other.CpuShares > a.CpuShares {
a.CpuShares = other.CpuShares
}
if len(other.ReservedCores) > len(a.ReservedCores) {
a.ReservedCores = other.ReservedCores
}
}
// AllocatedMemoryResources captures the allocated memory resources.

View file

@ -2599,32 +2599,6 @@ func TestResource_NetIndex(t *testing.T) {
}
}
func TestResource_Superset(t *testing.T) {
r1 := &Resources{
CPU: 2000,
MemoryMB: 2048,
DiskMB: 10000,
}
r2 := &Resources{
CPU: 2000,
MemoryMB: 1024,
DiskMB: 5000,
}
if s, _ := r1.Superset(r1); !s {
t.Fatalf("bad")
}
if s, _ := r1.Superset(r2); !s {
t.Fatalf("bad")
}
if s, _ := r2.Superset(r1); s {
t.Fatalf("bad")
}
if s, _ := r2.Superset(r2); !s {
t.Fatalf("bad")
}
}
func TestResource_Add(t *testing.T) {
r1 := &Resources{
CPU: 2000,
@ -2712,6 +2686,7 @@ func TestComparableResources_Subtract(t *testing.T) {
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{
CpuShares: 2000,
ReservedCores: []uint16{0, 1},
},
Memory: AllocatedMemoryResources{
MemoryMB: 2048,
@ -2733,6 +2708,7 @@ func TestComparableResources_Subtract(t *testing.T) {
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{
CpuShares: 1000,
ReservedCores: []uint16{0},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
@ -2755,6 +2731,7 @@ func TestComparableResources_Subtract(t *testing.T) {
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{
CpuShares: 1000,
ReservedCores: []uint16{1},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
@ -5511,6 +5488,8 @@ func TestNode_Copy(t *testing.T) {
NodeResources: &NodeResources{
Cpu: NodeCpuResources{
CpuShares: 4000,
TotalCpuCores: 4,
ReservableCpuCores: []uint16{0, 1, 2, 3},
},
Memory: NodeMemoryResources{
MemoryMB: 8192,
@ -5529,6 +5508,7 @@ func TestNode_Copy(t *testing.T) {
ReservedResources: &NodeReservedResources{
Cpu: NodeReservedCpuResources{
CpuShares: 100,
ReservedCpuCores: []uint16{0},
},
Memory: NodeReservedMemoryResources{
MemoryMB: 256,
@ -5782,6 +5762,7 @@ func TestNodeResources_Merge(t *testing.T) {
res := &NodeResources{
Cpu: NodeCpuResources{
CpuShares: int64(32000),
TotalCpuCores: 32,
},
Memory: NodeMemoryResources{
MemoryMB: int64(64000),
@ -5794,6 +5775,7 @@ func TestNodeResources_Merge(t *testing.T) {
}
res.Merge(&NodeResources{
Cpu: NodeCpuResources{ReservableCpuCores: []uint16{0, 1, 2, 3}},
Memory: NodeMemoryResources{
MemoryMB: int64(100000),
},
@ -5807,6 +5789,8 @@ func TestNodeResources_Merge(t *testing.T) {
require.Exactly(t, &NodeResources{
Cpu: NodeCpuResources{
CpuShares: int64(32000),
TotalCpuCores: 32,
ReservableCpuCores: []uint16{0, 1, 2, 3},
},
Memory: NodeMemoryResources{
MemoryMB: int64(100000),