raw_exec: make raw exec driver work with cgroups v2

This PR adds support for the raw_exec driver on systems with only cgroups v2.

The raw exec driver is able to use cgroups to manage processes. This happens
only on Linux, when exec_driver is enabled, and the no_cgroups option is not
set. The driver uses the freezer controller to freeze processes of a task,
issue a sigkill, then unfreeze. Previously the implementation assumed cgroups
v1, and now it also supports cgroups v2.

There is a bit of refactoring in this PR, but the fundamental design remains
the same.

Closes #12351 #12348
This commit is contained in:
Seth Hoenig 2022-03-28 19:33:01 -05:00
parent e7e8ce212e
commit 52aaf86f52
35 changed files with 698 additions and 308 deletions

3
.changelog/12419.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
Add support for cgroups v2 in raw_exec driver
```

View File

@ -78,7 +78,8 @@ jobs:
run: |
make bootstrap
make generate-all
make test-nomad-module
sudo sed -i 's!Defaults!#Defaults!g' /etc/sudoers
sudo -E env "PATH=$PATH" make test-nomad-module
tests-pkgs:
runs-on: ubuntu-20.04
timeout-minutes: 30

View File

@ -1,5 +1,4 @@
//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris
// +build darwin dragonfly freebsd linux netbsd openbsd solaris
package allocdir

View File

@ -1432,6 +1432,7 @@ func (c *Client) setupNode() error {
if node.Name == "" {
node.Name, _ = os.Hostname()
}
node.CgroupParent = c.config.CgroupParent
if node.HostVolumes == nil {
if l := len(c.config.HostVolumes); l != 0 {
node.HostVolumes = make(map[string]*structs.ClientHostVolumeConfig, l)

View File

@ -318,6 +318,7 @@ func TestFS_List_ACL(t *testing.T) {
func TestFS_Stream_NoAlloc(t *testing.T) {
ci.Parallel(t)
ci.SkipSlow(t, "flaky on GHA; #12358")
require := require.New(t)
// Start a client

View File

@ -58,13 +58,13 @@ func CgroupScope(allocID, task string) string {
return fmt.Sprintf("%s.%s.scope", allocID, task)
}
// ConfigureBasicCgroups will initialize cgroups for v1.
// ConfigureBasicCgroups will initialize a cgroup and modify config to contain
// a reference to its path.
//
// Not useful in cgroups.v2
// v1: creates a random "freezer" cgroup which can later be used for cleanup of processes.
// v2: does nothing.
func ConfigureBasicCgroups(config *lcc.Config) error {
if UseV2 {
// In v2 the default behavior is to create inherited interface files for
// all mounted subsystems automatically.
return nil
}

View File

@ -0,0 +1,210 @@
//go:build linux
package cgutil
import (
"errors"
"fmt"
"os"
"path/filepath"
"time"
"github.com/hashicorp/go-hclog"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/cgroups/fs2"
"github.com/opencontainers/runc/libcontainer/configs"
)
// freezer is the name of the cgroup subsystem used for stopping / starting
// a group of processes
const freezer = "freezer"
// thawed and frozen are the two states we put a cgroup in when trying to remove it
var (
thawed = &configs.Resources{Freezer: configs.Thawed}
frozen = &configs.Resources{Freezer: configs.Frozen}
)
// GroupKiller is used for SIGKILL-ing the process tree[s] of a cgroup by leveraging
// the freezer cgroup subsystem.
type GroupKiller interface {
KillGroup(cgroup *configs.Cgroup) error
}
// NewGroupKiller creates a GroupKiller with executor PID pid.
func NewGroupKiller(logger hclog.Logger, pid int) GroupKiller {
return &killer{
logger: logger.Named("group_killer"),
pid: pid,
}
}
type killer struct {
logger hclog.Logger
pid int
}
// KillGroup will SIGKILL the process tree present in cgroup, using the freezer
// subsystem to prevent further forking, etc.
func (d *killer) KillGroup(cgroup *configs.Cgroup) error {
if UseV2 {
return d.v2(cgroup)
}
return d.v1(cgroup)
}
func (d *killer) v1(cgroup *configs.Cgroup) error {
if cgroup == nil {
return errors.New("missing cgroup")
}
// the actual path to our tasks freezer cgroup
path := cgroup.Paths[freezer]
d.logger.Trace("killing processes", "cgroup_path", path, "cgroup_version", "v1", "executor_pid", d.pid)
// move executor PID into the init freezer cgroup so we can kill the task
// pids without killing the executor (which is the process running this code,
// doing the killing)
initPath, err := cgroups.GetInitCgroupPath(freezer)
if err != nil {
return fmt.Errorf("failed to find init cgroup: %w", err)
}
m := map[string]string{freezer: initPath}
if err = cgroups.EnterPid(m, d.pid); err != nil {
return fmt.Errorf("failed to add executor pid to init cgroup: %w", err)
}
// ability to freeze the cgroup
freeze := func() {
_ = new(fs.FreezerGroup).Set(path, frozen)
}
// ability to thaw the cgroup
thaw := func() {
_ = new(fs.FreezerGroup).Set(path, thawed)
}
// do the common kill logic
if err = d.kill(path, freeze, thaw); err != nil {
return err
}
// remove the cgroup from disk
return cgroups.RemovePath(path)
}
func (d *killer) v2(cgroup *configs.Cgroup) error {
if cgroup == nil {
return errors.New("missing cgroup")
}
path := filepath.Join(CgroupRoot, cgroup.Path)
existingPIDs, err := cgroups.GetPids(path)
if err != nil {
return fmt.Errorf("failed to determine pids in cgroup: %w", err)
}
d.logger.Trace("killing processes", "cgroup_path", path, "cgroup_version", "v2", "executor_pid", d.pid, "existing_pids", existingPIDs)
mgr, err := fs2.NewManager(cgroup, "", rootless)
if err != nil {
return fmt.Errorf("failed to create v2 cgroup manager: %w", err)
}
// move executor PID into the root init.scope so we can kill the task pids
// without killing the executor (which is the process running this code, doing
// the killing)
init, err := fs2.NewManager(nil, filepath.Join(CgroupRoot, "init.scope"), rootless)
if err != nil {
return fmt.Errorf("failed to create v2 init cgroup manager: %w", err)
}
if err = init.Apply(d.pid); err != nil {
return fmt.Errorf("failed to move executor pid into init.scope cgroup: %w", err)
}
d.logger.Trace("move of executor pid into init.scope complete", "pid", d.pid)
// ability to freeze the cgroup
freeze := func() {
_ = mgr.Freeze(configs.Frozen)
}
// ability to thaw the cgroup
thaw := func() {
_ = mgr.Freeze(configs.Thawed)
}
// do the common kill logic
if err = d.kill(path, freeze, thaw); err != nil {
return err
}
// remove the cgroup from disk
return mgr.Destroy()
}
// kill is used to SIGKILL all processes in cgroup
//
// The order of operations is
// 0. before calling this method, the executor pid has been moved outside of cgroup
// 1. freeze cgroup (so processes cannot fork further)
// 2. scan the cgroup to collect all pids
// 3. issue SIGKILL to each pid found
// 4. thaw the cgroup so processes can go die
// 5. wait on each processes until it is confirmed dead
func (d *killer) kill(cgroup string, freeze func(), thaw func()) error {
// freeze the cgroup stopping further forking
freeze()
d.logger.Trace("search for pids in", "cgroup", cgroup)
// find all the pids we intend to kill
pids, err := cgroups.GetPids(cgroup)
if err != nil {
// if we fail to get pids, re-thaw before bailing so there is at least
// a chance the processes can go die out of band
thaw()
return fmt.Errorf("failed to find pids: %w", err)
}
d.logger.Trace("send sigkill to frozen processes", "cgroup", cgroup, "pids", pids)
var processes []*os.Process
// kill the processes in cgroup
for _, pid := range pids {
p, findErr := os.FindProcess(pid)
if findErr != nil {
d.logger.Trace("failed to find process of pid to kill", "pid", pid, "error", findErr)
continue
}
processes = append(processes, p)
if killErr := p.Kill(); killErr != nil {
d.logger.Trace("failed to kill process", "pid", pid, "error", killErr)
continue
}
}
// thawed the cgroup so we can wait on each process
thaw()
// wait on each process
for _, p := range processes {
// do not capture error; errors are normal here
pState, _ := p.Wait()
d.logger.Trace("return from wait on process", "pid", p.Pid, "state", pState)
}
// cgroups are not atomic, the OS takes a moment to un-mark the cgroup as in-use;
// a tiny sleep here goes a long way for not creating noisy (but functionally benign)
// errors about removing busy cgroup
//
// alternatively we could do the removal in a loop and silence the interim errors, but meh
time.Sleep(50 * time.Millisecond)
return nil
}

View File

@ -0,0 +1,13 @@
package resources
// A Containment will cleanup resources created by an executor.
type Containment interface {
// Apply enables containment on pid.
Apply(pid int) error
// Cleanup will purge executor resources like cgroups.
Cleanup() error
// GetPIDs will return the processes overseen by the Containment
GetPIDs() PIDs
}

View File

@ -0,0 +1,11 @@
//go:build !linux
package resources
type containment struct {
// non-linux executors currently do not create resources to be cleaned up
}
func (c *containment) Cleanup() error {
return nil
}

View File

@ -0,0 +1,107 @@
//go:build linux
package resources
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/cgroups/fs2"
"github.com/opencontainers/runc/libcontainer/configs"
)
type containment struct {
lock sync.RWMutex
cgroup *configs.Cgroup
logger hclog.Logger
}
func Contain(logger hclog.Logger, cgroup *configs.Cgroup) *containment {
return &containment{
cgroup: cgroup,
logger: logger.Named("containment"),
}
}
func (c *containment) Apply(pid int) error {
c.lock.Lock()
defer c.lock.Unlock()
c.logger.Trace("create containment for", "cgroup", c.cgroup, "pid", pid)
// for v2 use manager to create and enter the cgroup
if cgutil.UseV2 {
mgr, err := fs2.NewManager(c.cgroup, "", false)
if err != nil {
return fmt.Errorf("failed to create v2 cgroup manager for containment: %w", err)
}
// add the pid to the cgroup
if err = mgr.Apply(pid); err != nil {
return fmt.Errorf("failed to apply v2 cgroup containment: %w", err)
}
// in v2 it is important to set the device resource configuration
if err = mgr.Set(c.cgroup.Resources); err != nil {
return fmt.Errorf("failed to set v2 cgroup resources: %w", err)
}
return nil
}
// for v1 a random cgroup was created already; just enter it
if err := cgroups.EnterPid(c.cgroup.Paths, pid); err != nil {
return fmt.Errorf("failed to add pid to v1 cgroup: %w", err)
}
return nil
}
func (c *containment) Cleanup() error {
c.lock.Lock()
defer c.lock.Unlock()
// the current pid is of the executor, who manages the task process cleanup
executorPID := os.Getpid()
c.logger.Trace("cleanup on", "cgroup", c.cgroup, "executor_pid", executorPID)
// destroy the task processes
destroyer := cgutil.NewGroupKiller(c.logger, executorPID)
return destroyer.KillGroup(c.cgroup)
}
func (c *containment) GetPIDs() PIDs {
c.lock.Lock()
defer c.lock.Unlock()
m := make(PIDs)
if c.cgroup == nil {
return m
}
// get the cgroup path under containment
var path string
if cgutil.UseV2 {
path = filepath.Join(cgutil.CgroupRoot, c.cgroup.Path)
} else {
path = c.cgroup.Paths["freezer"]
}
// find the pids in the cgroup under containment
pids, err := cgroups.GetAllPids(path)
if err != nil {
c.logger.Debug("failed to get pids", "cgroup", c.cgroup, "error", err)
return m
}
for _, pid := range pids {
m[pid] = NewPID(pid)
}
return m
}

View File

@ -0,0 +1,25 @@
package resources
import (
"github.com/hashicorp/nomad/client/stats"
)
// PIDs holds all of a task's pids and their cpu percentage calculators
type PIDs map[int]*PID
// PID holds one task's pid and it's cpu percentage calculator
type PID struct {
PID int
StatsTotalCPU *stats.CpuStats
StatsUserCPU *stats.CpuStats
StatsSysCPU *stats.CpuStats
}
func NewPID(pid int) *PID {
return &PID{
PID: pid,
StatsTotalCPU: stats.NewCpuStats(),
StatsUserCPU: stats.NewCpuStats(),
StatsSysCPU: stats.NewCpuStats(),
}
}

View File

@ -67,6 +67,9 @@ const (
// Datacenter is the environment variable for passing the datacenter in which the alloc is running.
Datacenter = "NOMAD_DC"
// CgroupParent is the environment variable for passing the cgroup parent in which cgroups are made.
CgroupParent = "NOMAD_PARENT_CGROUP"
// Namespace is the environment variable for passing the namespace in which the alloc is running.
Namespace = "NOMAD_NAMESPACE"
@ -400,6 +403,7 @@ type Builder struct {
taskName string
allocIndex int
datacenter string
cgroupParent string
namespace string
region string
allocId string
@ -518,6 +522,9 @@ func (b *Builder) buildEnv(allocDir, localDir, secretsDir string,
if b.datacenter != "" {
envMap[Datacenter] = b.datacenter
}
if b.cgroupParent != "" {
envMap[CgroupParent] = b.cgroupParent
}
if b.namespace != "" {
envMap[Namespace] = b.namespace
}
@ -802,6 +809,7 @@ func (b *Builder) setNode(n *structs.Node) *Builder {
b.nodeAttrs[nodeClassKey] = n.NodeClass
b.nodeAttrs[nodeDcKey] = n.Datacenter
b.datacenter = n.Datacenter
b.cgroupParent = n.CgroupParent
// Set up the attributes.
for k, v := range n.Attributes {

View File

@ -250,6 +250,7 @@ func TestEnvironment_AllValues(t *testing.T) {
"nested.meta.key": "a",
"invalid...metakey": "b",
}
n.CgroupParent = "abc.slice"
a := mock.ConnectAlloc()
a.Job.ParentID = fmt.Sprintf("mock-parent-service-%s", uuid.Generate())
a.AllocatedResources.Tasks["web"].Networks[0] = &structs.NetworkResource{
@ -378,6 +379,7 @@ func TestEnvironment_AllValues(t *testing.T) {
"NOMAD_PORT_ssh_ssh": "22",
"NOMAD_CPU_LIMIT": "500",
"NOMAD_DC": "dc1",
"NOMAD_PARENT_CGROUP": "abc.slice",
"NOMAD_NAMESPACE": "default",
"NOMAD_REGION": "global",
"NOMAD_MEMORY_LIMIT": "256",

View File

@ -21,6 +21,7 @@ import (
uuidparse "github.com/hashicorp/go-uuid"
"github.com/hashicorp/nomad/client"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/command/agent/event"
@ -694,7 +695,7 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) {
}
conf.BindWildcardDefaultHostNetwork = agentConfig.Client.BindWildcardDefaultHostNetwork
conf.CgroupParent = agentConfig.Client.CgroupParent
conf.CgroupParent = cgutil.GetCgroupParent(agentConfig.Client.CgroupParent)
if agentConfig.Client.ReserveableCores != "" {
cores, err := cpuset.Parse(agentConfig.Client.ReserveableCores)
if err != nil {

View File

@ -787,6 +787,11 @@ func TestDocker_ExecTaskStreaming(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)
// todo(shoenig) these fail maybe 50% of the time on GHA, and the test cases
// are tricky to follow all the way through - maybe a decent candidate for
// a generics re-write.
ci.SkipSlow(t, "flaky on GHA; #12358")
taskCfg := newTaskConfig("", []string{"/bin/sleep", "1000"})
task := &drivers.TaskConfig{
ID: uuid.Generate(),

View File

@ -122,7 +122,9 @@ func TestExecDriver_StartWait(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewExecDriver(ctx, testlog.HCLogger(t))
logger := testlog.HCLogger(t)
d := NewExecDriver(ctx, logger)
harness := dtestutil.NewDriverHarness(t, d)
allocID := uuid.Generate()
task := &drivers.TaskConfig{
@ -793,6 +795,7 @@ func TestExecDriver_NoPivotRoot(t *testing.T) {
handle, _, err := harness.StartTask(task)
require.NoError(t, err)
require.NotNil(t, handle)
require.NoError(t, harness.DestroyTask(task.ID, true))
}
func TestDriver_Config_validate(t *testing.T) {

View File

@ -11,7 +11,7 @@ import (
"time"
"github.com/hashicorp/consul-template/signals"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/helper/pluginutils/loader"
@ -323,9 +323,8 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
LogLevel: "debug",
}
exec, pluginClient, err := executor.CreateExecutor(
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),
d.nomadConfig, executorConfig)
logger := d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)
exec, pluginClient, err := executor.CreateExecutor(logger, d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
}
@ -372,7 +371,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
if err := handle.SetDriverState(&driverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err)
exec.Shutdown("", 0)
_ = exec.Shutdown("", 0)
pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
}

View File

@ -14,6 +14,7 @@ import (
"time"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/lib/cgutil"
ctestutil "github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
"github.com/hashicorp/nomad/helper/testlog"
@ -27,6 +28,18 @@ import (
"github.com/stretchr/testify/require"
)
// defaultEnv creates the default environment for raw exec tasks
func defaultEnv() map[string]string {
m := make(map[string]string)
if cgutil.UseV2 {
// normally the taskenv.Builder will set this automatically from the
// Node object which gets created via Client configuration, but none of
// that exists in the test harness so just set it here.
m["NOMAD_PARENT_CGROUP"] = "nomad.slice"
}
return m
}
func TestMain(m *testing.M) {
if !testtask.Run() {
os.Exit(m.Run())
@ -35,10 +48,12 @@ func TestMain(m *testing.M) {
func newEnabledRawExecDriver(t *testing.T) *Driver {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() { cancel() })
t.Cleanup(cancel)
d := NewRawExecDriver(ctx, testlog.HCLogger(t)).(*Driver)
logger := testlog.HCLogger(t)
d := NewRawExecDriver(ctx, logger).(*Driver)
d.config.Enabled = true
return d
}
@ -49,21 +64,25 @@ func TestRawExecDriver_SetConfig(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := NewRawExecDriver(ctx, testlog.HCLogger(t))
logger := testlog.HCLogger(t)
d := NewRawExecDriver(ctx, logger)
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()
bconfig := &basePlug.Config{}
var (
bconfig = new(basePlug.Config)
config = new(Config)
data = make([]byte, 0)
)
// Disable raw exec.
config := &Config{}
var data []byte
// Default is raw_exec is disabled.
require.NoError(basePlug.MsgPackEncode(&data, config))
bconfig.PluginConfig = data
require.NoError(harness.SetConfig(bconfig))
require.Exactly(config, d.(*Driver).config)
// Enable raw_exec, but disable cgroups.
config.Enabled = true
config.NoCgroups = true
data = []byte{}
@ -72,6 +91,7 @@ func TestRawExecDriver_SetConfig(t *testing.T) {
require.NoError(harness.SetConfig(bconfig))
require.Exactly(config, d.(*Driver).config)
// Enable raw_exec, enable cgroups.
config.NoCgroups = false
data = []byte{}
require.NoError(basePlug.MsgPackEncode(&data, config))
@ -150,8 +170,10 @@ func TestRawExecDriver_StartWait(t *testing.T) {
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
AllocID: uuid.Generate(),
ID: uuid.Generate(),
Name: "test",
Env: defaultEnv(),
}
tc := &TaskConfig{
@ -200,8 +222,10 @@ func TestRawExecDriver_StartWaitRecoverWaitStop(t *testing.T) {
require.NoError(harness.SetConfig(bconfig))
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
AllocID: uuid.Generate(),
ID: uuid.Generate(),
Name: "sleep",
Env: defaultEnv(),
}
tc := &TaskConfig{
Command: testtask.Path(),
@ -276,8 +300,10 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) {
defer harness.Kill()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
AllocID: uuid.Generate(),
ID: uuid.Generate(),
Name: "sleep",
Env: defaultEnv(),
}
cleanup := harness.MkAllocDir(task, false)
@ -323,7 +349,6 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) {
func TestRawExecDriver_Start_Kill_Wait_Cgroup(t *testing.T) {
ci.Parallel(t)
ctestutil.ExecCompatible(t)
ctestutil.CgroupsCompatibleV1(t) // todo(shoenig) #12348
require := require.New(t)
pidFile := "pid"
@ -333,9 +358,11 @@ func TestRawExecDriver_Start_Kill_Wait_Cgroup(t *testing.T) {
defer harness.Kill()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
User: "root",
AllocID: uuid.Generate(),
ID: uuid.Generate(),
Name: "sleep",
User: "root",
Env: defaultEnv(),
}
cleanup := harness.MkAllocDir(task, false)
@ -414,10 +441,56 @@ func TestRawExecDriver_Start_Kill_Wait_Cgroup(t *testing.T) {
require.NoError(harness.DestroyTask(task.ID, true))
}
func TestRawExecDriver_ParentCgroup(t *testing.T) {
ci.Parallel(t)
ctestutil.ExecCompatible(t)
ctestutil.CgroupsCompatibleV2(t)
d := newEnabledRawExecDriver(t)
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()
task := &drivers.TaskConfig{
AllocID: uuid.Generate(),
ID: uuid.Generate(),
Name: "sleep",
Env: map[string]string{
"NOMAD_PARENT_CGROUP": "custom.slice",
},
}
cleanup := harness.MkAllocDir(task, false)
defer cleanup()
// run sleep task
tc := &TaskConfig{
Command: testtask.Path(),
Args: []string{"sleep", "9000s"},
}
require.NoError(t, task.EncodeConcreteDriverConfig(&tc))
testtask.SetTaskConfigEnv(task)
_, _, err := harness.StartTask(task)
require.NoError(t, err)
// inspect environment variable
res, execErr := harness.ExecTask(task.ID, []string{"/usr/bin/env"}, 1*time.Second)
require.NoError(t, execErr)
require.True(t, res.ExitResult.Successful())
require.Contains(t, string(res.Stdout), "custom.slice")
// inspect /proc/self/cgroup
res2, execErr2 := harness.ExecTask(task.ID, []string{"cat", "/proc/self/cgroup"}, 1*time.Second)
require.NoError(t, execErr2)
require.True(t, res2.ExitResult.Successful())
require.Contains(t, string(res2.Stdout), "custom.slice")
// kill the sleep task
require.NoError(t, harness.DestroyTask(task.ID, true))
}
func TestRawExecDriver_Exec(t *testing.T) {
ci.Parallel(t)
ctestutil.ExecCompatible(t)
ctestutil.CgroupsCompatibleV1(t) // todo(shoenig) #12348
require := require.New(t)
@ -426,8 +499,10 @@ func TestRawExecDriver_Exec(t *testing.T) {
defer harness.Kill()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
AllocID: uuid.Generate(),
ID: uuid.Generate(),
Name: "sleep",
Env: defaultEnv(),
}
cleanup := harness.MkAllocDir(task, false)
@ -502,8 +577,10 @@ func TestRawExecDriver_Disabled(t *testing.T) {
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
AllocID: uuid.Generate(),
ID: uuid.Generate(),
Name: "test",
Env: defaultEnv(),
}
handle, _, err := harness.StartTask(task)

View File

@ -17,11 +17,9 @@ import (
"time"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/lib/cgutil"
clienttestutil "github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
basePlug "github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils"
@ -62,17 +60,18 @@ func TestRawExecDriver_User(t *testing.T) {
func TestRawExecDriver_Signal(t *testing.T) {
ci.Parallel(t)
if runtime.GOOS != "linux" {
t.Skip("Linux only test")
}
clienttestutil.RequireLinux(t)
require := require.New(t)
d := newEnabledRawExecDriver(t)
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "signal",
AllocID: uuid.Generate(),
ID: uuid.Generate(),
Name: "signal",
Env: defaultEnv(),
}
cleanup := harness.MkAllocDir(task, true)
@ -206,24 +205,16 @@ func TestRawExecDriver_StartWaitStop(t *testing.T) {
func TestRawExecDriver_DestroyKillsAll(t *testing.T) {
ci.Parallel(t)
clienttestutil.RequireLinux(t)
clienttestutil.CgroupsCompatibleV1(t) // todo(shoenig): #12348
d := newEnabledRawExecDriver(t)
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
}
if cgutil.UseV2 {
allocID := uuid.Generate()
task.AllocID = allocID
task.Resources = new(drivers.Resources)
task.Resources.NomadResources = new(structs.AllocatedTaskResources)
task.Resources.LinuxResources = new(drivers.LinuxResources)
task.Resources.LinuxResources.CpusetCgroupPath = filepath.Join(cgutil.CgroupRoot, "testing.slice", cgutil.CgroupScope(allocID, "test"))
AllocID: uuid.Generate(),
ID: uuid.Generate(),
Name: "test",
Env: defaultEnv(),
}
cleanup := harness.MkAllocDir(task, true)
@ -322,8 +313,10 @@ func TestRawExec_ExecTaskStreaming(t *testing.T) {
defer harness.Kill()
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
AllocID: uuid.Generate(),
ID: uuid.Generate(),
Name: "sleep",
Env: defaultEnv(),
}
cleanup := harness.MkAllocDir(task, false)
@ -349,10 +342,21 @@ func TestRawExec_ExecTaskStreaming_User(t *testing.T) {
clienttestutil.RequireLinux(t)
d := newEnabledRawExecDriver(t)
// because we cannot set AllocID, see below
d.config.NoCgroups = true
harness := dtestutil.NewDriverHarness(t, d)
defer harness.Kill()
task := &drivers.TaskConfig{
// todo(shoenig) - Setting AllocID causes test to fail - with or without
// cgroups, and with or without chroot. It has to do with MkAllocDir
// creating the directory structure, but the actual root cause is still
// TBD. The symptom is that any command you try to execute will result
// in "permission denied" coming from os/exec. This test is imperfect,
// the actual feature of running commands as another user works fine.
// AllocID: uuid.Generate()
ID: uuid.Generate(),
Name: "sleep",
User: "nobody",
@ -394,8 +398,9 @@ func TestRawExecDriver_NoCgroup(t *testing.T) {
harness := dtestutil.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "nocgroup",
AllocID: uuid.Generate(),
ID: uuid.Generate(),
Name: "nocgroup",
}
cleanup := harness.MkAllocDir(task, true)

View File

@ -20,12 +20,12 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/lib/fifo"
"github.com/hashicorp/nomad/client/lib/resources"
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs"
shelpers "github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/syndtr/gocapability/capability"
shelpers "github.com/hashicorp/nomad/helper/stats"
)
const (
@ -244,9 +244,9 @@ type UniversalExecutor struct {
exitState *ProcessState
processExited chan interface{}
// resConCtx is used to track and cleanup additional resources created by
// the executor. Currently this is only used for cgroups.
resConCtx resourceContainerContext
// containment is used to cleanup resources created by the executor
// currently only used for killing pids via freezer cgroup on linux
containment resources.Containment
totalCpuStats *stats.CpuStats
userCpuStats *stats.CpuStats
@ -262,6 +262,7 @@ func NewExecutor(logger hclog.Logger) Executor {
if err := shelpers.Init(); err != nil {
logger.Error("unable to initialize stats", "error", err)
}
return &UniversalExecutor{
logger: logger,
processExited: make(chan interface{}),
@ -300,10 +301,11 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error)
return nil, err
}
// Setup cgroups on linux
// Maybe setup containment (for now, cgroups only only on linux)
if e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup {
pid := os.Getpid()
if err := e.configureResourceContainer(pid); err != nil {
e.logger.Error("failed to configure resource container", "pid", pid, "error", err)
return nil, err
}
}
@ -519,14 +521,14 @@ func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error {
// If there is no process we can't shutdown
if e.childCmd.Process == nil {
e.logger.Warn("failed to shutdown", "error", "no process found")
e.logger.Warn("failed to shutdown due to missing process", "error", "no process found")
return fmt.Errorf("executor failed to shutdown error: no process found")
}
proc, err := os.FindProcess(e.childCmd.Process.Pid)
if err != nil {
err = fmt.Errorf("executor failed to find process: %v", err)
e.logger.Warn("failed to shutdown", "error", err)
e.logger.Warn("failed to shutdown due to inability to find process", "pid", e.childCmd.Process.Pid, "error", err)
return err
}
@ -545,7 +547,7 @@ func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error {
}
if err := e.shutdownProcess(sig, proc); err != nil {
e.logger.Warn("failed to shutdown", "error", err)
e.logger.Warn("failed to shutdown process", "pid", proc.Pid, "error", err)
return err
}
@ -566,22 +568,27 @@ func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error {
merr.Errors = append(merr.Errors, fmt.Errorf("process did not exit after 15 seconds"))
}
// Prefer killing the process via the resource container.
if !(e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup) {
if err := e.cleanupChildProcesses(proc); err != nil && err.Error() != finishedErr {
// prefer killing the process via platform-dependent resource containment
killByContainment := e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup
if !killByContainment {
// there is no containment, so kill the group the old fashioned way by sending
// SIGKILL to the negative pid
if cleanupChildrenErr := e.killProcessTree(proc); cleanupChildrenErr != nil && cleanupChildrenErr.Error() != finishedErr {
merr.Errors = append(merr.Errors,
fmt.Errorf("can't kill process with pid %d: %v", e.childCmd.Process.Pid, err))
fmt.Errorf("can't kill process with pid %d: %v", e.childCmd.Process.Pid, cleanupChildrenErr))
}
} else {
// there is containment available (e.g. cgroups) so defer to that implementation
// for killing the processes
if cleanupErr := e.containment.Cleanup(); cleanupErr != nil {
e.logger.Warn("containment cleanup failed", "error", cleanupErr)
merr.Errors = append(merr.Errors, cleanupErr)
}
}
if e.commandCfg.ResourceLimits || e.commandCfg.BasicProcessCgroup {
if err := e.resConCtx.executorCleanup(); err != nil {
merr.Errors = append(merr.Errors, err)
}
}
if err := merr.ErrorOrNil(); err != nil {
e.logger.Warn("failed to shutdown", "error", err)
if err = merr.ErrorOrNil(); err != nil {
e.logger.Warn("failed to shutdown due to some error", "error", err.Error())
return err
}

View File

@ -1,5 +1,4 @@
//go:build !linux
// +build !linux
package executor
@ -7,6 +6,7 @@ import (
"os/exec"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/resources"
"github.com/hashicorp/nomad/plugins/drivers"
)
@ -18,7 +18,7 @@ func NewExecutorWithIsolation(logger hclog.Logger) Executor {
func (e *UniversalExecutor) configureResourceContainer(_ int) error { return nil }
func (e *UniversalExecutor) getAllPids() (map[int]*nomadPid, error) {
func (e *UniversalExecutor) getAllPids() (resources.PIDs, error) {
return getAllPidsByScanning()
}

View File

@ -20,6 +20,7 @@ import (
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/lib/resources"
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/capabilities"
@ -200,21 +201,16 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
}, nil
}
func (l *LibcontainerExecutor) getAllPids() (map[int]*nomadPid, error) {
func (l *LibcontainerExecutor) getAllPids() (resources.PIDs, error) {
pids, err := l.container.Processes()
if err != nil {
return nil, err
}
nPids := make(map[int]*nomadPid)
m := make(resources.PIDs, 1)
for _, pid := range pids {
nPids[pid] = &nomadPid{
pid: pid,
cpuStatsTotal: stats.NewCpuStats(),
cpuStatsUser: stats.NewCpuStats(),
cpuStatsSys: stats.NewCpuStats(),
}
m[pid] = resources.NewPID(pid)
}
return nPids, nil
return m, nil
}
// Wait waits until a process has exited and returns it's exitcode and errors

View File

@ -15,7 +15,7 @@ import (
"testing"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/lib/cgutil"

View File

@ -2,19 +2,19 @@ package executor
import (
"fmt"
"os"
"os/exec"
"os/user"
"path/filepath"
"strconv"
"strings"
"syscall"
"github.com/containernetworking/plugins/pkg/ns"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/lib/resources"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/opencontainers/runc/libcontainer/cgroups"
cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
lconfigs "github.com/opencontainers/runc/libcontainer/configs"
"github.com/opencontainers/runc/libcontainer/configs"
"github.com/opencontainers/runc/libcontainer/specconv"
)
@ -23,20 +23,20 @@ import (
func setCmdUser(cmd *exec.Cmd, userid string) error {
u, err := user.Lookup(userid)
if err != nil {
return fmt.Errorf("Failed to identify user %v: %v", userid, err)
return fmt.Errorf("failed to identify user %v: %v", userid, err)
}
// Get the groups the user is a part of
gidStrings, err := u.GroupIds()
if err != nil {
return fmt.Errorf("Unable to lookup user's group membership: %v", err)
return fmt.Errorf("unable to lookup user's group membership: %v", err)
}
gids := make([]uint32, len(gidStrings))
for _, gidString := range gidStrings {
u, err := strconv.ParseUint(gidString, 10, 32)
if err != nil {
return fmt.Errorf("Unable to convert user's group to uint32 %s: %v", gidString, err)
return fmt.Errorf("unable to convert user's group to uint32 %s: %v", gidString, err)
}
gids = append(gids, uint32(u))
@ -45,11 +45,11 @@ func setCmdUser(cmd *exec.Cmd, userid string) error {
// Convert the uid and gid
uid, err := strconv.ParseUint(u.Uid, 10, 32)
if err != nil {
return fmt.Errorf("Unable to convert userid to uint32: %s", err)
return fmt.Errorf("unable to convert userid to uint32: %s", err)
}
gid, err := strconv.ParseUint(u.Gid, 10, 32)
if err != nil {
return fmt.Errorf("Unable to convert groupid to uint32: %s", err)
return fmt.Errorf("unable to convert groupid to uint32: %s", err)
}
// Set the command to run as that user and group.
@ -69,125 +69,86 @@ func setCmdUser(cmd *exec.Cmd, userid string) error {
// configureResourceContainer configured the cgroups to be used to track pids
// created by the executor
func (e *UniversalExecutor) configureResourceContainer(pid int) error {
cfg := &lconfigs.Config{
Cgroups: &lconfigs.Cgroup{
Resources: &lconfigs.Resources{},
cfg := &configs.Config{
Cgroups: &configs.Cgroup{
Resources: &configs.Resources{},
},
}
// note: this was always here, but not used until cgroups v2 support
for _, device := range specconv.AllowedDevices {
cfg.Cgroups.Resources.Devices = append(cfg.Cgroups.Resources.Devices, &device.Rule)
}
if err := cgutil.ConfigureBasicCgroups(cfg); err != nil {
// Log this error to help diagnose cases where nomad is run with too few
// permissions, but don't return an error. There is no separate check for
// cgroup creation permissions, so this may be the happy path.
e.logger.Warn("failed to create cgroup",
"docs", "https://www.nomadproject.io/docs/drivers/raw_exec.html#no_cgroups",
"error", err)
return nil
lookup := func(env []string, name string) (result string) {
for _, s := range env {
if strings.HasPrefix(s, name+"=") {
result = strings.TrimLeft(s, name+"=")
return
}
}
return
}
e.resConCtx.groups = cfg.Cgroups
return cgroups.EnterPid(cfg.Cgroups.Paths, pid)
}
func (e *UniversalExecutor) getAllPids() (map[int]*nomadPid, error) {
if e.resConCtx.isEmpty() {
return getAllPidsByScanning()
if cgutil.UseV2 {
// in v2 we have the definitive cgroup; create and enter it
// use the task environment variables for determining the cgroup path -
// not ideal but plumbing the values directly requires grpc protobuf changes
parent := lookup(e.commandCfg.Env, taskenv.CgroupParent)
allocID := lookup(e.commandCfg.Env, taskenv.AllocID)
task := lookup(e.commandCfg.Env, taskenv.TaskName)
if parent == "" || allocID == "" || task == "" {
return fmt.Errorf(
"environment variables %s must be set",
strings.Join([]string{taskenv.CgroupParent, taskenv.AllocID, taskenv.TaskName}, ","),
)
}
scope := cgutil.CgroupScope(allocID, task)
path := filepath.Join("/", cgutil.GetCgroupParent(parent), scope)
cfg.Cgroups.Path = path
e.containment = resources.Contain(e.logger, cfg.Cgroups)
return e.containment.Apply(pid)
} else {
return e.resConCtx.getAllPidsByCgroup()
// in v1 create a freezer cgroup for use by containment
if err := cgutil.ConfigureBasicCgroups(cfg); err != nil {
// Log this error to help diagnose cases where nomad is run with too few
// permissions, but don't return an error. There is no separate check for
// cgroup creation permissions, so this may be the happy path.
e.logger.Warn("failed to create cgroup",
"docs", "https://www.nomadproject.io/docs/drivers/raw_exec.html#no_cgroups",
"error", err)
return nil
}
path := cfg.Cgroups.Paths["freezer"]
e.logger.Trace("cgroup created, now need to apply", "path", path)
e.containment = resources.Contain(e.logger, cfg.Cgroups)
return e.containment.Apply(pid)
}
}
// DestroyCgroup kills all processes in the cgroup and removes the cgroup
// configuration from the host. This function is idempotent.
func DestroyCgroup(groups *lconfigs.Cgroup, executorPid int) error {
mErrs := new(multierror.Error)
if groups == nil {
return fmt.Errorf("Can't destroy: cgroup configuration empty")
func (e *UniversalExecutor) getAllPids() (resources.PIDs, error) {
if e.containment == nil {
return getAllPidsByScanning()
}
// Move the executor into the global cgroup so that the task specific
// cgroup can be destroyed.
path, err := cgroups.GetInitCgroupPath("freezer")
if err != nil {
return err
}
if err := cgroups.EnterPid(map[string]string{"freezer": path}, executorPid); err != nil {
return err
}
// Freeze the Cgroup so that it can not continue to fork/exec.
groups.Resources.Freezer = lconfigs.Frozen
freezer := cgroupFs.FreezerGroup{}
if err := freezer.Set(groups.Paths[freezer.Name()], groups.Resources); err != nil {
return err
}
var procs []*os.Process
pids, err := cgroups.GetAllPids(groups.Paths[freezer.Name()])
if err != nil {
multierror.Append(mErrs, fmt.Errorf("error getting pids: %v", err))
// Unfreeze the cgroup.
groups.Resources.Freezer = lconfigs.Thawed
freezer := cgroupFs.FreezerGroup{}
if err := freezer.Set(groups.Paths[freezer.Name()], groups.Resources); err != nil {
multierror.Append(mErrs, fmt.Errorf("failed to unfreeze cgroup: %v", err))
return mErrs.ErrorOrNil()
}
}
// Kill the processes in the cgroup
for _, pid := range pids {
proc, err := os.FindProcess(pid)
if err != nil {
multierror.Append(mErrs, fmt.Errorf("error finding process %v: %v", pid, err))
continue
}
procs = append(procs, proc)
if e := proc.Kill(); e != nil {
multierror.Append(mErrs, fmt.Errorf("error killing process %v: %v", pid, e))
}
}
// Unfreeze the cgroug so we can wait.
groups.Resources.Freezer = lconfigs.Thawed
if err := freezer.Set(groups.Paths[freezer.Name()], groups.Resources); err != nil {
multierror.Append(mErrs, fmt.Errorf("failed to unfreeze cgroup: %v", err))
return mErrs.ErrorOrNil()
}
// Wait on the killed processes to ensure they are cleaned up.
for _, proc := range procs {
// Don't capture the error because we expect this to fail for
// processes we didn't fork.
proc.Wait()
}
// Remove the cgroup.
if err := cgroups.RemovePaths(groups.Paths); err != nil {
multierror.Append(mErrs, fmt.Errorf("failed to delete the cgroup directories: %v", err))
}
return mErrs.ErrorOrNil()
return e.containment.GetPIDs(), nil
}
// withNetworkIsolation calls the passed function the network namespace `spec`
func withNetworkIsolation(f func() error, spec *drivers.NetworkIsolationSpec) error {
if spec != nil && spec.Path != "" {
// Get a handle to the target network namespace
netns, err := ns.GetNS(spec.Path)
netNS, err := ns.GetNS(spec.Path)
if err != nil {
return err
}
// Start the container in the network namespace
return netns.Do(func(ns.NetNS) error {
return netNS.Do(func(ns.NetNS) error {
return f()
})
}
return f()
}

View File

@ -1,5 +1,4 @@
//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris
// +build darwin dragonfly freebsd linux netbsd openbsd solaris
package executor
@ -18,17 +17,22 @@ func (e *UniversalExecutor) setNewProcessGroup() error {
return nil
}
// Cleanup any still hanging user processes
func (e *UniversalExecutor) cleanupChildProcesses(proc *os.Process) error {
// SIGKILL the process group starting at process.Pid
func (e *UniversalExecutor) killProcessTree(process *os.Process) error {
pid := process.Pid
negative := -pid // tells unix to kill entire process group
signal := syscall.SIGKILL
// If new process group was created upon command execution
// we can kill the whole process group now to cleanup any leftovers.
if e.childCmd.SysProcAttr != nil && e.childCmd.SysProcAttr.Setpgid {
if err := syscall.Kill(-proc.Pid, syscall.SIGKILL); err != nil && err.Error() != noSuchProcessErr {
e.logger.Trace("sending sigkill to process group", "pid", pid, "negative", negative, "signal", signal)
if err := syscall.Kill(negative, signal); err != nil && err.Error() != noSuchProcessErr {
return err
}
return nil
}
return proc.Kill()
return process.Kill()
}
// Only send the process a shutdown signal (default INT), doesn't

View File

@ -1,3 +1,5 @@
//go:build windows
package executor
import (
@ -17,7 +19,7 @@ func (e *UniversalExecutor) setNewProcessGroup() error {
}
// Cleanup any still hanging user processes
func (e *UniversalExecutor) cleanupChildProcesses(proc *os.Process) error {
func (e *UniversalExecutor) killProcessTree(proc *os.Process) error {
// We must first verify if the process is still running.
// (Windows process often lingered around after being reported as killed).
handle, err := syscall.OpenProcess(syscall.PROCESS_TERMINATE|syscall.SYNCHRONIZE|syscall.PROCESS_QUERY_INFORMATION, false, uint32(proc.Pid))

View File

@ -7,6 +7,7 @@ import (
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/resources"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/plugins/drivers"
ps "github.com/mitchellh/go-ps"
@ -23,26 +24,18 @@ var (
// pidCollector is a utility that can be embedded in an executor to collect pid
// stats
type pidCollector struct {
pids map[int]*nomadPid
pids map[int]*resources.PID
pidLock sync.RWMutex
logger hclog.Logger
}
// nomadPid holds a pid and it's cpu percentage calculator
type nomadPid struct {
pid int
cpuStatsTotal *stats.CpuStats
cpuStatsUser *stats.CpuStats
cpuStatsSys *stats.CpuStats
}
// allPidGetter is a func which is used by the pid collector to gather
// stats on
type allPidGetter func() (map[int]*nomadPid, error)
type allPidGetter func() (resources.PIDs, error)
func newPidCollector(logger hclog.Logger) *pidCollector {
return &pidCollector{
pids: make(map[int]*nomadPid),
pids: make(map[int]*resources.PID),
logger: logger.Named("pid_collector"),
}
}
@ -85,7 +78,7 @@ func (c *pidCollector) collectPids(stopCh chan interface{}, pidGetter allPidGett
// scanPids scans all the pids on the machine running the current executor and
// returns the child processes of the executor.
func scanPids(parentPid int, allPids []ps.Process) (map[int]*nomadPid, error) {
func scanPids(parentPid int, allPids []ps.Process) (map[int]*resources.PID, error) {
processFamily := make(map[int]struct{})
processFamily[parentPid] = struct{}{}
@ -117,15 +110,14 @@ func scanPids(parentPid int, allPids []ps.Process) (map[int]*nomadPid, error) {
}
}
res := make(map[int]*nomadPid)
res := make(map[int]*resources.PID)
for pid := range processFamily {
np := nomadPid{
pid: pid,
cpuStatsTotal: stats.NewCpuStats(),
cpuStatsUser: stats.NewCpuStats(),
cpuStatsSys: stats.NewCpuStats(),
res[pid] = &resources.PID{
PID: pid,
StatsTotalCPU: stats.NewCpuStats(),
StatsUserCPU: stats.NewCpuStats(),
StatsSysCPU: stats.NewCpuStats(),
}
res[pid] = &np
}
return res, nil
}
@ -134,7 +126,7 @@ func scanPids(parentPid int, allPids []ps.Process) (map[int]*nomadPid, error) {
func (c *pidCollector) pidStats() (map[string]*drivers.ResourceUsage, error) {
stats := make(map[string]*drivers.ResourceUsage)
c.pidLock.RLock()
pids := make(map[int]*nomadPid, len(c.pids))
pids := make(map[int]*resources.PID, len(c.pids))
for k, v := range c.pids {
pids[k] = v
}
@ -154,12 +146,12 @@ func (c *pidCollector) pidStats() (map[string]*drivers.ResourceUsage, error) {
cs := &drivers.CpuStats{}
if cpuStats, err := p.Times(); err == nil {
cs.SystemMode = np.cpuStatsSys.Percent(cpuStats.System * float64(time.Second))
cs.UserMode = np.cpuStatsUser.Percent(cpuStats.User * float64(time.Second))
cs.SystemMode = np.StatsSysCPU.Percent(cpuStats.System * float64(time.Second))
cs.UserMode = np.StatsUserCPU.Percent(cpuStats.User * float64(time.Second))
cs.Measured = ExecutorBasicMeasuredCpuStats
// calculate cpu usage percent
cs.Percent = np.cpuStatsTotal.Percent(cpuStats.Total() * float64(time.Second))
cs.Percent = np.StatsTotalCPU.Percent(cpuStats.Total() * float64(time.Second))
}
stats[strconv.Itoa(pid)] = &drivers.ResourceUsage{MemoryStats: ms, CpuStats: cs}
}
@ -210,7 +202,7 @@ func aggregatedResourceUsage(systemCpuStats *stats.CpuStats, pidStats map[string
}
}
func getAllPidsByScanning() (map[int]*nomadPid, error) {
func getAllPidsByScanning() (resources.PIDs, error) {
allProcesses, err := ps.Processes()
if err != nil {
return nil, err

View File

@ -1,5 +1,4 @@
//go:build darwin || dragonfly || freebsd || netbsd || openbsd || solaris || windows
// +build darwin dragonfly freebsd netbsd openbsd solaris windows
//go:build !linux
package executor

View File

@ -1,63 +0,0 @@
package executor
import (
"os"
"sync"
"github.com/hashicorp/nomad/client/stats"
"github.com/opencontainers/runc/libcontainer/cgroups"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)
// resourceContainerContext is a platform-specific struct for managing a
// resource container. In the case of Linux, this is used to control Cgroups.
type resourceContainerContext struct {
groups *cgroupConfig.Cgroup
cgLock sync.Mutex
}
// cleanup removes this host's Cgroup from within an Executor's context
func (rc *resourceContainerContext) executorCleanup() error {
rc.cgLock.Lock()
defer rc.cgLock.Unlock()
if err := DestroyCgroup(rc.groups, os.Getpid()); err != nil {
return err
}
return nil
}
func (rc *resourceContainerContext) isEmpty() bool {
return rc.groups == nil
}
// todo(shoenig) cgroups.v2 #12351
func (rc *resourceContainerContext) getAllPidsByCgroup() (map[int]*nomadPid, error) {
nPids := map[int]*nomadPid{}
if rc.groups == nil {
return nPids, nil
}
var path string
if p, ok := rc.groups.Paths["freezer"]; ok {
path = p
} else {
path = rc.groups.Path
}
pids, err := cgroups.GetAllPids(path)
if err != nil {
return nPids, err
}
for _, pid := range pids {
nPids[pid] = &nomadPid{
pid: pid,
cpuStatsTotal: stats.NewCpuStats(),
cpuStatsUser: stats.NewCpuStats(),
cpuStatsSys: stats.NewCpuStats(),
}
}
return nPids, nil
}

View File

@ -14,10 +14,9 @@ import (
// processes along side of a task. By early importing them we can avoid
// additional code being imported and thus reserving memory
_ "github.com/hashicorp/nomad/client/logmon"
"github.com/hashicorp/nomad/command"
_ "github.com/hashicorp/nomad/drivers/docker/docklog"
_ "github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/command"
"github.com/hashicorp/nomad/version"
"github.com/mitchellh/cli"
"github.com/sean-/seed"

View File

@ -2244,7 +2244,7 @@ func TestCoreScheduler_CSIPluginGC(t *testing.T) {
}
func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
ci.Parallel(t)
ci.SkipSlow(t, "flaky on GHA; #12358")
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue

View File

@ -1186,6 +1186,7 @@ func leaderElectionTest(t *testing.T, raftProtocol raft.ProtocolVersion) {
func TestLeader_RollRaftServer(t *testing.T) {
ci.Parallel(t)
ci.SkipSlow(t, "flaky on GHA; #12358")
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 2
@ -1389,7 +1390,7 @@ func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) {
// (and unpaused) upon leader elections (and step downs).
func TestLeader_PausingWorkers(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 12
})

View File

@ -1864,6 +1864,9 @@ type Node struct {
// Node name
Name string
// CgroupParent for this node (linux only)
CgroupParent string
// HTTPAddr is the address on which the Nomad client is listening for http
// requests
HTTPAddr string

View File

@ -43,7 +43,6 @@ func (h *DriverHarness) Impl() drivers.DriverPlugin {
}
func NewDriverHarness(t testing.T, d drivers.DriverPlugin) *DriverHarness {
logger := testlog.HCLogger(t).Named("driver_harness")
pd := drivers.NewDriverPlugin(d, logger)
client, server := plugin.TestPluginGRPCConn(t,
@ -68,18 +67,26 @@ func NewDriverHarness(t testing.T, d drivers.DriverPlugin) *DriverHarness {
}
}
// setCgroup creates a v2 cgroup for the task, as if a Client were initialized
// and managing the cgroup as it normally would.
// setupCgroupV2 creates a v2 cgroup for the task, as if a Client were initialized
// and managing the cgroup as it normally would via the cpuset manager.
//
// Uses testing.slice as a parent.
func (h *DriverHarness) setCgroup(allocID, task string) {
// Note that we are being lazy and trying to avoid importing cgutil because
// currently plugins/drivers/testutils is platform agnostic-ish.
//
// Some drivers (raw_exec) setup their own cgroup, while others (exec, java, docker)
// would otherwise depend on the Nomad cpuset manager (and docker daemon) to create
// one, which isn't available here in testing, and so we create one via the harness.
// Plumbing such metadata through to the harness is a mind bender, so we just always
// create the cgroup, but at least put it under 'testing.slice'.
//
// tl;dr raw_exec tests should ignore this cgroup.
func (h *DriverHarness) setupCgroupV2(allocID, task string) {
if cgutil.UseV2 {
h.cgroup = filepath.Join(cgutil.CgroupRoot, "testing.slice", cgutil.CgroupScope(allocID, task))
f, err := os.Create(h.cgroup)
if err != nil {
h.logger.Trace("create cgroup for test", "parent", "testing.slice", "id", allocID, "task", task, "path", h.cgroup)
if err := os.MkdirAll(h.cgroup, 0755); err != nil {
panic(err)
}
defer f.Close()
}
}
@ -89,11 +96,15 @@ func (h *DriverHarness) Kill() {
h.cleanupCgroup()
}
// cleanupCgroup might cleanup a cgroup that may or may not be tricked by DriverHarness.
func (h *DriverHarness) cleanupCgroup() {
if cgutil.UseV2 {
err := os.Remove(h.cgroup)
if err != nil {
panic(err)
// some [non-exec] tests don't bother with MkAllocDir which is what would create
// the cgroup, but then do call Kill, so in that case skip the cgroup cleanup
if cgutil.UseV2 && h.cgroup != "" {
if err := os.Remove(h.cgroup); err != nil && !os.IsNotExist(err) {
// in some cases the driver will cleanup the cgroup itself, in which
// case we do not care about the cgroup not existing at cleanup time
h.t.Fatalf("failed to cleanup cgroup: %v", err)
}
}
}
@ -153,6 +164,7 @@ func (h *DriverHarness) MkAllocDir(t *drivers.TaskConfig, enableLogs bool) func(
require.NoError(h.t, err)
fsi := caps.FSIsolation
h.logger.Trace("FS isolation", "fsi", fsi)
require.NoError(h.t, taskDir.Build(fsi == drivers.FSIsolationChroot, tinyChroot))
task := &structs.Task{
@ -181,8 +193,8 @@ func (h *DriverHarness) MkAllocDir(t *drivers.TaskConfig, enableLogs bool) func(
}
}
// set cgroup
h.setCgroup(alloc.ID, task.Name)
// setup a v2 cgroup for test cases that assume one exists
h.setupCgroupV2(alloc.ID, task.Name)
//logmon
if enableLogs {

View File

@ -115,6 +115,12 @@
</td>
<td>Datacenter in which the allocation is running</td>
</tr>
<tr>
<td>
<code>NOMAD_PARENT_CGROUP</code>
</td>
<td>The parent cgroup used to contain task cgroups (Linux only)</td>
</tr>
<tr>
<td>
<code>NOMAD_NAMESPACE</code>