diff --git a/client/lib/cgutil/cgutil_default.go b/client/lib/cgutil/cgutil_default.go index 6ad5798f0..322a970b3 100644 --- a/client/lib/cgutil/cgutil_default.go +++ b/client/lib/cgutil/cgutil_default.go @@ -2,8 +2,8 @@ package cgutil -const ( - DefaultCgroupParent = "" -) - -func InitCpusetParent(string) error { return nil } +// FindCgroupMountpointDir is used to find the cgroup mount point on a Linux +// system. Here it is a no-op implemtation +func FindCgroupMountpointDir() (string, error) { + return "", nil +} diff --git a/client/lib/cgutil/cgutil_linux.go b/client/lib/cgutil/cgutil_linux.go index 9a3332ba4..e8b867aa1 100644 --- a/client/lib/cgutil/cgutil_linux.go +++ b/client/lib/cgutil/cgutil_linux.go @@ -13,36 +13,11 @@ import ( ) const ( - DefaultCgroupParent = "/nomad" - SharedCpusetCgroupName = "shared" + DefaultCgroupParent = "/nomad" + SharedCpusetCgroupName = "shared" + ReservedCpusetCgroupName = "reserved" ) -// InitCpusetParent checks that the cgroup parent and expected child cgroups have been created. -// If the cgroup parent is set to /nomad then this will ensure that the /nomad/shared -// cgroup is initialized. The /nomad/reserved cgroup will be lazily created when a workload -// with reserved cores is created. -func InitCpusetParent(cgroupParent string) error { - if cgroupParent == "" { - cgroupParent = DefaultCgroupParent - } - var err error - if cgroupParent, err = getCgroupPathHelper("cpuset", cgroupParent); err != nil { - return err - } - - // 'ensureParent' start with parent because we don't want to - // explicitly inherit from parent, it could conflict with - // 'cpuset.cpu_exclusive'. - if err := cpusetEnsureParent(cgroupParent); err != nil { - return err - } - if err := os.Mkdir(filepath.Join(cgroupParent, SharedCpusetCgroupName), 0755); err != nil && !os.IsExist(err) { - return err - } - - return nil -} - func GetCPUsFromCgroup(group string) ([]uint16, error) { cgroupPath, err := getCgroupPathHelper("cpuset", group) if err != nil { @@ -138,3 +113,17 @@ func getCgroupPathHelper(subsystem, cgroup string) (string, error) { return filepath.Join(mnt, relCgroup), nil } + +// FindCgroupMountpointDir is used to find the cgroup mount point on a Linux +// system. +func FindCgroupMountpointDir() (string, error) { + mount, err := cgroups.GetCgroupMounts(false) + if err != nil { + return "", err + } + // It's okay if the mount point is not discovered + if len(mount) == 0 { + return "", nil + } + return mount[0].Mountpoint, nil +} diff --git a/client/lib/cgutil/cpuset_manager.go b/client/lib/cgutil/cpuset_manager.go new file mode 100644 index 000000000..9c02d7058 --- /dev/null +++ b/client/lib/cgutil/cpuset_manager.go @@ -0,0 +1,54 @@ +package cgutil + +import ( + "context" + + "github.com/hashicorp/nomad/lib/cpuset" + "github.com/hashicorp/nomad/nomad/structs" +) + +// CpusetManager is used to setup cpuset cgroups for each task. A pool of shared cpus is managed for +// tasks which don't require any reserved cores and a cgroup is managed seperetly for each task which +// require reserved cores. +type CpusetManager interface { + // Init should be called before any tasks are managed to ensure the cgroup parent exists and + // check that proper permissions are granted to manage cgroups. + Init() error + + // AddAlloc adds an allocation to the manager + AddAlloc(alloc *structs.Allocation) + + // RemoveAlloc removes an alloc by ID from the manager + RemoveAlloc(allocID string) + + // CgroupPathFor returns a callback for getting the cgroup path and any error that may have occurred during + // cgroup initialization. The callback will block if the cgroup has not been created + CgroupPathFor(allocID, taskName string) CgroupPathGetter +} + +// CgroupPathGetter is a function which returns the cgroup path and any error which ocured during cgroup initialization. +// It should block until the cgroup has been created or an error is reported +type CgroupPathGetter func(context.Context) (path string, err error) + +type TaskCgroupInfo struct { + CgroupPath string + RelativeCgroupPath string + Cpuset cpuset.CPUSet + Error error +} + +type noopCpusetManager struct{} + +func (n noopCpusetManager) Init() error { + return nil +} + +func (n noopCpusetManager) AddAlloc(alloc *structs.Allocation) { +} + +func (n noopCpusetManager) RemoveAlloc(allocID string) { +} + +func (n noopCpusetManager) CgroupPathFor(allocID, task string) CgroupPathGetter { + return func(context.Context) (string, error) { return "", nil } +} diff --git a/client/lib/cgutil/cpuset_manager_default.go b/client/lib/cgutil/cpuset_manager_default.go new file mode 100644 index 000000000..29c970d7f --- /dev/null +++ b/client/lib/cgutil/cpuset_manager_default.go @@ -0,0 +1,9 @@ +// +build !linux + +package cgutil + +import ( + "github.com/hashicorp/go-hclog" +) + +func NewCpusetManager(_ string, _ hclog.Logger) CpusetManager { return noopCpusetManager{} } diff --git a/client/lib/cgutil/cpuset_manager_linux.go b/client/lib/cgutil/cpuset_manager_linux.go new file mode 100644 index 000000000..185aa05aa --- /dev/null +++ b/client/lib/cgutil/cpuset_manager_linux.go @@ -0,0 +1,300 @@ +package cgutil + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/opencontainers/runc/libcontainer/cgroups" + + "github.com/hashicorp/go-hclog" + + "github.com/opencontainers/runc/libcontainer/cgroups/fscommon" + + "github.com/hashicorp/nomad/lib/cpuset" + cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs" + "github.com/opencontainers/runc/libcontainer/configs" + + "github.com/hashicorp/nomad/nomad/structs" +) + +func NewCpusetManager(cgroupParent string, logger hclog.Logger) CpusetManager { + if cgroupParent == "" { + cgroupParent = DefaultCgroupParent + } + return &cpusetManager{ + cgroupParent: cgroupParent, + cgroupInfo: map[string]allocTaskCgroupInfo{}, + logger: logger, + } +} + +var ( + cpusetReconcileInterval = 30 * time.Second +) + +type cpusetManager struct { + // cgroupParent relative to the cgroup root. ex. '/nomad' + cgroupParent string + // cgroupParentPath is the absolute path to the cgroup parent. + cgroupParentPath string + + parentCpuset cpuset.CPUSet + + // all exported functions are synchronized + mu sync.Mutex + + cgroupInfo map[string]allocTaskCgroupInfo + + doneCh chan struct{} + signalCh chan struct{} + logger hclog.Logger +} + +func (c *cpusetManager) AddAlloc(alloc *structs.Allocation) { + allocInfo := allocTaskCgroupInfo{} + for task, resources := range alloc.AllocatedResources.Tasks { + taskCpuset := cpuset.New(resources.Cpu.ReservedCores...) + cgroupPath := filepath.Join(c.cgroupParentPath, SharedCpusetCgroupName) + relativeCgroupPath := filepath.Join(c.cgroupParent, SharedCpusetCgroupName) + if taskCpuset.Size() > 0 { + cgroupPath, relativeCgroupPath = c.getCgroupPathsForTask(alloc.ID, task) + } + allocInfo[task] = &TaskCgroupInfo{ + CgroupPath: cgroupPath, + RelativeCgroupPath: relativeCgroupPath, + Cpuset: taskCpuset, + } + } + c.mu.Lock() + c.cgroupInfo[alloc.ID] = allocInfo + c.mu.Unlock() + go c.signalReconcile() +} + +func (c *cpusetManager) RemoveAlloc(allocID string) { + c.mu.Lock() + delete(c.cgroupInfo, allocID) + c.mu.Unlock() + go c.signalReconcile() +} + +func (c *cpusetManager) CgroupPathFor(allocID, task string) CgroupPathGetter { + return func(ctx context.Context) (string, error) { + c.mu.Lock() + allocInfo, ok := c.cgroupInfo[allocID] + if !ok { + c.mu.Unlock() + return "", fmt.Errorf("alloc not found for id %q", allocID) + } + + taskInfo, ok := allocInfo[task] + c.mu.Unlock() + if !ok { + return "", fmt.Errorf("task %q not found", task) + } + + for { + if taskInfo.Error != nil { + break + } + if _, err := os.Stat(taskInfo.CgroupPath); os.IsNotExist(err) { + select { + case <-ctx.Done(): + return taskInfo.CgroupPath, ctx.Err() + case <-time.After(100 * time.Millisecond): + continue + } + } + break + } + + return taskInfo.CgroupPath, taskInfo.Error + } + +} + +type allocTaskCgroupInfo map[string]*TaskCgroupInfo + +// Init checks that the cgroup parent and expected child cgroups have been created +// If the cgroup parent is set to /nomad then this will ensure that the /nomad/shared +// cgroup is initialized. +func (c *cpusetManager) Init() error { + c.mu.Lock() + defer c.mu.Unlock() + cgroupParentPath, err := getCgroupPathHelper("cpuset", c.cgroupParent) + if err != nil { + return err + } + c.cgroupParentPath = cgroupParentPath + + // ensures that shared cpuset exists and that the cpuset values are copied from the parent if created + if err := cpusetEnsureParent(filepath.Join(cgroupParentPath, SharedCpusetCgroupName)); err != nil { + return err + } + + parentCpus, parentMems, err := getCpusetSubsystemSettings(cgroupParentPath) + c.parentCpuset, err = cpuset.Parse(parentCpus) + if err != nil { + return fmt.Errorf("failed to parse parent cpuset.cpus setting: %v", err) + } + + // ensure the reserved cpuset exists, but only copy the mems from the parent if creating the cgroup + if err := os.Mkdir(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), 0755); err == nil { + // cgroup created, leave cpuset.cpus empty but copy cpuset.mems from parent + if err != nil { + return err + } + if err := fscommon.WriteFile(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), "cpuset.mems", string(parentMems)); err != nil { + return err + } + } else if !os.IsExist(err) { + return err + } + + c.doneCh = make(chan struct{}) + c.signalCh = make(chan struct{}) + + c.logger.Info("initialized cpuset cgroup manager", "parent", c.cgroupParent, "cpuset", c.parentCpuset.String()) + + go c.reconcileLoop() + return nil +} + +func (c *cpusetManager) reconcileLoop() { + timer := time.NewTimer(0) + if !timer.Stop() { + <-timer.C + } + defer timer.Stop() + + for { + select { + case <-c.doneCh: + c.logger.Debug("shutting down reconcile loop") + return + case <-c.signalCh: + timer.Reset(500 * time.Millisecond) + case <-timer.C: + c.reconcileCpusets() + timer.Reset(cpusetReconcileInterval) + } + } +} + +func (c *cpusetManager) reconcileCpusets() { + c.mu.Lock() + defer c.mu.Unlock() + sharedCpuset := cpuset.New(c.parentCpuset.ToSlice()...) + reservedCpuset := cpuset.New() + taskCpusets := map[string]*TaskCgroupInfo{} + for _, alloc := range c.cgroupInfo { + for _, task := range alloc { + if task.Cpuset.Size() == 0 { + continue + } + sharedCpuset = sharedCpuset.Difference(task.Cpuset) + reservedCpuset = reservedCpuset.Union(task.Cpuset) + taskCpusets[task.CgroupPath] = task + } + } + + // look for reserved cpusets which we don't know about and remove + files, err := ioutil.ReadDir(c.reservedCpusetPath()) + if err != nil { + c.logger.Error("failed to list files in reserved cgroup path during reconciliation", "path", c.reservedCpusetPath(), "error", err) + } + for _, f := range files { + if !f.IsDir() { + continue + } + path := filepath.Join(c.reservedCpusetPath(), f.Name()) + if _, ok := taskCpusets[path]; ok { + continue + } + c.logger.Debug("removing reserved cpuset cgroup", "path", path) + err := cgroups.RemovePaths(map[string]string{"cpuset": path}) + if err != nil { + c.logger.Error("removal of existing cpuset cgroup failed", "path", path, "error", err) + } + } + + if err := c.setCgroupCpusetCPUs(c.sharedCpusetPath(), sharedCpuset.String()); err != nil { + c.logger.Error("could not write shared cpuset.cpus", "path", c.sharedCpusetPath(), "cpuset.cpus", sharedCpuset.String(), "error", err) + } + if err := c.setCgroupCpusetCPUs(c.reservedCpusetPath(), reservedCpuset.String()); err != nil { + c.logger.Error("could not write reserved cpuset.cpus", "path", c.reservedCpusetPath(), "cpuset.cpus", reservedCpuset.String(), "error", err) + } + for _, info := range taskCpusets { + if err := os.Mkdir(info.CgroupPath, 0755); err != nil && !os.IsExist(err) { + c.logger.Error("failed to create new cgroup path for task", "path", info.CgroupPath, "error", err) + info.Error = err + continue + } + + // copy cpuset.mems from parent + _, parentMems, err := getCpusetSubsystemSettings(filepath.Dir(info.CgroupPath)) + if err != nil { + c.logger.Error("failed to read parent cgroup settings for task", "path", info.CgroupPath, "error", err) + info.Error = err + continue + } + if err := fscommon.WriteFile(info.CgroupPath, "cpuset.mems", parentMems); err != nil { + c.logger.Error("failed to write cgroup cpuset.mems setting for task", "path", info.CgroupPath, "mems", parentMems, "error", err) + info.Error = err + continue + } + if err := c.setCgroupCpusetCPUs(info.CgroupPath, info.Cpuset.String()); err != nil { + c.logger.Error("failed to write cgroup cpuset.cpus settings for task", "path", info.CgroupPath, "cpus", info.Cpuset.String(), "error", err) + info.Error = err + continue + } + } +} + +// setCgroupCpusetCPUs will compare an existing cpuset.cpus value with an expected value, overwriting the existing if different +// must hold a lock on cpusetManager.mu before calling +func (_ *cpusetManager) setCgroupCpusetCPUs(path, cpus string) error { + currentCpusRaw, err := fscommon.ReadFile(path, "cpuset.cpus") + if err != nil { + return err + } + + if cpus != strings.TrimSpace(currentCpusRaw) { + if err := fscommon.WriteFile(path, "cpuset.cpus", cpus); err != nil { + return err + } + } + return nil +} + +func (c *cpusetManager) signalReconcile() { + c.signalCh <- struct{}{} +} + +func (c *cpusetManager) getCpuset(group string) (cpuset.CPUSet, error) { + man := cgroupFs.NewManager(&configs.Cgroup{Path: filepath.Join(c.cgroupParent, group)}, map[string]string{"cpuset": filepath.Join(c.cgroupParentPath, group)}, false) + stats, err := man.GetStats() + if err != nil { + return cpuset.CPUSet{}, err + } + return cpuset.New(stats.CPUSetStats.CPUs...), nil +} + +func (c *cpusetManager) getCgroupPathsForTask(allocID, task string) (absolute, relative string) { + return filepath.Join(c.reservedCpusetPath(), fmt.Sprintf("%s-%s", allocID, task)), + filepath.Join(c.cgroupParent, ReservedCpusetCgroupName, fmt.Sprintf("%s-%s", allocID, task)) +} + +func (c *cpusetManager) sharedCpusetPath() string { + return filepath.Join(c.cgroupParentPath, SharedCpusetCgroupName) +} + +func (c *cpusetManager) reservedCpusetPath() string { + return filepath.Join(c.cgroupParentPath, ReservedCpusetCgroupName) +} diff --git a/client/lib/cgutil/cpuset_manager_linux_test.go b/client/lib/cgutil/cpuset_manager_linux_test.go new file mode 100644 index 000000000..c2ed95a55 --- /dev/null +++ b/client/lib/cgutil/cpuset_manager_linux_test.go @@ -0,0 +1,164 @@ +package cgutil + +import ( + "io/ioutil" + "path/filepath" + "runtime" + "syscall" + "testing" + + "github.com/hashicorp/nomad/lib/cpuset" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/opencontainers/runc/libcontainer/cgroups" + + "github.com/hashicorp/nomad/helper/uuid" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/nomad/helper/testlog" +) + +func tmpCpusetManager(t *testing.T) (manager *cpusetManager, cleanup func()) { + if runtime.GOOS != "linux" || syscall.Geteuid() != 0 { + t.Skip("Test only available running as root on linux") + } + mount, err := FindCgroupMountpointDir() + if err != nil || mount == "" { + t.Skipf("Failed to find cgroup mount: %v %v", mount, err) + } + + parent := "/gotest-" + uuid.Short() + require.NoError(t, cpusetEnsureParent(parent)) + + manager = &cpusetManager{ + cgroupParent: parent, + cgroupInfo: map[string]allocTaskCgroupInfo{}, + logger: testlog.HCLogger(t), + } + + parentPath, err := getCgroupPathHelper("cpuset", parent) + require.NoError(t, err) + + return manager, func() { require.NoError(t, cgroups.RemovePaths(map[string]string{"cpuset": parentPath})) } +} + +func TestCpusetManager_Init(t *testing.T) { + manager, cleanup := tmpCpusetManager(t) + defer cleanup() + require.NoError(t, manager.Init()) + + require.DirExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName)) + require.FileExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus")) + sharedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus")) + require.NoError(t, err) + sharedCpus, err := cpuset.Parse(string(sharedCpusRaw)) + require.NoError(t, err) + require.Exactly(t, manager.parentCpuset.ToSlice(), sharedCpus.ToSlice()) + require.DirExists(t, filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName)) +} + +func TestCpusetManager_AddAlloc(t *testing.T) { + manager, cleanup := tmpCpusetManager(t) + defer cleanup() + require.NoError(t, manager.Init()) + + alloc := mock.Alloc() + alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores = manager.parentCpuset.ToSlice() + manager.AddAlloc(alloc) + // force reconcile + manager.reconcileCpusets() + + // check that no more cores exist in the shared cgroup + require.DirExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName)) + require.FileExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus")) + sharedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus")) + require.NoError(t, err) + sharedCpus, err := cpuset.Parse(string(sharedCpusRaw)) + require.NoError(t, err) + require.Empty(t, sharedCpus.ToSlice()) + + // check that all cores are allocated to reserved cgroup + require.DirExists(t, filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName)) + reservedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName, "cpuset.cpus")) + require.NoError(t, err) + reservedCpus, err := cpuset.Parse(string(reservedCpusRaw)) + require.NoError(t, err) + require.Exactly(t, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores, reservedCpus.ToSlice()) + + // check that task cgroup exists and cpuset matches expected reserved cores + allocInfo, ok := manager.cgroupInfo[alloc.ID] + require.True(t, ok) + require.Len(t, allocInfo, 1) + taskInfo, ok := allocInfo["web"] + require.True(t, ok) + + require.DirExists(t, taskInfo.CgroupPath) + taskCpusRaw, err := ioutil.ReadFile(filepath.Join(taskInfo.CgroupPath, "cpuset.cpus")) + require.NoError(t, err) + taskCpus, err := cpuset.Parse(string(taskCpusRaw)) + require.NoError(t, err) + require.Exactly(t, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores, taskCpus.ToSlice()) +} + +func TestCpusetManager_RemoveAlloc(t *testing.T) { + manager, cleanup := tmpCpusetManager(t) + defer cleanup() + require.NoError(t, manager.Init()) + + // this case tests adding 2 allocs, reconciling then removing 1 alloc + // it requires the system to have atleast 2 cpu cores (one for each alloc) + if manager.parentCpuset.Size() < 2 { + t.Skip("test requires atleast 2 cpu cores") + } + + alloc1 := mock.Alloc() + alloc1Cpuset := cpuset.New(manager.parentCpuset.ToSlice()[0]) + alloc1.AllocatedResources.Tasks["web"].Cpu.ReservedCores = alloc1Cpuset.ToSlice() + manager.AddAlloc(alloc1) + + alloc2 := mock.Alloc() + alloc2Cpuset := cpuset.New(manager.parentCpuset.ToSlice()[1]) + alloc2.AllocatedResources.Tasks["web"].Cpu.ReservedCores = alloc2Cpuset.ToSlice() + manager.AddAlloc(alloc2) + + //force reconcile + manager.reconcileCpusets() + + // shared cpuset should not include any expected cores + sharedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus")) + require.NoError(t, err) + sharedCpus, err := cpuset.Parse(string(sharedCpusRaw)) + require.NoError(t, err) + require.False(t, sharedCpus.ContainsAny(alloc1Cpuset.Union(alloc2Cpuset))) + + // reserved cpuset should equal the expected cpus + reservedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName, "cpuset.cpus")) + require.NoError(t, err) + reservedCpus, err := cpuset.Parse(string(reservedCpusRaw)) + require.NoError(t, err) + require.True(t, reservedCpus.Equals(alloc1Cpuset.Union(alloc2Cpuset))) + + // remove first allocation + alloc1TaskPath := manager.cgroupInfo[alloc1.ID]["web"].CgroupPath + manager.RemoveAlloc(alloc1.ID) + manager.reconcileCpusets() + + // alloc1's task reserved cgroup should be removed + require.NoDirExists(t, alloc1TaskPath) + + // shared cpuset should now include alloc1's cores + sharedCpusRaw, err = ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus")) + require.NoError(t, err) + sharedCpus, err = cpuset.Parse(string(sharedCpusRaw)) + require.NoError(t, err) + require.False(t, sharedCpus.ContainsAny(alloc2Cpuset)) + require.True(t, sharedCpus.IsSupersetOf(alloc1Cpuset)) + + // reserved cpuset should only include alloc2's cores + reservedCpusRaw, err = ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName, "cpuset.cpus")) + require.NoError(t, err) + reservedCpus, err = cpuset.Parse(string(reservedCpusRaw)) + require.NoError(t, err) + require.True(t, reservedCpus.Equals(alloc2Cpuset)) + +}