e8a361310f
Upserts and deletes of node pools are forwarded to the authoritative region, just like we do for namespaces, quotas, ACL policies, etc. Replicate node pools from the authoritative region.
2056 lines
54 KiB
Go
2056 lines
54 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package nomad
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/go-version"
|
|
"github.com/shoenig/test"
|
|
"github.com/shoenig/test/must"
|
|
"github.com/shoenig/test/wait"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/hashicorp/nomad/ci"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/hashicorp/raft"
|
|
"github.com/hashicorp/serf/serf"
|
|
)
|
|
|
|
func TestLeader_LeftServer(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
for _, s := range servers {
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers, _ := s.numPeers()
|
|
return peers == 3, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have 3 peers")
|
|
})
|
|
}
|
|
|
|
// Kill any server
|
|
var peer *Server
|
|
for _, s := range servers {
|
|
if !s.IsLeader() {
|
|
peer = s
|
|
break
|
|
}
|
|
}
|
|
if peer == nil {
|
|
t.Fatalf("Should have a non-leader")
|
|
}
|
|
peer.Shutdown()
|
|
name := fmt.Sprintf("%s.%s", peer.config.NodeName, peer.config.Region)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
for _, s := range servers {
|
|
if s == peer {
|
|
continue
|
|
}
|
|
|
|
// Force remove the non-leader (transition to left state)
|
|
if err := s.RemoveFailedNode(name); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
peers, _ := s.numPeers()
|
|
return peers == 2, errors.New(fmt.Sprintf("%v", peers))
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %s", err)
|
|
})
|
|
}
|
|
|
|
func TestLeader_LeftLeader(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
for _, s := range servers {
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers, _ := s.numPeers()
|
|
return peers == 3, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have 3 peers")
|
|
})
|
|
}
|
|
|
|
// Kill the leader!
|
|
leader := waitForStableLeadership(t, servers)
|
|
|
|
leader.Leave()
|
|
leader.Shutdown()
|
|
|
|
for _, s := range servers {
|
|
if s == leader {
|
|
continue
|
|
}
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers, _ := s.numPeers()
|
|
return peers == 2, errors.New(fmt.Sprintf("%v", peers))
|
|
}, func(err error) {
|
|
t.Fatalf("should have 2 peers: %v", err)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestLeader_MultiBootstrap(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, nil)
|
|
defer cleanupS2()
|
|
servers := []*Server{s1, s2}
|
|
TestJoin(t, s1, s2)
|
|
|
|
for _, s := range servers {
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers := s.Members()
|
|
return len(peers) == 2, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have 2 peers")
|
|
})
|
|
}
|
|
|
|
// Ensure we don't have multiple raft peers
|
|
for _, s := range servers {
|
|
peers, err := s.numPeers()
|
|
if err != nil {
|
|
t.Fatalf("failed: %v", err)
|
|
}
|
|
if peers != 1 {
|
|
t.Fatalf("should only have 1 raft peer! %v", peers)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestLeader_PlanQueue_Reset(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
leader := waitForStableLeadership(t, servers)
|
|
|
|
if !leader.planQueue.Enabled() {
|
|
t.Fatalf("should enable plan queue")
|
|
}
|
|
|
|
for _, s := range servers {
|
|
if !s.IsLeader() && s.planQueue.Enabled() {
|
|
t.Fatalf("plan queue should not be enabled")
|
|
}
|
|
}
|
|
|
|
// Kill the leader
|
|
leader.Shutdown()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Wait for a new leader
|
|
leader = nil
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have leader")
|
|
})
|
|
|
|
// Check that the new leader has a pending GC expiration
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return leader.planQueue.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should enable plan queue")
|
|
})
|
|
}
|
|
|
|
func TestLeader_EvalBroker_Reset(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
leader := waitForStableLeadership(t, servers)
|
|
|
|
// Inject a pending eval
|
|
req := structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{mock.Eval()},
|
|
}
|
|
_, _, err := leader.raftApply(structs.EvalUpdateRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Kill the leader
|
|
leader.Shutdown()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Wait for a new leader
|
|
leader = nil
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have leader")
|
|
})
|
|
|
|
// Check that the new leader has a pending evaluation
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
stats := leader.evalBroker.Stats()
|
|
return stats.TotalReady == 1, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have pending evaluation")
|
|
})
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
leader := waitForStableLeadership(t, servers)
|
|
|
|
// Inject a periodic job, a parameterized periodic job and a non-periodic job
|
|
periodic := mock.PeriodicJob()
|
|
nonPeriodic := mock.Job()
|
|
parameterizedPeriodic := mock.PeriodicJob()
|
|
parameterizedPeriodic.ParameterizedJob = &structs.ParameterizedJobConfig{}
|
|
for _, job := range []*structs.Job{nonPeriodic, periodic, parameterizedPeriodic} {
|
|
req := structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
_, _, err := leader.raftApply(structs.JobRegisterRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Kill the leader
|
|
leader.Shutdown()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Wait for a new leader
|
|
leader = nil
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have leader")
|
|
})
|
|
|
|
tuplePeriodic := structs.NamespacedID{
|
|
ID: periodic.ID,
|
|
Namespace: periodic.Namespace,
|
|
}
|
|
tupleNonPeriodic := structs.NamespacedID{
|
|
ID: nonPeriodic.ID,
|
|
Namespace: nonPeriodic.Namespace,
|
|
}
|
|
tupleParameterized := structs.NamespacedID{
|
|
ID: parameterizedPeriodic.ID,
|
|
Namespace: parameterizedPeriodic.Namespace,
|
|
}
|
|
|
|
// Check that the new leader is tracking the periodic job only
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
leader.periodicDispatcher.l.Lock()
|
|
defer leader.periodicDispatcher.l.Unlock()
|
|
if _, tracked := leader.periodicDispatcher.tracked[tuplePeriodic]; !tracked {
|
|
return false, fmt.Errorf("periodic job not tracked")
|
|
}
|
|
if _, tracked := leader.periodicDispatcher.tracked[tupleNonPeriodic]; tracked {
|
|
return false, fmt.Errorf("non periodic job tracked")
|
|
}
|
|
if _, tracked := leader.periodicDispatcher.tracked[tupleParameterized]; tracked {
|
|
return false, fmt.Errorf("parameterized periodic job tracked")
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf(err.Error())
|
|
})
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Inject a periodic job that will be triggered soon.
|
|
launch := time.Now().Add(1 * time.Second)
|
|
job := testPeriodicJob(launch)
|
|
req := structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
|
s1.periodicDispatcher.SetEnabled(false)
|
|
|
|
// Get the current time to ensure the launch time is after this once we
|
|
// restore.
|
|
now := time.Now()
|
|
|
|
// Sleep till after the job should have been launched.
|
|
time.Sleep(5 * time.Second)
|
|
|
|
// Restore the periodic dispatcher.
|
|
s1.periodicDispatcher.SetEnabled(true)
|
|
s1.restorePeriodicDispatcher()
|
|
|
|
// Ensure the job is tracked.
|
|
tuple := structs.NamespacedID{
|
|
ID: job.ID,
|
|
Namespace: job.Namespace,
|
|
}
|
|
if _, tracked := s1.periodicDispatcher.tracked[tuple]; !tracked {
|
|
t.Fatalf("periodic job not restored")
|
|
}
|
|
|
|
// Check that an eval was made.
|
|
ws := memdb.NewWatchSet()
|
|
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
|
|
if err != nil || last == nil {
|
|
t.Fatalf("failed to get periodic launch time: %v", err)
|
|
}
|
|
|
|
if last.Launch.Before(now) {
|
|
t.Fatalf("restorePeriodicDispatcher did not force launch: last %v; want after %v", last.Launch, now)
|
|
}
|
|
}
|
|
|
|
type mockJobEvalDispatcher struct {
|
|
forceEvalCalled, children bool
|
|
evalToReturn *structs.Evaluation
|
|
JobEvalDispatcher
|
|
}
|
|
|
|
func (mjed *mockJobEvalDispatcher) DispatchJob(_ *structs.Job) (*structs.Evaluation, error) {
|
|
mjed.forceEvalCalled = true
|
|
return mjed.evalToReturn, nil
|
|
}
|
|
|
|
func (mjed *mockJobEvalDispatcher) RunningChildren(_ *structs.Job) (bool, error) {
|
|
return mjed.children, nil
|
|
}
|
|
|
|
func testPeriodicJob_OverlapEnabled(times ...time.Time) *structs.Job {
|
|
job := testPeriodicJob(times...)
|
|
job.Periodic.ProhibitOverlap = true
|
|
return job
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Inject a periodic job that triggered once in the past, should trigger now
|
|
// and once in the future.
|
|
now := time.Now()
|
|
past := now.Add(-1 * time.Second)
|
|
future := now.Add(10 * time.Second)
|
|
job := testPeriodicJob(past, now, future)
|
|
req := structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create an eval for the past launch.
|
|
eval, err := s1.periodicDispatcher.createEval(job, past)
|
|
must.NoError(t, err)
|
|
|
|
md := &mockJobEvalDispatcher{
|
|
children: false,
|
|
evalToReturn: eval,
|
|
JobEvalDispatcher: s1,
|
|
}
|
|
|
|
s1.periodicDispatcher.dispatcher = md
|
|
|
|
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
|
s1.periodicDispatcher.SetEnabled(false)
|
|
|
|
// Sleep till after the job should have been launched.
|
|
time.Sleep(3 * time.Second)
|
|
|
|
// Restore the periodic dispatcher.
|
|
s1.periodicDispatcher.SetEnabled(true)
|
|
|
|
s1.restorePeriodicDispatcher()
|
|
|
|
// Ensure the job is tracked.
|
|
tuple := structs.NamespacedID{
|
|
ID: job.ID,
|
|
Namespace: job.Namespace,
|
|
}
|
|
if _, tracked := s1.periodicDispatcher.tracked[tuple]; !tracked {
|
|
t.Fatalf("periodic job not restored")
|
|
}
|
|
|
|
// Check that an eval was made.
|
|
ws := memdb.NewWatchSet()
|
|
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
|
|
if err != nil || last == nil {
|
|
t.Fatalf("failed to get periodic launch time: %v", err)
|
|
}
|
|
if last.Launch == past {
|
|
t.Fatalf("restorePeriodicDispatcher did not force launch")
|
|
}
|
|
|
|
must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Inject a periodic job that triggered once in the past, should trigger now
|
|
// and once in the future.
|
|
now := time.Now()
|
|
past := now.Add(-1 * time.Second)
|
|
future := now.Add(10 * time.Second)
|
|
|
|
job := testPeriodicJob_OverlapEnabled(past, now, future)
|
|
req := structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
|
|
must.NoError(t, err)
|
|
|
|
// Create an eval for the past launch.
|
|
eval, err := s1.periodicDispatcher.createEval(job, past)
|
|
must.NoError(t, err)
|
|
|
|
md := &mockJobEvalDispatcher{
|
|
children: false,
|
|
evalToReturn: eval,
|
|
}
|
|
|
|
s1.periodicDispatcher.dispatcher = md
|
|
|
|
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
|
s1.periodicDispatcher.SetEnabled(false)
|
|
|
|
// Sleep till after the job should have been launched.
|
|
time.Sleep(3 * time.Second)
|
|
|
|
// Restore the periodic dispatcher.
|
|
s1.periodicDispatcher.SetEnabled(true)
|
|
must.NoError(t, s1.restorePeriodicDispatcher())
|
|
|
|
// Ensure the job is tracked.
|
|
tuple := structs.NamespacedID{
|
|
ID: job.ID,
|
|
Namespace: job.Namespace,
|
|
}
|
|
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))
|
|
|
|
// Check that an eval was made.
|
|
ws := memdb.NewWatchSet()
|
|
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
|
|
must.NoError(t, err)
|
|
must.NotNil(t, last)
|
|
|
|
must.NotEq(t, last.Launch, past, must.Sprint("restorePeriodicDispatcher did not force launch"))
|
|
|
|
must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Inject a periodic job that triggered once in the past, should trigger now
|
|
// and once in the future.
|
|
now := time.Now()
|
|
past := now.Add(-1 * time.Second)
|
|
future := now.Add(10 * time.Second)
|
|
|
|
job := testPeriodicJob_OverlapEnabled(past, now, future)
|
|
req := structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
|
|
must.NoError(t, err)
|
|
|
|
// Create an eval for the past launch.
|
|
eval, err := s1.periodicDispatcher.createEval(job, past)
|
|
must.NoError(t, err)
|
|
|
|
md := &mockJobEvalDispatcher{
|
|
children: true,
|
|
evalToReturn: eval,
|
|
}
|
|
|
|
s1.periodicDispatcher.dispatcher = md
|
|
|
|
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
|
s1.periodicDispatcher.SetEnabled(false)
|
|
|
|
// Sleep till after the job should have been launched.
|
|
time.Sleep(3 * time.Second)
|
|
|
|
// Restore the periodic dispatcher.
|
|
s1.periodicDispatcher.SetEnabled(true)
|
|
must.NoError(t, s1.restorePeriodicDispatcher())
|
|
|
|
// Ensure the job is tracked.
|
|
tuple := structs.NamespacedID{
|
|
ID: job.ID,
|
|
Namespace: job.Namespace,
|
|
}
|
|
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))
|
|
|
|
must.False(t, md.forceEvalCalled, must.Sprint("evaluation forced with job already running"))
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatch(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.EvalGCInterval = 5 * time.Millisecond
|
|
})
|
|
defer cleanupS1()
|
|
|
|
// Wait for a periodic dispatch
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
stats := s1.evalBroker.Stats()
|
|
bySched, ok := stats.ByScheduler[structs.JobTypeCore]
|
|
if !ok {
|
|
return false, nil
|
|
}
|
|
return bySched.Ready > 0, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should pending job")
|
|
})
|
|
}
|
|
|
|
func TestLeader_ReapFailedEval(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.EvalDeliveryLimit = 1
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Wait for a periodic dispatch
|
|
eval := mock.Eval()
|
|
s1.evalBroker.Enqueue(eval)
|
|
|
|
// Dequeue and Nack
|
|
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
s1.evalBroker.Nack(out.ID, token)
|
|
|
|
// Wait for an updated and followup evaluation
|
|
state := s1.fsm.State()
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if out == nil {
|
|
return false, fmt.Errorf("expect original evaluation to exist")
|
|
}
|
|
if out.Status != structs.EvalStatusFailed {
|
|
return false, fmt.Errorf("got status %v; want %v", out.Status, structs.EvalStatusFailed)
|
|
}
|
|
if out.NextEval == "" {
|
|
return false, fmt.Errorf("got empty NextEval")
|
|
}
|
|
// See if there is a followup
|
|
evals, err := state.EvalsByJob(ws, eval.Namespace, eval.JobID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if l := len(evals); l != 2 {
|
|
return false, fmt.Errorf("got %d evals, want 2", l)
|
|
}
|
|
|
|
for _, e := range evals {
|
|
if e.ID == eval.ID {
|
|
continue
|
|
}
|
|
|
|
if e.Status != structs.EvalStatusPending {
|
|
return false, fmt.Errorf("follow up eval has status %v; want %v",
|
|
e.Status, structs.EvalStatusPending)
|
|
}
|
|
|
|
if e.ID != out.NextEval {
|
|
return false, fmt.Errorf("follow up eval id is %v; orig eval NextEval %v",
|
|
e.ID, out.NextEval)
|
|
}
|
|
|
|
if e.Wait < s1.config.EvalFailedFollowupBaselineDelay ||
|
|
e.Wait > s1.config.EvalFailedFollowupBaselineDelay+s1.config.EvalFailedFollowupDelayRange {
|
|
return false, fmt.Errorf("bad wait: %v", e.Wait)
|
|
}
|
|
|
|
if e.TriggeredBy != structs.EvalTriggerFailedFollowUp {
|
|
return false, fmt.Errorf("follow up eval TriggeredBy %v; want %v",
|
|
e.TriggeredBy, structs.EvalTriggerFailedFollowUp)
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestLeader_ReapDuplicateEval(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create a duplicate blocked eval
|
|
eval := mock.Eval()
|
|
eval.CreateIndex = 100
|
|
eval2 := mock.Eval()
|
|
eval2.JobID = eval.JobID
|
|
eval2.CreateIndex = 102
|
|
s1.blockedEvals.Block(eval)
|
|
s1.blockedEvals.Block(eval2)
|
|
|
|
// Wait for the evaluation to marked as cancelled
|
|
state := s1.fsm.State()
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return out != nil && out.Status == structs.EvalStatusCancelled, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestLeader_revokeVaultAccessorsOnRestore(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Insert a vault accessor that should be revoked
|
|
fsmState := s1.fsm.State()
|
|
va := mock.VaultAccessor()
|
|
if err := fsmState.UpsertVaultAccessor(100, []*structs.VaultAccessor{va}); err != nil {
|
|
t.Fatalf("bad: %v", err)
|
|
}
|
|
|
|
// Swap the Vault client
|
|
tvc := &TestVaultClient{}
|
|
s1.vault = tvc
|
|
|
|
// Do a restore
|
|
if err := s1.revokeVaultAccessorsOnRestore(); err != nil {
|
|
t.Fatalf("Failed to restore: %v", err)
|
|
}
|
|
|
|
if len(tvc.RevokedTokens) != 1 && tvc.RevokedTokens[0].Accessor != va.Accessor {
|
|
t.Fatalf("Bad revoked accessors: %v", tvc.RevokedTokens)
|
|
}
|
|
}
|
|
|
|
func TestLeader_revokeSITokenAccessorsOnRestore(t *testing.T) {
|
|
ci.Parallel(t)
|
|
r := require.New(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// replace consul ACLs API with a mock for tracking calls in tests
|
|
var consulACLsAPI mockConsulACLsAPI
|
|
s1.consulACLs = &consulACLsAPI
|
|
|
|
// Insert a SI token accessor that should be revoked
|
|
fsmState := s1.fsm.State()
|
|
accessor := mock.SITokenAccessor()
|
|
err := fsmState.UpsertSITokenAccessors(100, []*structs.SITokenAccessor{accessor})
|
|
r.NoError(err)
|
|
|
|
// Do a restore
|
|
err = s1.revokeSITokenAccessorsOnRestore()
|
|
r.NoError(err)
|
|
|
|
// Check the accessor was revoked
|
|
exp := []revokeRequest{{
|
|
accessorID: accessor.AccessorID,
|
|
committed: true,
|
|
}}
|
|
r.ElementsMatch(exp, consulACLsAPI.revokeRequests)
|
|
}
|
|
|
|
func TestLeader_ClusterID(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.Build = minClusterIDVersion.String()
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
clusterID, err := s1.ClusterID()
|
|
|
|
require.NoError(t, err)
|
|
require.True(t, helper.IsUUID(clusterID))
|
|
}
|
|
|
|
func TestLeader_ClusterID_upgradePath(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
before := version.Must(version.NewVersion("0.10.1")).String()
|
|
after := minClusterIDVersion.String()
|
|
|
|
type server struct {
|
|
s *Server
|
|
cleanup func()
|
|
}
|
|
|
|
outdated := func() server {
|
|
s, cleanup := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.Build = before
|
|
c.BootstrapExpect = 3
|
|
c.Logger.SetLevel(hclog.Trace)
|
|
})
|
|
return server{s: s, cleanup: cleanup}
|
|
}
|
|
|
|
upgraded := func() server {
|
|
s, cleanup := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.Build = after
|
|
c.BootstrapExpect = 0
|
|
c.Logger.SetLevel(hclog.Trace)
|
|
})
|
|
return server{s: s, cleanup: cleanup}
|
|
}
|
|
|
|
servers := []server{outdated(), outdated(), outdated()}
|
|
// fallback shutdown attempt in case testing fails
|
|
defer servers[0].cleanup()
|
|
defer servers[1].cleanup()
|
|
defer servers[2].cleanup()
|
|
|
|
upgrade := func(i int) {
|
|
previous := servers[i]
|
|
|
|
servers[i] = upgraded()
|
|
TestJoin(t, servers[i].s, servers[(i+1)%3].s, servers[(i+2)%3].s)
|
|
testutil.WaitForLeader(t, servers[i].s.RPC)
|
|
|
|
require.NoError(t, previous.s.Leave())
|
|
require.NoError(t, previous.s.Shutdown())
|
|
}
|
|
|
|
// Join the servers before doing anything
|
|
TestJoin(t, servers[0].s, servers[1].s, servers[2].s)
|
|
|
|
// Wait for servers to settle
|
|
for i := 0; i < len(servers); i++ {
|
|
testutil.WaitForLeader(t, servers[i].s.RPC)
|
|
}
|
|
|
|
// A check that ClusterID is not available yet
|
|
noIDYet := func() {
|
|
for _, s := range servers {
|
|
_, err := s.s.ClusterID()
|
|
must.Error(t, err)
|
|
}
|
|
}
|
|
|
|
// Replace first old server with new server
|
|
upgrade(0)
|
|
defer servers[0].cleanup()
|
|
noIDYet() // ClusterID should not work yet, servers: [new, old, old]
|
|
|
|
// Replace second old server with new server
|
|
upgrade(1)
|
|
defer servers[1].cleanup()
|
|
noIDYet() // ClusterID should not work yet, servers: [new, new, old]
|
|
|
|
// Replace third / final old server with new server
|
|
upgrade(2)
|
|
defer servers[2].cleanup()
|
|
|
|
// Wait for old servers to really be gone
|
|
for _, s := range servers {
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers, _ := s.s.numPeers()
|
|
return peers == 3, nil
|
|
}, func(_ error) {
|
|
t.Fatalf("should have 3 peers")
|
|
})
|
|
}
|
|
|
|
// Now we can tickle the leader into making a cluster ID
|
|
leaderID := ""
|
|
for _, s := range servers {
|
|
if s.s.IsLeader() {
|
|
id, err := s.s.ClusterID()
|
|
require.NoError(t, err)
|
|
leaderID = id
|
|
break
|
|
}
|
|
}
|
|
require.True(t, helper.IsUUID(leaderID))
|
|
|
|
// Now every participating server has been upgraded, each one should be
|
|
// able to get the cluster ID, having been plumbed all the way through.
|
|
agreeClusterID(t, []*Server{servers[0].s, servers[1].s, servers[2].s})
|
|
}
|
|
|
|
func TestLeader_ClusterID_noUpgrade(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
type server struct {
|
|
s *Server
|
|
cleanup func()
|
|
}
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.Logger.SetLevel(hclog.Trace)
|
|
c.NumSchedulers = 0
|
|
c.Build = minClusterIDVersion.String()
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS1()
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.Logger.SetLevel(hclog.Trace)
|
|
c.NumSchedulers = 0
|
|
c.Build = minClusterIDVersion.String()
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.Logger.SetLevel(hclog.Trace)
|
|
c.NumSchedulers = 0
|
|
c.Build = minClusterIDVersion.String()
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
|
|
servers := []*Server{s1, s2, s3}
|
|
|
|
// Join the servers before doing anything
|
|
TestJoin(t, servers[0], servers[1], servers[2])
|
|
|
|
// Wait for servers to settle
|
|
for i := 0; i < len(servers); i++ {
|
|
testutil.WaitForLeader(t, servers[i].RPC)
|
|
}
|
|
|
|
// Each server started at the minimum version, check there should be only 1
|
|
// cluster ID they all agree on.
|
|
agreeClusterID(t, []*Server{servers[0], servers[1], servers[2]})
|
|
}
|
|
|
|
func agreeClusterID(t *testing.T, servers []*Server) {
|
|
must.Len(t, 3, servers)
|
|
|
|
f := func() error {
|
|
id1, err1 := servers[0].ClusterID()
|
|
if err1 != nil {
|
|
return err1
|
|
}
|
|
id2, err2 := servers[1].ClusterID()
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
id3, err3 := servers[2].ClusterID()
|
|
if err3 != nil {
|
|
return err3
|
|
}
|
|
if id1 != id2 || id2 != id3 {
|
|
return fmt.Errorf("ids do not match, id1: %s, id2: %s, id3: %s", id1, id2, id3)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
must.Wait(t, wait.InitialSuccess(
|
|
wait.ErrorFunc(f),
|
|
wait.Timeout(60*time.Second),
|
|
wait.Gap(1*time.Second),
|
|
))
|
|
}
|
|
|
|
func TestLeader_ReplicateACLPolicies(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region1"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
})
|
|
defer cleanupS1()
|
|
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region2"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
c.ReplicationBackoff = 20 * time.Millisecond
|
|
c.ReplicationToken = root.SecretID
|
|
})
|
|
defer cleanupS2()
|
|
TestJoin(t, s1, s2)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
testutil.WaitForLeader(t, s2.RPC)
|
|
|
|
// Write a policy to the authoritative region
|
|
p1 := mock.ACLPolicy()
|
|
if err := s1.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{p1}); err != nil {
|
|
t.Fatalf("bad: %v", err)
|
|
}
|
|
|
|
// Wait for the policy to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
state := s2.State()
|
|
out, err := state.ACLPolicyByName(nil, p1.Name)
|
|
return out != nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate policy")
|
|
})
|
|
}
|
|
|
|
func TestLeader_DiffACLPolicies(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := state.TestStateStore(t)
|
|
|
|
// Populate the local state
|
|
p1 := mock.ACLPolicy()
|
|
p2 := mock.ACLPolicy()
|
|
p3 := mock.ACLPolicy()
|
|
assert.Nil(t, state.UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{p1, p2, p3}))
|
|
|
|
// Simulate a remote list
|
|
p2Stub := p2.Stub()
|
|
p2Stub.ModifyIndex = 50 // Ignored, same index
|
|
p3Stub := p3.Stub()
|
|
p3Stub.ModifyIndex = 100 // Updated, higher index
|
|
p3Stub.Hash = []byte{0, 1, 2, 3}
|
|
p4 := mock.ACLPolicy()
|
|
remoteList := []*structs.ACLPolicyListStub{
|
|
p2Stub,
|
|
p3Stub,
|
|
p4.Stub(),
|
|
}
|
|
delete, update := diffACLPolicies(state, 50, remoteList)
|
|
|
|
// P1 does not exist on the remote side, should delete
|
|
assert.Equal(t, []string{p1.Name}, delete)
|
|
|
|
// P2 is un-modified - ignore. P3 modified, P4 new.
|
|
assert.Equal(t, []string{p3.Name, p4.Name}, update)
|
|
}
|
|
|
|
func TestLeader_ReplicateACLTokens(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region1"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
})
|
|
defer cleanupS1()
|
|
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region2"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
c.ReplicationBackoff = 20 * time.Millisecond
|
|
c.ReplicationToken = root.SecretID
|
|
})
|
|
defer cleanupS2()
|
|
TestJoin(t, s1, s2)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
testutil.WaitForLeader(t, s2.RPC)
|
|
|
|
// Write a token to the authoritative region
|
|
p1 := mock.ACLToken()
|
|
p1.Global = true
|
|
if err := s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 100, []*structs.ACLToken{p1}); err != nil {
|
|
t.Fatalf("bad: %v", err)
|
|
}
|
|
|
|
// Wait for the token to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
state := s2.State()
|
|
out, err := state.ACLTokenByAccessorID(nil, p1.AccessorID)
|
|
return out != nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate token")
|
|
})
|
|
}
|
|
|
|
func TestLeader_DiffACLTokens(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := state.TestStateStore(t)
|
|
|
|
// Populate the local state
|
|
p0 := mock.ACLToken()
|
|
p1 := mock.ACLToken()
|
|
p1.Global = true
|
|
p2 := mock.ACLToken()
|
|
p2.Global = true
|
|
p3 := mock.ACLToken()
|
|
p3.Global = true
|
|
assert.Nil(t, state.UpsertACLTokens(structs.MsgTypeTestSetup, 100, []*structs.ACLToken{p0, p1, p2, p3}))
|
|
|
|
// Simulate a remote list
|
|
p2Stub := p2.Stub()
|
|
p2Stub.ModifyIndex = 50 // Ignored, same index
|
|
p3Stub := p3.Stub()
|
|
p3Stub.ModifyIndex = 100 // Updated, higher index
|
|
p3Stub.Hash = []byte{0, 1, 2, 3}
|
|
p4 := mock.ACLToken()
|
|
p4.Global = true
|
|
remoteList := []*structs.ACLTokenListStub{
|
|
p2Stub,
|
|
p3Stub,
|
|
p4.Stub(),
|
|
}
|
|
delete, update := diffACLTokens(state, 50, remoteList)
|
|
|
|
// P0 is local and should be ignored
|
|
// P1 does not exist on the remote side, should delete
|
|
assert.Equal(t, []string{p1.AccessorID}, delete)
|
|
|
|
// P2 is un-modified - ignore. P3 modified, P4 new.
|
|
assert.Equal(t, []string{p3.AccessorID, p4.AccessorID}, update)
|
|
}
|
|
|
|
func TestServer_replicationBackoffContinue(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
testCases := []struct {
|
|
name string
|
|
testFn func()
|
|
}{
|
|
{
|
|
name: "leadership lost",
|
|
testFn: func() {
|
|
|
|
// Create a test server with a long enough backoff that we will
|
|
// be able to close the channel before it fires, but not too
|
|
// long that the test having problems means CI will hang
|
|
// forever.
|
|
testServer, testServerCleanup := TestServer(t, func(c *Config) {
|
|
c.ReplicationBackoff = 5 * time.Second
|
|
})
|
|
defer testServerCleanup()
|
|
|
|
// Create our stop channel which is used by the server to
|
|
// indicate leadership loss.
|
|
stopCh := make(chan struct{})
|
|
|
|
// The resultCh is used to block and collect the output from
|
|
// the test routine.
|
|
resultCh := make(chan bool, 1)
|
|
|
|
// Run a routine to collect the result and close the channel
|
|
// straight away.
|
|
go func() {
|
|
output := testServer.replicationBackoffContinue(stopCh)
|
|
resultCh <- output
|
|
}()
|
|
|
|
close(stopCh)
|
|
|
|
actualResult := <-resultCh
|
|
require.False(t, actualResult)
|
|
},
|
|
},
|
|
{
|
|
name: "backoff continue",
|
|
testFn: func() {
|
|
|
|
// Create a test server with a short backoff.
|
|
testServer, testServerCleanup := TestServer(t, func(c *Config) {
|
|
c.ReplicationBackoff = 10 * time.Nanosecond
|
|
})
|
|
defer testServerCleanup()
|
|
|
|
// Create our stop channel which is used by the server to
|
|
// indicate leadership loss.
|
|
stopCh := make(chan struct{})
|
|
|
|
// The resultCh is used to block and collect the output from
|
|
// the test routine.
|
|
resultCh := make(chan bool, 1)
|
|
|
|
// Run a routine to collect the result without closing stopCh.
|
|
go func() {
|
|
output := testServer.replicationBackoffContinue(stopCh)
|
|
resultCh <- output
|
|
}()
|
|
|
|
actualResult := <-resultCh
|
|
require.True(t, actualResult)
|
|
},
|
|
},
|
|
}
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
tc.testFn()
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_diffACLRoles(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
stateStore := state.TestStateStore(t)
|
|
|
|
// Build an initial baseline of ACL Roles.
|
|
aclRole0 := mock.ACLRole()
|
|
aclRole1 := mock.ACLRole()
|
|
aclRole2 := mock.ACLRole()
|
|
aclRole3 := mock.ACLRole()
|
|
|
|
// Upsert these into our local state. Use copies, so we can alter the roles
|
|
// directly and use within the diff func.
|
|
err := stateStore.UpsertACLRoles(structs.MsgTypeTestSetup, 50,
|
|
[]*structs.ACLRole{aclRole0.Copy(), aclRole1.Copy(), aclRole2.Copy(), aclRole3.Copy()}, true)
|
|
require.NoError(t, err)
|
|
|
|
// Modify the ACL roles to create a number of differences. These roles
|
|
// represent the state of the authoritative region.
|
|
aclRole2.ModifyIndex = 50
|
|
aclRole3.ModifyIndex = 200
|
|
aclRole3.Hash = []byte{0, 1, 2, 3}
|
|
aclRole4 := mock.ACLRole()
|
|
|
|
// Run the diff function and test the output.
|
|
toDelete, toUpdate := diffACLRoles(stateStore, 50, []*structs.ACLRoleListStub{
|
|
aclRole2.Stub(), aclRole3.Stub(), aclRole4.Stub()})
|
|
require.ElementsMatch(t, []string{aclRole0.ID, aclRole1.ID}, toDelete)
|
|
require.ElementsMatch(t, []string{aclRole3.ID, aclRole4.ID}, toUpdate)
|
|
}
|
|
|
|
func Test_diffACLAuthMethods(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
stateStore := state.TestStateStore(t)
|
|
|
|
// Build an initial baseline of ACL auth-methods.
|
|
aclAuthMethod0 := mock.ACLOIDCAuthMethod()
|
|
aclAuthMethod1 := mock.ACLOIDCAuthMethod()
|
|
aclAuthMethod2 := mock.ACLOIDCAuthMethod()
|
|
aclAuthMethod3 := mock.ACLOIDCAuthMethod()
|
|
|
|
// Upsert these into our local state. Use copies, so we can alter the
|
|
// auth-methods directly and use within the diff func.
|
|
err := stateStore.UpsertACLAuthMethods(50,
|
|
[]*structs.ACLAuthMethod{aclAuthMethod0.Copy(), aclAuthMethod1.Copy(),
|
|
aclAuthMethod2.Copy(), aclAuthMethod3.Copy()})
|
|
must.NoError(t, err)
|
|
|
|
// Modify the ACL auth-methods to create a number of differences. These
|
|
// methods represent the state of the authoritative region.
|
|
aclAuthMethod2.ModifyIndex = 50
|
|
aclAuthMethod3.ModifyIndex = 200
|
|
aclAuthMethod3.Hash = []byte{0, 1, 2, 3}
|
|
aclAuthMethod4 := mock.ACLOIDCAuthMethod()
|
|
|
|
// Run the diff function and test the output.
|
|
toDelete, toUpdate := diffACLAuthMethods(stateStore, 50, []*structs.ACLAuthMethodStub{
|
|
aclAuthMethod2.Stub(), aclAuthMethod3.Stub(), aclAuthMethod4.Stub()})
|
|
require.ElementsMatch(t, []string{aclAuthMethod0.Name, aclAuthMethod1.Name}, toDelete)
|
|
require.ElementsMatch(t, []string{aclAuthMethod3.Name, aclAuthMethod4.Name}, toUpdate)
|
|
}
|
|
|
|
func Test_diffACLBindingRules(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
stateStore := state.TestStateStore(t)
|
|
|
|
// Build an initial baseline of ACL binding rules.
|
|
aclBindingRule0 := mock.ACLBindingRule()
|
|
aclBindingRule1 := mock.ACLBindingRule()
|
|
aclBindingRule2 := mock.ACLBindingRule()
|
|
aclBindingRule3 := mock.ACLBindingRule()
|
|
|
|
// Upsert these into our local state. Use copies, so we can alter the
|
|
// binding rules directly and use within the diff func.
|
|
err := stateStore.UpsertACLBindingRules(50,
|
|
[]*structs.ACLBindingRule{aclBindingRule0.Copy(), aclBindingRule1.Copy(),
|
|
aclBindingRule2.Copy(), aclBindingRule3.Copy()}, true)
|
|
must.NoError(t, err)
|
|
|
|
// Modify the ACL auth-methods to create a number of differences. These
|
|
// methods represent the state of the authoritative region.
|
|
aclBindingRule2.ModifyIndex = 50
|
|
aclBindingRule3.ModifyIndex = 200
|
|
aclBindingRule3.Hash = []byte{0, 1, 2, 3}
|
|
aclBindingRule4 := mock.ACLBindingRule()
|
|
|
|
// Run the diff function and test the output.
|
|
toDelete, toUpdate := diffACLBindingRules(stateStore, 50, []*structs.ACLBindingRuleListStub{
|
|
aclBindingRule2.Stub(), aclBindingRule3.Stub(), aclBindingRule4.Stub()})
|
|
must.SliceContainsAll(t, []string{aclBindingRule0.ID, aclBindingRule1.ID}, toDelete)
|
|
must.SliceContainsAll(t, []string{aclBindingRule3.ID, aclBindingRule4.ID}, toUpdate)
|
|
}
|
|
|
|
func TestLeader_Reelection(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
const raftProtocol = 3
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = raftProtocol
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = raftProtocol
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = raftProtocol
|
|
})
|
|
defer cleanupS3() // todo(shoenig) added this, should be here right??
|
|
|
|
servers := []*Server{s1, s2, s3}
|
|
|
|
// Try to join
|
|
TestJoin(t, s1, s2, s3)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, server := range future.Configuration().Servers {
|
|
if server.Suffrage == raft.Nonvoter {
|
|
return false, fmt.Errorf("non-voter %v", server)
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatal(err)
|
|
})
|
|
|
|
var leader, nonLeader *Server
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
} else {
|
|
nonLeader = s
|
|
}
|
|
}
|
|
|
|
// make sure we still have a leader, then shut it down
|
|
must.NotNil(t, leader, must.Sprint("expected there to be a leader"))
|
|
leader.Shutdown()
|
|
|
|
// Wait for new leader to elect
|
|
testutil.WaitForLeader(t, nonLeader.RPC)
|
|
}
|
|
|
|
func TestLeader_RollRaftServer(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS3()
|
|
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
t.Logf("waiting for initial stable cluster")
|
|
waitForStableLeadership(t, servers)
|
|
|
|
t.Logf("killing server s1")
|
|
s1.Shutdown()
|
|
for _, s := range []*Server{s2, s3} {
|
|
s.RemoveFailedNode(s1.config.NodeID)
|
|
}
|
|
|
|
t.Logf("waiting for server loss to be detected")
|
|
testutil.WaitForResultUntil(time.Second*10,
|
|
func() (bool, error) {
|
|
for _, s := range []*Server{s2, s3} {
|
|
err := wantPeers(s, 2)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return true, nil
|
|
|
|
},
|
|
func(err error) { must.NoError(t, err) },
|
|
)
|
|
|
|
t.Logf("adding replacement server s4")
|
|
s4, cleanupS4 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS4()
|
|
TestJoin(t, s2, s3, s4)
|
|
servers = []*Server{s4, s2, s3}
|
|
|
|
t.Logf("waiting for s4 to stabilize")
|
|
waitForStableLeadership(t, servers)
|
|
|
|
t.Logf("killing server s2")
|
|
s2.Shutdown()
|
|
for _, s := range []*Server{s3, s4} {
|
|
s.RemoveFailedNode(s2.config.NodeID)
|
|
}
|
|
|
|
t.Logf("waiting for server loss to be detected")
|
|
testutil.WaitForResultUntil(time.Second*10,
|
|
func() (bool, error) {
|
|
for _, s := range []*Server{s3, s4} {
|
|
err := wantPeers(s, 2)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return true, nil
|
|
},
|
|
func(err error) { must.NoError(t, err) },
|
|
)
|
|
|
|
t.Logf("adding replacement server s5")
|
|
s5, cleanupS5 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS5()
|
|
TestJoin(t, s3, s4, s5)
|
|
servers = []*Server{s4, s5, s3}
|
|
|
|
t.Logf("waiting for s5 to stabilize")
|
|
waitForStableLeadership(t, servers)
|
|
|
|
t.Logf("killing server s3")
|
|
s3.Shutdown()
|
|
|
|
t.Logf("waiting for server loss to be detected")
|
|
testutil.WaitForResultUntil(time.Second*10,
|
|
func() (bool, error) {
|
|
for _, s := range []*Server{s4, s5} {
|
|
err := wantPeers(s, 2)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return true, nil
|
|
},
|
|
func(err error) { must.NoError(t, err) },
|
|
)
|
|
|
|
t.Logf("adding replacement server s6")
|
|
s6, cleanupS6 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS6()
|
|
TestJoin(t, s6, s4)
|
|
servers = []*Server{s4, s5, s6}
|
|
|
|
t.Logf("waiting for s6 to stabilize")
|
|
waitForStableLeadership(t, servers)
|
|
}
|
|
|
|
func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return s1.evalBroker.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have finished establish leader loop")
|
|
})
|
|
|
|
require.Nil(t, s1.revokeLeadership())
|
|
require.Nil(t, s1.revokeLeadership())
|
|
require.Nil(t, s1.revokeLeadership())
|
|
}
|
|
|
|
func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return s1.isReadyForConsistentReads(), nil
|
|
}, func(err error) {
|
|
require.Fail(t, "should have finished establish leader loop")
|
|
})
|
|
|
|
require.Nil(t, s1.revokeLeadership())
|
|
require.False(t, s1.isReadyForConsistentReads())
|
|
|
|
ch := make(chan struct{})
|
|
require.Nil(t, s1.establishLeadership(ch))
|
|
require.True(t, s1.isReadyForConsistentReads())
|
|
}
|
|
|
|
// TestLeader_PausingWorkers asserts that scheduling workers are paused
|
|
// (and unpaused) upon leader elections (and step downs).
|
|
func TestLeader_PausingWorkers(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 12
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
require.Len(t, s1.workers, 12)
|
|
|
|
// this satisfies the require.Eventually test interface
|
|
checkPaused := func(count int) func() bool {
|
|
return func() bool {
|
|
pausedWorkers := func() int {
|
|
c := 0
|
|
for _, w := range s1.workers {
|
|
if w.IsPaused() {
|
|
c++
|
|
}
|
|
}
|
|
return c
|
|
}
|
|
|
|
return pausedWorkers() == count
|
|
}
|
|
}
|
|
|
|
// acquiring leadership should have paused 3/4 of the workers
|
|
require.Eventually(t, checkPaused(9), 1*time.Second, 10*time.Millisecond, "scheduler workers did not pause within a second at leadership change")
|
|
|
|
err := s1.revokeLeadership()
|
|
require.NoError(t, err)
|
|
|
|
// unpausing is a relatively quick activity
|
|
require.Eventually(t, checkPaused(0), 50*time.Millisecond, 10*time.Millisecond, "scheduler workers should have unpaused after losing leadership")
|
|
}
|
|
|
|
// Test doing an inplace upgrade on a server from raft protocol 2 to 3
|
|
// This verifies that removing the server and adding it back with a uuid works
|
|
// even if the server's address stays the same.
|
|
func TestServer_ReconcileMember(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
// Create a three node cluster
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 2
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 2
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
TestJoin(t, s1, s2)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// test relies on s3 not being the leader, so adding it
|
|
// after leadership has been established to reduce
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 0
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS3()
|
|
|
|
TestJoin(t, s1, s3)
|
|
|
|
// Create a memberlist object for s3, with raft protocol upgraded to 3
|
|
upgradedS3Member := serf.Member{
|
|
Name: s3.config.NodeName,
|
|
Addr: s3.config.RPCAddr.IP,
|
|
Status: serf.StatusAlive,
|
|
Tags: make(map[string]string),
|
|
}
|
|
upgradedS3Member.Tags["role"] = "nomad"
|
|
upgradedS3Member.Tags["id"] = s3.config.NodeID
|
|
upgradedS3Member.Tags["region"] = s3.config.Region
|
|
upgradedS3Member.Tags["dc"] = s3.config.Datacenter
|
|
upgradedS3Member.Tags["rpc_addr"] = "127.0.0.1"
|
|
upgradedS3Member.Tags["port"] = strconv.Itoa(s3.config.RPCAddr.Port)
|
|
upgradedS3Member.Tags["build"] = "0.8.0"
|
|
upgradedS3Member.Tags["vsn"] = "2"
|
|
upgradedS3Member.Tags["mvn"] = "1"
|
|
upgradedS3Member.Tags["raft_vsn"] = "3"
|
|
|
|
findLeader := func(t *testing.T) *Server {
|
|
t.Helper()
|
|
for _, s := range []*Server{s1, s2, s3} {
|
|
if s.IsLeader() {
|
|
t.Logf("found leader: %v %v", s.config.NodeID, s.config.RPCAddr)
|
|
return s
|
|
}
|
|
}
|
|
|
|
t.Fatalf("no leader found")
|
|
return nil
|
|
}
|
|
|
|
// Find the leader so that we can call reconcile member on it
|
|
leader := findLeader(t)
|
|
if err := leader.reconcileMember(upgradedS3Member); err != nil {
|
|
t.Fatalf("failed to reconcile member: %v", err)
|
|
}
|
|
|
|
// This should remove s3 from the config and potentially cause a leader election
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Figure out the new leader and call reconcile again, this should add s3 with the new ID format
|
|
leader = findLeader(t)
|
|
if err := leader.reconcileMember(upgradedS3Member); err != nil {
|
|
t.Fatalf("failed to reconcile member: %v", err)
|
|
}
|
|
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
future := s2.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
addrs := 0
|
|
ids := 0
|
|
for _, server := range future.Configuration().Servers {
|
|
if string(server.ID) == string(server.Address) {
|
|
addrs++
|
|
} else {
|
|
ids++
|
|
}
|
|
}
|
|
// After this, all three servers should have IDs in raft
|
|
if got, want := addrs, 0; got != want {
|
|
t.Fatalf("got %d server addresses want %d", got, want)
|
|
}
|
|
if got, want := ids, 3; got != want {
|
|
t.Fatalf("got %d server ids want %d: %#v", got, want, future.Configuration().Servers)
|
|
}
|
|
}
|
|
|
|
func TestLeader_ReplicateNamespaces(t *testing.T) {
|
|
ci.Parallel(t)
|
|
assert := assert.New(t)
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region1"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
})
|
|
defer cleanupS1()
|
|
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region2"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
c.ReplicationBackoff = 20 * time.Millisecond
|
|
c.ReplicationToken = root.SecretID
|
|
})
|
|
defer cleanupS2()
|
|
TestJoin(t, s1, s2)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
testutil.WaitForLeader(t, s2.RPC)
|
|
|
|
// Write a namespace to the authoritative region
|
|
ns1 := mock.Namespace()
|
|
assert.Nil(s1.State().UpsertNamespaces(100, []*structs.Namespace{ns1}))
|
|
|
|
// Wait for the namespace to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
state := s2.State()
|
|
out, err := state.NamespaceByName(nil, ns1.Name)
|
|
return out != nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate namespace")
|
|
})
|
|
|
|
// Delete the namespace at the authoritative region
|
|
assert.Nil(s1.State().DeleteNamespaces(200, []string{ns1.Name}))
|
|
|
|
// Wait for the namespace deletion to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
state := s2.State()
|
|
out, err := state.NamespaceByName(nil, ns1.Name)
|
|
return out == nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate namespace deletion")
|
|
})
|
|
}
|
|
|
|
func TestLeader_DiffNamespaces(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := state.TestStateStore(t)
|
|
|
|
// Populate the local state
|
|
ns1 := mock.Namespace()
|
|
ns2 := mock.Namespace()
|
|
ns3 := mock.Namespace()
|
|
assert.Nil(t, state.UpsertNamespaces(100, []*structs.Namespace{ns1, ns2, ns3}))
|
|
|
|
// Simulate a remote list
|
|
rns2 := ns2.Copy()
|
|
rns2.ModifyIndex = 50 // Ignored, same index
|
|
rns3 := ns3.Copy()
|
|
rns3.ModifyIndex = 100 // Updated, higher index
|
|
rns3.Hash = []byte{0, 1, 2, 3}
|
|
ns4 := mock.Namespace()
|
|
remoteList := []*structs.Namespace{
|
|
rns2,
|
|
rns3,
|
|
ns4,
|
|
}
|
|
delete, update := diffNamespaces(state, 50, remoteList)
|
|
sort.Strings(delete)
|
|
|
|
// ns1 does not exist on the remote side, should delete
|
|
assert.Equal(t, []string{structs.DefaultNamespace, ns1.Name}, delete)
|
|
|
|
// ns2 is un-modified - ignore. ns3 modified, ns4 new.
|
|
assert.Equal(t, []string{ns3.Name, ns4.Name}, update)
|
|
}
|
|
|
|
func TestLeader_ReplicateNodePools(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region1"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
})
|
|
defer cleanupS1()
|
|
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region2"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
c.ReplicationBackoff = 20 * time.Millisecond
|
|
c.ReplicationToken = root.SecretID
|
|
})
|
|
defer cleanupS2()
|
|
TestJoin(t, s1, s2)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
testutil.WaitForLeader(t, s2.RPC)
|
|
|
|
// Write a node pool to the authoritative region
|
|
np1 := mock.NodePool()
|
|
must.NoError(t, s1.State().UpsertNodePools(
|
|
structs.MsgTypeTestSetup, 100, []*structs.NodePool{np1}))
|
|
|
|
// Wait for the node pool to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
store := s2.State()
|
|
out, err := store.NodePoolByName(nil, np1.Name)
|
|
return out != nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate node pool")
|
|
})
|
|
|
|
// Delete the node pool at the authoritative region
|
|
must.NoError(t, s1.State().DeleteNodePools(structs.MsgTypeTestSetup, 200, []string{np1.Name}))
|
|
|
|
// Wait for the namespace deletion to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
store := s2.State()
|
|
out, err := store.NodePoolByName(nil, np1.Name)
|
|
return out == nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate node pool deletion")
|
|
})
|
|
}
|
|
|
|
func TestLeader_DiffNodePools(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := state.TestStateStore(t)
|
|
|
|
// Populate the local state
|
|
np1, np2, np3 := mock.NodePool(), mock.NodePool(), mock.NodePool()
|
|
must.NoError(t, state.UpsertNodePools(
|
|
structs.MsgTypeTestSetup, 100, []*structs.NodePool{np1, np2, np3}))
|
|
|
|
// Simulate a remote list
|
|
rnp2 := np2.Copy()
|
|
rnp2.ModifyIndex = 50 // Ignored, same index
|
|
rnp3 := np3.Copy()
|
|
rnp3.ModifyIndex = 100 // Updated, higher index
|
|
rnp3.Description = "force a hash update"
|
|
rnp3.SetHash()
|
|
rnp4 := mock.NodePool()
|
|
remoteList := []*structs.NodePool{
|
|
rnp2,
|
|
rnp3,
|
|
rnp4,
|
|
}
|
|
delete, update := diffNodePools(state, 50, remoteList)
|
|
sort.Strings(delete)
|
|
|
|
// np1 does not exist on the remote side, should delete
|
|
test.Eq(t, []string{structs.NodePoolAll, structs.NodePoolDefault, np1.Name}, delete)
|
|
|
|
// np2 is un-modified - ignore. np3 modified, np4 new.
|
|
test.Eq(t, []*structs.NodePool{rnp3, rnp4}, update)
|
|
}
|
|
|
|
// waitForStableLeadership waits until a leader is elected and all servers
|
|
// get promoted as voting members, returns the leader
|
|
func waitForStableLeadership(t *testing.T, servers []*Server) *Server {
|
|
nPeers := len(servers)
|
|
|
|
// wait for all servers to discover each other
|
|
for _, s := range servers {
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers, _ := s.numPeers()
|
|
if peers != nPeers {
|
|
return false, fmt.Errorf("should find %d peers but found %d", nPeers, peers)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
// wait for leader
|
|
var leader *Server
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
return false, fmt.Errorf("no leader found")
|
|
}, func(err error) {
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
// wait for all servers get marked as voters
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
future := leader.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
return false, fmt.Errorf("failed to get raft config: %v", future.Error())
|
|
}
|
|
ss := future.Configuration().Servers
|
|
if len(ss) != len(servers) {
|
|
return false, fmt.Errorf("raft doesn't contain all servers. Expected %d but found %d", len(servers), len(ss))
|
|
}
|
|
|
|
for _, s := range ss {
|
|
if s.Suffrage != raft.Voter {
|
|
return false, fmt.Errorf("configuration has non voting server: %v", s)
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
return leader
|
|
}
|
|
|
|
func TestServer_getLatestIndex(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
testServer, testServerCleanup := TestServer(t, nil)
|
|
defer testServerCleanup()
|
|
|
|
// Test a new state store value.
|
|
idx, success := testServer.getLatestIndex()
|
|
require.True(t, success)
|
|
must.Eq(t, 1, idx)
|
|
|
|
// Upsert something with a high index, and check again.
|
|
err := testServer.State().UpsertACLPolicies(
|
|
structs.MsgTypeTestSetup, 1013, []*structs.ACLPolicy{mock.ACLPolicy()})
|
|
require.NoError(t, err)
|
|
|
|
idx, success = testServer.getLatestIndex()
|
|
require.True(t, success)
|
|
must.Eq(t, 1013, idx)
|
|
}
|
|
|
|
func TestServer_handleEvalBrokerStateChange(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
testCases := []struct {
|
|
startValue bool
|
|
testServerCallBackConfig func(c *Config)
|
|
inputSchedulerConfiguration *structs.SchedulerConfiguration
|
|
expectedOutput bool
|
|
name string
|
|
}{
|
|
{
|
|
startValue: false,
|
|
testServerCallBackConfig: func(c *Config) { c.DefaultSchedulerConfig.PauseEvalBroker = false },
|
|
inputSchedulerConfiguration: nil,
|
|
expectedOutput: true,
|
|
name: "bootstrap un-paused",
|
|
},
|
|
{
|
|
startValue: false,
|
|
testServerCallBackConfig: func(c *Config) { c.DefaultSchedulerConfig.PauseEvalBroker = true },
|
|
inputSchedulerConfiguration: nil,
|
|
expectedOutput: false,
|
|
name: "bootstrap paused",
|
|
},
|
|
{
|
|
startValue: true,
|
|
testServerCallBackConfig: nil,
|
|
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true},
|
|
expectedOutput: false,
|
|
name: "state change to paused",
|
|
},
|
|
{
|
|
startValue: false,
|
|
testServerCallBackConfig: nil,
|
|
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true},
|
|
expectedOutput: false,
|
|
name: "no state change to paused",
|
|
},
|
|
{
|
|
startValue: false,
|
|
testServerCallBackConfig: nil,
|
|
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: false},
|
|
expectedOutput: true,
|
|
name: "state change to un-paused",
|
|
},
|
|
{
|
|
startValue: false,
|
|
testServerCallBackConfig: nil,
|
|
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true},
|
|
expectedOutput: false,
|
|
name: "no state change to un-paused",
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
|
|
// Create a new server and wait for leadership to be established.
|
|
testServer, cleanupFn := TestServer(t, nil)
|
|
_ = waitForStableLeadership(t, []*Server{testServer})
|
|
defer cleanupFn()
|
|
|
|
// If we set a callback config, we are just testing the eventual
|
|
// state of the brokers. Otherwise, we set our starting value and
|
|
// then perform our state modification change and check.
|
|
if tc.testServerCallBackConfig == nil {
|
|
testServer.evalBroker.SetEnabled(tc.startValue)
|
|
testServer.blockedEvals.SetEnabled(tc.startValue)
|
|
actualOutput := testServer.handleEvalBrokerStateChange(tc.inputSchedulerConfiguration)
|
|
require.Equal(t, tc.expectedOutput, actualOutput)
|
|
}
|
|
|
|
// Check the brokers are in the expected state.
|
|
var expectedEnabledVal bool
|
|
|
|
if tc.inputSchedulerConfiguration == nil {
|
|
expectedEnabledVal = !testServer.config.DefaultSchedulerConfig.PauseEvalBroker
|
|
} else {
|
|
expectedEnabledVal = !tc.inputSchedulerConfiguration.PauseEvalBroker
|
|
}
|
|
require.Equal(t, expectedEnabledVal, testServer.evalBroker.Enabled())
|
|
require.Equal(t, expectedEnabledVal, testServer.blockedEvals.Enabled())
|
|
})
|
|
}
|
|
}
|