nomad: using Raft StartAsLeader to make tests faster
This commit is contained in:
parent
24e032b61f
commit
7d69aa78c1
|
@ -44,6 +44,8 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
|
|||
config.RaftConfig.LeaderLeaseTimeout = 20 * time.Millisecond
|
||||
config.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
|
||||
config.RaftConfig.ElectionTimeout = 40 * time.Millisecond
|
||||
config.RaftConfig.StartAsLeader = true
|
||||
config.RaftTimeout = 500 * time.Millisecond
|
||||
|
||||
// Invoke the callback if any
|
||||
if cb != nil {
|
||||
|
|
|
@ -54,6 +54,7 @@ func makeAgent(t *testing.T, cb func(*Config)) (string, *Agent) {
|
|||
config.RaftConfig.LeaderLeaseTimeout = 20 * time.Millisecond
|
||||
config.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
|
||||
config.RaftConfig.ElectionTimeout = 40 * time.Millisecond
|
||||
config.RaftConfig.StartAsLeader = true
|
||||
config.RaftTimeout = 500 * time.Millisecond
|
||||
|
||||
if cb != nil {
|
||||
|
|
|
@ -14,11 +14,10 @@ import (
|
|||
// as the leader in the Raft cluster. There is some work the leader is
|
||||
// expected to do, so we must react to changes
|
||||
func (s *Server) monitorLeadership() {
|
||||
leaderCh := s.raft.LeaderCh()
|
||||
var stopCh chan struct{}
|
||||
for {
|
||||
select {
|
||||
case isLeader := <-leaderCh:
|
||||
case isLeader := <-s.leaderCh:
|
||||
if isLeader {
|
||||
stopCh = make(chan struct{})
|
||||
go s.leaderLoop(stopCh)
|
||||
|
|
|
@ -66,6 +66,7 @@ type Server struct {
|
|||
|
||||
// The raft instance is used among Nomad nodes within the
|
||||
// region to protect operations that require strong consistency
|
||||
leaderCh <-chan bool
|
||||
raft *raft.Raft
|
||||
raftLayer *RaftLayer
|
||||
raftPeers raft.PeerStore
|
||||
|
@ -197,7 +198,6 @@ func NewServer(config *Config) (*Server, error) {
|
|||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to start serf: %v", err)
|
||||
}
|
||||
go s.serfEventHandler()
|
||||
|
||||
// Intialize the scheduling workers
|
||||
if err := s.setupWorkers(); err != nil {
|
||||
|
@ -205,6 +205,12 @@ func NewServer(config *Config) (*Server, error) {
|
|||
return nil, fmt.Errorf("Failed to start workers: %v", err)
|
||||
}
|
||||
|
||||
// Monitor leadership changes
|
||||
go s.monitorLeadership()
|
||||
|
||||
// Start ingesting events for Serf
|
||||
go s.serfEventHandler()
|
||||
|
||||
// Start the RPC listeners
|
||||
go s.listen()
|
||||
|
||||
|
@ -471,6 +477,11 @@ func (s *Server) setupRaft() error {
|
|||
// Make sure we set the LogOutput
|
||||
s.config.RaftConfig.LogOutput = s.config.LogOutput
|
||||
|
||||
// Setup the leader channel
|
||||
leaderCh := make(chan bool, 1)
|
||||
s.config.RaftConfig.NotifyCh = leaderCh
|
||||
s.leaderCh = leaderCh
|
||||
|
||||
// Setup the Raft store
|
||||
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable,
|
||||
snap, peers, trans)
|
||||
|
@ -481,9 +492,6 @@ func (s *Server) setupRaft() error {
|
|||
trans.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Start monitoring leadership
|
||||
go s.monitorLeadership()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -499,7 +507,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
|
|||
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
|
||||
conf.Tags["build"] = s.config.Build
|
||||
conf.Tags["port"] = fmt.Sprintf("%d", s.rpcAdvertise.(*net.TCPAddr).Port)
|
||||
if s.config.Bootstrap {
|
||||
if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) {
|
||||
conf.Tags["bootstrap"] = "1"
|
||||
}
|
||||
if s.config.BootstrapExpect != 0 {
|
||||
|
|
|
@ -54,6 +54,9 @@ func testServer(t *testing.T, cb func(*Config)) *Server {
|
|||
cb(config)
|
||||
}
|
||||
|
||||
// Enable raft as leader if we have bootstrap on
|
||||
config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap
|
||||
|
||||
// Create server
|
||||
server, err := NewServer(config)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue