2056 lines
54 KiB
Go
2056 lines
54 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package nomad
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strconv"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/go-version"
|
|
"github.com/shoenig/test"
|
|
"github.com/shoenig/test/must"
|
|
"github.com/shoenig/test/wait"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/hashicorp/nomad/ci"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/hashicorp/raft"
|
|
"github.com/hashicorp/serf/serf"
|
|
)
|
|
|
|
func TestLeader_LeftServer(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
for _, s := range servers {
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers, _ := s.numPeers()
|
|
return peers == 3, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have 3 peers")
|
|
})
|
|
}
|
|
|
|
// Kill any server
|
|
var peer *Server
|
|
for _, s := range servers {
|
|
if !s.IsLeader() {
|
|
peer = s
|
|
break
|
|
}
|
|
}
|
|
if peer == nil {
|
|
t.Fatalf("Should have a non-leader")
|
|
}
|
|
peer.Shutdown()
|
|
name := fmt.Sprintf("%s.%s", peer.config.NodeName, peer.config.Region)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
for _, s := range servers {
|
|
if s == peer {
|
|
continue
|
|
}
|
|
|
|
// Force remove the non-leader (transition to left state)
|
|
if err := s.RemoveFailedNode(name); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
peers, _ := s.numPeers()
|
|
return peers == 2, errors.New(fmt.Sprintf("%v", peers))
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %s", err)
|
|
})
|
|
}
|
|
|
|
func TestLeader_LeftLeader(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
for _, s := range servers {
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers, _ := s.numPeers()
|
|
return peers == 3, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have 3 peers")
|
|
})
|
|
}
|
|
|
|
// Kill the leader!
|
|
leader := waitForStableLeadership(t, servers)
|
|
|
|
leader.Leave()
|
|
leader.Shutdown()
|
|
|
|
for _, s := range servers {
|
|
if s == leader {
|
|
continue
|
|
}
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers, _ := s.numPeers()
|
|
return peers == 2, errors.New(fmt.Sprintf("%v", peers))
|
|
}, func(err error) {
|
|
t.Fatalf("should have 2 peers: %v", err)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestLeader_MultiBootstrap(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, nil)
|
|
defer cleanupS2()
|
|
servers := []*Server{s1, s2}
|
|
TestJoin(t, s1, s2)
|
|
|
|
for _, s := range servers {
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers := s.Members()
|
|
return len(peers) == 2, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have 2 peers")
|
|
})
|
|
}
|
|
|
|
// Ensure we don't have multiple raft peers
|
|
for _, s := range servers {
|
|
peers, err := s.numPeers()
|
|
if err != nil {
|
|
t.Fatalf("failed: %v", err)
|
|
}
|
|
if peers != 1 {
|
|
t.Fatalf("should only have 1 raft peer! %v", peers)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestLeader_PlanQueue_Reset(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
leader := waitForStableLeadership(t, servers)
|
|
|
|
if !leader.planQueue.Enabled() {
|
|
t.Fatalf("should enable plan queue")
|
|
}
|
|
|
|
for _, s := range servers {
|
|
if !s.IsLeader() && s.planQueue.Enabled() {
|
|
t.Fatalf("plan queue should not be enabled")
|
|
}
|
|
}
|
|
|
|
// Kill the leader
|
|
leader.Shutdown()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Wait for a new leader
|
|
leader = nil
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have leader")
|
|
})
|
|
|
|
// Check that the new leader has a pending GC expiration
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return leader.planQueue.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should enable plan queue")
|
|
})
|
|
}
|
|
|
|
func TestLeader_EvalBroker_Reset(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
leader := waitForStableLeadership(t, servers)
|
|
|
|
// Inject a pending eval
|
|
req := structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{mock.Eval()},
|
|
}
|
|
_, _, err := leader.raftApply(structs.EvalUpdateRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Kill the leader
|
|
leader.Shutdown()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Wait for a new leader
|
|
leader = nil
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have leader")
|
|
})
|
|
|
|
// Check that the new leader has a pending evaluation
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
stats := leader.evalBroker.Stats()
|
|
return stats.TotalReady == 1, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have pending evaluation")
|
|
})
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
leader := waitForStableLeadership(t, servers)
|
|
|
|
// Inject a periodic job, a parameterized periodic job and a non-periodic job
|
|
periodic := mock.PeriodicJob()
|
|
nonPeriodic := mock.Job()
|
|
parameterizedPeriodic := mock.PeriodicJob()
|
|
parameterizedPeriodic.ParameterizedJob = &structs.ParameterizedJobConfig{}
|
|
for _, job := range []*structs.Job{nonPeriodic, periodic, parameterizedPeriodic} {
|
|
req := structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
_, _, err := leader.raftApply(structs.JobRegisterRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Kill the leader
|
|
leader.Shutdown()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Wait for a new leader
|
|
leader = nil
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have leader")
|
|
})
|
|
|
|
tuplePeriodic := structs.NamespacedID{
|
|
ID: periodic.ID,
|
|
Namespace: periodic.Namespace,
|
|
}
|
|
tupleNonPeriodic := structs.NamespacedID{
|
|
ID: nonPeriodic.ID,
|
|
Namespace: nonPeriodic.Namespace,
|
|
}
|
|
tupleParameterized := structs.NamespacedID{
|
|
ID: parameterizedPeriodic.ID,
|
|
Namespace: parameterizedPeriodic.Namespace,
|
|
}
|
|
|
|
// Check that the new leader is tracking the periodic job only
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
leader.periodicDispatcher.l.Lock()
|
|
defer leader.periodicDispatcher.l.Unlock()
|
|
if _, tracked := leader.periodicDispatcher.tracked[tuplePeriodic]; !tracked {
|
|
return false, fmt.Errorf("periodic job not tracked")
|
|
}
|
|
if _, tracked := leader.periodicDispatcher.tracked[tupleNonPeriodic]; tracked {
|
|
return false, fmt.Errorf("non periodic job tracked")
|
|
}
|
|
if _, tracked := leader.periodicDispatcher.tracked[tupleParameterized]; tracked {
|
|
return false, fmt.Errorf("parameterized periodic job tracked")
|
|
}
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf(err.Error())
|
|
})
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Inject a periodic job that will be triggered soon.
|
|
launch := time.Now().Add(1 * time.Second)
|
|
job := testPeriodicJob(launch)
|
|
req := structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
|
s1.periodicDispatcher.SetEnabled(false)
|
|
|
|
// Get the current time to ensure the launch time is after this once we
|
|
// restore.
|
|
now := time.Now()
|
|
|
|
// Sleep till after the job should have been launched.
|
|
time.Sleep(5 * time.Second)
|
|
|
|
// Restore the periodic dispatcher.
|
|
s1.periodicDispatcher.SetEnabled(true)
|
|
s1.restorePeriodicDispatcher()
|
|
|
|
// Ensure the job is tracked.
|
|
tuple := structs.NamespacedID{
|
|
ID: job.ID,
|
|
Namespace: job.Namespace,
|
|
}
|
|
if _, tracked := s1.periodicDispatcher.tracked[tuple]; !tracked {
|
|
t.Fatalf("periodic job not restored")
|
|
}
|
|
|
|
// Check that an eval was made.
|
|
ws := memdb.NewWatchSet()
|
|
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
|
|
if err != nil || last == nil {
|
|
t.Fatalf("failed to get periodic launch time: %v", err)
|
|
}
|
|
|
|
if last.Launch.Before(now) {
|
|
t.Fatalf("restorePeriodicDispatcher did not force launch: last %v; want after %v", last.Launch, now)
|
|
}
|
|
}
|
|
|
|
type mockJobEvalDispatcher struct {
|
|
forceEvalCalled, children bool
|
|
evalToReturn *structs.Evaluation
|
|
JobEvalDispatcher
|
|
}
|
|
|
|
func (mjed *mockJobEvalDispatcher) DispatchJob(_ *structs.Job) (*structs.Evaluation, error) {
|
|
mjed.forceEvalCalled = true
|
|
return mjed.evalToReturn, nil
|
|
}
|
|
|
|
func (mjed *mockJobEvalDispatcher) RunningChildren(_ *structs.Job) (bool, error) {
|
|
return mjed.children, nil
|
|
}
|
|
|
|
func testPeriodicJob_OverlapEnabled(times ...time.Time) *structs.Job {
|
|
job := testPeriodicJob(times...)
|
|
job.Periodic.ProhibitOverlap = true
|
|
return job
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Inject a periodic job that triggered once in the past, should trigger now
|
|
// and once in the future.
|
|
now := time.Now()
|
|
past := now.Add(-1 * time.Second)
|
|
future := now.Add(10 * time.Second)
|
|
job := testPeriodicJob(past, now, future)
|
|
req := structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create an eval for the past launch.
|
|
eval, err := s1.periodicDispatcher.createEval(job, past)
|
|
must.NoError(t, err)
|
|
|
|
md := &mockJobEvalDispatcher{
|
|
children: false,
|
|
evalToReturn: eval,
|
|
JobEvalDispatcher: s1,
|
|
}
|
|
|
|
s1.periodicDispatcher.dispatcher = md
|
|
|
|
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
|
s1.periodicDispatcher.SetEnabled(false)
|
|
|
|
// Sleep till after the job should have been launched.
|
|
time.Sleep(3 * time.Second)
|
|
|
|
// Restore the periodic dispatcher.
|
|
s1.periodicDispatcher.SetEnabled(true)
|
|
|
|
s1.restorePeriodicDispatcher()
|
|
|
|
// Ensure the job is tracked.
|
|
tuple := structs.NamespacedID{
|
|
ID: job.ID,
|
|
Namespace: job.Namespace,
|
|
}
|
|
if _, tracked := s1.periodicDispatcher.tracked[tuple]; !tracked {
|
|
t.Fatalf("periodic job not restored")
|
|
}
|
|
|
|
// Check that an eval was made.
|
|
ws := memdb.NewWatchSet()
|
|
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
|
|
if err != nil || last == nil {
|
|
t.Fatalf("failed to get periodic launch time: %v", err)
|
|
}
|
|
if last.Launch == past {
|
|
t.Fatalf("restorePeriodicDispatcher did not force launch")
|
|
}
|
|
|
|
must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatcher_No_Overlaps_No_Running_Job(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Inject a periodic job that triggered once in the past, should trigger now
|
|
// and once in the future.
|
|
now := time.Now()
|
|
past := now.Add(-1 * time.Second)
|
|
future := now.Add(10 * time.Second)
|
|
|
|
job := testPeriodicJob_OverlapEnabled(past, now, future)
|
|
req := structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
|
|
must.NoError(t, err)
|
|
|
|
// Create an eval for the past launch.
|
|
eval, err := s1.periodicDispatcher.createEval(job, past)
|
|
must.NoError(t, err)
|
|
|
|
md := &mockJobEvalDispatcher{
|
|
children: false,
|
|
evalToReturn: eval,
|
|
}
|
|
|
|
s1.periodicDispatcher.dispatcher = md
|
|
|
|
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
|
s1.periodicDispatcher.SetEnabled(false)
|
|
|
|
// Sleep till after the job should have been launched.
|
|
time.Sleep(3 * time.Second)
|
|
|
|
// Restore the periodic dispatcher.
|
|
s1.periodicDispatcher.SetEnabled(true)
|
|
must.NoError(t, s1.restorePeriodicDispatcher())
|
|
|
|
// Ensure the job is tracked.
|
|
tuple := structs.NamespacedID{
|
|
ID: job.ID,
|
|
Namespace: job.Namespace,
|
|
}
|
|
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))
|
|
|
|
// Check that an eval was made.
|
|
ws := memdb.NewWatchSet()
|
|
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
|
|
must.NoError(t, err)
|
|
must.NotNil(t, last)
|
|
|
|
must.NotEq(t, last.Launch, past, must.Sprint("restorePeriodicDispatcher did not force launch"))
|
|
|
|
must.True(t, md.forceEvalCalled, must.Sprint("failed to force job evaluation"))
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatcher_No_Overlaps_Running_Job(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Inject a periodic job that triggered once in the past, should trigger now
|
|
// and once in the future.
|
|
now := time.Now()
|
|
past := now.Add(-1 * time.Second)
|
|
future := now.Add(10 * time.Second)
|
|
|
|
job := testPeriodicJob_OverlapEnabled(past, now, future)
|
|
req := structs.JobRegisterRequest{
|
|
Job: job,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: job.Namespace,
|
|
},
|
|
}
|
|
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
|
|
must.NoError(t, err)
|
|
|
|
// Create an eval for the past launch.
|
|
eval, err := s1.periodicDispatcher.createEval(job, past)
|
|
must.NoError(t, err)
|
|
|
|
md := &mockJobEvalDispatcher{
|
|
children: true,
|
|
evalToReturn: eval,
|
|
}
|
|
|
|
s1.periodicDispatcher.dispatcher = md
|
|
|
|
// Flush the periodic dispatcher, ensuring that no evals will be created.
|
|
s1.periodicDispatcher.SetEnabled(false)
|
|
|
|
// Sleep till after the job should have been launched.
|
|
time.Sleep(3 * time.Second)
|
|
|
|
// Restore the periodic dispatcher.
|
|
s1.periodicDispatcher.SetEnabled(true)
|
|
must.NoError(t, s1.restorePeriodicDispatcher())
|
|
|
|
// Ensure the job is tracked.
|
|
tuple := structs.NamespacedID{
|
|
ID: job.ID,
|
|
Namespace: job.Namespace,
|
|
}
|
|
must.MapContainsKey(t, s1.periodicDispatcher.tracked, tuple, must.Sprint("periodic job not restored"))
|
|
|
|
must.False(t, md.forceEvalCalled, must.Sprint("evaluation forced with job already running"))
|
|
}
|
|
|
|
func TestLeader_PeriodicDispatch(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.EvalGCInterval = 5 * time.Millisecond
|
|
})
|
|
defer cleanupS1()
|
|
|
|
// Wait for a periodic dispatch
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
stats := s1.evalBroker.Stats()
|
|
bySched, ok := stats.ByScheduler[structs.JobTypeCore]
|
|
if !ok {
|
|
return false, nil
|
|
}
|
|
return bySched.Ready > 0, nil
|
|
}, func(err error) {
|
|
t.Fatalf("should pending job")
|
|
})
|
|
}
|
|
|
|
func TestLeader_ReapFailedEval(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.EvalDeliveryLimit = 1
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Wait for a periodic dispatch
|
|
eval := mock.Eval()
|
|
s1.evalBroker.Enqueue(eval)
|
|
|
|
// Dequeue and Nack
|
|
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
s1.evalBroker.Nack(out.ID, token)
|
|
|
|
// Wait for an updated and followup evaluation
|
|
state := s1.fsm.State()
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if out == nil {
|
|
return false, fmt.Errorf("expect original evaluation to exist")
|
|
}
|
|
if out.Status != structs.EvalStatusFailed {
|
|
return false, fmt.Errorf("got status %v; want %v", out.Status, structs.EvalStatusFailed)
|
|
}
|
|
if out.NextEval == "" {
|
|
return false, fmt.Errorf("got empty NextEval")
|
|
}
|
|
// See if there is a followup
|
|
evals, err := state.EvalsByJob(ws, eval.Namespace, eval.JobID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if l := len(evals); l != 2 {
|
|
return false, fmt.Errorf("got %d evals, want 2", l)
|
|
}
|
|
|
|
for _, e := range evals {
|
|
if e.ID == eval.ID {
|
|
continue
|
|
}
|
|
|
|
if e.Status != structs.EvalStatusPending {
|
|
return false, fmt.Errorf("follow up eval has status %v; want %v",
|
|
e.Status, structs.EvalStatusPending)
|
|
}
|
|
|
|
if e.ID != out.NextEval {
|
|
return false, fmt.Errorf("follow up eval id is %v; orig eval NextEval %v",
|
|
e.ID, out.NextEval)
|
|
}
|
|
|
|
if e.Wait < s1.config.EvalFailedFollowupBaselineDelay ||
|
|
e.Wait > s1.config.EvalFailedFollowupBaselineDelay+s1.config.EvalFailedFollowupDelayRange {
|
|
return false, fmt.Errorf("bad wait: %v", e.Wait)
|
|
}
|
|
|
|
if e.TriggeredBy != structs.EvalTriggerFailedFollowUp {
|
|
return false, fmt.Errorf("follow up eval TriggeredBy %v; want %v",
|
|
e.TriggeredBy, structs.EvalTriggerFailedFollowUp)
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestLeader_ReapDuplicateEval(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create a duplicate blocked eval
|
|
eval := mock.Eval()
|
|
eval.CreateIndex = 100
|
|
eval2 := mock.Eval()
|
|
eval2.JobID = eval.JobID
|
|
eval2.CreateIndex = 102
|
|
s1.blockedEvals.Block(eval)
|
|
s1.blockedEvals.Block(eval2)
|
|
|
|
// Wait for the evaluation to marked as cancelled
|
|
state := s1.fsm.State()
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return out != nil && out.Status == structs.EvalStatusCancelled, nil
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
}
|
|
|
|
func TestLeader_revokeVaultAccessorsOnRestore(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Insert a vault accessor that should be revoked
|
|
fsmState := s1.fsm.State()
|
|
va := mock.VaultAccessor()
|
|
if err := fsmState.UpsertVaultAccessor(100, []*structs.VaultAccessor{va}); err != nil {
|
|
t.Fatalf("bad: %v", err)
|
|
}
|
|
|
|
// Swap the Vault client
|
|
tvc := &TestVaultClient{}
|
|
s1.vault = tvc
|
|
|
|
// Do a restore
|
|
if err := s1.revokeVaultAccessorsOnRestore(); err != nil {
|
|
t.Fatalf("Failed to restore: %v", err)
|
|
}
|
|
|
|
if len(tvc.RevokedTokens) != 1 && tvc.RevokedTokens[0].Accessor != va.Accessor {
|
|
t.Fatalf("Bad revoked accessors: %v", tvc.RevokedTokens)
|
|
}
|
|
}
|
|
|
|
func TestLeader_revokeSITokenAccessorsOnRestore(t *testing.T) {
|
|
ci.Parallel(t)
|
|
r := require.New(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// replace consul ACLs API with a mock for tracking calls in tests
|
|
var consulACLsAPI mockConsulACLsAPI
|
|
s1.consulACLs = &consulACLsAPI
|
|
|
|
// Insert a SI token accessor that should be revoked
|
|
fsmState := s1.fsm.State()
|
|
accessor := mock.SITokenAccessor()
|
|
err := fsmState.UpsertSITokenAccessors(100, []*structs.SITokenAccessor{accessor})
|
|
r.NoError(err)
|
|
|
|
// Do a restore
|
|
err = s1.revokeSITokenAccessorsOnRestore()
|
|
r.NoError(err)
|
|
|
|
// Check the accessor was revoked
|
|
exp := []revokeRequest{{
|
|
accessorID: accessor.AccessorID,
|
|
committed: true,
|
|
}}
|
|
r.ElementsMatch(exp, consulACLsAPI.revokeRequests)
|
|
}
|
|
|
|
func TestLeader_ClusterID(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.Build = minClusterIDVersion.String()
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
clusterID, err := s1.ClusterID()
|
|
|
|
require.NoError(t, err)
|
|
require.True(t, helper.IsUUID(clusterID))
|
|
}
|
|
|
|
func TestLeader_ClusterID_upgradePath(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
before := version.Must(version.NewVersion("0.10.1")).String()
|
|
after := minClusterIDVersion.String()
|
|
|
|
type server struct {
|
|
s *Server
|
|
cleanup func()
|
|
}
|
|
|
|
outdated := func() server {
|
|
s, cleanup := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.Build = before
|
|
c.BootstrapExpect = 3
|
|
c.Logger.SetLevel(hclog.Trace)
|
|
})
|
|
return server{s: s, cleanup: cleanup}
|
|
}
|
|
|
|
upgraded := func() server {
|
|
s, cleanup := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
c.Build = after
|
|
c.BootstrapExpect = 0
|
|
c.Logger.SetLevel(hclog.Trace)
|
|
})
|
|
return server{s: s, cleanup: cleanup}
|
|
}
|
|
|
|
servers := []server{outdated(), outdated(), outdated()}
|
|
// fallback shutdown attempt in case testing fails
|
|
defer servers[0].cleanup()
|
|
defer servers[1].cleanup()
|
|
defer servers[2].cleanup()
|
|
|
|
upgrade := func(i int) {
|
|
previous := servers[i]
|
|
|
|
servers[i] = upgraded()
|
|
TestJoin(t, servers[i].s, servers[(i+1)%3].s, servers[(i+2)%3].s)
|
|
testutil.WaitForLeader(t, servers[i].s.RPC)
|
|
|
|
require.NoError(t, previous.s.Leave())
|
|
require.NoError(t, previous.s.Shutdown())
|
|
}
|
|
|
|
// Join the servers before doing anything
|
|
TestJoin(t, servers[0].s, servers[1].s, servers[2].s)
|
|
|
|
// Wait for servers to settle
|
|
for i := 0; i < len(servers); i++ {
|
|
testutil.WaitForLeader(t, servers[i].s.RPC)
|
|
}
|
|
|
|
// A check that ClusterID is not available yet
|
|
noIDYet := func() {
|
|
for _, s := range servers {
|
|
_, err := s.s.ClusterID()
|
|
must.Error(t, err)
|
|
}
|
|
}
|
|
|
|
// Replace first old server with new server
|
|
upgrade(0)
|
|
defer servers[0].cleanup()
|
|
noIDYet() // ClusterID should not work yet, servers: [new, old, old]
|
|
|
|
// Replace second old server with new server
|
|
upgrade(1)
|
|
defer servers[1].cleanup()
|
|
noIDYet() // ClusterID should not work yet, servers: [new, new, old]
|
|
|
|
// Replace third / final old server with new server
|
|
upgrade(2)
|
|
defer servers[2].cleanup()
|
|
|
|
// Wait for old servers to really be gone
|
|
for _, s := range servers {
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers, _ := s.s.numPeers()
|
|
return peers == 3, nil
|
|
}, func(_ error) {
|
|
t.Fatalf("should have 3 peers")
|
|
})
|
|
}
|
|
|
|
// Now we can tickle the leader into making a cluster ID
|
|
leaderID := ""
|
|
for _, s := range servers {
|
|
if s.s.IsLeader() {
|
|
id, err := s.s.ClusterID()
|
|
require.NoError(t, err)
|
|
leaderID = id
|
|
break
|
|
}
|
|
}
|
|
require.True(t, helper.IsUUID(leaderID))
|
|
|
|
// Now every participating server has been upgraded, each one should be
|
|
// able to get the cluster ID, having been plumbed all the way through.
|
|
agreeClusterID(t, []*Server{servers[0].s, servers[1].s, servers[2].s})
|
|
}
|
|
|
|
func TestLeader_ClusterID_noUpgrade(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
type server struct {
|
|
s *Server
|
|
cleanup func()
|
|
}
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.Logger.SetLevel(hclog.Trace)
|
|
c.NumSchedulers = 0
|
|
c.Build = minClusterIDVersion.String()
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS1()
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.Logger.SetLevel(hclog.Trace)
|
|
c.NumSchedulers = 0
|
|
c.Build = minClusterIDVersion.String()
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS2()
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.Logger.SetLevel(hclog.Trace)
|
|
c.NumSchedulers = 0
|
|
c.Build = minClusterIDVersion.String()
|
|
c.BootstrapExpect = 3
|
|
})
|
|
defer cleanupS3()
|
|
|
|
servers := []*Server{s1, s2, s3}
|
|
|
|
// Join the servers before doing anything
|
|
TestJoin(t, servers[0], servers[1], servers[2])
|
|
|
|
// Wait for servers to settle
|
|
for i := 0; i < len(servers); i++ {
|
|
testutil.WaitForLeader(t, servers[i].RPC)
|
|
}
|
|
|
|
// Each server started at the minimum version, check there should be only 1
|
|
// cluster ID they all agree on.
|
|
agreeClusterID(t, []*Server{servers[0], servers[1], servers[2]})
|
|
}
|
|
|
|
func agreeClusterID(t *testing.T, servers []*Server) {
|
|
must.Len(t, 3, servers)
|
|
|
|
f := func() error {
|
|
id1, err1 := servers[0].ClusterID()
|
|
if err1 != nil {
|
|
return err1
|
|
}
|
|
id2, err2 := servers[1].ClusterID()
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
id3, err3 := servers[2].ClusterID()
|
|
if err3 != nil {
|
|
return err3
|
|
}
|
|
if id1 != id2 || id2 != id3 {
|
|
return fmt.Errorf("ids do not match, id1: %s, id2: %s, id3: %s", id1, id2, id3)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
must.Wait(t, wait.InitialSuccess(
|
|
wait.ErrorFunc(f),
|
|
wait.Timeout(60*time.Second),
|
|
wait.Gap(1*time.Second),
|
|
))
|
|
}
|
|
|
|
func TestLeader_ReplicateACLPolicies(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region1"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
})
|
|
defer cleanupS1()
|
|
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region2"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
c.ReplicationBackoff = 20 * time.Millisecond
|
|
c.ReplicationToken = root.SecretID
|
|
})
|
|
defer cleanupS2()
|
|
TestJoin(t, s1, s2)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
testutil.WaitForLeader(t, s2.RPC)
|
|
|
|
// Write a policy to the authoritative region
|
|
p1 := mock.ACLPolicy()
|
|
if err := s1.State().UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{p1}); err != nil {
|
|
t.Fatalf("bad: %v", err)
|
|
}
|
|
|
|
// Wait for the policy to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
state := s2.State()
|
|
out, err := state.ACLPolicyByName(nil, p1.Name)
|
|
return out != nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate policy")
|
|
})
|
|
}
|
|
|
|
func TestLeader_DiffACLPolicies(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := state.TestStateStore(t)
|
|
|
|
// Populate the local state
|
|
p1 := mock.ACLPolicy()
|
|
p2 := mock.ACLPolicy()
|
|
p3 := mock.ACLPolicy()
|
|
assert.Nil(t, state.UpsertACLPolicies(structs.MsgTypeTestSetup, 100, []*structs.ACLPolicy{p1, p2, p3}))
|
|
|
|
// Simulate a remote list
|
|
p2Stub := p2.Stub()
|
|
p2Stub.ModifyIndex = 50 // Ignored, same index
|
|
p3Stub := p3.Stub()
|
|
p3Stub.ModifyIndex = 100 // Updated, higher index
|
|
p3Stub.Hash = []byte{0, 1, 2, 3}
|
|
p4 := mock.ACLPolicy()
|
|
remoteList := []*structs.ACLPolicyListStub{
|
|
p2Stub,
|
|
p3Stub,
|
|
p4.Stub(),
|
|
}
|
|
delete, update := diffACLPolicies(state, 50, remoteList)
|
|
|
|
// P1 does not exist on the remote side, should delete
|
|
assert.Equal(t, []string{p1.Name}, delete)
|
|
|
|
// P2 is un-modified - ignore. P3 modified, P4 new.
|
|
assert.Equal(t, []string{p3.Name, p4.Name}, update)
|
|
}
|
|
|
|
func TestLeader_ReplicateACLTokens(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region1"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
})
|
|
defer cleanupS1()
|
|
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region2"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
c.ReplicationBackoff = 20 * time.Millisecond
|
|
c.ReplicationToken = root.SecretID
|
|
})
|
|
defer cleanupS2()
|
|
TestJoin(t, s1, s2)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
testutil.WaitForLeader(t, s2.RPC)
|
|
|
|
// Write a token to the authoritative region
|
|
p1 := mock.ACLToken()
|
|
p1.Global = true
|
|
if err := s1.State().UpsertACLTokens(structs.MsgTypeTestSetup, 100, []*structs.ACLToken{p1}); err != nil {
|
|
t.Fatalf("bad: %v", err)
|
|
}
|
|
|
|
// Wait for the token to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
state := s2.State()
|
|
out, err := state.ACLTokenByAccessorID(nil, p1.AccessorID)
|
|
return out != nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate token")
|
|
})
|
|
}
|
|
|
|
func TestLeader_DiffACLTokens(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := state.TestStateStore(t)
|
|
|
|
// Populate the local state
|
|
p0 := mock.ACLToken()
|
|
p1 := mock.ACLToken()
|
|
p1.Global = true
|
|
p2 := mock.ACLToken()
|
|
p2.Global = true
|
|
p3 := mock.ACLToken()
|
|
p3.Global = true
|
|
assert.Nil(t, state.UpsertACLTokens(structs.MsgTypeTestSetup, 100, []*structs.ACLToken{p0, p1, p2, p3}))
|
|
|
|
// Simulate a remote list
|
|
p2Stub := p2.Stub()
|
|
p2Stub.ModifyIndex = 50 // Ignored, same index
|
|
p3Stub := p3.Stub()
|
|
p3Stub.ModifyIndex = 100 // Updated, higher index
|
|
p3Stub.Hash = []byte{0, 1, 2, 3}
|
|
p4 := mock.ACLToken()
|
|
p4.Global = true
|
|
remoteList := []*structs.ACLTokenListStub{
|
|
p2Stub,
|
|
p3Stub,
|
|
p4.Stub(),
|
|
}
|
|
delete, update := diffACLTokens(state, 50, remoteList)
|
|
|
|
// P0 is local and should be ignored
|
|
// P1 does not exist on the remote side, should delete
|
|
assert.Equal(t, []string{p1.AccessorID}, delete)
|
|
|
|
// P2 is un-modified - ignore. P3 modified, P4 new.
|
|
assert.Equal(t, []string{p3.AccessorID, p4.AccessorID}, update)
|
|
}
|
|
|
|
func TestServer_replicationBackoffContinue(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
testCases := []struct {
|
|
name string
|
|
testFn func()
|
|
}{
|
|
{
|
|
name: "leadership lost",
|
|
testFn: func() {
|
|
|
|
// Create a test server with a long enough backoff that we will
|
|
// be able to close the channel before it fires, but not too
|
|
// long that the test having problems means CI will hang
|
|
// forever.
|
|
testServer, testServerCleanup := TestServer(t, func(c *Config) {
|
|
c.ReplicationBackoff = 5 * time.Second
|
|
})
|
|
defer testServerCleanup()
|
|
|
|
// Create our stop channel which is used by the server to
|
|
// indicate leadership loss.
|
|
stopCh := make(chan struct{})
|
|
|
|
// The resultCh is used to block and collect the output from
|
|
// the test routine.
|
|
resultCh := make(chan bool, 1)
|
|
|
|
// Run a routine to collect the result and close the channel
|
|
// straight away.
|
|
go func() {
|
|
output := testServer.replicationBackoffContinue(stopCh)
|
|
resultCh <- output
|
|
}()
|
|
|
|
close(stopCh)
|
|
|
|
actualResult := <-resultCh
|
|
require.False(t, actualResult)
|
|
},
|
|
},
|
|
{
|
|
name: "backoff continue",
|
|
testFn: func() {
|
|
|
|
// Create a test server with a short backoff.
|
|
testServer, testServerCleanup := TestServer(t, func(c *Config) {
|
|
c.ReplicationBackoff = 10 * time.Nanosecond
|
|
})
|
|
defer testServerCleanup()
|
|
|
|
// Create our stop channel which is used by the server to
|
|
// indicate leadership loss.
|
|
stopCh := make(chan struct{})
|
|
|
|
// The resultCh is used to block and collect the output from
|
|
// the test routine.
|
|
resultCh := make(chan bool, 1)
|
|
|
|
// Run a routine to collect the result without closing stopCh.
|
|
go func() {
|
|
output := testServer.replicationBackoffContinue(stopCh)
|
|
resultCh <- output
|
|
}()
|
|
|
|
actualResult := <-resultCh
|
|
require.True(t, actualResult)
|
|
},
|
|
},
|
|
}
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
tc.testFn()
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_diffACLRoles(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
stateStore := state.TestStateStore(t)
|
|
|
|
// Build an initial baseline of ACL Roles.
|
|
aclRole0 := mock.ACLRole()
|
|
aclRole1 := mock.ACLRole()
|
|
aclRole2 := mock.ACLRole()
|
|
aclRole3 := mock.ACLRole()
|
|
|
|
// Upsert these into our local state. Use copies, so we can alter the roles
|
|
// directly and use within the diff func.
|
|
err := stateStore.UpsertACLRoles(structs.MsgTypeTestSetup, 50,
|
|
[]*structs.ACLRole{aclRole0.Copy(), aclRole1.Copy(), aclRole2.Copy(), aclRole3.Copy()}, true)
|
|
require.NoError(t, err)
|
|
|
|
// Modify the ACL roles to create a number of differences. These roles
|
|
// represent the state of the authoritative region.
|
|
aclRole2.ModifyIndex = 50
|
|
aclRole3.ModifyIndex = 200
|
|
aclRole3.Hash = []byte{0, 1, 2, 3}
|
|
aclRole4 := mock.ACLRole()
|
|
|
|
// Run the diff function and test the output.
|
|
toDelete, toUpdate := diffACLRoles(stateStore, 50, []*structs.ACLRoleListStub{
|
|
aclRole2.Stub(), aclRole3.Stub(), aclRole4.Stub()})
|
|
require.ElementsMatch(t, []string{aclRole0.ID, aclRole1.ID}, toDelete)
|
|
require.ElementsMatch(t, []string{aclRole3.ID, aclRole4.ID}, toUpdate)
|
|
}
|
|
|
|
func Test_diffACLAuthMethods(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
stateStore := state.TestStateStore(t)
|
|
|
|
// Build an initial baseline of ACL auth-methods.
|
|
aclAuthMethod0 := mock.ACLOIDCAuthMethod()
|
|
aclAuthMethod1 := mock.ACLOIDCAuthMethod()
|
|
aclAuthMethod2 := mock.ACLOIDCAuthMethod()
|
|
aclAuthMethod3 := mock.ACLOIDCAuthMethod()
|
|
|
|
// Upsert these into our local state. Use copies, so we can alter the
|
|
// auth-methods directly and use within the diff func.
|
|
err := stateStore.UpsertACLAuthMethods(50,
|
|
[]*structs.ACLAuthMethod{aclAuthMethod0.Copy(), aclAuthMethod1.Copy(),
|
|
aclAuthMethod2.Copy(), aclAuthMethod3.Copy()})
|
|
must.NoError(t, err)
|
|
|
|
// Modify the ACL auth-methods to create a number of differences. These
|
|
// methods represent the state of the authoritative region.
|
|
aclAuthMethod2.ModifyIndex = 50
|
|
aclAuthMethod3.ModifyIndex = 200
|
|
aclAuthMethod3.Hash = []byte{0, 1, 2, 3}
|
|
aclAuthMethod4 := mock.ACLOIDCAuthMethod()
|
|
|
|
// Run the diff function and test the output.
|
|
toDelete, toUpdate := diffACLAuthMethods(stateStore, 50, []*structs.ACLAuthMethodStub{
|
|
aclAuthMethod2.Stub(), aclAuthMethod3.Stub(), aclAuthMethod4.Stub()})
|
|
require.ElementsMatch(t, []string{aclAuthMethod0.Name, aclAuthMethod1.Name}, toDelete)
|
|
require.ElementsMatch(t, []string{aclAuthMethod3.Name, aclAuthMethod4.Name}, toUpdate)
|
|
}
|
|
|
|
func Test_diffACLBindingRules(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
stateStore := state.TestStateStore(t)
|
|
|
|
// Build an initial baseline of ACL binding rules.
|
|
aclBindingRule0 := mock.ACLBindingRule()
|
|
aclBindingRule1 := mock.ACLBindingRule()
|
|
aclBindingRule2 := mock.ACLBindingRule()
|
|
aclBindingRule3 := mock.ACLBindingRule()
|
|
|
|
// Upsert these into our local state. Use copies, so we can alter the
|
|
// binding rules directly and use within the diff func.
|
|
err := stateStore.UpsertACLBindingRules(50,
|
|
[]*structs.ACLBindingRule{aclBindingRule0.Copy(), aclBindingRule1.Copy(),
|
|
aclBindingRule2.Copy(), aclBindingRule3.Copy()}, true)
|
|
must.NoError(t, err)
|
|
|
|
// Modify the ACL auth-methods to create a number of differences. These
|
|
// methods represent the state of the authoritative region.
|
|
aclBindingRule2.ModifyIndex = 50
|
|
aclBindingRule3.ModifyIndex = 200
|
|
aclBindingRule3.Hash = []byte{0, 1, 2, 3}
|
|
aclBindingRule4 := mock.ACLBindingRule()
|
|
|
|
// Run the diff function and test the output.
|
|
toDelete, toUpdate := diffACLBindingRules(stateStore, 50, []*structs.ACLBindingRuleListStub{
|
|
aclBindingRule2.Stub(), aclBindingRule3.Stub(), aclBindingRule4.Stub()})
|
|
must.SliceContainsAll(t, []string{aclBindingRule0.ID, aclBindingRule1.ID}, toDelete)
|
|
must.SliceContainsAll(t, []string{aclBindingRule3.ID, aclBindingRule4.ID}, toUpdate)
|
|
}
|
|
|
|
func TestLeader_Reelection(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
const raftProtocol = 3
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = raftProtocol
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = raftProtocol
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = raftProtocol
|
|
})
|
|
defer cleanupS3() // todo(shoenig) added this, should be here right??
|
|
|
|
servers := []*Server{s1, s2, s3}
|
|
|
|
// Try to join
|
|
TestJoin(t, s1, s2, s3)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, server := range future.Configuration().Servers {
|
|
if server.Suffrage == raft.Nonvoter {
|
|
return false, fmt.Errorf("non-voter %v", server)
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatal(err)
|
|
})
|
|
|
|
var leader, nonLeader *Server
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
} else {
|
|
nonLeader = s
|
|
}
|
|
}
|
|
|
|
// make sure we still have a leader, then shut it down
|
|
must.NotNil(t, leader, must.Sprint("expected there to be a leader"))
|
|
leader.Shutdown()
|
|
|
|
// Wait for new leader to elect
|
|
testutil.WaitForLeader(t, nonLeader.RPC)
|
|
}
|
|
|
|
func TestLeader_RollRaftServer(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS3()
|
|
|
|
servers := []*Server{s1, s2, s3}
|
|
TestJoin(t, s1, s2, s3)
|
|
|
|
t.Logf("waiting for initial stable cluster")
|
|
waitForStableLeadership(t, servers)
|
|
|
|
t.Logf("killing server s1")
|
|
s1.Shutdown()
|
|
for _, s := range []*Server{s2, s3} {
|
|
s.RemoveFailedNode(s1.config.NodeID)
|
|
}
|
|
|
|
t.Logf("waiting for server loss to be detected")
|
|
testutil.WaitForResultUntil(time.Second*10,
|
|
func() (bool, error) {
|
|
for _, s := range []*Server{s2, s3} {
|
|
err := wantPeers(s, 2)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return true, nil
|
|
|
|
},
|
|
func(err error) { must.NoError(t, err) },
|
|
)
|
|
|
|
t.Logf("adding replacement server s4")
|
|
s4, cleanupS4 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS4()
|
|
TestJoin(t, s2, s3, s4)
|
|
servers = []*Server{s4, s2, s3}
|
|
|
|
t.Logf("waiting for s4 to stabilize")
|
|
waitForStableLeadership(t, servers)
|
|
|
|
t.Logf("killing server s2")
|
|
s2.Shutdown()
|
|
for _, s := range []*Server{s3, s4} {
|
|
s.RemoveFailedNode(s2.config.NodeID)
|
|
}
|
|
|
|
t.Logf("waiting for server loss to be detected")
|
|
testutil.WaitForResultUntil(time.Second*10,
|
|
func() (bool, error) {
|
|
for _, s := range []*Server{s3, s4} {
|
|
err := wantPeers(s, 2)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return true, nil
|
|
},
|
|
func(err error) { must.NoError(t, err) },
|
|
)
|
|
|
|
t.Logf("adding replacement server s5")
|
|
s5, cleanupS5 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS5()
|
|
TestJoin(t, s3, s4, s5)
|
|
servers = []*Server{s4, s5, s3}
|
|
|
|
t.Logf("waiting for s5 to stabilize")
|
|
waitForStableLeadership(t, servers)
|
|
|
|
t.Logf("killing server s3")
|
|
s3.Shutdown()
|
|
|
|
t.Logf("waiting for server loss to be detected")
|
|
testutil.WaitForResultUntil(time.Second*10,
|
|
func() (bool, error) {
|
|
for _, s := range []*Server{s4, s5} {
|
|
err := wantPeers(s, 2)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return true, nil
|
|
},
|
|
func(err error) { must.NoError(t, err) },
|
|
)
|
|
|
|
t.Logf("adding replacement server s6")
|
|
s6, cleanupS6 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 3
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS6()
|
|
TestJoin(t, s6, s4)
|
|
servers = []*Server{s4, s5, s6}
|
|
|
|
t.Logf("waiting for s6 to stabilize")
|
|
waitForStableLeadership(t, servers)
|
|
}
|
|
|
|
func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return s1.evalBroker.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should have finished establish leader loop")
|
|
})
|
|
|
|
require.Nil(t, s1.revokeLeadership())
|
|
require.Nil(t, s1.revokeLeadership())
|
|
require.Nil(t, s1.revokeLeadership())
|
|
}
|
|
|
|
func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return s1.isReadyForConsistentReads(), nil
|
|
}, func(err error) {
|
|
require.Fail(t, "should have finished establish leader loop")
|
|
})
|
|
|
|
require.Nil(t, s1.revokeLeadership())
|
|
require.False(t, s1.isReadyForConsistentReads())
|
|
|
|
ch := make(chan struct{})
|
|
require.Nil(t, s1.establishLeadership(ch))
|
|
require.True(t, s1.isReadyForConsistentReads())
|
|
}
|
|
|
|
// TestLeader_PausingWorkers asserts that scheduling workers are paused
|
|
// (and unpaused) upon leader elections (and step downs).
|
|
func TestLeader_PausingWorkers(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 12
|
|
})
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
require.Len(t, s1.workers, 12)
|
|
|
|
// this satisfies the require.Eventually test interface
|
|
checkPaused := func(count int) func() bool {
|
|
return func() bool {
|
|
pausedWorkers := func() int {
|
|
c := 0
|
|
for _, w := range s1.workers {
|
|
if w.IsPaused() {
|
|
c++
|
|
}
|
|
}
|
|
return c
|
|
}
|
|
|
|
return pausedWorkers() == count
|
|
}
|
|
}
|
|
|
|
// acquiring leadership should have paused 3/4 of the workers
|
|
require.Eventually(t, checkPaused(9), 1*time.Second, 10*time.Millisecond, "scheduler workers did not pause within a second at leadership change")
|
|
|
|
err := s1.revokeLeadership()
|
|
require.NoError(t, err)
|
|
|
|
// unpausing is a relatively quick activity
|
|
require.Eventually(t, checkPaused(0), 50*time.Millisecond, 10*time.Millisecond, "scheduler workers should have unpaused after losing leadership")
|
|
}
|
|
|
|
// Test doing an inplace upgrade on a server from raft protocol 2 to 3
|
|
// This verifies that removing the server and adding it back with a uuid works
|
|
// even if the server's address stays the same.
|
|
func TestServer_ReconcileMember(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
// Create a three node cluster
|
|
s1, cleanupS1 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 2
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS1()
|
|
|
|
s2, cleanupS2 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 2
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS2()
|
|
|
|
TestJoin(t, s1, s2)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// test relies on s3 not being the leader, so adding it
|
|
// after leadership has been established to reduce
|
|
s3, cleanupS3 := TestServer(t, func(c *Config) {
|
|
c.BootstrapExpect = 0
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer cleanupS3()
|
|
|
|
TestJoin(t, s1, s3)
|
|
|
|
// Create a memberlist object for s3, with raft protocol upgraded to 3
|
|
upgradedS3Member := serf.Member{
|
|
Name: s3.config.NodeName,
|
|
Addr: s3.config.RPCAddr.IP,
|
|
Status: serf.StatusAlive,
|
|
Tags: make(map[string]string),
|
|
}
|
|
upgradedS3Member.Tags["role"] = "nomad"
|
|
upgradedS3Member.Tags["id"] = s3.config.NodeID
|
|
upgradedS3Member.Tags["region"] = s3.config.Region
|
|
upgradedS3Member.Tags["dc"] = s3.config.Datacenter
|
|
upgradedS3Member.Tags["rpc_addr"] = "127.0.0.1"
|
|
upgradedS3Member.Tags["port"] = strconv.Itoa(s3.config.RPCAddr.Port)
|
|
upgradedS3Member.Tags["build"] = "0.8.0"
|
|
upgradedS3Member.Tags["vsn"] = "2"
|
|
upgradedS3Member.Tags["mvn"] = "1"
|
|
upgradedS3Member.Tags["raft_vsn"] = "3"
|
|
|
|
findLeader := func(t *testing.T) *Server {
|
|
t.Helper()
|
|
for _, s := range []*Server{s1, s2, s3} {
|
|
if s.IsLeader() {
|
|
t.Logf("found leader: %v %v", s.config.NodeID, s.config.RPCAddr)
|
|
return s
|
|
}
|
|
}
|
|
|
|
t.Fatalf("no leader found")
|
|
return nil
|
|
}
|
|
|
|
// Find the leader so that we can call reconcile member on it
|
|
leader := findLeader(t)
|
|
if err := leader.reconcileMember(upgradedS3Member); err != nil {
|
|
t.Fatalf("failed to reconcile member: %v", err)
|
|
}
|
|
|
|
// This should remove s3 from the config and potentially cause a leader election
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Figure out the new leader and call reconcile again, this should add s3 with the new ID format
|
|
leader = findLeader(t)
|
|
if err := leader.reconcileMember(upgradedS3Member); err != nil {
|
|
t.Fatalf("failed to reconcile member: %v", err)
|
|
}
|
|
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
future := s2.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
addrs := 0
|
|
ids := 0
|
|
for _, server := range future.Configuration().Servers {
|
|
if string(server.ID) == string(server.Address) {
|
|
addrs++
|
|
} else {
|
|
ids++
|
|
}
|
|
}
|
|
// After this, all three servers should have IDs in raft
|
|
if got, want := addrs, 0; got != want {
|
|
t.Fatalf("got %d server addresses want %d", got, want)
|
|
}
|
|
if got, want := ids, 3; got != want {
|
|
t.Fatalf("got %d server ids want %d: %#v", got, want, future.Configuration().Servers)
|
|
}
|
|
}
|
|
|
|
func TestLeader_ReplicateNamespaces(t *testing.T) {
|
|
ci.Parallel(t)
|
|
assert := assert.New(t)
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region1"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
})
|
|
defer cleanupS1()
|
|
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region2"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
c.ReplicationBackoff = 20 * time.Millisecond
|
|
c.ReplicationToken = root.SecretID
|
|
})
|
|
defer cleanupS2()
|
|
TestJoin(t, s1, s2)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
testutil.WaitForLeader(t, s2.RPC)
|
|
|
|
// Write a namespace to the authoritative region
|
|
ns1 := mock.Namespace()
|
|
assert.Nil(s1.State().UpsertNamespaces(100, []*structs.Namespace{ns1}))
|
|
|
|
// Wait for the namespace to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
state := s2.State()
|
|
out, err := state.NamespaceByName(nil, ns1.Name)
|
|
return out != nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate namespace")
|
|
})
|
|
|
|
// Delete the namespace at the authoritative region
|
|
assert.Nil(s1.State().DeleteNamespaces(200, []string{ns1.Name}))
|
|
|
|
// Wait for the namespace deletion to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
state := s2.State()
|
|
out, err := state.NamespaceByName(nil, ns1.Name)
|
|
return out == nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate namespace deletion")
|
|
})
|
|
}
|
|
|
|
func TestLeader_DiffNamespaces(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := state.TestStateStore(t)
|
|
|
|
// Populate the local state
|
|
ns1 := mock.Namespace()
|
|
ns2 := mock.Namespace()
|
|
ns3 := mock.Namespace()
|
|
assert.Nil(t, state.UpsertNamespaces(100, []*structs.Namespace{ns1, ns2, ns3}))
|
|
|
|
// Simulate a remote list
|
|
rns2 := ns2.Copy()
|
|
rns2.ModifyIndex = 50 // Ignored, same index
|
|
rns3 := ns3.Copy()
|
|
rns3.ModifyIndex = 100 // Updated, higher index
|
|
rns3.Hash = []byte{0, 1, 2, 3}
|
|
ns4 := mock.Namespace()
|
|
remoteList := []*structs.Namespace{
|
|
rns2,
|
|
rns3,
|
|
ns4,
|
|
}
|
|
delete, update := diffNamespaces(state, 50, remoteList)
|
|
sort.Strings(delete)
|
|
|
|
// ns1 does not exist on the remote side, should delete
|
|
assert.Equal(t, []string{structs.DefaultNamespace, ns1.Name}, delete)
|
|
|
|
// ns2 is un-modified - ignore. ns3 modified, ns4 new.
|
|
assert.Equal(t, []string{ns3.Name, ns4.Name}, update)
|
|
}
|
|
|
|
func TestLeader_ReplicateNodePools(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region1"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
})
|
|
defer cleanupS1()
|
|
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
|
|
c.Region = "region2"
|
|
c.AuthoritativeRegion = "region1"
|
|
c.ACLEnabled = true
|
|
c.ReplicationBackoff = 20 * time.Millisecond
|
|
c.ReplicationToken = root.SecretID
|
|
})
|
|
defer cleanupS2()
|
|
TestJoin(t, s1, s2)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
testutil.WaitForLeader(t, s2.RPC)
|
|
|
|
// Write a node pool to the authoritative region
|
|
np1 := mock.NodePool()
|
|
must.NoError(t, s1.State().UpsertNodePools(
|
|
structs.MsgTypeTestSetup, 100, []*structs.NodePool{np1}))
|
|
|
|
// Wait for the node pool to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
store := s2.State()
|
|
out, err := store.NodePoolByName(nil, np1.Name)
|
|
return out != nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate node pool")
|
|
})
|
|
|
|
// Delete the node pool at the authoritative region
|
|
must.NoError(t, s1.State().DeleteNodePools(structs.MsgTypeTestSetup, 200, []string{np1.Name}))
|
|
|
|
// Wait for the namespace deletion to replicate
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
store := s2.State()
|
|
out, err := store.NodePoolByName(nil, np1.Name)
|
|
return out == nil, err
|
|
}, func(err error) {
|
|
t.Fatalf("should replicate node pool deletion")
|
|
})
|
|
}
|
|
|
|
func TestLeader_DiffNodePools(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := state.TestStateStore(t)
|
|
|
|
// Populate the local state
|
|
np1, np2, np3 := mock.NodePool(), mock.NodePool(), mock.NodePool()
|
|
must.NoError(t, state.UpsertNodePools(
|
|
structs.MsgTypeTestSetup, 100, []*structs.NodePool{np1, np2, np3}))
|
|
|
|
// Simulate a remote list
|
|
rnp2 := np2.Copy()
|
|
rnp2.ModifyIndex = 50 // Ignored, same index
|
|
rnp3 := np3.Copy()
|
|
rnp3.ModifyIndex = 100 // Updated, higher index
|
|
rnp3.Description = "force a hash update"
|
|
rnp3.SetHash()
|
|
rnp4 := mock.NodePool()
|
|
remoteList := []*structs.NodePool{
|
|
rnp2,
|
|
rnp3,
|
|
rnp4,
|
|
}
|
|
delete, update := diffNodePools(state, 50, remoteList)
|
|
sort.Strings(delete)
|
|
|
|
// np1 does not exist on the remote side, should delete
|
|
test.Eq(t, []string{structs.NodePoolAll, structs.NodePoolDefault, np1.Name}, delete)
|
|
|
|
// np2 is un-modified - ignore. np3 modified, np4 new.
|
|
test.Eq(t, []*structs.NodePool{rnp3, rnp4}, update)
|
|
}
|
|
|
|
// waitForStableLeadership waits until a leader is elected and all servers
|
|
// get promoted as voting members, returns the leader
|
|
func waitForStableLeadership(t *testing.T, servers []*Server) *Server {
|
|
nPeers := len(servers)
|
|
|
|
// wait for all servers to discover each other
|
|
for _, s := range servers {
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
peers, _ := s.numPeers()
|
|
if peers != nPeers {
|
|
return false, fmt.Errorf("should find %d peers but found %d", nPeers, peers)
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
|
|
// wait for leader
|
|
var leader *Server
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
for _, s := range servers {
|
|
if s.IsLeader() {
|
|
leader = s
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
return false, fmt.Errorf("no leader found")
|
|
}, func(err error) {
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
// wait for all servers get marked as voters
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
future := leader.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
return false, fmt.Errorf("failed to get raft config: %v", future.Error())
|
|
}
|
|
ss := future.Configuration().Servers
|
|
if len(ss) != len(servers) {
|
|
return false, fmt.Errorf("raft doesn't contain all servers. Expected %d but found %d", len(servers), len(ss))
|
|
}
|
|
|
|
for _, s := range ss {
|
|
if s.Suffrage != raft.Voter {
|
|
return false, fmt.Errorf("configuration has non voting server: %v", s)
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
return leader
|
|
}
|
|
|
|
func TestServer_getLatestIndex(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
testServer, testServerCleanup := TestServer(t, nil)
|
|
defer testServerCleanup()
|
|
|
|
// Test a new state store value.
|
|
idx, success := testServer.getLatestIndex()
|
|
require.True(t, success)
|
|
must.Eq(t, 1, idx)
|
|
|
|
// Upsert something with a high index, and check again.
|
|
err := testServer.State().UpsertACLPolicies(
|
|
structs.MsgTypeTestSetup, 1013, []*structs.ACLPolicy{mock.ACLPolicy()})
|
|
require.NoError(t, err)
|
|
|
|
idx, success = testServer.getLatestIndex()
|
|
require.True(t, success)
|
|
must.Eq(t, 1013, idx)
|
|
}
|
|
|
|
func TestServer_handleEvalBrokerStateChange(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
testCases := []struct {
|
|
startValue bool
|
|
testServerCallBackConfig func(c *Config)
|
|
inputSchedulerConfiguration *structs.SchedulerConfiguration
|
|
expectedOutput bool
|
|
name string
|
|
}{
|
|
{
|
|
startValue: false,
|
|
testServerCallBackConfig: func(c *Config) { c.DefaultSchedulerConfig.PauseEvalBroker = false },
|
|
inputSchedulerConfiguration: nil,
|
|
expectedOutput: true,
|
|
name: "bootstrap un-paused",
|
|
},
|
|
{
|
|
startValue: false,
|
|
testServerCallBackConfig: func(c *Config) { c.DefaultSchedulerConfig.PauseEvalBroker = true },
|
|
inputSchedulerConfiguration: nil,
|
|
expectedOutput: false,
|
|
name: "bootstrap paused",
|
|
},
|
|
{
|
|
startValue: true,
|
|
testServerCallBackConfig: nil,
|
|
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true},
|
|
expectedOutput: false,
|
|
name: "state change to paused",
|
|
},
|
|
{
|
|
startValue: false,
|
|
testServerCallBackConfig: nil,
|
|
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true},
|
|
expectedOutput: false,
|
|
name: "no state change to paused",
|
|
},
|
|
{
|
|
startValue: false,
|
|
testServerCallBackConfig: nil,
|
|
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: false},
|
|
expectedOutput: true,
|
|
name: "state change to un-paused",
|
|
},
|
|
{
|
|
startValue: false,
|
|
testServerCallBackConfig: nil,
|
|
inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true},
|
|
expectedOutput: false,
|
|
name: "no state change to un-paused",
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
|
|
// Create a new server and wait for leadership to be established.
|
|
testServer, cleanupFn := TestServer(t, nil)
|
|
_ = waitForStableLeadership(t, []*Server{testServer})
|
|
defer cleanupFn()
|
|
|
|
// If we set a callback config, we are just testing the eventual
|
|
// state of the brokers. Otherwise, we set our starting value and
|
|
// then perform our state modification change and check.
|
|
if tc.testServerCallBackConfig == nil {
|
|
testServer.evalBroker.SetEnabled(tc.startValue)
|
|
testServer.blockedEvals.SetEnabled(tc.startValue)
|
|
actualOutput := testServer.handleEvalBrokerStateChange(tc.inputSchedulerConfiguration)
|
|
require.Equal(t, tc.expectedOutput, actualOutput)
|
|
}
|
|
|
|
// Check the brokers are in the expected state.
|
|
var expectedEnabledVal bool
|
|
|
|
if tc.inputSchedulerConfiguration == nil {
|
|
expectedEnabledVal = !testServer.config.DefaultSchedulerConfig.PauseEvalBroker
|
|
} else {
|
|
expectedEnabledVal = !tc.inputSchedulerConfiguration.PauseEvalBroker
|
|
}
|
|
require.Equal(t, expectedEnabledVal, testServer.evalBroker.Enabled())
|
|
require.Equal(t, expectedEnabledVal, testServer.blockedEvals.Enabled())
|
|
})
|
|
}
|
|
}
|