cgutil: implement cpuset management as seperate package

This commit is contained in:
Nick Ethier 2021-04-08 01:00:45 -04:00
parent 0a21de91dd
commit 411d992788
6 changed files with 549 additions and 33 deletions

View File

@ -2,8 +2,8 @@
package cgutil
const (
DefaultCgroupParent = ""
)
func InitCpusetParent(string) error { return nil }
// FindCgroupMountpointDir is used to find the cgroup mount point on a Linux
// system. Here it is a no-op implemtation
func FindCgroupMountpointDir() (string, error) {
return "", nil
}

View File

@ -13,36 +13,11 @@ import (
)
const (
DefaultCgroupParent = "/nomad"
SharedCpusetCgroupName = "shared"
DefaultCgroupParent = "/nomad"
SharedCpusetCgroupName = "shared"
ReservedCpusetCgroupName = "reserved"
)
// InitCpusetParent checks that the cgroup parent and expected child cgroups have been created.
// If the cgroup parent is set to /nomad then this will ensure that the /nomad/shared
// cgroup is initialized. The /nomad/reserved cgroup will be lazily created when a workload
// with reserved cores is created.
func InitCpusetParent(cgroupParent string) error {
if cgroupParent == "" {
cgroupParent = DefaultCgroupParent
}
var err error
if cgroupParent, err = getCgroupPathHelper("cpuset", cgroupParent); err != nil {
return err
}
// 'ensureParent' start with parent because we don't want to
// explicitly inherit from parent, it could conflict with
// 'cpuset.cpu_exclusive'.
if err := cpusetEnsureParent(cgroupParent); err != nil {
return err
}
if err := os.Mkdir(filepath.Join(cgroupParent, SharedCpusetCgroupName), 0755); err != nil && !os.IsExist(err) {
return err
}
return nil
}
func GetCPUsFromCgroup(group string) ([]uint16, error) {
cgroupPath, err := getCgroupPathHelper("cpuset", group)
if err != nil {
@ -138,3 +113,17 @@ func getCgroupPathHelper(subsystem, cgroup string) (string, error) {
return filepath.Join(mnt, relCgroup), nil
}
// FindCgroupMountpointDir is used to find the cgroup mount point on a Linux
// system.
func FindCgroupMountpointDir() (string, error) {
mount, err := cgroups.GetCgroupMounts(false)
if err != nil {
return "", err
}
// It's okay if the mount point is not discovered
if len(mount) == 0 {
return "", nil
}
return mount[0].Mountpoint, nil
}

View File

@ -0,0 +1,54 @@
package cgutil
import (
"context"
"github.com/hashicorp/nomad/lib/cpuset"
"github.com/hashicorp/nomad/nomad/structs"
)
// CpusetManager is used to setup cpuset cgroups for each task. A pool of shared cpus is managed for
// tasks which don't require any reserved cores and a cgroup is managed seperetly for each task which
// require reserved cores.
type CpusetManager interface {
// Init should be called before any tasks are managed to ensure the cgroup parent exists and
// check that proper permissions are granted to manage cgroups.
Init() error
// AddAlloc adds an allocation to the manager
AddAlloc(alloc *structs.Allocation)
// RemoveAlloc removes an alloc by ID from the manager
RemoveAlloc(allocID string)
// CgroupPathFor returns a callback for getting the cgroup path and any error that may have occurred during
// cgroup initialization. The callback will block if the cgroup has not been created
CgroupPathFor(allocID, taskName string) CgroupPathGetter
}
// CgroupPathGetter is a function which returns the cgroup path and any error which ocured during cgroup initialization.
// It should block until the cgroup has been created or an error is reported
type CgroupPathGetter func(context.Context) (path string, err error)
type TaskCgroupInfo struct {
CgroupPath string
RelativeCgroupPath string
Cpuset cpuset.CPUSet
Error error
}
type noopCpusetManager struct{}
func (n noopCpusetManager) Init() error {
return nil
}
func (n noopCpusetManager) AddAlloc(alloc *structs.Allocation) {
}
func (n noopCpusetManager) RemoveAlloc(allocID string) {
}
func (n noopCpusetManager) CgroupPathFor(allocID, task string) CgroupPathGetter {
return func(context.Context) (string, error) { return "", nil }
}

View File

@ -0,0 +1,9 @@
// +build !linux
package cgutil
import (
"github.com/hashicorp/go-hclog"
)
func NewCpusetManager(_ string, _ hclog.Logger) CpusetManager { return noopCpusetManager{} }

View File

@ -0,0 +1,300 @@
package cgutil
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/hashicorp/go-hclog"
"github.com/opencontainers/runc/libcontainer/cgroups/fscommon"
"github.com/hashicorp/nomad/lib/cpuset"
cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/configs"
"github.com/hashicorp/nomad/nomad/structs"
)
func NewCpusetManager(cgroupParent string, logger hclog.Logger) CpusetManager {
if cgroupParent == "" {
cgroupParent = DefaultCgroupParent
}
return &cpusetManager{
cgroupParent: cgroupParent,
cgroupInfo: map[string]allocTaskCgroupInfo{},
logger: logger,
}
}
var (
cpusetReconcileInterval = 30 * time.Second
)
type cpusetManager struct {
// cgroupParent relative to the cgroup root. ex. '/nomad'
cgroupParent string
// cgroupParentPath is the absolute path to the cgroup parent.
cgroupParentPath string
parentCpuset cpuset.CPUSet
// all exported functions are synchronized
mu sync.Mutex
cgroupInfo map[string]allocTaskCgroupInfo
doneCh chan struct{}
signalCh chan struct{}
logger hclog.Logger
}
func (c *cpusetManager) AddAlloc(alloc *structs.Allocation) {
allocInfo := allocTaskCgroupInfo{}
for task, resources := range alloc.AllocatedResources.Tasks {
taskCpuset := cpuset.New(resources.Cpu.ReservedCores...)
cgroupPath := filepath.Join(c.cgroupParentPath, SharedCpusetCgroupName)
relativeCgroupPath := filepath.Join(c.cgroupParent, SharedCpusetCgroupName)
if taskCpuset.Size() > 0 {
cgroupPath, relativeCgroupPath = c.getCgroupPathsForTask(alloc.ID, task)
}
allocInfo[task] = &TaskCgroupInfo{
CgroupPath: cgroupPath,
RelativeCgroupPath: relativeCgroupPath,
Cpuset: taskCpuset,
}
}
c.mu.Lock()
c.cgroupInfo[alloc.ID] = allocInfo
c.mu.Unlock()
go c.signalReconcile()
}
func (c *cpusetManager) RemoveAlloc(allocID string) {
c.mu.Lock()
delete(c.cgroupInfo, allocID)
c.mu.Unlock()
go c.signalReconcile()
}
func (c *cpusetManager) CgroupPathFor(allocID, task string) CgroupPathGetter {
return func(ctx context.Context) (string, error) {
c.mu.Lock()
allocInfo, ok := c.cgroupInfo[allocID]
if !ok {
c.mu.Unlock()
return "", fmt.Errorf("alloc not found for id %q", allocID)
}
taskInfo, ok := allocInfo[task]
c.mu.Unlock()
if !ok {
return "", fmt.Errorf("task %q not found", task)
}
for {
if taskInfo.Error != nil {
break
}
if _, err := os.Stat(taskInfo.CgroupPath); os.IsNotExist(err) {
select {
case <-ctx.Done():
return taskInfo.CgroupPath, ctx.Err()
case <-time.After(100 * time.Millisecond):
continue
}
}
break
}
return taskInfo.CgroupPath, taskInfo.Error
}
}
type allocTaskCgroupInfo map[string]*TaskCgroupInfo
// Init checks that the cgroup parent and expected child cgroups have been created
// If the cgroup parent is set to /nomad then this will ensure that the /nomad/shared
// cgroup is initialized.
func (c *cpusetManager) Init() error {
c.mu.Lock()
defer c.mu.Unlock()
cgroupParentPath, err := getCgroupPathHelper("cpuset", c.cgroupParent)
if err != nil {
return err
}
c.cgroupParentPath = cgroupParentPath
// ensures that shared cpuset exists and that the cpuset values are copied from the parent if created
if err := cpusetEnsureParent(filepath.Join(cgroupParentPath, SharedCpusetCgroupName)); err != nil {
return err
}
parentCpus, parentMems, err := getCpusetSubsystemSettings(cgroupParentPath)
c.parentCpuset, err = cpuset.Parse(parentCpus)
if err != nil {
return fmt.Errorf("failed to parse parent cpuset.cpus setting: %v", err)
}
// ensure the reserved cpuset exists, but only copy the mems from the parent if creating the cgroup
if err := os.Mkdir(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), 0755); err == nil {
// cgroup created, leave cpuset.cpus empty but copy cpuset.mems from parent
if err != nil {
return err
}
if err := fscommon.WriteFile(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), "cpuset.mems", string(parentMems)); err != nil {
return err
}
} else if !os.IsExist(err) {
return err
}
c.doneCh = make(chan struct{})
c.signalCh = make(chan struct{})
c.logger.Info("initialized cpuset cgroup manager", "parent", c.cgroupParent, "cpuset", c.parentCpuset.String())
go c.reconcileLoop()
return nil
}
func (c *cpusetManager) reconcileLoop() {
timer := time.NewTimer(0)
if !timer.Stop() {
<-timer.C
}
defer timer.Stop()
for {
select {
case <-c.doneCh:
c.logger.Debug("shutting down reconcile loop")
return
case <-c.signalCh:
timer.Reset(500 * time.Millisecond)
case <-timer.C:
c.reconcileCpusets()
timer.Reset(cpusetReconcileInterval)
}
}
}
func (c *cpusetManager) reconcileCpusets() {
c.mu.Lock()
defer c.mu.Unlock()
sharedCpuset := cpuset.New(c.parentCpuset.ToSlice()...)
reservedCpuset := cpuset.New()
taskCpusets := map[string]*TaskCgroupInfo{}
for _, alloc := range c.cgroupInfo {
for _, task := range alloc {
if task.Cpuset.Size() == 0 {
continue
}
sharedCpuset = sharedCpuset.Difference(task.Cpuset)
reservedCpuset = reservedCpuset.Union(task.Cpuset)
taskCpusets[task.CgroupPath] = task
}
}
// look for reserved cpusets which we don't know about and remove
files, err := ioutil.ReadDir(c.reservedCpusetPath())
if err != nil {
c.logger.Error("failed to list files in reserved cgroup path during reconciliation", "path", c.reservedCpusetPath(), "error", err)
}
for _, f := range files {
if !f.IsDir() {
continue
}
path := filepath.Join(c.reservedCpusetPath(), f.Name())
if _, ok := taskCpusets[path]; ok {
continue
}
c.logger.Debug("removing reserved cpuset cgroup", "path", path)
err := cgroups.RemovePaths(map[string]string{"cpuset": path})
if err != nil {
c.logger.Error("removal of existing cpuset cgroup failed", "path", path, "error", err)
}
}
if err := c.setCgroupCpusetCPUs(c.sharedCpusetPath(), sharedCpuset.String()); err != nil {
c.logger.Error("could not write shared cpuset.cpus", "path", c.sharedCpusetPath(), "cpuset.cpus", sharedCpuset.String(), "error", err)
}
if err := c.setCgroupCpusetCPUs(c.reservedCpusetPath(), reservedCpuset.String()); err != nil {
c.logger.Error("could not write reserved cpuset.cpus", "path", c.reservedCpusetPath(), "cpuset.cpus", reservedCpuset.String(), "error", err)
}
for _, info := range taskCpusets {
if err := os.Mkdir(info.CgroupPath, 0755); err != nil && !os.IsExist(err) {
c.logger.Error("failed to create new cgroup path for task", "path", info.CgroupPath, "error", err)
info.Error = err
continue
}
// copy cpuset.mems from parent
_, parentMems, err := getCpusetSubsystemSettings(filepath.Dir(info.CgroupPath))
if err != nil {
c.logger.Error("failed to read parent cgroup settings for task", "path", info.CgroupPath, "error", err)
info.Error = err
continue
}
if err := fscommon.WriteFile(info.CgroupPath, "cpuset.mems", parentMems); err != nil {
c.logger.Error("failed to write cgroup cpuset.mems setting for task", "path", info.CgroupPath, "mems", parentMems, "error", err)
info.Error = err
continue
}
if err := c.setCgroupCpusetCPUs(info.CgroupPath, info.Cpuset.String()); err != nil {
c.logger.Error("failed to write cgroup cpuset.cpus settings for task", "path", info.CgroupPath, "cpus", info.Cpuset.String(), "error", err)
info.Error = err
continue
}
}
}
// setCgroupCpusetCPUs will compare an existing cpuset.cpus value with an expected value, overwriting the existing if different
// must hold a lock on cpusetManager.mu before calling
func (_ *cpusetManager) setCgroupCpusetCPUs(path, cpus string) error {
currentCpusRaw, err := fscommon.ReadFile(path, "cpuset.cpus")
if err != nil {
return err
}
if cpus != strings.TrimSpace(currentCpusRaw) {
if err := fscommon.WriteFile(path, "cpuset.cpus", cpus); err != nil {
return err
}
}
return nil
}
func (c *cpusetManager) signalReconcile() {
c.signalCh <- struct{}{}
}
func (c *cpusetManager) getCpuset(group string) (cpuset.CPUSet, error) {
man := cgroupFs.NewManager(&configs.Cgroup{Path: filepath.Join(c.cgroupParent, group)}, map[string]string{"cpuset": filepath.Join(c.cgroupParentPath, group)}, false)
stats, err := man.GetStats()
if err != nil {
return cpuset.CPUSet{}, err
}
return cpuset.New(stats.CPUSetStats.CPUs...), nil
}
func (c *cpusetManager) getCgroupPathsForTask(allocID, task string) (absolute, relative string) {
return filepath.Join(c.reservedCpusetPath(), fmt.Sprintf("%s-%s", allocID, task)),
filepath.Join(c.cgroupParent, ReservedCpusetCgroupName, fmt.Sprintf("%s-%s", allocID, task))
}
func (c *cpusetManager) sharedCpusetPath() string {
return filepath.Join(c.cgroupParentPath, SharedCpusetCgroupName)
}
func (c *cpusetManager) reservedCpusetPath() string {
return filepath.Join(c.cgroupParentPath, ReservedCpusetCgroupName)
}

