Merge pull request #10328 from hashicorp/f-cpu-cores-3

Reserved Cores [3/4]: Client cpuset cgroup managment
This commit is contained in:
Nick Ethier 2021-04-16 14:11:45 -04:00 committed by GitHub
commit f6d7285157
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 782 additions and 100 deletions

View file

@ -7,6 +7,8 @@ import (
"sync"
"time"
"github.com/hashicorp/nomad/client/lib/cgutil"
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
@ -152,6 +154,9 @@ type allocRunner struct {
// runner to manage their mounting
csiManager csimanager.Manager
// cpusetManager is responsible for configuring task cgroups if supported by the platform
cpusetManager cgutil.CpusetManager
// devicemanager is used to mount devices as well as lookup device
// statistics
devicemanager devicemanager.Manager
@ -208,6 +213,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
prevAllocMigrator: config.PrevAllocMigrator,
dynamicRegistry: config.DynamicRegistry,
csiManager: config.CSIManager,
cpusetManager: config.CpusetManager,
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
serversContactedCh: config.ServersContactedCh,
@ -262,6 +268,10 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task),
}
if ar.cpusetManager != nil {
trConfig.CpusetCgroupPathGetter = ar.cpusetManager.CgroupPathFor(ar.id, task.Name)
}
// Create, but do not Run, the task runner
tr, err := taskrunner.NewTaskRunner(trConfig)
if err != nil {

View file

@ -138,6 +138,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
alloc := ar.Alloc()
ar.runnerHooks = []interfaces.RunnerHook{
newAllocDirHook(hookLogger, ar.allocDir),
newCgroupHook(ar.Alloc(), ar.cpusetManager),
newUpstreamAllocsHook(hookLogger, ar.prevAllocWatcher),
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient),

View file

@ -0,0 +1,32 @@
package allocrunner
import (
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/nomad/structs"
)
func newCgroupHook(alloc *structs.Allocation, man cgutil.CpusetManager) *cgroupHook {
return &cgroupHook{
alloc: alloc,
cpusetManager: man,
}
}
type cgroupHook struct {
alloc *structs.Allocation
cpusetManager cgutil.CpusetManager
}
func (c *cgroupHook) Name() string {
return "cgroup"
}
func (c *cgroupHook) Prerun() error {
c.cpusetManager.AddAlloc(c.alloc)
return nil
}
func (c *cgroupHook) Postrun() error {
c.cpusetManager.RemoveAlloc(c.alloc.ID)
return nil
}

View file

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
cstate "github.com/hashicorp/nomad/client/state"
@ -69,6 +70,9 @@ type Config struct {
// DriverManager handles dispensing of driver plugins
DriverManager drivermanager.Manager
// CpusetManager configures the cpuset cgroup if supported by the platform
CpusetManager cgutil.CpusetManager
// ServersContactedCh is closed when the first GetClientAllocs call to
// servers succeeds and allocs are synced.
ServersContactedCh chan struct{}

View file

@ -8,6 +8,8 @@ import (
"sync"
"time"
"github.com/hashicorp/nomad/client/lib/cgutil"
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
@ -201,6 +203,10 @@ type TaskRunner struct {
// statistics
devicemanager devicemanager.Manager
// cpusetCgroupPathGetter is used to lookup the cgroup path if supported by the platform
cpusetCgroupPathGetter cgutil.CgroupPathGetter
CpusetCgroupPathGetter cgutil.CgroupPathGetter
// driverManager is used to dispense driver plugins and register event
// handlers
driverManager drivermanager.Manager
@ -265,6 +271,9 @@ type Config struct {
// CSIManager is used to manage the mounting of CSI volumes into tasks
CSIManager csimanager.Manager
// CpusetCgroupPathGetter is used to lookup the cgroup path if supported by the platform
CpusetCgroupPathGetter cgutil.CgroupPathGetter
// DeviceManager is used to mount devices as well as lookup device
// statistics
DeviceManager devicemanager.Manager
@ -303,36 +312,37 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
}
tr := &TaskRunner{
alloc: config.Alloc,
allocID: config.Alloc.ID,
clientConfig: config.ClientConfig,
task: config.Task,
taskDir: config.TaskDir,
taskName: config.Task.Name,
taskLeader: config.Task.Leader,
envBuilder: envBuilder,
dynamicRegistry: config.DynamicRegistry,
consulServiceClient: config.Consul,
consulProxiesClient: config.ConsulProxies,
siClient: config.ConsulSI,
vaultClient: config.Vault,
state: tstate,
localState: state.NewLocalState(),
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
deviceStatsReporter: config.DeviceStatsReporter,
killCtx: killCtx,
killCtxCancel: killCancel,
shutdownCtx: trCtx,
shutdownCtxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
waitCh: make(chan struct{}),
csiManager: config.CSIManager,
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
maxEvents: defaultMaxEvents,
serversContactedCh: config.ServersContactedCh,
startConditionMetCtx: config.StartConditionMetCtx,
alloc: config.Alloc,
allocID: config.Alloc.ID,
clientConfig: config.ClientConfig,
task: config.Task,
taskDir: config.TaskDir,
taskName: config.Task.Name,
taskLeader: config.Task.Leader,
envBuilder: envBuilder,
dynamicRegistry: config.DynamicRegistry,
consulServiceClient: config.Consul,
consulProxiesClient: config.ConsulProxies,
siClient: config.ConsulSI,
vaultClient: config.Vault,
state: tstate,
localState: state.NewLocalState(),
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
deviceStatsReporter: config.DeviceStatsReporter,
killCtx: killCtx,
killCtxCancel: killCancel,
shutdownCtx: trCtx,
shutdownCtxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
waitCh: make(chan struct{}),
csiManager: config.CSIManager,
cpusetCgroupPathGetter: config.CpusetCgroupPathGetter,
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
maxEvents: defaultMaxEvents,
serversContactedCh: config.ServersContactedCh,
startConditionMetCtx: config.StartConditionMetCtx,
}
// Create the logger based on the allocation ID
@ -741,6 +751,13 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
func (tr *TaskRunner) runDriver() error {
taskConfig := tr.buildTaskConfig()
if tr.cpusetCgroupPathGetter != nil {
cpusetCgroupPath, err := tr.cpusetCgroupPathGetter(tr.killCtx)
if err != nil {
return err
}
taskConfig.Resources.LinuxResources.CpusetCgroupPath = cpusetCgroupPath
}
// Build hcl context variables
vars, errs, err := tr.envBuilder.Build().AllValues()

View file

@ -6,6 +6,8 @@ import (
"sync"
"testing"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/allocwatcher"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
@ -67,6 +69,7 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
CpusetManager: cgutil.NoopCpusetManager(),
ServersContactedCh: make(chan struct{}),
}
return conf, cleanup

View file

@ -7,6 +7,7 @@ import (
"net/rpc"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
@ -298,6 +299,9 @@ type Client struct {
// with a nomad client. Currently only used for CSI.
dynamicRegistry dynamicplugins.Registry
// cpusetManager configures cpusets on supported platforms
cpusetManager cgutil.CpusetManager
// EnterpriseClient is used to set and check enterprise features for clients
EnterpriseClient *EnterpriseClient
}
@ -357,6 +361,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
invalidAllocs: make(map[string]struct{}),
serversContactedCh: make(chan struct{}),
serversContactedOnce: sync.Once{},
cpusetManager: cgutil.NewCpusetManager(cfg.CgroupParent, logger.Named("cpuset_manager")),
EnterpriseClient: newEnterpriseClient(logger),
}
@ -635,12 +640,14 @@ func (c *Client) init() error {
c.logger.Info("using alloc directory", "alloc_dir", c.config.AllocDir)
// Ensure cgroups are created on linux platform
if err := cgutil.InitCpusetParent(c.config.CgroupParent); err != nil {
// If the client cannot initialize the cgroup then reserved cores will
// not be reported and the cpuset manager will be disabled. This is
// common when running in dev mode under a non-root user for example.
c.logger.Warn("could not initialize cpuset cgroup subsystem, cpuset management disabled", "error", err)
c.config.DisableCgroupManagement = true
if runtime.GOOS == "linux" && c.cpusetManager != nil {
err := c.cpusetManager.Init()
if err != nil {
// if the client cannot initialize the cgroup then reserved cores will not be reported and the cpuset manager
// will be disabled. this is common when running in dev mode under a non-root user for example
c.logger.Warn("could not initialize cpuset cgroup subsystem, cpuset management disabled", "error", err)
c.cpusetManager = cgutil.NoopCpusetManager()
}
}
return nil
}
@ -1125,6 +1132,7 @@ func (c *Client) restoreState() error {
PrevAllocMigrator: prevAllocMigrator,
DynamicRegistry: c.dynamicRegistry,
CSIManager: c.csimanager,
CpusetManager: c.cpusetManager,
DeviceManager: c.devicemanager,
DriverManager: c.drivermanager,
ServersContactedCh: c.serversContactedCh,
@ -2419,6 +2427,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
PrevAllocMigrator: prevAllocMigrator,
DynamicRegistry: c.dynamicRegistry,
CSIManager: c.csimanager,
CpusetManager: c.cpusetManager,
DeviceManager: c.devicemanager,
DriverManager: c.drivermanager,
RPCClient: c,

View file

@ -1186,6 +1186,9 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
// network interfaces take precedence over fingerprinted ones.
func TestClient_UpdateNodeFromFingerprintKeepsConfig(t *testing.T) {
t.Parallel()
if runtime.GOOS != "linux" {
t.Skip("assertions assume linux platform")
}
// Client without network configured updates to match fingerprint
client, cleanup := TestClient(t, nil)

View file

@ -274,10 +274,6 @@ type Config struct {
// ReservableCores if set overrides the set of reservable cores reported in fingerprinting.
ReservableCores []uint16
// DisableCgroupManagement if true disables all management of cgroup subsystems by the Nomad client. It does
// not prevent individual drivers from manging their own cgroups.
DisableCgroupManagement bool
}
type ClientTemplateConfig struct {

View file

@ -3,6 +3,8 @@ package fingerprint
import (
"time"
"github.com/hashicorp/nomad/client/lib/cgutil"
log "github.com/hashicorp/go-hclog"
)
@ -30,7 +32,7 @@ type DefaultMountPointDetector struct {
// Call out to the default cgroup library
func (b *DefaultMountPointDetector) MountPoint() (string, error) {
return FindCgroupMountpointDir()
return cgutil.FindCgroupMountpointDir()
}
// NewCGroupFingerprint returns a new cgroup fingerprinter

View file

@ -2,12 +2,6 @@
package fingerprint
// FindCgroupMountpointDir is used to find the cgroup mount point on a Linux
// system. Here it is a no-op implemtation
func FindCgroupMountpointDir() (string, error) {
return "", nil
}
func (f *CGroupFingerprint) Fingerprint(*FingerprintRequest, *FingerprintResponse) error {
return nil
}

View file

@ -4,28 +4,12 @@ package fingerprint
import (
"fmt"
"github.com/opencontainers/runc/libcontainer/cgroups"
)
const (
cgroupAvailable = "available"
)
// FindCgroupMountpointDir is used to find the cgroup mount point on a Linux
// system.
func FindCgroupMountpointDir() (string, error) {
mount, err := cgroups.GetCgroupMounts(false)
if err != nil {
return "", err
}
// It's okay if the mount point is not discovered
if len(mount) == 0 {
return "", nil
}
return mount[0].Mountpoint, nil
}
// Fingerprint tries to find a valid cgroup mount point
func (f *CGroupFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintResponse) error {
mount, err := f.mountPointDetector.MountPoint()

View file

@ -74,7 +74,9 @@ func (f *CPUFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintR
if cores, err := f.deriveReservableCores(req); err != nil {
f.logger.Warn("failed to detect set of reservable cores", "error", err)
} else {
reservableCores = cpuset.New(cores...).Difference(cpuset.New(req.Node.ReservedResources.Cpu.ReservedCpuCores...)).ToSlice()
if req.Node.ReservedResources != nil {
reservableCores = cpuset.New(cores...).Difference(cpuset.New(req.Node.ReservedResources.Cpu.ReservedCpuCores...)).ToSlice()
}
f.logger.Debug("detected reservable cores", "cpuset", reservableCores)
}
}

View file

@ -5,5 +5,9 @@ import (
)
func (f *CPUFingerprint) deriveReservableCores(req *FingerprintRequest) ([]uint16, error) {
return cgutil.GetCPUsFromCgroup(req.Config.CgroupParent)
parent := req.Config.CgroupParent
if parent == "" {
parent = cgutil.DefaultCgroupParent
}
return cgutil.GetCPUsFromCgroup(parent)
}

View file

@ -6,4 +6,8 @@ const (
DefaultCgroupParent = ""
)
func InitCpusetParent(string) error { return nil }
// FindCgroupMountpointDir is used to find the cgroup mount point on a Linux
// system. Here it is a no-op implemtation
func FindCgroupMountpointDir() (string, error) {
return "", nil
}

View file

@ -13,36 +13,11 @@ import (
)
const (
DefaultCgroupParent = "/nomad"
SharedCpusetCgroupName = "shared"
DefaultCgroupParent = "/nomad"
SharedCpusetCgroupName = "shared"
ReservedCpusetCgroupName = "reserved"
)
// InitCpusetParent checks that the cgroup parent and expected child cgroups have been created.
// If the cgroup parent is set to /nomad then this will ensure that the /nomad/shared
// cgroup is initialized. The /nomad/reserved cgroup will be lazily created when a workload
// with reserved cores is created.
func InitCpusetParent(cgroupParent string) error {
if cgroupParent == "" {
cgroupParent = DefaultCgroupParent
}
var err error
if cgroupParent, err = getCgroupPathHelper("cpuset", cgroupParent); err != nil {
return err
}
// 'ensureParent' start with parent because we don't want to
// explicitly inherit from parent, it could conflict with
// 'cpuset.cpu_exclusive'.
if err := cpusetEnsureParent(cgroupParent); err != nil {
return err
}
if err := os.Mkdir(filepath.Join(cgroupParent, SharedCpusetCgroupName), 0755); err != nil && !os.IsExist(err) {
return err
}
return nil
}
func GetCPUsFromCgroup(group string) ([]uint16, error) {
cgroupPath, err := getCgroupPathHelper("cpuset", group)
if err != nil {
@ -138,3 +113,17 @@ func getCgroupPathHelper(subsystem, cgroup string) (string, error) {
return filepath.Join(mnt, relCgroup), nil
}
// FindCgroupMountpointDir is used to find the cgroup mount point on a Linux
// system.
func FindCgroupMountpointDir() (string, error) {
mount, err := cgroups.GetCgroupMounts(false)
if err != nil {
return "", err
}
// It's okay if the mount point is not discovered
if len(mount) == 0 {
return "", nil
}
return mount[0].Mountpoint, nil
}

View file

@ -0,0 +1,56 @@
package cgutil
import (
"context"
"github.com/hashicorp/nomad/lib/cpuset"
"github.com/hashicorp/nomad/nomad/structs"
)
// CpusetManager is used to setup cpuset cgroups for each task. A pool of shared cpus is managed for
// tasks which don't require any reserved cores and a cgroup is managed seperetly for each task which
// require reserved cores.
type CpusetManager interface {
// Init should be called before any tasks are managed to ensure the cgroup parent exists and
// check that proper permissions are granted to manage cgroups.
Init() error
// AddAlloc adds an allocation to the manager
AddAlloc(alloc *structs.Allocation)
// RemoveAlloc removes an alloc by ID from the manager
RemoveAlloc(allocID string)
// CgroupPathFor returns a callback for getting the cgroup path and any error that may have occurred during
// cgroup initialization. The callback will block if the cgroup has not been created
CgroupPathFor(allocID, taskName string) CgroupPathGetter
}
// CgroupPathGetter is a function which returns the cgroup path and any error which ocured during cgroup initialization.
// It should block until the cgroup has been created or an error is reported
type CgroupPathGetter func(context.Context) (path string, err error)
type TaskCgroupInfo struct {
CgroupPath string
RelativeCgroupPath string
Cpuset cpuset.CPUSet
Error error
}
func NoopCpusetManager() CpusetManager { return noopCpusetManager{} }
type noopCpusetManager struct{}
func (n noopCpusetManager) Init() error {
return nil
}
func (n noopCpusetManager) AddAlloc(alloc *structs.Allocation) {
}
func (n noopCpusetManager) RemoveAlloc(allocID string) {
}
func (n noopCpusetManager) CgroupPathFor(allocID, task string) CgroupPathGetter {
return func(context.Context) (string, error) { return "", nil }
}

View file

@ -0,0 +1,9 @@
// +build !linux
package cgutil
import (
"github.com/hashicorp/go-hclog"
)
func NewCpusetManager(_ string, _ hclog.Logger) CpusetManager { return noopCpusetManager{} }

View file

@ -0,0 +1,315 @@
package cgutil
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/hashicorp/go-hclog"
"github.com/opencontainers/runc/libcontainer/cgroups/fscommon"
"github.com/hashicorp/nomad/lib/cpuset"
cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/configs"
"github.com/hashicorp/nomad/nomad/structs"
)
func NewCpusetManager(cgroupParent string, logger hclog.Logger) CpusetManager {
if cgroupParent == "" {
cgroupParent = DefaultCgroupParent
}
return &cpusetManager{
cgroupParent: cgroupParent,
cgroupInfo: map[string]allocTaskCgroupInfo{},
logger: logger,
}
}
var (
cpusetReconcileInterval = 30 * time.Second
)
type cpusetManager struct {
// cgroupParent relative to the cgroup root. ex. '/nomad'
cgroupParent string
// cgroupParentPath is the absolute path to the cgroup parent.
cgroupParentPath string
parentCpuset cpuset.CPUSet
// all exported functions are synchronized
mu sync.Mutex
cgroupInfo map[string]allocTaskCgroupInfo
doneCh chan struct{}
signalCh chan struct{}
logger hclog.Logger
}
func (c *cpusetManager) AddAlloc(alloc *structs.Allocation) {
if alloc == nil || alloc.AllocatedResources == nil {
return
}
allocInfo := allocTaskCgroupInfo{}
for task, resources := range alloc.AllocatedResources.Tasks {
taskCpuset := cpuset.New(resources.Cpu.ReservedCores...)
cgroupPath := filepath.Join(c.cgroupParentPath, SharedCpusetCgroupName)
relativeCgroupPath := filepath.Join(c.cgroupParent, SharedCpusetCgroupName)
if taskCpuset.Size() > 0 {
cgroupPath, relativeCgroupPath = c.getCgroupPathsForTask(alloc.ID, task)
}
allocInfo[task] = &TaskCgroupInfo{
CgroupPath: cgroupPath,
RelativeCgroupPath: relativeCgroupPath,
Cpuset: taskCpuset,
}
}
c.mu.Lock()
c.cgroupInfo[alloc.ID] = allocInfo
c.mu.Unlock()
go c.signalReconcile()
}
func (c *cpusetManager) RemoveAlloc(allocID string) {
c.mu.Lock()
delete(c.cgroupInfo, allocID)
c.mu.Unlock()
go c.signalReconcile()
}
func (c *cpusetManager) CgroupPathFor(allocID, task string) CgroupPathGetter {
return func(ctx context.Context) (string, error) {
c.mu.Lock()
allocInfo, ok := c.cgroupInfo[allocID]
if !ok {
c.mu.Unlock()
return "", fmt.Errorf("alloc not found for id %q", allocID)
}
taskInfo, ok := allocInfo[task]
c.mu.Unlock()
if !ok {
return "", fmt.Errorf("task %q not found", task)
}
for {
if taskInfo.Error != nil {
break
}
if _, err := os.Stat(taskInfo.CgroupPath); os.IsNotExist(err) {
select {
case <-ctx.Done():
return taskInfo.CgroupPath, ctx.Err()
case <-time.After(100 * time.Millisecond):
continue
}
}
break
}
return taskInfo.CgroupPath, taskInfo.Error
}
}
type allocTaskCgroupInfo map[string]*TaskCgroupInfo
// Init checks that the cgroup parent and expected child cgroups have been created
// If the cgroup parent is set to /nomad then this will ensure that the /nomad/shared
// cgroup is initialized.
func (c *cpusetManager) Init() error {
c.mu.Lock()
defer c.mu.Unlock()
cgroupParentPath, err := getCgroupPathHelper("cpuset", c.cgroupParent)
if err != nil {
return err
}
c.cgroupParentPath = cgroupParentPath
// ensures that shared cpuset exists and that the cpuset values are copied from the parent if created
if err := cpusetEnsureParent(filepath.Join(cgroupParentPath, SharedCpusetCgroupName)); err != nil {
return err
}
parentCpus, parentMems, err := getCpusetSubsystemSettings(cgroupParentPath)
if err != nil {
return fmt.Errorf("failed to detect parent cpuset settings: %v", err)
}
c.parentCpuset, err = cpuset.Parse(parentCpus)
if err != nil {
return fmt.Errorf("failed to parse parent cpuset.cpus setting: %v", err)
}
// ensure the reserved cpuset exists, but only copy the mems from the parent if creating the cgroup
if err := os.Mkdir(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), 0755); err == nil {
// cgroup created, leave cpuset.cpus empty but copy cpuset.mems from parent
if err != nil {
return err
}
if err := fscommon.WriteFile(filepath.Join(cgroupParentPath, ReservedCpusetCgroupName), "cpuset.mems", parentMems); err != nil {
return err
}
} else if !os.IsExist(err) {
return err
}
c.doneCh = make(chan struct{})
c.signalCh = make(chan struct{})
c.logger.Info("initialized cpuset cgroup manager", "parent", c.cgroupParent, "cpuset", c.parentCpuset.String())
go c.reconcileLoop()
return nil
}
func (c *cpusetManager) reconcileLoop() {
timer := time.NewTimer(0)
if !timer.Stop() {
<-timer.C
}
defer timer.Stop()
for {
select {
case <-c.doneCh:
c.logger.Debug("shutting down reconcile loop")
return
case <-c.signalCh:
timer.Reset(500 * time.Millisecond)
case <-timer.C:
c.reconcileCpusets()
timer.Reset(cpusetReconcileInterval)
}
}
}
func (c *cpusetManager) reconcileCpusets() {
c.mu.Lock()
defer c.mu.Unlock()
sharedCpuset := cpuset.New(c.parentCpuset.ToSlice()...)
reservedCpuset := cpuset.New()
taskCpusets := map[string]*TaskCgroupInfo{}
for _, alloc := range c.cgroupInfo {
for _, task := range alloc {
if task.Cpuset.Size() == 0 {
continue
}
sharedCpuset = sharedCpuset.Difference(task.Cpuset)
reservedCpuset = reservedCpuset.Union(task.Cpuset)
taskCpusets[task.CgroupPath] = task
}
}
// look for reserved cpusets which we don't know about and remove
files, err := ioutil.ReadDir(c.reservedCpusetPath())
if err != nil {
c.logger.Error("failed to list files in reserved cgroup path during reconciliation", "path", c.reservedCpusetPath(), "error", err)
}
for _, f := range files {
if !f.IsDir() {
continue
}
path := filepath.Join(c.reservedCpusetPath(), f.Name())
if _, ok := taskCpusets[path]; ok {
continue
}
c.logger.Debug("removing reserved cpuset cgroup", "path", path)
err := cgroups.RemovePaths(map[string]string{"cpuset": path})
if err != nil {
c.logger.Error("removal of existing cpuset cgroup failed", "path", path, "error", err)
}
}
if err := c.setCgroupCpusetCPUs(c.sharedCpusetPath(), sharedCpuset.String()); err != nil {
c.logger.Error("could not write shared cpuset.cpus", "path", c.sharedCpusetPath(), "cpuset.cpus", sharedCpuset.String(), "error", err)
}
if err := c.setCgroupCpusetCPUs(c.reservedCpusetPath(), reservedCpuset.String()); err != nil {
c.logger.Error("could not write reserved cpuset.cpus", "path", c.reservedCpusetPath(), "cpuset.cpus", reservedCpuset.String(), "error", err)
}
for _, info := range taskCpusets {
if err := os.Mkdir(info.CgroupPath, 0755); err != nil && !os.IsExist(err) {
c.logger.Error("failed to create new cgroup path for task", "path", info.CgroupPath, "error", err)
info.Error = err
continue
}
// copy cpuset.mems from parent
_, parentMems, err := getCpusetSubsystemSettings(filepath.Dir(info.CgroupPath))
if err != nil {
c.logger.Error("failed to read parent cgroup settings for task", "path", info.CgroupPath, "error", err)
info.Error = err
continue
}
if err := fscommon.WriteFile(info.CgroupPath, "cpuset.mems", parentMems); err != nil {
c.logger.Error("failed to write cgroup cpuset.mems setting for task", "path", info.CgroupPath, "mems", parentMems, "error", err)
info.Error = err
continue
}
if err := c.setCgroupCpusetCPUs(info.CgroupPath, info.Cpuset.String()); err != nil {
c.logger.Error("failed to write cgroup cpuset.cpus settings for task", "path", info.CgroupPath, "cpus", info.Cpuset.String(), "error", err)
info.Error = err
continue
}
}
}
// setCgroupCpusetCPUs will compare an existing cpuset.cpus value with an expected value, overwriting the existing if different
// must hold a lock on cpusetManager.mu before calling
func (_ *cpusetManager) setCgroupCpusetCPUs(path, cpus string) error {
currentCpusRaw, err := fscommon.ReadFile(path, "cpuset.cpus")
if err != nil {
return err
}
if cpus != strings.TrimSpace(currentCpusRaw) {
if err := fscommon.WriteFile(path, "cpuset.cpus", cpus); err != nil {
return err
}
}
return nil
}
func (c *cpusetManager) signalReconcile() {
select {
case c.signalCh <- struct{}{}:
case <-c.doneCh:
}
}
func (c *cpusetManager) getCpuset(group string) (cpuset.CPUSet, error) {
man := cgroupFs.NewManager(
&configs.Cgroup{
Path: filepath.Join(c.cgroupParent, group),
},
map[string]string{"cpuset": filepath.Join(c.cgroupParentPath, group)},
false,
)
stats, err := man.GetStats()
if err != nil {
return cpuset.CPUSet{}, err
}
return cpuset.New(stats.CPUSetStats.CPUs...), nil
}
func (c *cpusetManager) getCgroupPathsForTask(allocID, task string) (absolute, relative string) {
return filepath.Join(c.reservedCpusetPath(), fmt.Sprintf("%s-%s", allocID, task)),
filepath.Join(c.cgroupParent, ReservedCpusetCgroupName, fmt.Sprintf("%s-%s", allocID, task))
}
func (c *cpusetManager) sharedCpusetPath() string {
return filepath.Join(c.cgroupParentPath, SharedCpusetCgroupName)
}
func (c *cpusetManager) reservedCpusetPath() string {
return filepath.Join(c.cgroupParentPath, ReservedCpusetCgroupName)
}

View file

@ -0,0 +1,164 @@
package cgutil
import (
"io/ioutil"
"path/filepath"
"runtime"
"syscall"
"testing"
"github.com/hashicorp/nomad/lib/cpuset"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/helper/testlog"
)
func tmpCpusetManager(t *testing.T) (manager *cpusetManager, cleanup func()) {
if runtime.GOOS != "linux" || syscall.Geteuid() != 0 {
t.Skip("Test only available running as root on linux")
}
mount, err := FindCgroupMountpointDir()
if err != nil || mount == "" {
t.Skipf("Failed to find cgroup mount: %v %v", mount, err)
}
parent := "/gotest-" + uuid.Short()
require.NoError(t, cpusetEnsureParent(parent))
manager = &cpusetManager{
cgroupParent: parent,
cgroupInfo: map[string]allocTaskCgroupInfo{},
logger: testlog.HCLogger(t),
}
parentPath, err := getCgroupPathHelper("cpuset", parent)
require.NoError(t, err)
return manager, func() { require.NoError(t, cgroups.RemovePaths(map[string]string{"cpuset": parentPath})) }
}
func TestCpusetManager_Init(t *testing.T) {
manager, cleanup := tmpCpusetManager(t)
defer cleanup()
require.NoError(t, manager.Init())
require.DirExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName))
require.FileExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
sharedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
sharedCpus, err := cpuset.Parse(string(sharedCpusRaw))
require.NoError(t, err)
require.Exactly(t, manager.parentCpuset.ToSlice(), sharedCpus.ToSlice())
require.DirExists(t, filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName))
}
func TestCpusetManager_AddAlloc(t *testing.T) {
manager, cleanup := tmpCpusetManager(t)
defer cleanup()
require.NoError(t, manager.Init())
alloc := mock.Alloc()
alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores = manager.parentCpuset.ToSlice()
manager.AddAlloc(alloc)
// force reconcile
manager.reconcileCpusets()
// check that no more cores exist in the shared cgroup
require.DirExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName))
require.FileExists(t, filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
sharedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
sharedCpus, err := cpuset.Parse(string(sharedCpusRaw))
require.NoError(t, err)
require.Empty(t, sharedCpus.ToSlice())
// check that all cores are allocated to reserved cgroup
require.DirExists(t, filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName))
reservedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
reservedCpus, err := cpuset.Parse(string(reservedCpusRaw))
require.NoError(t, err)
require.Exactly(t, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores, reservedCpus.ToSlice())
// check that task cgroup exists and cpuset matches expected reserved cores
allocInfo, ok := manager.cgroupInfo[alloc.ID]
require.True(t, ok)
require.Len(t, allocInfo, 1)
taskInfo, ok := allocInfo["web"]
require.True(t, ok)
require.DirExists(t, taskInfo.CgroupPath)
taskCpusRaw, err := ioutil.ReadFile(filepath.Join(taskInfo.CgroupPath, "cpuset.cpus"))
require.NoError(t, err)
taskCpus, err := cpuset.Parse(string(taskCpusRaw))
require.NoError(t, err)
require.Exactly(t, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores, taskCpus.ToSlice())
}
func TestCpusetManager_RemoveAlloc(t *testing.T) {
manager, cleanup := tmpCpusetManager(t)
defer cleanup()
require.NoError(t, manager.Init())
// this case tests adding 2 allocs, reconciling then removing 1 alloc
// it requires the system to have atleast 2 cpu cores (one for each alloc)
if manager.parentCpuset.Size() < 2 {
t.Skip("test requires atleast 2 cpu cores")
}
alloc1 := mock.Alloc()
alloc1Cpuset := cpuset.New(manager.parentCpuset.ToSlice()[0])
alloc1.AllocatedResources.Tasks["web"].Cpu.ReservedCores = alloc1Cpuset.ToSlice()
manager.AddAlloc(alloc1)
alloc2 := mock.Alloc()
alloc2Cpuset := cpuset.New(manager.parentCpuset.ToSlice()[1])
alloc2.AllocatedResources.Tasks["web"].Cpu.ReservedCores = alloc2Cpuset.ToSlice()
manager.AddAlloc(alloc2)
//force reconcile
manager.reconcileCpusets()
// shared cpuset should not include any expected cores
sharedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
sharedCpus, err := cpuset.Parse(string(sharedCpusRaw))
require.NoError(t, err)
require.False(t, sharedCpus.ContainsAny(alloc1Cpuset.Union(alloc2Cpuset)))
// reserved cpuset should equal the expected cpus
reservedCpusRaw, err := ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
reservedCpus, err := cpuset.Parse(string(reservedCpusRaw))
require.NoError(t, err)
require.True(t, reservedCpus.Equals(alloc1Cpuset.Union(alloc2Cpuset)))
// remove first allocation
alloc1TaskPath := manager.cgroupInfo[alloc1.ID]["web"].CgroupPath
manager.RemoveAlloc(alloc1.ID)
manager.reconcileCpusets()
// alloc1's task reserved cgroup should be removed
require.NoDirExists(t, alloc1TaskPath)
// shared cpuset should now include alloc1's cores
sharedCpusRaw, err = ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, SharedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
sharedCpus, err = cpuset.Parse(string(sharedCpusRaw))
require.NoError(t, err)
require.False(t, sharedCpus.ContainsAny(alloc2Cpuset))
require.True(t, sharedCpus.IsSupersetOf(alloc1Cpuset))
// reserved cpuset should only include alloc2's cores
reservedCpusRaw, err = ioutil.ReadFile(filepath.Join(manager.cgroupParentPath, ReservedCpusetCgroupName, "cpuset.cpus"))
require.NoError(t, err)
reservedCpus, err = cpuset.Parse(string(reservedCpusRaw))
require.NoError(t, err)
require.True(t, reservedCpus.Equals(alloc2Cpuset))
}

View file

@ -6,7 +6,7 @@ import (
"syscall"
"testing"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/lib/cgutil"
)
// RequireRoot skips tests unless running on a Unix as root.
@ -50,7 +50,7 @@ func QemuCompatible(t *testing.T) {
}
func CgroupCompatible(t *testing.T) {
mount, err := fingerprint.FindCgroupMountpointDir()
mount, err := cgutil.FindCgroupMountpointDir()
if err != nil || mount == "" {
t.Skipf("Failed to find cgroup mount: %v %v", mount, err)
}

View file

@ -9,9 +9,10 @@ import (
"sync"
"time"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/consul-template/signals"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/drivers/shared/resolvconf"
@ -313,7 +314,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
return fp
}
mount, err := fingerprint.FindCgroupMountpointDir()
mount, err := cgutil.FindCgroupMountpointDir()
if err != nil {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = drivers.NoCgroupMountMessage

View file

@ -9,9 +9,10 @@ import (
"runtime"
"time"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/consul-template/signals"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/drivers/shared/resolvconf"
@ -281,7 +282,7 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
return fp
}
mount, err := fingerprint.FindCgroupMountpointDir()
mount, err := cgutil.FindCgroupMountpointDir()
if err != nil {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = drivers.NoCgroupMountMessage

View file

@ -27,6 +27,38 @@ func New(cpus ...uint16) CPUSet {
return cpuset
}
// String returns the cpuset as a comma delimited set of core values and ranged
func (c CPUSet) String() string {
if c.Size() == 0 {
return ""
}
cores := c.ToSlice()
cpusetStrs := []string{}
cur := [2]uint16{cores[0], cores[0]}
for i := 1; i < len(cores); i++ {
if cores[i] == cur[1]+1 {
cur[1] = cores[i]
continue
}
if cur[0] == cur[1] {
cpusetStrs = append(cpusetStrs, fmt.Sprintf("%d", cur[0]))
} else {
cpusetStrs = append(cpusetStrs, fmt.Sprintf("%d-%d", cur[0], cur[1]))
}
// new range
cur = [2]uint16{cores[i], cores[i]}
}
if cur[0] == cur[1] {
cpusetStrs = append(cpusetStrs, fmt.Sprintf("%d", cur[0]))
} else {
cpusetStrs = append(cpusetStrs, fmt.Sprintf("%d-%d", cur[0], cur[1]))
}
return strings.Join(cpusetStrs, ",")
}
// Size returns to the number of cpus contained in the CPUSet
func (c CPUSet) Size() int {
return len(c.cpus)
@ -88,6 +120,16 @@ func (s CPUSet) IsSupersetOf(other CPUSet) bool {
return true
}
// ContainsAny returns true if any cpus in other CPUSet are present
func (s CPUSet) ContainsAny(other CPUSet) bool {
for cpu := range other.cpus {
if _, ok := s.cpus[cpu]; ok {
return true
}
}
return false
}
// Equals tests the equality of the elements in the CPUSet
func (s CPUSet) Equals(other CPUSet) bool {
return reflect.DeepEqual(s.cpus, other.cpus)
@ -98,6 +140,7 @@ func (s CPUSet) Equals(other CPUSet) bool {
// Ref: http://man7.org/linux/man-pages/man7/cpuset.7.html#FORMATS
func Parse(s string) (CPUSet, error) {
cpuset := New()
s = strings.TrimSpace(s)
if s == "" {
return cpuset, nil
}

View file

@ -146,13 +146,34 @@ func TestCPUSet_IsSupersetOf(t *testing.T) {
}
}
func TestCPUSet_ContainsAny(t *testing.T) {
cases := []struct {
a CPUSet
b CPUSet
containsAny bool
}{
{New(0), New(0), true},
{New(0), New(), false},
{New(), New(0), false},
{New(0, 1, 2, 3), New(0), true},
{New(0, 1, 2, 3), New(2, 3), true},
{New(0, 1, 2, 3), New(2, 3, 4), true},
}
for _, c := range cases {
require.Equal(t, c.containsAny, c.a.ContainsAny(c.b))
}
}
func TestParse(t *testing.T) {
cases := []struct {
cpuset string
expected CPUSet
}{
{"", New()},
{"\n", New()},
{"1", New(1)},
{"1\n", New(1)},
{"0,1,2,3", New(0, 1, 2, 3)},
{"0-3", New(0, 1, 2, 3)},
{"0,2-3,5", New(0, 2, 3, 5)},
@ -164,3 +185,19 @@ func TestParse(t *testing.T) {
require.True(t, result.Equals(c.expected))
}
}
func TestCPUSet_String(t *testing.T) {
cases := []struct {
cpuset CPUSet
expected string
}{
{New(), ""},
{New(0, 1, 2, 3), "0-3"},
{New(1, 3), "1,3"},
{New(0, 2, 3, 5), "0,2-3,5"},
}
for _, c := range cases {
require.Equal(t, c.expected, c.cpuset.String())
}
}

View file

@ -371,6 +371,8 @@ type LinuxResources struct {
CpusetCPUs string
CpusetMems string
CpusetCgroupPath string
// PrecentTicks is used to calculate the CPUQuota, currently the docker
// driver exposes cpu period and quota through the driver configuration
// and thus the calculation for CPUQuota cannot be done on the client.