make version checks specific to region (1.4.x) (#14912)

* One-time tokens are not replicated between regions, so we don't want to enforce
  that the version check across all of serf, just members in the same region.
* Scheduler: Disconnected clients handling is specific to a single region, so we
  don't want to enforce that the version check across all of serf, just members in
  the same region.
* Variables: enforce version check in Apply RPC
* Cleans up a bunch of legacy checks.

This changeset is specific to 1.4.x and the changes for previous versions of
Nomad will be manually backported in a separate PR.
This commit is contained in:
Tim Gross 2022-10-17 16:23:51 -04:00 committed by GitHub
parent 306b4dd38e
commit 3c78980b78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 50 additions and 58 deletions

9
.changelog/14912.txt Normal file
View File

@ -0,0 +1,9 @@
```release-note:bug
variables: Fixed a bug where Nomad version checking was not enforced for writing to variables
```
```release-note:bug
acl: Fixed a bug where Nomad version checking for one-time tokens was enforced across regions
```
```release-note:bug
scheduler: Fixed a bug where version checking for disconnected clients handling was enforced across regions
```

View File

@ -944,8 +944,8 @@ func (a *ACL) UpsertOneTimeToken(args *structs.OneTimeTokenUpsertRequest, reply
defer metrics.MeasureSince(
[]string{"nomad", "acl", "upsert_one_time_token"}, time.Now())
if !ServersMeetMinimumVersion(a.srv.Members(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minAutopilotVersion)
if !ServersMeetMinimumVersion(a.srv.Members(), a.srv.Region(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minOneTimeAuthenticationTokenVersion)
}
// Snapshot the state
@ -996,8 +996,8 @@ func (a *ACL) ExchangeOneTimeToken(args *structs.OneTimeTokenExchangeRequest, re
defer metrics.MeasureSince(
[]string{"nomad", "acl", "exchange_one_time_token"}, time.Now())
if !ServersMeetMinimumVersion(a.srv.Members(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minAutopilotVersion)
if !ServersMeetMinimumVersion(a.srv.Members(), a.srv.Region(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minOneTimeAuthenticationTokenVersion)
}
// Snapshot the state
@ -1053,8 +1053,8 @@ func (a *ACL) ExpireOneTimeTokens(args *structs.OneTimeTokenExpireRequest, reply
defer metrics.MeasureSince(
[]string{"nomad", "acl", "expire_one_time_tokens"}, time.Now())
if !ServersMeetMinimumVersion(a.srv.Members(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minAutopilotVersion)
if !ServersMeetMinimumVersion(a.srv.Members(), a.srv.Region(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minOneTimeAuthenticationTokenVersion)
}
// Check management level permissions

View File

@ -479,7 +479,7 @@ OUTER:
func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) error {
// For old clusters, send single deregistration messages COMPAT(0.11)
minVersionBatchNodeDeregister := version.Must(version.NewVersion("0.9.4"))
if !ServersMeetMinimumVersion(c.srv.Members(), minVersionBatchNodeDeregister, true) {
if !ServersMeetMinimumVersion(c.srv.Members(), c.srv.Region(), minVersionBatchNodeDeregister, true) {
for _, id := range nodeIDs {
req := structs.NodeDeregisterRequest{
NodeID: id,

View File

@ -359,7 +359,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check to always set args.Eval
// 0.12.1 introduced atomic eval job registration
if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), j.srv.Region(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
submittedEval = true
}
@ -848,7 +848,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
}
// COMPAT(1.1.0): remove conditional and always set args.Eval
if ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
if ServersMeetMinimumVersion(j.srv.Members(), j.srv.Region(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
}

View File

@ -814,7 +814,7 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIVolumeClaimGC, index))
}
case <-oneTimeTokenGC.C:
if !ServersMeetMinimumVersion(s.Members(), minOneTimeAuthenticationTokenVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minOneTimeAuthenticationTokenVersion, false) {
continue
}
@ -1922,7 +1922,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), AllRegions, minAutopilotVersion, false) {
s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion)
return nil
}
@ -1949,7 +1949,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {
if config != nil {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minSchedulerConfigVersion, false) {
s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion)
return nil
}
@ -1991,14 +1991,7 @@ func (s *Server) initializeKeyring(stopCh <-chan struct{}) {
default:
}
members := s.serf.Members()
regionMembers := []serf.Member{}
for _, member := range members {
if member.Tags["region"] == s.Region() {
regionMembers = append(regionMembers, member)
}
}
if ServersMeetMinimumVersion(regionMembers, minVersionKeyring, true) {
if ServersMeetMinimumVersion(s.serf.Members(), s.Region(), minVersionKeyring, true) {
break
}
}
@ -2034,7 +2027,7 @@ func (s *Server) initializeKeyring(stopCh <-chan struct{}) {
}
func (s *Server) generateClusterID() (string, error) {
if !ServersMeetMinimumVersion(s.Members(), minClusterIDVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), AllRegions, minClusterIDVersion, false) {
s.logger.Named("core").Warn("cannot initialize cluster ID until all servers are above minimum version", "min_version", minClusterIDVersion)
return "", fmt.Errorf("cluster ID cannot be created until all servers are above minimum version %s", minClusterIDVersion)
}

View File

@ -247,7 +247,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
}
// All servers should be at or above 0.8.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) {
if !ServersMeetMinimumVersion(op.srv.Members(), op.srv.Region(), minAutopilotVersion, false) {
return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion)
}
@ -315,7 +315,7 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
}
// All servers should be at or above 0.9.0 to apply this operation
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) {
if !ServersMeetMinimumVersion(op.srv.Members(), op.srv.Region(), minSchedulerConfigVersion, false) {
return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion)
}

