From f812bccb4e75aa9d50d44382e5a13a9eceba080a Mon Sep 17 00:00:00 2001 From: hc-github-team-nomad-core <82989552+hc-github-team-nomad-core@users.noreply.github.com> Date: Mon, 7 Aug 2023 13:48:09 -0500 Subject: [PATCH] Backport of Tuning job versions retention. #17635 into release/1.6.x (#18169) This pull request was automerged via backport-assistant --- .changelog/17939.txt | 3 +++ command/agent/agent.go | 7 +++++++ command/agent/config.go | 11 ++++++++++- nomad/config.go | 4 ++++ nomad/fsm.go | 12 ++++++++---- nomad/fsm_test.go | 15 ++++++++------- nomad/server.go | 15 ++++++++------- nomad/state/state_store.go | 11 +++++++---- nomad/state/state_store_test.go | 8 ++++---- nomad/state/testing.go | 5 +++-- nomad/structs/structs.go | 4 ++-- website/content/docs/configuration/server.mdx | 3 +++ 12 files changed, 67 insertions(+), 31 deletions(-) create mode 100644 .changelog/17939.txt diff --git a/.changelog/17939.txt b/.changelog/17939.txt new file mode 100644 index 000000000..fc71fc313 --- /dev/null +++ b/.changelog/17939.txt @@ -0,0 +1,3 @@ +```release-note:improvement +config: Added an option to configure how many historic versions of jobs are retained in the state store +``` diff --git a/command/agent/agent.go b/command/agent/agent.go index a4ed1b8ad..89242ca44 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -343,6 +343,13 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { conf.JobMaxPriority = jobMaxPriority conf.JobDefaultPriority = jobDefaultPriority + if agentConfig.Server.JobTrackedVersions != nil { + if *agentConfig.Server.JobTrackedVersions <= 0 { + return nil, fmt.Errorf("job_tracked_versions must be greater than 0") + } + conf.JobTrackedVersions = *agentConfig.Server.JobTrackedVersions + } + // Set up the bind addresses rpcAddr, err := net.ResolveTCPAddr("tcp", agentConfig.normalizedAddrs.RPC) if err != nil { diff --git a/command/agent/config.go b/command/agent/config.go index f8f299401..26bc43e50 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -674,6 +674,9 @@ type ServerConfig struct { // before being discarded automatically. If unset, the maximum size defaults // to 1 MB. If the value is zero, no job sources will be stored. JobMaxSourceSize *string `hcl:"job_max_source_size"` + + // JobTrackedVersions is the number of historic job versions that are kept. + JobTrackedVersions *int `hcl:"job_tracked_versions"` } func (s *ServerConfig) Copy() *ServerConfig { @@ -702,6 +705,7 @@ func (s *ServerConfig) Copy() *ServerConfig { ns.RaftTrailingLogs = pointer.Copy(s.RaftTrailingLogs) ns.JobDefaultPriority = pointer.Copy(s.JobDefaultPriority) ns.JobMaxPriority = pointer.Copy(s.JobMaxPriority) + ns.JobTrackedVersions = pointer.Copy(s.JobTrackedVersions) return &ns } @@ -1329,7 +1333,8 @@ func DefaultConfig() *Config { LimitResults: 100, MinTermLength: 2, }, - JobMaxSourceSize: pointer.Of("1M"), + JobMaxSourceSize: pointer.Of("1M"), + JobTrackedVersions: pointer.Of(structs.JobDefaultTrackedVersions), }, ACL: &ACLConfig{ Enabled: false, @@ -2033,6 +2038,10 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig { } } + if b.JobTrackedVersions != nil { + result.JobTrackedVersions = b.JobTrackedVersions + } + // Add the schedulers result.EnabledSchedulers = append(result.EnabledSchedulers, b.EnabledSchedulers...) diff --git a/nomad/config.go b/nomad/config.go index 10912d37f..1563c047f 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -416,6 +416,9 @@ type Config struct { // JobMaxPriority is an upper bound on the Job priority. JobMaxPriority int + + // JobTrackedVersions is the number of historic Job versions that are kept. + JobTrackedVersions int } func (c *Config) Copy() *Config { @@ -535,6 +538,7 @@ func DefaultConfig() *Config { DeploymentQueryRateLimit: deploymentwatcher.LimitStateQueriesPerSecond, JobDefaultPriority: structs.JobDefaultPriority, JobMaxPriority: structs.JobDefaultMaxPriority, + JobTrackedVersions: structs.JobDefaultTrackedVersions, } // Enable all known schedulers by default diff --git a/nomad/fsm.go b/nomad/fsm.go index 818c97dd3..eadac82ea 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -148,16 +148,20 @@ type FSMConfig struct { // EventBufferSize is the amount of messages to hold in memory EventBufferSize int64 + + // JobTrackedVersions is the number of historic job versions that are kept. + JobTrackedVersions int } // NewFSM is used to construct a new FSM with a blank state. func NewFSM(config *FSMConfig) (*nomadFSM, error) { // Create a state store sconfig := &state.StateStoreConfig{ - Logger: config.Logger, - Region: config.Region, - EnablePublisher: config.EnableEventBroker, - EventBufferSize: config.EventBufferSize, + Logger: config.Logger, + Region: config.Region, + EnablePublisher: config.EnableEventBroker, + EventBufferSize: config.EventBufferSize, + JobTrackedVersions: config.JobTrackedVersions, } state, err := state.NewStateStore(sconfig) if err != nil { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index e937ec702..a2667d4ed 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -59,13 +59,14 @@ func testFSM(t *testing.T) *nomadFSM { dispatcher, _ := testPeriodicDispatcher(t) logger := testlog.HCLogger(t) fsmConfig := &FSMConfig{ - EvalBroker: broker, - Periodic: dispatcher, - Blocked: NewBlockedEvals(broker, logger), - Logger: logger, - Region: "global", - EnableEventBroker: true, - EventBufferSize: 100, + EvalBroker: broker, + Periodic: dispatcher, + Blocked: NewBlockedEvals(broker, logger), + Logger: logger, + Region: "global", + EnableEventBroker: true, + EventBufferSize: 100, + JobTrackedVersions: structs.JobDefaultTrackedVersions, } fsm, err := NewFSM(fsmConfig) if err != nil { diff --git a/nomad/server.go b/nomad/server.go index 67ded0aa3..a7c9f8163 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1295,13 +1295,14 @@ func (s *Server) setupRaft() error { // Create the FSM fsmConfig := &FSMConfig{ - EvalBroker: s.evalBroker, - Periodic: s.periodicDispatcher, - Blocked: s.blockedEvals, - Logger: s.logger, - Region: s.Region(), - EnableEventBroker: s.config.EnableEventBroker, - EventBufferSize: s.config.EventBufferSize, + EvalBroker: s.evalBroker, + Periodic: s.periodicDispatcher, + Blocked: s.blockedEvals, + Logger: s.logger, + Region: s.Region(), + EnableEventBroker: s.config.EnableEventBroker, + EventBufferSize: s.config.EventBufferSize, + JobTrackedVersions: s.config.JobTrackedVersions, } var err error s.fsm, err = NewFSM(fsmConfig) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 445849fd0..43c1dd1a5 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -99,6 +99,9 @@ type StateStoreConfig struct { // EventBufferSize configures the amount of events to hold in memory EventBufferSize int64 + + // JobTrackedVersions is the number of historic job versions that are kept. + JobTrackedVersions int } // The StateStore is responsible for maintaining all the Nomad @@ -1956,7 +1959,7 @@ func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, tx func (s *StateStore) deleteJobSubmission(job *structs.Job, txn *txn) error { // find submissions associated with job - remove := *set.NewHashSet[*structs.JobSubmission, string](structs.JobTrackedVersions) + remove := *set.NewHashSet[*structs.JobSubmission, string](s.config.JobTrackedVersions) iter, err := txn.Get("job_submission", "id_prefix", job.Namespace, job.ID) if err != nil { @@ -2045,7 +2048,7 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) } // If we are below the limit there is no GCing to be done - if len(all) <= structs.JobTrackedVersions { + if len(all) <= s.config.JobTrackedVersions { return nil } @@ -2061,7 +2064,7 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) // If the stable job is the oldest version, do a swap to bring it into the // keep set. - max := structs.JobTrackedVersions + max := s.config.JobTrackedVersions if stableIdx == max { all[max-1], all[max] = all[max], all[max-1] } @@ -5498,7 +5501,7 @@ func (s *StateStore) pruneJobSubmissions(namespace, jobID string, txn *txn) erro // although the number of tracked submissions is the same as the number of // tracked job versions, do not assume a 1:1 correlation, as there could be // holes in the submissions (or none at all) - limit := structs.JobTrackedVersions + limit := s.config.JobTrackedVersions // iterate through all stored submissions iter, err := txn.Get("job_submission", "id_prefix", namespace, jobID) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index a6bd60dc8..81aa4aaf8 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2523,7 +2523,7 @@ func TestStateStore_UpsertJob_submission(t *testing.T) { must.Eq(t, index, sub.JobModifyIndex) // insert 6 more, going over the limit - for i := 1; i <= structs.JobTrackedVersions; i++ { + for i := 1; i <= structs.JobDefaultTrackedVersions; i++ { index++ job2 := job.Copy() job2.Meta["version"] = strconv.Itoa(i) @@ -2624,8 +2624,8 @@ func TestStateStore_UpdateUpsertJob_JobVersion(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - if len(allVersions) != structs.JobTrackedVersions { - t.Fatalf("got %d; want %d", len(allVersions), structs.JobTrackedVersions) + if len(allVersions) != structs.JobDefaultTrackedVersions { + t.Fatalf("got %d; want %d", len(allVersions), structs.JobDefaultTrackedVersions) } if a := allVersions[0]; a.ID != job.ID || a.Version != 299 || a.Name != "299" { @@ -2636,7 +2636,7 @@ func TestStateStore_UpdateUpsertJob_JobVersion(t *testing.T) { } // Ensure we didn't delete the stable job - if a := allVersions[structs.JobTrackedVersions-1]; a.ID != job.ID || + if a := allVersions[structs.JobDefaultTrackedVersions-1]; a.ID != job.ID || a.Version != 0 || a.Name != "0" || !a.Stable { t.Fatalf("bad: %+v", a) } diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 1547a9050..acc225566 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -16,8 +16,9 @@ import ( func TestStateStore(t testing.TB) *StateStore { config := &StateStoreConfig{ - Logger: testlog.HCLogger(t), - Region: "global", + Logger: testlog.HCLogger(t), + Region: "global", + JobTrackedVersions: structs.JobDefaultTrackedVersions, } state, err := NewStateStore(config) if err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index f912ee82b..691f0db49 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4304,9 +4304,9 @@ const ( // for the system to remain healthy. CoreJobPriority = math.MaxInt16 - // JobTrackedVersions is the number of historic job versions that are + // JobDefaultTrackedVersions is the number of historic job versions that are // kept. - JobTrackedVersions = 6 + JobDefaultTrackedVersions = 6 // JobTrackedScalingEvents is the number of scaling events that are // kept for a single task group. diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index 3582bcb73..8b4b63eaf 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -264,6 +264,9 @@ server { size of a job. If the limit is exceeded, the original source is simply discarded and no error is returned from the job API. +- `job_tracked_versions` `(int: 6)` - Specifies the number of historic job versions that + are kept. + ### Deprecated Parameters - `retry_join` `(array: [])` - Specifies a list of server addresses to