View File

@ -0,0 +1,164 @@
package cgutil
import (
"io/ioutil"
"path/filepath"
"runtime"
"syscall"
"testing"
"github.com/hashicorp/nomad/lib/cpuset"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/helper/testlog"
)
func tmpCpusetManager(t *testing.T) (manager *cpusetManager, cleanup func()) {
if runtime.GOOS != "linux" || syscall.Geteuid() != 0 {
t.Skip("Test only available running as root on linux")
}
mount, err := FindCgroupMountpointDir()
if err != nil || mount == "" {
t.Skipf("Failed to find cgroup mount: %v %v", mount, err)
}
parent := "/gotest-" + uuid.Short()
require.NoError(t, cpusetEnsureParent(parent))
manager = &cpusetManager{
cgroupParent: parent,
cgroupInfo: map[string]allocTaskCgroupInfo{},
logger: testlog.HCLogger(t),
}
parentPath, err := getCgroupPathHelper("cpuset", parent)
require.NoError(t, err)
return manager, func() { require.NoError(t, cgroups.RemovePaths(map[string]string{"cpuset": parentPath})) }
}
func TestCpusetManager_Init(t *testing.T) {
manager, cleanup := tmpCpusetManager(t)
defer cleanup()
require.NoError(t, manager.Init())
require.DirExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName))
require.FileExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
sharedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
sharedCpus, err := cpuset.Parse(string(sharedCpusRaw))
require.NoError(t, err)
require.Exactly(t, manager.parentCpuset.ToSlice(), sharedCpus.ToSlice())
require.DirExists(t, filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName))
}
func TestCpusetManager_AddAlloc(t *testing.T) {
manager, cleanup := tmpCpusetManager(t)
defer cleanup()
require.NoError(t, manager.Init())
alloc := mock.Alloc()
alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores = manager.parentCpuset.ToSlice()
manager.AddAlloc(alloc)
// force reconcile
manager.reconcileCpusets()
// check that no more cores exist in the shared cgroup
require.DirExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName))
require.FileExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
sharedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
sharedCpus, err := cpuset.Parse(string(sharedCpusRaw))
require.NoError(t, err)
require.Empty(t, sharedCpus.ToSlice())
// check that all cores are allocated to reserved cgroup
require.DirExists(t, filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName))
reservedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
reservedCpus, err := cpuset.Parse(string(reservedCpusRaw))
require.NoError(t, err)
require.Exactly(t, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores, reservedCpus.ToSlice())
// check that task cgroup exists and cpuset matches expected reserved cores
allocInfo, ok := manager.cgroupInfo[alloc.ID]
require.True(t, ok)
require.Len(t, allocInfo, 1)
taskInfo, ok := allocInfo["web"]
require.True(t, ok)
require.DirExists(t, taskInfo.CgroupPath)
taskCpusRaw, err := ioutil.ReadFile(filepath.Join(taskInfo.CgroupPath, "cpuset.cpus"))
require.NoError(t, err)
taskCpus, err := cpuset.Parse(string(taskCpusRaw))
require.NoError(t, err)
require.Exactly(t, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores, taskCpus.ToSlice())
}
func TestCpusetManager_RemoveAlloc(t *testing.T) {
manager, cleanup := tmpCpusetManager(t)
defer cleanup()
require.NoError(t, manager.Init())
// this case tests adding 2 allocs, reconciling then removing 1 alloc
// it requires the system to have atleast 2 cpu cores (one for each alloc)
if manager.parentCpuset.Size() < 2 {
t.Skip("test requires atleast 2 cpu cores")
}
alloc1 := mock.Alloc()
alloc1Cpuset := cpuset.New(manager.parentCpuset.ToSlice()[0])
alloc1.AllocatedResources.Tasks["web"].Cpu.ReservedCores = alloc1Cpuset.ToSlice()
manager.AddAlloc(alloc1)
alloc2 := mock.Alloc()
alloc2Cpuset := cpuset.New(manager.parentCpuset.ToSlice()[1])
alloc2.AllocatedResources.Tasks["web"].Cpu.ReservedCores = alloc2Cpuset.ToSlice()
manager.AddAlloc(alloc2)
//force reconcile
manager.reconcileCpusets()
// shared cpuset should not include any expected cores
sharedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
sharedCpus, err := cpuset.Parse(string(sharedCpusRaw))
require.NoError(t, err)
require.False(t, sharedCpus.ContainsAny(alloc1Cpuset.Union(alloc2Cpuset)))
// reserved cpuset should equal the expected cpus
reservedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
reservedCpus, err := cpuset.Parse(string(reservedCpusRaw))
require.NoError(t, err)
require.True(t, reservedCpus.Equals(alloc1Cpuset.Union(alloc2Cpuset)))
// remove first allocation
alloc1TaskPath := manager.cgroupInfo[alloc1.ID]["web"].CgroupPath
manager.RemoveAlloc(alloc1.ID)
manager.reconcileCpusets()
// alloc1's task reserved cgroup should be removed
require.NoDirExists(t, alloc1TaskPath)
// shared cpuset should now include alloc1's cores
sharedCpusRaw, err = ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
sharedCpus, err = cpuset.Parse(string(sharedCpusRaw))
require.NoError(t, err)
require.False(t, sharedCpus.ContainsAny(alloc2Cpuset))
require.True(t, sharedCpus.IsSupersetOf(alloc1Cpuset))
// reserved cpuset should only include alloc2's cores
reservedCpusRaw, err = ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
reservedCpus, err = cpuset.Parse(string(reservedCpusRaw))
require.NoError(t, err)
require.True(t, reservedCpus.Equals(alloc2Cpuset))
}