View File

@ -80,7 +80,7 @@ func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
eval.ModifyIndex = index
// COMPAT(1.1): Remove in 1.1.0 - 0.12.1 introduced atomic eval job registration
if !ServersMeetMinimumVersion(s.Members(), minJobRegisterAtomicEvalVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minJobRegisterAtomicEvalVersion, false) {
// Create a new evaluation
eval.JobModifyIndex = index
update := &structs.EvalUpdateRequest{

View File

@ -254,7 +254,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) {
if ServersMeetMinimumVersion(p.Members(), p.Region(), MinVersionPlanNormalization, true) {
// Initialize the allocs request using the new optimized log entry format.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node

View File

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/serf/serf"
"golang.org/x/exp/slices"
)
const (
@ -143,15 +144,20 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
return true, parts
}
const AllRegions = ""
// ServersMeetMinimumVersion returns whether the Nomad servers are at least on the
// given Nomad version. The checkFailedServers parameter specifies whether version
// for the failed servers should be verified.
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version, checkFailedServers bool) bool {
func ServersMeetMinimumVersion(members []serf.Member, region string, minVersion *version.Version, checkFailedServers bool) bool {
for _, member := range members {
if valid, parts := isNomadServer(member); valid && (parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) {
valid, parts := isNomadServer(member)
if valid &&
(parts.Region == region || region == AllRegions) &&
(parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) {
// Check if the versions match - version.LessThan will return true for
// 0.8.0-rc1 < 0.8.0, so we want to ignore the metadata
versionsMatch := slicesMatch(minVersion.Segments(), parts.Build.Segments())
versionsMatch := slices.Equal(minVersion.Segments(), parts.Build.Segments())
if parts.Build.LessThan(minVersion) && !versionsMatch {
return false
}
@ -161,28 +167,6 @@ func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Versio
return true
}
func slicesMatch(a, b []int) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
// shuffleStrings randomly shuffles the list of strings
func shuffleStrings(list []string) {
for i := range list {

View File

@ -148,7 +148,7 @@ func TestServersMeetMinimumVersionExcludingFailed(t *testing.T) {
}
for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, false)
result := ServersMeetMinimumVersion(tc.members, AllRegions, tc.ver, false)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
@ -186,7 +186,7 @@ func TestServersMeetMinimumVersionIncludingFailed(t *testing.T) {
}
for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, true)
result := ServersMeetMinimumVersion(tc.members, AllRegions, tc.ver, true)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
@ -224,7 +224,7 @@ func TestServersMeetMinimumVersionSuffix(t *testing.T) {
}
for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, true)
result := ServersMeetMinimumVersion(tc.members, AllRegions, tc.ver, true)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}

View File

@ -46,6 +46,11 @@ func (sv *Variables) Apply(args *structs.VariablesApplyRequest, reply *structs.V
args.Var.Namespace = targetNS
}
if !ServersMeetMinimumVersion(
sv.srv.serf.Members(), sv.srv.Region(), minVersionKeyring, true) {
return fmt.Errorf("all servers must be running version %v or later to apply variables", minVersionKeyring)
}
canRead, err := svePreApply(sv, args, args.Var)
if err != nil {
return err

View File

@ -585,7 +585,7 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua
// other packages to perform server version checks without direct references to
// the Nomad server.
func (w *Worker) ServersMeetMinimumVersion(minVersion *version.Version, checkFailedServers bool) bool {
return ServersMeetMinimumVersion(w.srv.Members(), minVersion, checkFailedServers)
return ServersMeetMinimumVersion(w.srv.Members(), w.srv.Region(), minVersion, checkFailedServers)
}
// SubmitPlan is used to submit a plan for consideration. This allows
@ -606,7 +606,7 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.
plan.SnapshotIndex = w.snapshotIndex
// Normalize stopped and preempted allocs before RPC
normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true)
normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), w.srv.Region(), MinVersionPlanNormalization, true)
if normalizePlan {
plan.NormalizeAllocations()
}

View File

@ -134,8 +134,9 @@ type Planner interface {
// that on leader changes, the evaluation will be reblocked properly.
ReblockEval(*structs.Evaluation) error
// ServersMeetMinimumVersion returns whether the Nomad servers are at least on the
// given Nomad version. The checkFailedServers parameter specifies whether version
// for the failed servers should be verified.
// ServersMeetMinimumVersion returns whether the Nomad servers in the
// worker's region are at least on the given Nomad version. The
// checkFailedServers parameter specifies whether version for the failed
// servers should be verified.
ServersMeetMinimumVersion(minVersion *version.Version, checkFailedServers bool) bool
}