backport of commit 615e76ef3c23497f768ebd175f0c624d32aeece8 (#17993)

This pull request was automerged via backport-assistant
This commit is contained in:
hc-github-team-nomad-core 2023-07-19 13:31:14 -05:00 committed by GitHub
parent 872db79967
commit e5fb6fe687
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 238 additions and 205 deletions

3
.changelog/17628.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
cpustats: Use config "cpu_total_compute" (if set) for all CPU statistics
```

View File

@ -10,7 +10,8 @@ import (
"github.com/hashicorp/nomad/lib/cpuset" "github.com/hashicorp/nomad/lib/cpuset"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper/stats" "github.com/hashicorp/nomad/client/stats"
shelpers "github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
@ -42,7 +43,7 @@ func NewCPUFingerprint(logger hclog.Logger) Fingerprint {
} }
func (f *CPUFingerprint) Fingerprint(request *FingerprintRequest, response *FingerprintResponse) error { func (f *CPUFingerprint) Fingerprint(request *FingerprintRequest, response *FingerprintResponse) error {
f.initialize() f.initialize(request)
f.setModelName(response) f.setModelName(response)
@ -61,8 +62,8 @@ func (f *CPUFingerprint) Fingerprint(request *FingerprintRequest, response *Fing
return nil return nil
} }
func (f *CPUFingerprint) initialize() { func (f *CPUFingerprint) initialize(request *FingerprintRequest) {
if err := stats.Init(); err != nil { if err := stats.Init(uint64(request.Config.CpuCompute)); err != nil {
f.logger.Warn("failed initializing stats collector", "error", err) f.logger.Warn("failed initializing stats collector", "error", err)
} }
} }
@ -134,10 +135,8 @@ func (f *CPUFingerprint) setReservableCores(request *FingerprintRequest, respons
func (f *CPUFingerprint) setTotalCompute(request *FingerprintRequest, response *FingerprintResponse) { func (f *CPUFingerprint) setTotalCompute(request *FingerprintRequest, response *FingerprintResponse) {
var ticks uint64 var ticks uint64
switch { switch {
case request.Config.CpuCompute > 0: case shelpers.CpuTotalTicks() > 0:
ticks = uint64(request.Config.CpuCompute) ticks = shelpers.CpuTotalTicks()
case stats.TotalTicksAvailable() > 0:
ticks = stats.TotalTicksAvailable()
default: default:
ticks = defaultCPUTicks ticks = defaultCPUTicks
} }

View File

@ -19,6 +19,7 @@ import (
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
cleanhttp "github.com/hashicorp/go-cleanhttp" cleanhttp "github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
@ -185,6 +186,7 @@ func (f *EnvAWSFingerprint) Fingerprint(request *FingerprintRequest, response *F
if ticks := specs.Ticks(); request.Config.CpuCompute <= 0 { if ticks := specs.Ticks(); request.Config.CpuCompute <= 0 {
response.AddAttribute("cpu.totalcompute", fmt.Sprintf("%d", ticks)) response.AddAttribute("cpu.totalcompute", fmt.Sprintf("%d", ticks))
f.logger.Debug("setting ec2 cpu", "ticks", ticks) f.logger.Debug("setting ec2 cpu", "ticks", ticks)
stats.SetCpuTotalTicks(uint64(ticks))
resources = new(structs.Resources) resources = new(structs.Resources)
resources.CPU = ticks resources.CPU = ticks
if nodeResources == nil { if nodeResources == nil {

View File

@ -4,7 +4,7 @@
package resources package resources
import ( import (
"github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/helper/stats"
) )
// PIDs holds all of a task's pids and their cpu percentage calculators // PIDs holds all of a task's pids and their cpu percentage calculators

View File

@ -4,64 +4,115 @@
package stats package stats
import ( import (
"runtime" "context"
"errors"
"fmt"
"sync"
"time" "time"
shelpers "github.com/hashicorp/nomad/helper/stats" "github.com/hashicorp/nomad/helper/stats"
"github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/cpu"
"github.com/shoenig/go-m1cpu"
) )
// CpuStats calculates cpu usage percentage const (
type CpuStats struct { // cpuInfoTimeout is the timeout used when gathering CPU info. This is used
prevCpuTime float64 // to override the default timeout in gopsutil which has a tendency to
prevTime time.Time // timeout on Windows.
cpuInfoTimeout = 60 * time.Second
)
totalCpus int var (
} cpuPowerCoreCount int
cpuPowerCoreMHz uint64
cpuEfficiencyCoreCount int
cpuEfficiencyCoreMHz uint64
cpuModelName string
)
// NewCpuStats returns a cpu stats calculator var (
func NewCpuStats() *CpuStats { detectedCpuTotalTicks uint64
numCpus := runtime.NumCPU() initErr error
cpuStats := &CpuStats{ onceLer sync.Once
totalCpus: numCpus, )
}
return cpuStats
}
// Percent calculates the cpu usage percentage based on the current cpu usage func Init(configCpuTotalCompute uint64) error {
// and the previous cpu usage where usage is given as time in nanoseconds spend onceLer.Do(func() {
// in the cpu switch {
func (c *CpuStats) Percent(cpuTime float64) float64 { case m1cpu.IsAppleSilicon():
now := time.Now() cpuModelName = m1cpu.ModelName()
cpuPowerCoreCount = m1cpu.PCoreCount()
cpuPowerCoreMHz = m1cpu.PCoreHz() / 1_000_000
cpuEfficiencyCoreCount = m1cpu.ECoreCount()
cpuEfficiencyCoreMHz = m1cpu.ECoreHz() / 1_000_000
bigTicks := uint64(cpuPowerCoreCount) * cpuPowerCoreMHz
littleTicks := uint64(cpuEfficiencyCoreCount) * cpuEfficiencyCoreMHz
detectedCpuTotalTicks = bigTicks + littleTicks
default:
// for now, all other cpu types assume only power cores
// todo: this is already not true for Intel 13th generation
if c.prevCpuTime == 0.0 { var err error
// invoked first time if cpuPowerCoreCount, err = cpu.Counts(true); err != nil {
c.prevCpuTime = cpuTime initErr = errors.Join(initErr, fmt.Errorf("failed to detect number of CPU cores: %w", err))
c.prevTime = now }
return 0.0
ctx, cancel := context.WithTimeout(context.Background(), cpuInfoTimeout)
defer cancel()
var cpuInfoStats []cpu.InfoStat
if cpuInfoStats, err = cpu.InfoWithContext(ctx); err != nil {
initErr = errors.Join(initErr, fmt.Errorf("Unable to obtain CPU information: %w", err))
}
for _, infoStat := range cpuInfoStats {
cpuModelName = infoStat.ModelName
if uint64(infoStat.Mhz) > cpuPowerCoreMHz {
cpuPowerCoreMHz = uint64(infoStat.Mhz)
}
}
// compute ticks using only power core, until we add support for
// detecting little cores on non-apple platforms
detectedCpuTotalTicks = uint64(cpuPowerCoreCount) * cpuPowerCoreMHz
initErr = err
}
stats.SetCpuTotalTicks(detectedCpuTotalTicks)
})
// override the computed value with the config value if it is set
if configCpuTotalCompute > 0 {
stats.SetCpuTotalTicks(configCpuTotalCompute)
} }
timeDelta := now.Sub(c.prevTime).Nanoseconds() return initErr
ret := c.calculatePercent(c.prevCpuTime, cpuTime, timeDelta)
c.prevCpuTime = cpuTime
c.prevTime = now
return ret
} }
// TicksConsumed calculates the total ticks consumes by the process across all // CPUNumCores returns the number of CPU cores available.
// cpu cores //
func (c *CpuStats) TicksConsumed(percent float64) float64 { // This is represented with two values - (Power (P), Efficiency (E)) so we can
return (percent / 100) * float64(shelpers.TotalTicksAvailable()) / float64(c.totalCpus) // correctly compute total compute for processors with asymetric cores such as
// Apple Silicon.
//
// For platforms with symetric cores (or where we do not correcly detect asymetric
// cores), all cores are presented as P cores.
func CPUNumCores() (int, int) {
return cpuPowerCoreCount, cpuEfficiencyCoreCount
} }
func (c *CpuStats) calculatePercent(t1, t2 float64, timeDelta int64) float64 { // CPUMHzPerCore returns the MHz per CPU (P, E) core type.
vDelta := t2 - t1 //
if timeDelta <= 0 || vDelta <= 0.0 { // As with CPUNumCores, asymetric core detection currently only works with
return 0.0 // Apple Silicon CPUs.
} func CPUMHzPerCore() (uint64, uint64) {
return cpuPowerCoreMHz, cpuEfficiencyCoreMHz
}
overall_percent := (vDelta / float64(timeDelta)) * 100.0 // CPUModelName returns the model name of the CPU.
return overall_percent func CPUModelName() string {
return cpuModelName
} }
func (h *HostStatsCollector) collectCPUStats() (cpus []*CPUStats, totalTicks float64, err error) { func (h *HostStatsCollector) collectCPUStats() (cpus []*CPUStats, totalTicks float64, err error) {
@ -79,7 +130,7 @@ func (h *HostStatsCollector) collectCPUStats() (cpus []*CPUStats, totalTicks flo
h.statsCalculator[cpuStat.CPU] = percentCalculator h.statsCalculator[cpuStat.CPU] = percentCalculator
} }
idle, user, system, total := percentCalculator.Calculate(cpuStat) idle, user, system, total := percentCalculator.Calculate(cpuStat)
ticks := (total / 100.0) * (float64(shelpers.TotalTicksAvailable()) / float64(len(cpuStats))) ticks := (total / 100.0) * (float64(stats.CpuTotalTicks()) / float64(len(cpuStats)))
cs[idx] = &CPUStats{ cs[idx] = &CPUStats{
CPU: cpuStat.CPU, CPU: cpuStat.CPU,
User: user, User: user,

View File

@ -32,7 +32,7 @@ func TestCPU_CPUModelName(t *testing.T) {
must.NotEq(t, "", name) must.NotEq(t, "", name)
} }
func TestCPU_CPUTotalTicksAvailable(t *testing.T) { func TestCPU_CPUCpuTotalTicks(t *testing.T) {
ticks := TotalTicksAvailable() ticks := CpuTotalTicks()
must.Positive(t, ticks) must.Positive(t, ticks)
} }

View File

@ -7,32 +7,17 @@ import (
"math" "math"
"os" "os"
"testing" "testing"
"time"
"github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/ci"
shelpers "github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/testlog"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestCpuStatsPercent(t *testing.T) {
ci.Parallel(t)
cs := NewCpuStats()
cs.Percent(79.7)
time.Sleep(1 * time.Second)
percent := cs.Percent(80.69)
expectedPercent := 98.00
if percent < expectedPercent && percent > (expectedPercent+1.00) {
t.Fatalf("expected: %v, actual: %v", expectedPercent, percent)
}
}
func TestHostStats_CPU(t *testing.T) { func TestHostStats_CPU(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
assert := assert.New(t) assert := assert.New(t)
assert.Nil(shelpers.Init()) assert.Nil(Init(0))
logger := testlog.HCLogger(t) logger := testlog.HCLogger(t)
cwd, err := os.Getwd() cwd, err := os.Getwd()

View File

@ -56,7 +56,7 @@ func DockerStatsToTaskResourceUsage(s *docker.Stats) *cstructs.TaskResourceUsage
cs.UserMode = CalculateCPUPercent( cs.UserMode = CalculateCPUPercent(
s.CPUStats.CPUUsage.UsageInUsermode, s.PreCPUStats.CPUUsage.UsageInUsermode, s.CPUStats.CPUUsage.UsageInUsermode, s.PreCPUStats.CPUUsage.UsageInUsermode,
s.CPUStats.CPUUsage.TotalUsage, s.PreCPUStats.CPUUsage.TotalUsage, runtime.NumCPU()) s.CPUStats.CPUUsage.TotalUsage, s.PreCPUStats.CPUUsage.TotalUsage, runtime.NumCPU())
cs.TotalTicks = (cs.Percent / 100) * float64(stats.TotalTicksAvailable()) / float64(runtime.NumCPU()) cs.TotalTicks = (cs.Percent / 100) * float64(stats.CpuTotalTicks()) / float64(runtime.NumCPU())
return &cstructs.TaskResourceUsage{ return &cstructs.TaskResourceUsage{
ResourceUsage: &cstructs.ResourceUsage{ ResourceUsage: &cstructs.ResourceUsage{

View File

@ -45,7 +45,7 @@ func DockerStatsToTaskResourceUsage(s *docker.Stats) *cstructs.TaskResourceUsage
ThrottledPeriods: s.CPUStats.ThrottlingData.ThrottledPeriods, ThrottledPeriods: s.CPUStats.ThrottlingData.ThrottledPeriods,
ThrottledTime: s.CPUStats.ThrottlingData.ThrottledTime, ThrottledTime: s.CPUStats.ThrottlingData.ThrottledTime,
Percent: cpuPercent, Percent: cpuPercent,
TotalTicks: (cpuPercent / 100) * float64(stats.TotalTicksAvailable()) / float64(runtime.NumCPU()), TotalTicks: (cpuPercent / 100) * float64(stats.CpuTotalTicks()) / float64(runtime.NumCPU()),
Measured: DockerMeasuredCPUStats, Measured: DockerMeasuredCPUStats,
} }

View File

@ -23,9 +23,8 @@ import (
"github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/lib/fifo" "github.com/hashicorp/nomad/client/lib/fifo"
"github.com/hashicorp/nomad/client/lib/resources" "github.com/hashicorp/nomad/client/lib/resources"
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs" cstructs "github.com/hashicorp/nomad/client/structs"
shelpers "github.com/hashicorp/nomad/helper/stats" "github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers"
"github.com/syndtr/gocapability/capability" "github.com/syndtr/gocapability/capability"
) )
@ -259,11 +258,9 @@ type UniversalExecutor struct {
} }
// NewExecutor returns an Executor // NewExecutor returns an Executor
func NewExecutor(logger hclog.Logger) Executor { func NewExecutor(logger hclog.Logger, cpuTotalTicks uint64) Executor {
logger = logger.Named("executor") logger = logger.Named("executor")
if err := shelpers.Init(); err != nil { stats.SetCpuTotalTicks(cpuTotalTicks)
logger.Error("unable to initialize stats", "error", err)
}
return &UniversalExecutor{ return &UniversalExecutor{
logger: logger, logger: logger,

View File

@ -13,10 +13,10 @@ import (
"github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers"
) )
func NewExecutorWithIsolation(logger hclog.Logger) Executor { func NewExecutorWithIsolation(logger hclog.Logger, cpuTotalTicks uint64) Executor {
logger = logger.Named("executor") logger = logger.Named("executor")
logger.Error("isolation executor is not supported on this platform, using default") logger.Error("isolation executor is not supported on this platform, using default")
return NewExecutor(logger) return NewExecutor(logger, cpuTotalTicks)
} }
func (e *UniversalExecutor) configureResourceContainer(_ int) error { return nil } func (e *UniversalExecutor) configureResourceContainer(_ int) error { return nil }

View File

@ -24,10 +24,9 @@ import (
"github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/lib/resources" "github.com/hashicorp/nomad/client/lib/resources"
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs" cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/capabilities" "github.com/hashicorp/nomad/drivers/shared/capabilities"
shelpers "github.com/hashicorp/nomad/helper/stats" "github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers"
@ -71,11 +70,10 @@ type LibcontainerExecutor struct {
exitState *ProcessState exitState *ProcessState
} }
func NewExecutorWithIsolation(logger hclog.Logger) Executor { func NewExecutorWithIsolation(logger hclog.Logger, cpuTotalTicks uint64) Executor {
logger = logger.Named("isolated_executor") logger = logger.Named("isolated_executor")
if err := shelpers.Init(); err != nil { stats.SetCpuTotalTicks(cpuTotalTicks)
logger.Error("unable to initialize stats", "error", err)
}
return &LibcontainerExecutor{ return &LibcontainerExecutor{
id: strings.ReplaceAll(uuid.Generate(), "-", "_"), id: strings.ReplaceAll(uuid.Generate(), "-", "_"),
logger: logger, logger: logger,

View File

@ -147,7 +147,7 @@ func TestExecutor_Isolation_PID_and_IPC_hostMode(t *testing.T) {
execCmd.ModePID = "host" // disable PID namespace execCmd.ModePID = "host" // disable PID namespace
execCmd.ModeIPC = "host" // disable IPC namespace execCmd.ModeIPC = "host" // disable IPC namespace
executor := NewExecutorWithIsolation(testlog.HCLogger(t)) executor := NewExecutorWithIsolation(testlog.HCLogger(t), 0)
defer executor.Shutdown("SIGKILL", 0) defer executor.Shutdown("SIGKILL", 0)
ps, err := executor.Launch(execCmd) ps, err := executor.Launch(execCmd)
@ -190,7 +190,7 @@ func TestExecutor_IsolationAndConstraints(t *testing.T) {
execCmd.ModePID = "private" execCmd.ModePID = "private"
execCmd.ModeIPC = "private" execCmd.ModeIPC = "private"
executor := NewExecutorWithIsolation(testlog.HCLogger(t)) executor := NewExecutorWithIsolation(testlog.HCLogger(t), 0)
defer executor.Shutdown("SIGKILL", 0) defer executor.Shutdown("SIGKILL", 0)
ps, err := executor.Launch(execCmd) ps, err := executor.Launch(execCmd)
@ -282,7 +282,7 @@ func TestExecutor_CgroupPaths(t *testing.T) {
execCmd.ResourceLimits = true execCmd.ResourceLimits = true
executor := NewExecutorWithIsolation(testlog.HCLogger(t)) executor := NewExecutorWithIsolation(testlog.HCLogger(t), 0)
defer executor.Shutdown("SIGKILL", 0) defer executor.Shutdown("SIGKILL", 0)
ps, err := executor.Launch(execCmd) ps, err := executor.Launch(execCmd)
@ -344,7 +344,7 @@ func TestExecutor_CgroupPathsAreDestroyed(t *testing.T) {
execCmd.ResourceLimits = true execCmd.ResourceLimits = true
executor := NewExecutorWithIsolation(testlog.HCLogger(t)) executor := NewExecutorWithIsolation(testlog.HCLogger(t), 0)
defer executor.Shutdown("SIGKILL", 0) defer executor.Shutdown("SIGKILL", 0)
ps, err := executor.Launch(execCmd) ps, err := executor.Launch(execCmd)
@ -547,7 +547,7 @@ func TestExecutor_EscapeContainer(t *testing.T) {
execCmd.ResourceLimits = true execCmd.ResourceLimits = true
executor := NewExecutorWithIsolation(testlog.HCLogger(t)) executor := NewExecutorWithIsolation(testlog.HCLogger(t), 0)
defer executor.Shutdown("SIGKILL", 0) defer executor.Shutdown("SIGKILL", 0)
_, err := executor.Launch(execCmd) _, err := executor.Launch(execCmd)
@ -597,7 +597,7 @@ func TestExecutor_DoesNotInheritOomScoreAdj(t *testing.T) {
execCmd.Cmd = "/bin/bash" execCmd.Cmd = "/bin/bash"
execCmd.Args = []string{"-c", "cat /proc/self/oom_score_adj"} execCmd.Args = []string{"-c", "cat /proc/self/oom_score_adj"}
executor := NewExecutorWithIsolation(testlog.HCLogger(t)) executor := NewExecutorWithIsolation(testlog.HCLogger(t), 0)
defer executor.Shutdown("SIGKILL", 0) defer executor.Shutdown("SIGKILL", 0)
_, err = executor.Launch(execCmd) _, err = executor.Launch(execCmd)
@ -691,7 +691,7 @@ CapAmb: 0000000000000400`,
execCmd.Capabilities = capsAllowed execCmd.Capabilities = capsAllowed
} }
executor := NewExecutorWithIsolation(testlog.HCLogger(t)) executor := NewExecutorWithIsolation(testlog.HCLogger(t), 0)
defer executor.Shutdown("SIGKILL", 0) defer executor.Shutdown("SIGKILL", 0)
_, err := executor.Launch(execCmd) _, err := executor.Launch(execCmd)
@ -739,7 +739,7 @@ func TestExecutor_ClientCleanup(t *testing.T) {
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
defer allocDir.Destroy() defer allocDir.Destroy()
executor := NewExecutorWithIsolation(testlog.HCLogger(t)) executor := NewExecutorWithIsolation(testlog.HCLogger(t), 0)
defer executor.Shutdown("", 0) defer executor.Shutdown("", 0)
// Need to run a command which will produce continuous output but not // Need to run a command which will produce continuous output but not
@ -864,7 +864,7 @@ func TestUniversalExecutor_NoCgroup(t *testing.T) {
execCmd.BasicProcessCgroup = false execCmd.BasicProcessCgroup = false
execCmd.ResourceLimits = false execCmd.ResourceLimits = false
executor := NewExecutor(testlog.HCLogger(t)) executor := NewExecutor(testlog.HCLogger(t), 0)
defer executor.Shutdown("SIGKILL", 0) defer executor.Shutdown("SIGKILL", 0)
_, err = executor.Launch(execCmd) _, err = executor.Launch(execCmd)

View File

@ -15,15 +15,16 @@ import (
type ExecutorPlugin struct { type ExecutorPlugin struct {
// TODO: support backwards compatibility with pre 0.9 NetRPC plugin // TODO: support backwards compatibility with pre 0.9 NetRPC plugin
plugin.NetRPCUnsupportedPlugin plugin.NetRPCUnsupportedPlugin
logger hclog.Logger logger hclog.Logger
fsIsolation bool fsIsolation bool
cpuTotalTicks uint64
} }
func (p *ExecutorPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { func (p *ExecutorPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
if p.fsIsolation { if p.fsIsolation {
proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: NewExecutorWithIsolation(p.logger)}) proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: NewExecutorWithIsolation(p.logger, p.cpuTotalTicks)})
} else { } else {
proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: NewExecutor(p.logger)}) proto.RegisterExecutorServer(s, &grpcExecutorServer{impl: NewExecutor(p.logger, p.cpuTotalTicks)})
} }
return nil return nil
} }

