Backport of Tuning job versions retention. #17635 into release/1.6.x (#18169)

This pull request was automerged via backport-assistant
This commit is contained in:
hc-github-team-nomad-core 2023-08-07 13:48:09 -05:00 committed by GitHub
parent ebcdd4d82d
commit f812bccb4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 67 additions and 31 deletions

3
.changelog/17939.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
config: Added an option to configure how many historic versions of jobs are retained in the state store
```

View File

@ -343,6 +343,13 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
conf.JobMaxPriority = jobMaxPriority
conf.JobDefaultPriority = jobDefaultPriority
if agentConfig.Server.JobTrackedVersions != nil {
if *agentConfig.Server.JobTrackedVersions <= 0 {
return nil, fmt.Errorf("job_tracked_versions must be greater than 0")
}
conf.JobTrackedVersions = *agentConfig.Server.JobTrackedVersions
}
// Set up the bind addresses
rpcAddr, err := net.ResolveTCPAddr("tcp", agentConfig.normalizedAddrs.RPC)
if err != nil {

View File

@ -674,6 +674,9 @@ type ServerConfig struct {
// before being discarded automatically. If unset, the maximum size defaults
// to 1 MB. If the value is zero, no job sources will be stored.
JobMaxSourceSize *string `hcl:"job_max_source_size"`
// JobTrackedVersions is the number of historic job versions that are kept.
JobTrackedVersions *int `hcl:"job_tracked_versions"`
}
func (s *ServerConfig) Copy() *ServerConfig {
@ -702,6 +705,7 @@ func (s *ServerConfig) Copy() *ServerConfig {
ns.RaftTrailingLogs = pointer.Copy(s.RaftTrailingLogs)
ns.JobDefaultPriority = pointer.Copy(s.JobDefaultPriority)
ns.JobMaxPriority = pointer.Copy(s.JobMaxPriority)
ns.JobTrackedVersions = pointer.Copy(s.JobTrackedVersions)
return &ns
}
@ -1329,7 +1333,8 @@ func DefaultConfig() *Config {
LimitResults: 100,
MinTermLength: 2,
},
JobMaxSourceSize: pointer.Of("1M"),
JobMaxSourceSize: pointer.Of("1M"),
JobTrackedVersions: pointer.Of(structs.JobDefaultTrackedVersions),
},
ACL: &ACLConfig{
Enabled: false,
@ -2033,6 +2038,10 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
}
}
if b.JobTrackedVersions != nil {
result.JobTrackedVersions = b.JobTrackedVersions
}
// Add the schedulers
result.EnabledSchedulers = append(result.EnabledSchedulers, b.EnabledSchedulers...)

View File

@ -416,6 +416,9 @@ type Config struct {
// JobMaxPriority is an upper bound on the Job priority.
JobMaxPriority int
// JobTrackedVersions is the number of historic Job versions that are kept.
JobTrackedVersions int
}
func (c *Config) Copy() *Config {
@ -535,6 +538,7 @@ func DefaultConfig() *Config {
DeploymentQueryRateLimit: deploymentwatcher.LimitStateQueriesPerSecond,
JobDefaultPriority: structs.JobDefaultPriority,
JobMaxPriority: structs.JobDefaultMaxPriority,
JobTrackedVersions: structs.JobDefaultTrackedVersions,
}
// Enable all known schedulers by default

View File

@ -148,16 +148,20 @@ type FSMConfig struct {
// EventBufferSize is the amount of messages to hold in memory
EventBufferSize int64
// JobTrackedVersions is the number of historic job versions that are kept.
JobTrackedVersions int
}
// NewFSM is used to construct a new FSM with a blank state.
func NewFSM(config *FSMConfig) (*nomadFSM, error) {
// Create a state store
sconfig := &state.StateStoreConfig{
Logger: config.Logger,
Region: config.Region,
EnablePublisher: config.EnableEventBroker,
EventBufferSize: config.EventBufferSize,
Logger: config.Logger,
Region: config.Region,
EnablePublisher: config.EnableEventBroker,
EventBufferSize: config.EventBufferSize,
JobTrackedVersions: config.JobTrackedVersions,
}
state, err := state.NewStateStore(sconfig)
if err != nil {

View File

@ -59,13 +59,14 @@ func testFSM(t *testing.T) *nomadFSM {
dispatcher, _ := testPeriodicDispatcher(t)
logger := testlog.HCLogger(t)
fsmConfig := &FSMConfig{
EvalBroker: broker,
Periodic: dispatcher,
Blocked: NewBlockedEvals(broker, logger),
Logger: logger,
Region: "global",
EnableEventBroker: true,
EventBufferSize: 100,
EvalBroker: broker,
Periodic: dispatcher,
Blocked: NewBlockedEvals(broker, logger),
Logger: logger,
Region: "global",
EnableEventBroker: true,
EventBufferSize: 100,
JobTrackedVersions: structs.JobDefaultTrackedVersions,
}
fsm, err := NewFSM(fsmConfig)
if err != nil {

View File

@ -1295,13 +1295,14 @@ func (s *Server) setupRaft() error {
// Create the FSM
fsmConfig := &FSMConfig{
EvalBroker: s.evalBroker,
Periodic: s.periodicDispatcher,
Blocked: s.blockedEvals,
Logger: s.logger,
Region: s.Region(),
EnableEventBroker: s.config.EnableEventBroker,
EventBufferSize: s.config.EventBufferSize,
EvalBroker: s.evalBroker,
Periodic: s.periodicDispatcher,
Blocked: s.blockedEvals,
Logger: s.logger,
Region: s.Region(),
EnableEventBroker: s.config.EnableEventBroker,
EventBufferSize: s.config.EventBufferSize,
JobTrackedVersions: s.config.JobTrackedVersions,
}
var err error
s.fsm, err = NewFSM(fsmConfig)

View File

@ -99,6 +99,9 @@ type StateStoreConfig struct {
// EventBufferSize configures the amount of events to hold in memory
EventBufferSize int64
// JobTrackedVersions is the number of historic job versions that are kept.
JobTrackedVersions int
}
// The StateStore is responsible for maintaining all the Nomad
@ -1956,7 +1959,7 @@ func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, tx
func (s *StateStore) deleteJobSubmission(job *structs.Job, txn *txn) error {
// find submissions associated with job
remove := *set.NewHashSet[*structs.JobSubmission, string](structs.JobTrackedVersions)
remove := *set.NewHashSet[*structs.JobSubmission, string](s.config.JobTrackedVersions)
iter, err := txn.Get("job_submission", "id_prefix", job.Namespace, job.ID)
if err != nil {
@ -2045,7 +2048,7 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn)
}
// If we are below the limit there is no GCing to be done
if len(all) <= structs.JobTrackedVersions {
if len(all) <= s.config.JobTrackedVersions {
return nil
}
@ -2061,7 +2064,7 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn)
// If the stable job is the oldest version, do a swap to bring it into the
// keep set.
max := structs.JobTrackedVersions
max := s.config.JobTrackedVersions
if stableIdx == max {
all[max-1], all[max] = all[max], all[max-1]
}
@ -5498,7 +5501,7 @@ func (s *StateStore) pruneJobSubmissions(namespace, jobID string, txn *txn) erro
// although the number of tracked submissions is the same as the number of
// tracked job versions, do not assume a 1:1 correlation, as there could be
// holes in the submissions (or none at all)
limit := structs.JobTrackedVersions
limit := s.config.JobTrackedVersions
// iterate through all stored submissions
iter, err := txn.Get("job_submission", "id_prefix", namespace, jobID)

View File

@ -2523,7 +2523,7 @@ func TestStateStore_UpsertJob_submission(t *testing.T) {
must.Eq(t, index, sub.JobModifyIndex)
// insert 6 more, going over the limit
for i := 1; i <= structs.JobTrackedVersions; i++ {
for i := 1; i <= structs.JobDefaultTrackedVersions; i++ {
index++
job2 := job.Copy()
job2.Meta["version"] = strconv.Itoa(i)
@ -2624,8 +2624,8 @@ func TestStateStore_UpdateUpsertJob_JobVersion(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
if len(allVersions) != structs.JobTrackedVersions {
t.Fatalf("got %d; want %d", len(allVersions), structs.JobTrackedVersions)
if len(allVersions) != structs.JobDefaultTrackedVersions {
t.Fatalf("got %d; want %d", len(allVersions), structs.JobDefaultTrackedVersions)
}
if a := allVersions[0]; a.ID != job.ID || a.Version != 299 || a.Name != "299" {
@ -2636,7 +2636,7 @@ func TestStateStore_UpdateUpsertJob_JobVersion(t *testing.T) {
}
// Ensure we didn't delete the stable job
if a := allVersions[structs.JobTrackedVersions-1]; a.ID != job.ID ||
if a := allVersions[structs.JobDefaultTrackedVersions-1]; a.ID != job.ID ||
a.Version != 0 || a.Name != "0" || !a.Stable {
t.Fatalf("bad: %+v", a)
}

View File

@ -16,8 +16,9 @@ import (
func TestStateStore(t testing.TB) *StateStore {
config := &StateStoreConfig{
Logger: testlog.HCLogger(t),
Region: "global",
Logger: testlog.HCLogger(t),
Region: "global",
JobTrackedVersions: structs.JobDefaultTrackedVersions,
}
state, err := NewStateStore(config)
if err != nil {

View File

@ -4304,9 +4304,9 @@ const (
// for the system to remain healthy.
CoreJobPriority = math.MaxInt16
// JobTrackedVersions is the number of historic job versions that are
// JobDefaultTrackedVersions is the number of historic job versions that are
// kept.
JobTrackedVersions = 6
JobDefaultTrackedVersions = 6
// JobTrackedScalingEvents is the number of scaling events that are
// kept for a single task group.

View File

@ -264,6 +264,9 @@ server {
size of a job. If the limit is exceeded, the original source is simply discarded
and no error is returned from the job API.
- `job_tracked_versions` `(int: 6)` - Specifies the number of historic job versions that
are kept.
### Deprecated Parameters
- `retry_join` `(array<string>: [])` - Specifies a list of server addresses to