Add scheduler version enforcement
This commit is contained in:
parent
8e3d9d4f4f
commit
a1d08c2aba
|
@ -243,6 +243,7 @@ func DefaultConfig() *Config {
|
|||
ConsulConfig: config.DefaultConsulConfig(),
|
||||
VaultConfig: config.DefaultVaultConfig(),
|
||||
RPCHoldTimeout: 5 * time.Second,
|
||||
TLSConfig: &config.TLSConfig{},
|
||||
}
|
||||
|
||||
// Enable all known schedulers by default
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/nomad/watch"
|
||||
"github.com/hashicorp/nomad/scheduler"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -77,6 +78,12 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
|
|||
return fmt.Errorf("dequeue requires at least one scheduler type")
|
||||
}
|
||||
|
||||
// Check that there isn't a scheduler version mismatch
|
||||
if args.SchedulerVersion != scheduler.SchedulerVersion {
|
||||
return fmt.Errorf("dequeue disallowed: calling scheduler version is %d; leader version is %d",
|
||||
args.SchedulerVersion, scheduler.SchedulerVersion)
|
||||
}
|
||||
|
||||
// Ensure there is a default timeout
|
||||
if args.Timeout <= 0 {
|
||||
args.Timeout = DefaultDequeueTimeout
|
||||
|
|
|
@ -2,12 +2,14 @@ package nomad
|
|||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/scheduler"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
)
|
||||
|
||||
|
@ -142,8 +144,9 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
|
|||
|
||||
// Dequeue the eval
|
||||
get := &structs.EvalDequeueRequest{
|
||||
Schedulers: defaultSched,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
Schedulers: defaultSched,
|
||||
SchedulerVersion: scheduler.SchedulerVersion,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
var resp structs.EvalDequeueResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
|
||||
|
@ -164,6 +167,31 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request
|
||||
eval1 := mock.Eval()
|
||||
s1.evalBroker.Enqueue(eval1)
|
||||
|
||||
// Dequeue the eval
|
||||
get := &structs.EvalDequeueRequest{
|
||||
Schedulers: defaultSched,
|
||||
SchedulerVersion: 0,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
var resp structs.EvalDequeueResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp)
|
||||
if err == nil || !strings.Contains(err.Error(), "scheduler version is 0") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Ack(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
|
|
|
@ -305,8 +305,9 @@ type EvalAckRequest struct {
|
|||
|
||||
// EvalDequeueRequest is used when we want to dequeue an evaluation
|
||||
type EvalDequeueRequest struct {
|
||||
Schedulers []string
|
||||
Timeout time.Duration
|
||||
Schedulers []string
|
||||
Timeout time.Duration
|
||||
SchedulerVersion uint16
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
|
|
|
@ -134,8 +134,9 @@ func (w *Worker) run() {
|
|||
func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) {
|
||||
// Setup the request
|
||||
req := structs.EvalDequeueRequest{
|
||||
Schedulers: w.srv.config.EnabledSchedulers,
|
||||
Timeout: timeout,
|
||||
Schedulers: w.srv.config.EnabledSchedulers,
|
||||
Timeout: timeout,
|
||||
SchedulerVersion: scheduler.SchedulerVersion,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: w.srv.config.Region,
|
||||
},
|
||||
|
|
|
@ -8,6 +8,14 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
// SchedulerVersion is the version of the scheduler. Changes to the
|
||||
// scheduler that are incompatible with prior schedulers will increment this
|
||||
// version. It is used to disallow dequeueing when the versions do not match
|
||||
// across the leader and the dequeueing scheduler.
|
||||
SchedulerVersion uint16 = 1
|
||||
)
|
||||
|
||||
// BuiltinSchedulers contains the built in registered schedulers
|
||||
// which are available
|
||||
var BuiltinSchedulers = map[string]Factory{
|
||||
|
|
Loading…
Reference in New Issue