View File

@ -36,7 +36,7 @@ import (
var executorFactories = map[string]executorFactory{} var executorFactories = map[string]executorFactory{}
type executorFactory struct { type executorFactory struct {
new func(hclog.Logger) Executor new func(hclog.Logger, uint64) Executor
configureExecCmd func(*testing.T, *ExecCommand) configureExecCmd func(*testing.T, *ExecCommand)
} }
@ -150,7 +150,7 @@ func TestExecutor_Start_Invalid(t *testing.T) {
execCmd.Args = []string{"1"} execCmd.Args = []string{"1"}
factory.configureExecCmd(t, execCmd) factory.configureExecCmd(t, execCmd)
defer allocDir.Destroy() defer allocDir.Destroy()
executor := factory.new(testlog.HCLogger(t)) executor := factory.new(testlog.HCLogger(t), 0)
defer executor.Shutdown("", 0) defer executor.Shutdown("", 0)
_, err := executor.Launch(execCmd) _, err := executor.Launch(execCmd)
@ -170,7 +170,7 @@ func TestExecutor_Start_Wait_Failure_Code(t *testing.T) {
execCmd.Args = []string{"-c", "sleep 1; /bin/date fail"} execCmd.Args = []string{"-c", "sleep 1; /bin/date fail"}
factory.configureExecCmd(t, execCmd) factory.configureExecCmd(t, execCmd)
defer allocDir.Destroy() defer allocDir.Destroy()
executor := factory.new(testlog.HCLogger(t)) executor := factory.new(testlog.HCLogger(t), 0)
defer executor.Shutdown("", 0) defer executor.Shutdown("", 0)
ps, err := executor.Launch(execCmd) ps, err := executor.Launch(execCmd)
@ -195,7 +195,7 @@ func TestExecutor_Start_Wait(t *testing.T) {
factory.configureExecCmd(t, execCmd) factory.configureExecCmd(t, execCmd)
defer allocDir.Destroy() defer allocDir.Destroy()
executor := factory.new(testlog.HCLogger(t)) executor := factory.new(testlog.HCLogger(t), 0)
defer executor.Shutdown("", 0) defer executor.Shutdown("", 0)
ps, err := executor.Launch(execCmd) ps, err := executor.Launch(execCmd)
@ -232,7 +232,7 @@ func TestExecutor_Start_Wait_Children(t *testing.T) {
factory.configureExecCmd(t, execCmd) factory.configureExecCmd(t, execCmd)
defer allocDir.Destroy() defer allocDir.Destroy()
executor := factory.new(testlog.HCLogger(t)) executor := factory.new(testlog.HCLogger(t), 0)
defer executor.Shutdown("SIGKILL", 0) defer executor.Shutdown("SIGKILL", 0)
ps, err := executor.Launch(execCmd) ps, err := executor.Launch(execCmd)
@ -273,7 +273,7 @@ func TestExecutor_WaitExitSignal(t *testing.T) {
factory.configureExecCmd(t, execCmd) factory.configureExecCmd(t, execCmd)
defer allocDir.Destroy() defer allocDir.Destroy()
executor := factory.new(testlog.HCLogger(t)) executor := factory.new(testlog.HCLogger(t), 0)
defer executor.Shutdown("", 0) defer executor.Shutdown("", 0)
pState, err := executor.Launch(execCmd) pState, err := executor.Launch(execCmd)
@ -331,7 +331,7 @@ func TestExecutor_Start_Kill(t *testing.T) {
factory.configureExecCmd(t, execCmd) factory.configureExecCmd(t, execCmd)
defer allocDir.Destroy() defer allocDir.Destroy()
executor := factory.new(testlog.HCLogger(t)) executor := factory.new(testlog.HCLogger(t), 0)
defer executor.Shutdown("", 0) defer executor.Shutdown("", 0)
ps, err := executor.Launch(execCmd) ps, err := executor.Launch(execCmd)
@ -536,7 +536,7 @@ func TestExecutor_Start_Kill_Immediately_NoGrace(t *testing.T) {
execCmd.Args = []string{"100"} execCmd.Args = []string{"100"}
factory.configureExecCmd(t, execCmd) factory.configureExecCmd(t, execCmd)
defer allocDir.Destroy() defer allocDir.Destroy()
executor := factory.new(testlog.HCLogger(t)) executor := factory.new(testlog.HCLogger(t), 0)
defer executor.Shutdown("", 0) defer executor.Shutdown("", 0)
ps, err := executor.Launch(execCmd) ps, err := executor.Launch(execCmd)
@ -572,7 +572,7 @@ func TestExecutor_Start_Kill_Immediately_WithGrace(t *testing.T) {
execCmd.Args = []string{"100"} execCmd.Args = []string{"100"}
factory.configureExecCmd(t, execCmd) factory.configureExecCmd(t, execCmd)
defer allocDir.Destroy() defer allocDir.Destroy()
executor := factory.new(testlog.HCLogger(t)) executor := factory.new(testlog.HCLogger(t), 0)
defer executor.Shutdown("", 0) defer executor.Shutdown("", 0)
ps, err := executor.Launch(execCmd) ps, err := executor.Launch(execCmd)
@ -618,7 +618,7 @@ func TestExecutor_Start_NonExecutableBinaries(t *testing.T) {
execCmd.Cmd = nonExecutablePath execCmd.Cmd = nonExecutablePath
factory.configureExecCmd(t, execCmd) factory.configureExecCmd(t, execCmd)
executor := factory.new(testlog.HCLogger(t)) executor := factory.new(testlog.HCLogger(t), 0)
defer executor.Shutdown("", 0) defer executor.Shutdown("", 0)
// need to configure path in chroot with that file if using isolation executor // need to configure path in chroot with that file if using isolation executor

View File

@ -11,7 +11,7 @@ import (
hclog "github.com/hashicorp/go-hclog" hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/resources" "github.com/hashicorp/nomad/client/lib/resources"
"github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/drivers"
ps "github.com/mitchellh/go-ps" ps "github.com/mitchellh/go-ps"
"github.com/shirou/gopsutil/v3/process" "github.com/shirou/gopsutil/v3/process"

View File

@ -22,13 +22,18 @@ type ExecutorConfig struct {
// FSIsolation if set will use an executor implementation that support // FSIsolation if set will use an executor implementation that support
// filesystem isolation // filesystem isolation
FSIsolation bool FSIsolation bool
// cpuTotalTicks is the total CPU compute. It should be given as Cores * MHz
// (2 Cores * 2 Ghz = 4000)
CpuTotalTicks uint64
} }
func GetPluginMap(logger hclog.Logger, fsIsolation bool) map[string]plugin.Plugin { func GetPluginMap(logger hclog.Logger, fsIsolation bool, cpuTotalTicks uint64) map[string]plugin.Plugin {
return map[string]plugin.Plugin{ return map[string]plugin.Plugin{
"executor": &ExecutorPlugin{ "executor": &ExecutorPlugin{
logger: logger, logger: logger,
fsIsolation: fsIsolation, fsIsolation: fsIsolation,
cpuTotalTicks: cpuTotalTicks,
}, },
} }
} }

View File

@ -13,6 +13,7 @@ import (
hclog "github.com/hashicorp/go-hclog" hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin" plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/drivers/shared/executor/proto" "github.com/hashicorp/nomad/drivers/shared/executor/proto"
"github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/base"
) )
@ -31,6 +32,7 @@ const (
func CreateExecutor(logger hclog.Logger, driverConfig *base.ClientDriverConfig, func CreateExecutor(logger hclog.Logger, driverConfig *base.ClientDriverConfig,
executorConfig *ExecutorConfig) (Executor, *plugin.Client, error) { executorConfig *ExecutorConfig) (Executor, *plugin.Client, error) {
executorConfig.CpuTotalTicks = stats.CpuTotalTicks()
c, err := json.Marshal(executorConfig) c, err := json.Marshal(executorConfig)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("unable to create executor config: %v", err) return nil, nil, fmt.Errorf("unable to create executor config: %v", err)
@ -41,8 +43,9 @@ func CreateExecutor(logger hclog.Logger, driverConfig *base.ClientDriverConfig,
} }
p := &ExecutorPlugin{ p := &ExecutorPlugin{
logger: logger, logger: logger,
fsIsolation: executorConfig.FSIsolation, fsIsolation: executorConfig.FSIsolation,
cpuTotalTicks: executorConfig.CpuTotalTicks,
} }
config := &plugin.ClientConfig{ config := &plugin.ClientConfig{
@ -75,7 +78,7 @@ func ReattachToExecutor(reattachConfig *plugin.ReattachConfig, logger hclog.Logg
config := &plugin.ClientConfig{ config := &plugin.ClientConfig{
HandshakeConfig: base.Handshake, HandshakeConfig: base.Handshake,
Reattach: reattachConfig, Reattach: reattachConfig,
Plugins: GetPluginMap(logger, false), Plugins: GetPluginMap(logger, false, stats.CpuTotalTicks()),
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Logger: logger.Named("executor"), Logger: logger.Named("executor"),
} }

View File

@ -49,6 +49,7 @@ func init() {
Plugins: GetPluginMap( Plugins: GetPluginMap(
logger, logger,
executorConfig.FSIsolation, executorConfig.FSIsolation,
executorConfig.CpuTotalTicks,
), ),
GRPCServer: plugin.DefaultGRPCServer, GRPCServer: plugin.DefaultGRPCServer,
Logger: logger, Logger: logger,

View File

@ -4,109 +4,73 @@
package stats package stats
import ( import (
"context" "runtime"
"errors"
"fmt"
"sync"
"time" "time"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shoenig/go-m1cpu"
)
const (
// cpuInfoTimeout is the timeout used when gathering CPU info. This is used
// to override the default timeout in gopsutil which has a tendency to
// timeout on Windows.
cpuInfoTimeout = 60 * time.Second
) )
var ( var (
cpuPowerCoreCount int cpuTotalTicks uint64
cpuPowerCoreMHz uint64
cpuEfficiencyCoreCount int
cpuEfficiencyCoreMHz uint64
cpuTotalTicks uint64
cpuModelName string
) )
var ( // CpuStats calculates cpu usage percentage
initErr error type CpuStats struct {
onceLer sync.Once prevCpuTime float64
) prevTime time.Time
func Init() error { totalCpus int
onceLer.Do(func() {
switch {
case m1cpu.IsAppleSilicon():
cpuModelName = m1cpu.ModelName()
cpuPowerCoreCount = m1cpu.PCoreCount()
cpuPowerCoreMHz = m1cpu.PCoreHz() / 1_000_000
cpuEfficiencyCoreCount = m1cpu.ECoreCount()
cpuEfficiencyCoreMHz = m1cpu.ECoreHz() / 1_000_000
bigTicks := uint64(cpuPowerCoreCount) * cpuPowerCoreMHz
littleTicks := uint64(cpuEfficiencyCoreCount) * cpuEfficiencyCoreMHz
cpuTotalTicks = bigTicks + littleTicks
default:
// for now, all other cpu types assume only power cores
// todo: this is already not true for Intel 13th generation
var err error
if cpuPowerCoreCount, err = cpu.Counts(true); err != nil {
initErr = errors.Join(initErr, fmt.Errorf("failed to detect number of CPU cores: %w", err))
}
ctx, cancel := context.WithTimeout(context.Background(), cpuInfoTimeout)
defer cancel()
var cpuInfoStats []cpu.InfoStat
if cpuInfoStats, err = cpu.InfoWithContext(ctx); err != nil {
initErr = errors.Join(initErr, fmt.Errorf("Unable to obtain CPU information: %w", err))
}
for _, infoStat := range cpuInfoStats {
cpuModelName = infoStat.ModelName
if uint64(infoStat.Mhz) > cpuPowerCoreMHz {
cpuPowerCoreMHz = uint64(infoStat.Mhz)
}
}
// compute ticks using only power core, until we add support for
// detecting little cores on non-apple platforms
cpuTotalTicks = uint64(cpuPowerCoreCount) * cpuPowerCoreMHz
initErr = err
}
})
return initErr
} }
// CPUNumCores returns the number of CPU cores available. // NewCpuStats returns a cpu stats calculator
// func NewCpuStats() *CpuStats {
// This is represented with two values - (Power (P), Efficiency (E)) so we can numCpus := runtime.NumCPU()
// correctly compute total compute for processors with asymetric cores such as cpuStats := &CpuStats{
// Apple Silicon. totalCpus: numCpus,
// }
// For platforms with symetric cores (or where we do not correcly detect asymetric return cpuStats
// cores), all cores are presented as P cores.
func CPUNumCores() (int, int) {
return cpuPowerCoreCount, cpuEfficiencyCoreCount
} }
// CPUMHzPerCore returns the MHz per CPU (P, E) core type. // Percent calculates the cpu usage percentage based on the current cpu usage
// // and the previous cpu usage where usage is given as time in nanoseconds spend
// As with CPUNumCores, asymetric core detection currently only works with // in the cpu
// Apple Silicon CPUs. func (c *CpuStats) Percent(cpuTime float64) float64 {
func CPUMHzPerCore() (uint64, uint64) { now := time.Now()
return cpuPowerCoreMHz, cpuEfficiencyCoreMHz
if c.prevCpuTime == 0.0 {
// invoked first time
c.prevCpuTime = cpuTime
c.prevTime = now
return 0.0
}
timeDelta := now.Sub(c.prevTime).Nanoseconds()
ret := c.calculatePercent(c.prevCpuTime, cpuTime, timeDelta)
c.prevCpuTime = cpuTime
c.prevTime = now
return ret
} }
// CPUModelName returns the model name of the CPU. // TicksConsumed calculates the total ticks consumes by the process across all
func CPUModelName() string { // cpu cores
return cpuModelName func (c *CpuStats) TicksConsumed(percent float64) float64 {
return (percent / 100) * float64(CpuTotalTicks()) / float64(c.totalCpus)
} }
// TotalTicksAvailable calculates the total MHz available across all cores. func (c *CpuStats) calculatePercent(t1, t2 float64, timeDelta int64) float64 {
vDelta := t2 - t1
if timeDelta <= 0 || vDelta <= 0.0 {
return 0.0
}
overall_percent := (vDelta / float64(timeDelta)) * 100.0
return overall_percent
}
// Set the total ticks available across all cores.
func SetCpuTotalTicks(newCpuTotalTicks uint64) {
cpuTotalTicks = newCpuTotalTicks
}
// CpuTotalTicks calculates the total MHz available across all cores.
// //
// Where asymetric cores are correctly detected, the total ticks is the sum of // Where asymetric cores are correctly detected, the total ticks is the sum of
// the performance across both core types. // the performance across both core types.
@ -114,6 +78,6 @@ func CPUModelName() string {
// Where asymetric cores are not correctly detected (such as Intel 13th gen), // Where asymetric cores are not correctly detected (such as Intel 13th gen),
// the total ticks available is over-estimated, as we assume all cores are P // the total ticks available is over-estimated, as we assume all cores are P
// cores. // cores.
func TotalTicksAvailable() uint64 { func CpuTotalTicks() uint64 {
return cpuTotalTicks return cpuTotalTicks
} }

24
helper/stats/cpu_test.go Normal file
View File

@ -0,0 +1,24 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package stats
import (
"testing"
"time"
"github.com/hashicorp/nomad/ci"
)
func TestCpuStatsPercent(t *testing.T) {
ci.Parallel(t)
cs := NewCpuStats()
cs.Percent(79.7)
time.Sleep(1 * time.Second)
percent := cs.Percent(80.69)
expectedPercent := 98.00
if percent < expectedPercent && percent > (expectedPercent+1.00) {
t.Fatalf("expected: %v, actual: %v", expectedPercent, percent)
}
}