open-nomad/nomad/leader_test.go

1462 lines
36 KiB
Go
Raw Normal View History

package nomad
2015-06-05 21:54:45 +00:00
import (
"errors"
"fmt"
"strconv"
2015-06-05 21:54:45 +00:00
"testing"
2015-08-05 23:32:46 +00:00
"time"
2015-06-05 21:54:45 +00:00
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/go-hclog"
2017-02-08 05:22:48 +00:00
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
2015-06-05 21:54:45 +00:00
"github.com/hashicorp/nomad/testutil"
2018-04-10 00:51:55 +00:00
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/assert"
2018-02-20 23:05:16 +00:00
"github.com/stretchr/testify/require"
2015-06-05 21:54:45 +00:00
)
func TestLeader_LeftServer(t *testing.T) {
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
2015-06-05 21:54:45 +00:00
s2, cleanupS2 := TestServer(t, func(c *Config) {
2015-06-05 21:54:45 +00:00
c.DevDisableBootstrap = true
})
defer cleanupS2()
2015-06-05 21:54:45 +00:00
s3, cleanupS3 := TestServer(t, func(c *Config) {
2015-06-05 21:54:45 +00:00
c.DevDisableBootstrap = true
})
defer cleanupS3()
2015-06-05 21:54:45 +00:00
servers := []*Server{s1, s2, s3}
2018-01-12 01:00:30 +00:00
TestJoin(t, s1, s2, s3)
2015-06-05 21:54:45 +00:00
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
2017-02-03 01:50:06 +00:00
return peers == 3, nil
2015-06-05 21:54:45 +00:00
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
// Kill any server
2017-10-19 21:16:27 +00:00
var peer *Server
for _, s := range servers {
if !s.IsLeader() {
peer = s
break
}
}
if peer == nil {
t.Fatalf("Should have a non-leader")
}
peer.Shutdown()
name := fmt.Sprintf("%s.%s", peer.config.NodeName, peer.config.Region)
2015-06-05 21:54:45 +00:00
testutil.WaitForResult(func() (bool, error) {
2017-10-19 21:16:27 +00:00
for _, s := range servers {
if s == peer {
continue
}
// Force remove the non-leader (transition to left state)
if err := s.RemoveFailedNode(name); err != nil {
return false, err
}
2015-06-05 21:54:45 +00:00
peers, _ := s.numPeers()
2017-02-03 01:50:06 +00:00
return peers == 2, errors.New(fmt.Sprintf("%v", peers))
2015-06-05 21:54:45 +00:00
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestLeader_LeftLeader(t *testing.T) {
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
2015-06-05 21:54:45 +00:00
s2, cleanupS2 := TestServer(t, func(c *Config) {
2015-06-05 21:54:45 +00:00
c.DevDisableBootstrap = true
})
defer cleanupS2()
2015-06-05 21:54:45 +00:00
s3, cleanupS3 := TestServer(t, func(c *Config) {
2015-06-05 21:54:45 +00:00
c.DevDisableBootstrap = true
})
defer cleanupS3()
2015-06-05 21:54:45 +00:00
servers := []*Server{s1, s2, s3}
2018-01-12 01:00:30 +00:00
TestJoin(t, s1, s2, s3)
2015-06-05 21:54:45 +00:00
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
2017-02-03 01:50:06 +00:00
return peers == 3, nil
2015-06-05 21:54:45 +00:00
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
// Kill the leader!
var leader *Server
for _, s := range servers {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
t.Fatalf("Should have a leader")
}
leader.Leave()
leader.Shutdown()
for _, s := range servers {
if s == leader {
continue
}
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
2017-02-03 01:50:06 +00:00
return peers == 2, errors.New(fmt.Sprintf("%v", peers))
2015-06-05 21:54:45 +00:00
}, func(err error) {
t.Fatalf("should have 2 peers: %v", err)
})
}
}
func TestLeader_MultiBootstrap(t *testing.T) {
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
2015-06-05 21:54:45 +00:00
s2, cleanupS2 := TestServer(t, nil)
defer cleanupS2()
2015-06-05 21:54:45 +00:00
servers := []*Server{s1, s2}
2018-01-12 01:00:30 +00:00
TestJoin(t, s1, s2)
2015-06-05 21:54:45 +00:00
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers := s.Members()
return len(peers) == 2, nil
}, func(err error) {
t.Fatalf("should have 2 peers")
})
}
// Ensure we don't have multiple raft peers
for _, s := range servers {
peers, _ := s.numPeers()
2017-02-03 01:50:06 +00:00
if peers != 1 {
2015-06-05 21:54:45 +00:00
t.Fatalf("should only have 1 raft peer!")
}
}
}
2015-08-05 23:32:46 +00:00
func TestLeader_PlanQueue_Reset(t *testing.T) {
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
2015-08-05 23:32:46 +00:00
s2, cleanupS2 := TestServer(t, func(c *Config) {
2015-08-05 23:32:46 +00:00
c.DevDisableBootstrap = true
})
defer cleanupS2()
2015-08-05 23:32:46 +00:00
s3, cleanupS3 := TestServer(t, func(c *Config) {
2015-08-05 23:32:46 +00:00
c.DevDisableBootstrap = true
})
defer cleanupS3()
2015-08-05 23:32:46 +00:00
servers := []*Server{s1, s2, s3}
2018-01-12 01:00:30 +00:00
TestJoin(t, s1, s2, s3)
2015-08-05 23:32:46 +00:00
leader := waitForStableLeadership(t, servers)
2015-08-05 23:32:46 +00:00
if !leader.planQueue.Enabled() {
t.Fatalf("should enable plan queue")
}
for _, s := range servers {
if !s.IsLeader() && s.planQueue.Enabled() {
t.Fatalf("plan queue should not be enabled")
}
}
2015-08-05 23:32:46 +00:00
// Kill the leader
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
// Wait for a new leader
leader = nil
testutil.WaitForResult(func() (bool, error) {
for _, s := range servers {
if s.IsLeader() {
leader = s
return true, nil
}
}
return false, nil
}, func(err error) {
t.Fatalf("should have leader")
})
// Check that the new leader has a pending GC expiration
testutil.WaitForResult(func() (bool, error) {
return leader.planQueue.Enabled(), nil
}, func(err error) {
t.Fatalf("should enable plan queue")
})
}
func TestLeader_EvalBroker_Reset(t *testing.T) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.DevDisableBootstrap = true
})
defer cleanupS2()
s3, cleanupS3 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.DevDisableBootstrap = true
})
defer cleanupS3()
servers := []*Server{s1, s2, s3}
2018-01-12 01:00:30 +00:00
TestJoin(t, s1, s2, s3)
leader := waitForStableLeadership(t, servers)
// Inject a pending eval
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{mock.Eval()},
}
_, _, err := leader.raftApply(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Kill the leader
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
// Wait for a new leader
leader = nil
testutil.WaitForResult(func() (bool, error) {
for _, s := range servers {
if s.IsLeader() {
leader = s
return true, nil
}
}
return false, nil
}, func(err error) {
t.Fatalf("should have leader")
})
// Check that the new leader has a pending evaluation
testutil.WaitForResult(func() (bool, error) {
stats := leader.evalBroker.Stats()
return stats.TotalReady == 1, nil
}, func(err error) {
t.Fatalf("should have pending evaluation")
})
}
2015-08-15 22:15:00 +00:00
func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.DevDisableBootstrap = true
})
defer cleanupS2()
s3, cleanupS3 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.DevDisableBootstrap = true
})
defer cleanupS3()
servers := []*Server{s1, s2, s3}
2018-01-12 01:00:30 +00:00
TestJoin(t, s1, s2, s3)
leader := waitForStableLeadership(t, servers)
// Inject a periodic job, a parameterized periodic job and a non-periodic job
periodic := mock.PeriodicJob()
nonPeriodic := mock.Job()
parameterizedPeriodic := mock.PeriodicJob()
parameterizedPeriodic.ParameterizedJob = &structs.ParameterizedJobConfig{}
for _, job := range []*structs.Job{nonPeriodic, periodic, parameterizedPeriodic} {
req := structs.JobRegisterRequest{
Job: job,
2017-09-07 23:56:15 +00:00
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
_, _, err := leader.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
}
// Kill the leader
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
// Wait for a new leader
leader = nil
testutil.WaitForResult(func() (bool, error) {
for _, s := range servers {
if s.IsLeader() {
leader = s
return true, nil
}
}
return false, nil
}, func(err error) {
t.Fatalf("should have leader")
})
2017-09-07 23:56:15 +00:00
tuplePeriodic := structs.NamespacedID{
ID: periodic.ID,
Namespace: periodic.Namespace,
}
tupleNonPeriodic := structs.NamespacedID{
ID: nonPeriodic.ID,
Namespace: nonPeriodic.Namespace,
}
tupleParameterized := structs.NamespacedID{
ID: parameterizedPeriodic.ID,
Namespace: parameterizedPeriodic.Namespace,
}
// Check that the new leader is tracking the periodic job only
testutil.WaitForResult(func() (bool, error) {
test: fix race and nil panic in nomad/ tests Race was test only and due to unlocked map access. Panic was test only and due to checking a field on a struct even when we knew the struct was nil. Race output that was fixed: ``` ================== WARNING: DATA RACE Read at 0x00c000697dd0 by goroutine 768: runtime.mapaccess2() /usr/local/go/src/runtime/map.go:439 +0x0 github.com/hashicorp/nomad/nomad.TestLeader_PeriodicDispatcher_Restore_Adds.func8() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader_test.go:402 +0xe6 github.com/hashicorp/nomad/testutil.WaitForResultRetries() /home/schmichael/go/src/github.com/hashicorp/nomad/testutil/wait.go:30 +0x5a github.com/hashicorp/nomad/testutil.WaitForResult() /home/schmichael/go/src/github.com/hashicorp/nomad/testutil/wait.go:22 +0x57 github.com/hashicorp/nomad/nomad.TestLeader_PeriodicDispatcher_Restore_Adds() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader_test.go:401 +0xb53 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous write at 0x00c000697dd0 by goroutine 569: runtime.mapassign() /usr/local/go/src/runtime/map.go:549 +0x0 github.com/hashicorp/nomad/nomad.(*PeriodicDispatch).Add() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/periodic.go:224 +0x2eb github.com/hashicorp/nomad/nomad.(*Server).restorePeriodicDispatcher() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:394 +0x29a github.com/hashicorp/nomad/nomad.(*Server).establishLeadership() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:234 +0x593 github.com/hashicorp/nomad/nomad.(*Server).leaderLoop() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:117 +0x82e github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership.func1() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:72 +0x6c Goroutine 768 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x650 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:1150 +0x221 Goroutine 569 (running) created at: github.com/hashicorp/nomad/nomad.(*Server).monitorLeadership() /home/schmichael/go/src/github.com/hashicorp/nomad/nomad/leader.go:70 +0x269 ================== ```
2018-12-19 21:05:12 +00:00
leader.periodicDispatcher.l.Lock()
defer leader.periodicDispatcher.l.Unlock()
2017-09-07 23:56:15 +00:00
if _, tracked := leader.periodicDispatcher.tracked[tuplePeriodic]; !tracked {
return false, fmt.Errorf("periodic job not tracked")
}
2017-09-07 23:56:15 +00:00
if _, tracked := leader.periodicDispatcher.tracked[tupleNonPeriodic]; tracked {
return false, fmt.Errorf("non periodic job tracked")
}
2017-09-07 23:56:15 +00:00
if _, tracked := leader.periodicDispatcher.tracked[tupleParameterized]; tracked {
return false, fmt.Errorf("parameterized periodic job tracked")
}
return true, nil
}, func(err error) {
t.Fatalf(err.Error())
})
}
func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
2015-12-05 00:53:36 +00:00
testutil.WaitForLeader(t, s1.RPC)
// Inject a periodic job that will be triggered soon.
launch := time.Now().Add(1 * time.Second)
job := testPeriodicJob(launch)
req := structs.JobRegisterRequest{
Job: job,
2017-09-07 23:56:15 +00:00
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Flush the periodic dispatcher, ensuring that no evals will be created.
2015-12-05 00:53:36 +00:00
s1.periodicDispatcher.SetEnabled(false)
// Get the current time to ensure the launch time is after this once we
// restore.
now := time.Now()
// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)
// Restore the periodic dispatcher.
2015-12-05 00:53:36 +00:00
s1.periodicDispatcher.SetEnabled(true)
s1.restorePeriodicDispatcher()
// Ensure the job is tracked.
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, tracked := s1.periodicDispatcher.tracked[tuple]; !tracked {
t.Fatalf("periodic job not restored")
}
// Check that an eval was made.
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
if err != nil || last == nil {
t.Fatalf("failed to get periodic launch time: %v", err)
}
if last.Launch.Before(now) {
t.Fatalf("restorePeriodicDispatcher did not force launch: last %v; want after %v", last.Launch, now)
}
}
func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
2015-12-05 00:53:36 +00:00
testutil.WaitForLeader(t, s1.RPC)
// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)
job := testPeriodicJob(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
2017-09-07 23:56:15 +00:00
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create an eval for the past launch.
s1.periodicDispatcher.createEval(job, past)
// Flush the periodic dispatcher, ensuring that no evals will be created.
2015-12-05 00:53:36 +00:00
s1.periodicDispatcher.SetEnabled(false)
// Sleep till after the job should have been launched.
2015-12-05 00:53:36 +00:00
time.Sleep(3 * time.Second)
// Restore the periodic dispatcher.
2015-12-05 00:53:36 +00:00
s1.periodicDispatcher.SetEnabled(true)
s1.restorePeriodicDispatcher()
// Ensure the job is tracked.
2017-09-07 23:56:15 +00:00
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, tracked := s1.periodicDispatcher.tracked[tuple]; !tracked {
t.Fatalf("periodic job not restored")
}
// Check that an eval was made.
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
last, err := s1.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
if err != nil || last == nil {
t.Fatalf("failed to get periodic launch time: %v", err)
}
if last.Launch == past {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}
}
2015-08-15 22:15:00 +00:00
func TestLeader_PeriodicDispatch(t *testing.T) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
2015-08-15 22:15:00 +00:00
c.NumSchedulers = 0
c.EvalGCInterval = 5 * time.Millisecond
})
defer cleanupS1()
2015-08-15 22:15:00 +00:00
// Wait for a periodic dispatch
testutil.WaitForResult(func() (bool, error) {
stats := s1.evalBroker.Stats()
bySched, ok := stats.ByScheduler[structs.JobTypeCore]
if !ok {
return false, nil
}
return bySched.Ready > 0, nil
}, func(err error) {
t.Fatalf("should pending job")
})
}
2015-08-16 18:10:18 +00:00
func TestLeader_ReapFailedEval(t *testing.T) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
2015-08-16 18:10:18 +00:00
c.NumSchedulers = 0
c.EvalDeliveryLimit = 1
})
defer cleanupS1()
2015-08-16 18:10:18 +00:00
testutil.WaitForLeader(t, s1.RPC)
// Wait for a periodic dispatch
eval := mock.Eval()
s1.evalBroker.Enqueue(eval)
2015-08-16 18:10:18 +00:00
// Dequeue and Nack
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
s1.evalBroker.Nack(out.ID, token)
// Wait for an updated and followup evaluation
2015-08-16 18:10:18 +00:00
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
out, err := state.EvalByID(ws, eval.ID)
2015-08-16 18:10:18 +00:00
if err != nil {
return false, err
}
if out == nil {
return false, fmt.Errorf("expect original evaluation to exist")
}
if out.Status != structs.EvalStatusFailed {
return false, fmt.Errorf("got status %v; want %v", out.Status, structs.EvalStatusFailed)
}
if out.NextEval == "" {
return false, fmt.Errorf("got empty NextEval")
}
// See if there is a followup
2017-09-07 23:56:15 +00:00
evals, err := state.EvalsByJob(ws, eval.Namespace, eval.JobID)
if err != nil {
return false, err
}
if l := len(evals); l != 2 {
return false, fmt.Errorf("got %d evals, want 2", l)
}
for _, e := range evals {
if e.ID == eval.ID {
continue
}
if e.Status != structs.EvalStatusPending {
return false, fmt.Errorf("follow up eval has status %v; want %v",
e.Status, structs.EvalStatusPending)
}
if e.ID != out.NextEval {
return false, fmt.Errorf("follow up eval id is %v; orig eval NextEval %v",
e.ID, out.NextEval)
}
2017-04-14 22:24:55 +00:00
if e.Wait < s1.config.EvalFailedFollowupBaselineDelay ||
e.Wait > s1.config.EvalFailedFollowupBaselineDelay+s1.config.EvalFailedFollowupDelayRange {
return false, fmt.Errorf("bad wait: %v", e.Wait)
}
if e.TriggeredBy != structs.EvalTriggerFailedFollowUp {
return false, fmt.Errorf("follow up eval TriggeredBy %v; want %v",
e.TriggeredBy, structs.EvalTriggerFailedFollowUp)
}
}
return true, nil
2015-08-16 18:10:18 +00:00
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestLeader_ReapDuplicateEval(t *testing.T) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// Create a duplicate blocked eval
eval := mock.Eval()
2018-11-07 19:59:24 +00:00
eval.CreateIndex = 100
eval2 := mock.Eval()
eval2.JobID = eval.JobID
2018-11-07 19:59:24 +00:00
eval2.CreateIndex = 102
s1.blockedEvals.Block(eval)
s1.blockedEvals.Block(eval2)
// Wait for the evaluation to marked as cancelled
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
2018-11-07 19:59:24 +00:00
out, err := state.EvalByID(ws, eval.ID)
if err != nil {
return false, err
}
return out != nil && out.Status == structs.EvalStatusCancelled, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestLeader_revokeVaultAccessorsOnRestore(t *testing.T) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// Insert a vault accessor that should be revoked
fsmState := s1.fsm.State()
va := mock.VaultAccessor()
if err := fsmState.UpsertVaultAccessor(100, []*structs.VaultAccessor{va}); err != nil {
t.Fatalf("bad: %v", err)
}
// Swap the Vault client
tvc := &TestVaultClient{}
s1.vault = tvc
// Do a restore
if err := s1.revokeVaultAccessorsOnRestore(); err != nil {
t.Fatalf("Failed to restore: %v", err)
}
if len(tvc.RevokedTokens) != 1 && tvc.RevokedTokens[0].Accessor != va.Accessor {
t.Fatalf("Bad revoked accessors: %v", tvc.RevokedTokens)
}
}
func TestLeader_revokeSITokenAccessorsOnRestore(t *testing.T) {
t.Parallel()
r := require.New(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// replace consul ACLs api with a mock for tracking calls
var consulACLsAPI mockConsulACLsAPI
s1.consulACLs = &consulACLsAPI
// Insert a SI token accessor that should be revoked
fsmState := s1.fsm.State()
accessor := mock.SITokenAccessor()
err := fsmState.UpsertSITokenAccessors(100, []*structs.SITokenAccessor{accessor})
r.NoError(err)
// Do a restore
err = s1.revokeSITokenAccessorsOnRestore()
r.NoError(err)
// Check the accessor was revoked
exp := []string{accessor.AccessorID}
r.ElementsMatch(exp, consulACLsAPI.revokeRequests)
}
func TestLeader_ClusterID(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.Build = minClusterIDVersion.String()
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
clusterID, err := s1.ClusterID()
require.NoError(t, err)
require.True(t, helper.IsUUID(clusterID))
}
func TestLeader_ClusterID_upgradePath(t *testing.T) {
t.Parallel()
before := version.Must(version.NewVersion("0.10.1")).String()
after := minClusterIDVersion.String()
type server struct {
s *Server
cleanup func()
}
outdated := func(bootstrap bool) server {
s, cleanup := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.Build = before
c.DevDisableBootstrap = bootstrap
c.BootstrapExpect = 3
c.Logger.SetLevel(hclog.Trace)
})
return server{s: s, cleanup: cleanup}
}
upgraded := func(bootstrap bool) server {
s, cleanup := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
c.Build = after
c.DevDisableBootstrap = bootstrap
c.BootstrapExpect = 3
c.Logger.SetLevel(hclog.Trace)
})
return server{s: s, cleanup: cleanup}
}
servers := []server{outdated(false), outdated(true), outdated(true)}
// fallback shutdown attempt in case testing fails
defer servers[0].cleanup()
defer servers[1].cleanup()
defer servers[2].cleanup()
upgrade := func(i int) {
previous := servers[i]
servers[i] = upgraded(true)
TestJoin(t, servers[i].s, servers[(i+1)%3].s, servers[(i+2)%3].s)
testutil.WaitForLeader(t, servers[i].s.RPC)
require.NoError(t, previous.s.Leave())
require.NoError(t, previous.s.Shutdown())
}
// Join the servers before doing anything
TestJoin(t, servers[0].s, servers[1].s, servers[2].s)
// Wait for servers to settle
for i := 0; i < len(servers); i++ {
testutil.WaitForLeader(t, servers[i].s.RPC)
}
// A check that ClusterID is not available yet
noIDYet := func() {
for _, s := range servers {
retry.Run(t, func(r *retry.R) {
if _, err := s.s.ClusterID(); err == nil {
r.Error("expected error")
}
})
}
}
// Replace first old server with new server
upgrade(0)
defer servers[0].cleanup()
noIDYet() // ClusterID should not work yet, servers: [new, old, old]
// Replace second old server with new server
upgrade(1)
defer servers[1].cleanup()
noIDYet() // ClusterID should not work yet, servers: [new, new, old]
// Replace third / final old server with new server
upgrade(2)
defer servers[2].cleanup()
// Wait for old servers to really be gone
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.s.numPeers()
return peers == 3, nil
}, func(_ error) {
t.Fatalf("should have 3 peers")
})
}
// Now we can tickle the leader into making a cluster ID
leaderID := ""
for _, s := range servers {
if s.s.IsLeader() {
id, err := s.s.ClusterID()
require.NoError(t, err)
leaderID = id
break
}
}
require.True(t, helper.IsUUID(leaderID))
// Now every participating server has been upgraded, each one should be
// able to get the cluster ID, having been plumbed all the way through.
agreeClusterID(t, []*Server{servers[0].s, servers[1].s, servers[2].s})
}
func TestLeader_ClusterID_noUpgrade(t *testing.T) {
t.Parallel()
type server struct {
s *Server
cleanup func()
}
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.Logger.SetLevel(hclog.Trace)
c.NumSchedulers = 0
c.Build = minClusterIDVersion.String()
c.BootstrapExpect = 3
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.Logger.SetLevel(hclog.Trace)
c.NumSchedulers = 0
c.Build = minClusterIDVersion.String()
c.DevDisableBootstrap = true
c.BootstrapExpect = 3
})
defer cleanupS2()
s3, cleanupS3 := TestServer(t, func(c *Config) {
c.Logger.SetLevel(hclog.Trace)
c.NumSchedulers = 0
c.Build = minClusterIDVersion.String()
c.DevDisableBootstrap = true
c.BootstrapExpect = 3
})
defer cleanupS3()
servers := []*Server{s1, s2, s3}
// Join the servers before doing anything
TestJoin(t, servers[0], servers[1], servers[2])
// Wait for servers to settle
for i := 0; i < len(servers); i++ {
testutil.WaitForLeader(t, servers[i].RPC)
}
// Each server started at the minimum version, check there should be only 1
// cluster ID they all agree on.
agreeClusterID(t, []*Server{servers[0], servers[1], servers[2]})
}
func agreeClusterID(t *testing.T, servers []*Server) {
retries := &retry.Timer{Timeout: 60 * time.Second, Wait: 1 * time.Second}
ids := make([]string, 3)
for i, s := range servers {
retry.RunWith(retries, t, func(r *retry.R) {
id, err := s.ClusterID()
if err != nil {
r.Error(err.Error())
return
}
if !helper.IsUUID(id) {
r.Error("not a UUID")
return
}
ids[i] = id
})
}
require.True(t, ids[0] == ids[1] && ids[1] == ids[2], "ids[0] %s, ids[1] %s, ids[2] %s", ids[0], ids[1], ids[2])
}
func TestLeader_ReplicateACLPolicies(t *testing.T) {
t.Parallel()
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.Region = "region1"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
})
defer cleanupS1()
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
c.Region = "region2"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
c.ReplicationBackoff = 20 * time.Millisecond
c.ReplicationToken = root.SecretID
})
defer cleanupS2()
2018-01-12 01:00:30 +00:00
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
// Write a policy to the authoritative region
p1 := mock.ACLPolicy()
if err := s1.State().UpsertACLPolicies(100, []*structs.ACLPolicy{p1}); err != nil {
t.Fatalf("bad: %v", err)
}
// Wait for the policy to replicate
testutil.WaitForResult(func() (bool, error) {
state := s2.State()
out, err := state.ACLPolicyByName(nil, p1.Name)
return out != nil, err
}, func(err error) {
t.Fatalf("should replicate policy")
})
}
func TestLeader_DiffACLPolicies(t *testing.T) {
t.Parallel()
2017-10-13 21:36:02 +00:00
state := state.TestStateStore(t)
// Populate the local state
p1 := mock.ACLPolicy()
p2 := mock.ACLPolicy()
p3 := mock.ACLPolicy()
2017-10-13 21:36:02 +00:00
assert.Nil(t, state.UpsertACLPolicies(100, []*structs.ACLPolicy{p1, p2, p3}))
// Simulate a remote list
p2Stub := p2.Stub()
p2Stub.ModifyIndex = 50 // Ignored, same index
p3Stub := p3.Stub()
p3Stub.ModifyIndex = 100 // Updated, higher index
p3Stub.Hash = []byte{0, 1, 2, 3}
p4 := mock.ACLPolicy()
remoteList := []*structs.ACLPolicyListStub{
p2Stub,
p3Stub,
p4.Stub(),
}
delete, update := diffACLPolicies(state, 50, remoteList)
// P1 does not exist on the remote side, should delete
assert.Equal(t, []string{p1.Name}, delete)
// P2 is un-modified - ignore. P3 modified, P4 new.
assert.Equal(t, []string{p3.Name, p4.Name}, update)
}
2017-08-13 23:45:13 +00:00
func TestLeader_ReplicateACLTokens(t *testing.T) {
t.Parallel()
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
2017-08-13 23:45:13 +00:00
c.Region = "region1"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
})
defer cleanupS1()
s2, _, cleanupS2 := TestACLServer(t, func(c *Config) {
2017-08-13 23:45:13 +00:00
c.Region = "region2"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
c.ReplicationBackoff = 20 * time.Millisecond
c.ReplicationToken = root.SecretID
2017-08-13 23:45:13 +00:00
})
defer cleanupS2()
2018-01-12 01:00:30 +00:00
TestJoin(t, s1, s2)
2017-08-13 23:45:13 +00:00
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
// Write a token to the authoritative region
p1 := mock.ACLToken()
p1.Global = true
if err := s1.State().UpsertACLTokens(100, []*structs.ACLToken{p1}); err != nil {
t.Fatalf("bad: %v", err)
}
// Wait for the token to replicate
testutil.WaitForResult(func() (bool, error) {
state := s2.State()
out, err := state.ACLTokenByAccessorID(nil, p1.AccessorID)
return out != nil, err
}, func(err error) {
t.Fatalf("should replicate token")
})
}
func TestLeader_DiffACLTokens(t *testing.T) {
t.Parallel()
2017-10-13 21:36:02 +00:00
state := state.TestStateStore(t)
2017-08-13 23:45:13 +00:00
// Populate the local state
p0 := mock.ACLToken()
p1 := mock.ACLToken()
p1.Global = true
p2 := mock.ACLToken()
p2.Global = true
p3 := mock.ACLToken()
p3.Global = true
2017-10-13 21:36:02 +00:00
assert.Nil(t, state.UpsertACLTokens(100, []*structs.ACLToken{p0, p1, p2, p3}))
2017-08-13 23:45:13 +00:00
// Simulate a remote list
p2Stub := p2.Stub()
p2Stub.ModifyIndex = 50 // Ignored, same index
p3Stub := p3.Stub()
p3Stub.ModifyIndex = 100 // Updated, higher index
p3Stub.Hash = []byte{0, 1, 2, 3}
2017-08-13 23:45:13 +00:00
p4 := mock.ACLToken()
p4.Global = true
remoteList := []*structs.ACLTokenListStub{
p2Stub,
p3Stub,
p4.Stub(),
}
delete, update := diffACLTokens(state, 50, remoteList)
// P0 is local and should be ignored
// P1 does not exist on the remote side, should delete
assert.Equal(t, []string{p1.AccessorID}, delete)
// P2 is un-modified - ignore. P3 modified, P4 new.
assert.Equal(t, []string{p3.AccessorID, p4.AccessorID}, update)
}
func TestLeader_UpgradeRaftVersion(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
2018-01-12 01:00:30 +00:00
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 2
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 1
})
defer cleanupS2()
s3, cleanupS3 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 2
})
defer cleanupS3()
servers := []*Server{s1, s2, s3}
// Try to join
2018-01-12 01:00:30 +00:00
TestJoin(t, s1, s2, s3)
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
// Kill the v1 server
if err := s2.Leave(); err != nil {
t.Fatal(err)
}
for _, s := range []*Server{s1, s3} {
minVer, err := s.autopilot.MinRaftProtocol()
if err != nil {
t.Fatal(err)
}
if got, want := minVer, 2; got != want {
t.Fatalf("got min raft version %d want %d", got, want)
}
}
// Replace the dead server with one running raft protocol v3
s4, cleanupS4 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS4()
2018-01-12 01:00:30 +00:00
TestJoin(t, s1, s4)
servers[1] = s4
// Make sure we're back to 3 total peers with the new one added via ID
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
addrs := 0
ids := 0
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return false, err
}
for _, server := range future.Configuration().Servers {
if string(server.ID) == string(server.Address) {
addrs++
} else {
ids++
}
}
if got, want := addrs, 2; got != want {
return false, fmt.Errorf("got %d server addresses want %d", got, want)
}
if got, want := ids, 1; got != want {
return false, fmt.Errorf("got %d server ids want %d", got, want)
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
}
}
func TestLeader_Reelection(t *testing.T) {
raftProtocols := []int{1, 2, 3}
for _, p := range raftProtocols {
t.Run("Leader Election - Protocol version "+string(p), func(t *testing.T) {
leaderElectionTest(t, raft.ProtocolVersion(p))
})
}
}
func leaderElectionTest(t *testing.T, raftProtocol raft.ProtocolVersion) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
2018-04-10 00:51:55 +00:00
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = raftProtocol
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
2018-04-10 00:51:55 +00:00
c.BootstrapExpect = 3
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = raftProtocol
})
defer cleanupS2()
s3, cleanupS3 := TestServer(t, func(c *Config) {
2018-04-10 00:51:55 +00:00
c.BootstrapExpect = 3
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = raftProtocol
})
defer cleanupS3() // todo(shoenig) added this, should be here right??
servers := []*Server{s1, s2, s3}
// Try to join
TestJoin(t, s1, s2, s3)
2018-04-10 00:51:55 +00:00
testutil.WaitForLeader(t, s1.RPC)
2018-04-10 00:51:55 +00:00
testutil.WaitForResult(func() (bool, error) {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
return false, err
}
2018-04-10 00:51:55 +00:00
for _, server := range future.Configuration().Servers {
if server.Suffrage == raft.Nonvoter {
return false, fmt.Errorf("non-voter %v", server)
}
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
2018-04-10 00:51:55 +00:00
var leader, nonLeader *Server
for _, s := range servers {
if s.IsLeader() {
leader = s
} else {
nonLeader = s
}
}
// Shutdown the leader
leader.Shutdown()
2018-04-10 13:16:50 +00:00
// Wait for new leader to elect
testutil.WaitForLeader(t, nonLeader.RPC)
}
func TestLeader_RollRaftServer(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 2
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 2
})
defer cleanupS2()
s3, cleanupS3 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 2
})
defer cleanupS3()
servers := []*Server{s1, s2, s3}
// Try to join
2018-01-12 01:00:30 +00:00
TestJoin(t, s1, s2, s3)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Kill the first v2 server
s1.Shutdown()
for _, s := range []*Server{s1, s3} {
retry.Run(t, func(r *retry.R) {
minVer, err := s.autopilot.MinRaftProtocol()
if err != nil {
r.Fatal(err)
}
if got, want := minVer, 2; got != want {
r.Fatalf("got min raft version %d want %d", got, want)
}
})
}
// Replace the dead server with one running raft protocol v3
s4, cleanupS4 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS4()
TestJoin(t, s4, s2)
servers[0] = s4
// Kill the second v2 server
s2.Shutdown()
for _, s := range []*Server{s3, s4} {
retry.Run(t, func(r *retry.R) {
minVer, err := s.autopilot.MinRaftProtocol()
if err != nil {
r.Fatal(err)
}
if got, want := minVer, 2; got != want {
r.Fatalf("got min raft version %d want %d", got, want)
}
})
}
// Replace another dead server with one running raft protocol v3
s5, cleanupS5 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS5()
TestJoin(t, s5, s4)
servers[1] = s5
// Kill the last v2 server, now minRaftProtocol should be 3
s3.Shutdown()
for _, s := range []*Server{s4, s5} {
retry.Run(t, func(r *retry.R) {
minVer, err := s.autopilot.MinRaftProtocol()
if err != nil {
r.Fatal(err)
}
if got, want := minVer, 3; got != want {
r.Fatalf("got min raft version %d want %d", got, want)
}
})
}
// Replace the last dead server with one running raft protocol v3
s6, cleanupS6 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS6()
TestJoin(t, s6, s4)
servers[2] = s6
// Make sure all the dead servers are removed and we're back to 3 total peers
for _, s := range servers {
retry.Run(t, func(r *retry.R) {
addrs := 0
ids := 0
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
r.Fatal(err)
}
for _, server := range future.Configuration().Servers {
if string(server.ID) == string(server.Address) {
addrs++
} else {
ids++
}
}
if got, want := addrs, 0; got != want {
r.Fatalf("got %d server addresses want %d", got, want)
}
if got, want := ids, 3; got != want {
r.Fatalf("got %d server ids want %d", got, want)
}
})
}
}
2018-02-20 23:05:16 +00:00
func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) {
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
2018-02-20 23:05:16 +00:00
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForResult(func() (bool, error) {
return s1.evalBroker.Enabled(), nil
}, func(err error) {
t.Fatalf("should have finished establish leader loop")
})
require.Nil(t, s1.revokeLeadership())
require.Nil(t, s1.revokeLeadership())
require.Nil(t, s1.revokeLeadership())
}
func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) {
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForResult(func() (bool, error) {
return s1.isReadyForConsistentReads(), nil
}, func(err error) {
require.Fail(t, "should have finished establish leader loop")
})
require.Nil(t, s1.revokeLeadership())
require.False(t, s1.isReadyForConsistentReads())
ch := make(chan struct{})
require.Nil(t, s1.establishLeadership(ch))
require.True(t, s1.isReadyForConsistentReads())
}
2018-05-30 18:05:15 +00:00
// Test doing an inplace upgrade on a server from raft protocol 2 to 3
// This verifies that removing the server and adding it back with a uuid works
// even if the server's address stays the same.
func TestServer_ReconcileMember(t *testing.T) {
t.Parallel()
// Create a three node cluster
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS2()
s3, cleanupS3 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.RaftConfig.ProtocolVersion = 2
})
defer cleanupS3()
TestJoin(t, s1, s2, s3)
testutil.WaitForLeader(t, s1.RPC)
// Create a memberlist object for s3, with raft protocol upgraded to 3
upgradedS3Member := serf.Member{
Name: s3.config.NodeName,
Addr: s3.config.RPCAddr.IP,
Status: serf.StatusAlive,
Tags: make(map[string]string),
}
upgradedS3Member.Tags["role"] = "nomad"
upgradedS3Member.Tags["id"] = s3.config.NodeID
upgradedS3Member.Tags["region"] = s3.config.Region
upgradedS3Member.Tags["dc"] = s3.config.Datacenter
upgradedS3Member.Tags["rpc_addr"] = "127.0.0.1"
upgradedS3Member.Tags["port"] = strconv.Itoa(s3.config.RPCAddr.Port)
upgradedS3Member.Tags["build"] = "0.8.0"
upgradedS3Member.Tags["vsn"] = "2"
upgradedS3Member.Tags["mvn"] = "1"
upgradedS3Member.Tags["raft_vsn"] = "3"
// Find the leader so that we can call reconcile member on it
var leader *Server
for _, s := range []*Server{s1, s2, s3} {
if s.IsLeader() {
leader = s
}
}
leader.reconcileMember(upgradedS3Member)
// This should remove s3 from the config and potentially cause a leader election
testutil.WaitForLeader(t, s1.RPC)
// Figure out the new leader and call reconcile again, this should add s3 with the new ID format
for _, s := range []*Server{s1, s2, s3} {
if s.IsLeader() {
leader = s
}
}
leader.reconcileMember(upgradedS3Member)
testutil.WaitForLeader(t, s1.RPC)
future := s2.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatal(err)
}
addrs := 0
ids := 0
for _, server := range future.Configuration().Servers {
if string(server.ID) == string(server.Address) {
addrs++
} else {
ids++
}
}
// After this, all three servers should have IDs in raft
if got, want := addrs, 0; got != want {
t.Fatalf("got %d server addresses want %d", got, want)
}
if got, want := ids, 3; got != want {
t.Fatalf("got %d server ids want %d", got, want)
}
}
// waitForStableLeadership waits until a leader is elected and all servers
// get promoted as voting members, returns the leader
func waitForStableLeadership(t *testing.T, servers []*Server) *Server {
nPeers := len(servers)
// wait for all servers to discover each other
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, fmt.Errorf("should find %d peers but found %d", nPeers, peers)
}, func(err error) {
require.NoError(t, err)
})
}
// wait for leader
var leader *Server
testutil.WaitForResult(func() (bool, error) {
for _, s := range servers {
if s.IsLeader() {
leader = s
return true, nil
}
}
return false, fmt.Errorf("no leader found")
}, func(err error) {
require.NoError(t, err)
})
// wait for all servers get marked as voters
testutil.WaitForResult(func() (bool, error) {
future := leader.raft.GetConfiguration()
if err := future.Error(); err != nil {
return false, fmt.Errorf("failed to get raft config: %v", future.Error())
}
ss := future.Configuration().Servers
if len(ss) != len(servers) {
return false, fmt.Errorf("raft doesn't contain all servers. Expected %d but found %d", len(servers), len(ss))
}
for _, s := range ss {
if s.Suffrage != raft.Voter {
return false, fmt.Errorf("configuration has non voting server: %v", s)
}
}
return true, nil
}, func(err error) {
require.NoError(t, err)
})
return leader
}