Update setupRaft

This commit is contained in:
Alex Dadgar 2017-02-02 15:31:36 -08:00
parent 78cfcd2724
commit ac10aed731
1 changed files with 108 additions and 30 deletions

View File

@ -4,9 +4,11 @@ import (
"crypto/tls"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
"net/rpc"
"os"
"path/filepath"
"reflect"
"sort"
@ -80,7 +82,6 @@ type Server struct {
leaderCh <-chan bool
raft *raft.Raft
raftLayer *RaftLayer
raftPeers raft.PeerStore
raftStore *raftboltdb.BoltStore
raftInmem *raft.InmemStore
raftTransport *raft.NetworkTransport
@ -499,7 +500,7 @@ func (s *Server) setupBootstrapHandler() error {
// quorum has been reached, we do not need to poll
// Consul. Let the normal timeout-based strategy
// take over.
if len(raftPeers) >= int(bootstrapExpect) {
if raftPeers >= int(bootstrapExpect) {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return nil
}
@ -664,10 +665,14 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we are in bootstrap mode, enable a single node cluster
if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) {
s.config.RaftConfig.EnableSingleNode = true
}
// If we have an unclean exit then attempt to close the Raft store.
defer func() {
if s.raft == nil && s.raftStore != nil {
if err := s.raftStore.Close(); err != nil {
s.logger.Printf("[ERR] consul: failed to close Raft store: %v", err)
}
}
}()
// Create the FSM
var err error
@ -681,19 +686,24 @@ func (s *Server) setupRaft() error {
s.config.LogOutput)
s.raftTransport = trans
// Create the backend raft store for logs and stable storage
// Make sure we set the LogOutput.
s.config.RaftConfig.LogOutput = s.config.LogOutput
// Our version of Raft protocol requires the LocalID to match the network
// address of the transport.
s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr())
// Build an all in-memory setup for dev mode, otherwise prepare a full
// disk-based setup.
var log raft.LogStore
var stable raft.StableStore
var snap raft.SnapshotStore
var peers raft.PeerStore
if s.config.DevMode {
store := raft.NewInmemStore()
s.raftInmem = store
stable = store
log = store
snap = raft.NewDiscardSnapshotStore()
peers = &raft.StaticPeers{}
s.raftPeers = peers
} else {
// Create the base raft path
@ -728,41 +738,82 @@ func (s *Server) setupRaft() error {
}
snap = snapshots
// Setup the peer store
s.raftPeers = raft.NewJSONPeers(path, trans)
peers = s.raftPeers
// For an existing cluster being upgraded to the new version of
// Raft, we almost never want to run recovery based on the old
// peers.json file. We create a peers.info file with a helpful
// note about where peers.json went, and use that as a sentinel
// to avoid ingesting the old one that first time (if we have to
// create the peers.info file because it's not there, we also
// blow away any existing peers.json file).
peersFile := filepath.Join(path, "peers.json")
peersInfoFile := filepath.Join(path, "peers.info")
if _, err := os.Stat(peersInfoFile); os.IsNotExist(err) {
if err := ioutil.WriteFile(peersInfoFile, []byte(peersInfoContent), 0755); err != nil {
return fmt.Errorf("failed to write peers.info file: %v", err)
}
// Blow away the peers.json file if present, since the
// peers.info sentinel wasn't there.
if _, err := os.Stat(peersFile); err == nil {
if err := os.Remove(peersFile); err != nil {
return fmt.Errorf("failed to delete peers.json, please delete manually (see peers.info for details): %v", err)
}
s.logger.Printf("[INFO] consul: deleted peers.json file (see peers.info for details)")
}
} else if _, err := os.Stat(peersFile); err == nil {
s.logger.Printf("[INFO] consul: found peers.json file, recovering Raft configuration...")
configuration, err := raft.ReadPeersJSON(peersFile)
if err != nil {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
tmpFsm, err := NewFSM(s.tombstoneGC, s.config.LogOutput)
if err != nil {
return fmt.Errorf("recovery failed to make temp FSM: %v", err)
}
if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm,
log, stable, snap, trans, configuration); err != nil {
return fmt.Errorf("recovery failed: %v", err)
}
if err := os.Remove(peersFile); err != nil {
return fmt.Errorf("recovery failed to delete peers.json, please delete manually (see peers.info for details): %v", err)
}
s.logger.Printf("[INFO] consul: deleted peers.json file after successful recovery")
}
}
// Ensure local host is always included if we are in bootstrap mode
if s.config.RaftConfig.EnableSingleNode {
p, err := peers.Peers()
// If we are in bootstrap or dev mode and the state is clean then we can
// bootstrap now.
if s.config.Bootstrap || s.config.DevMode {
hasState, err := raft.HasExistingState(log, stable, snap)
if err != nil {
if s.raftStore != nil {
s.raftStore.Close()
}
return err
}
if !raft.PeerContained(p, trans.LocalAddr()) {
peers.SetPeers(raft.AddUniquePeer(p, trans.LocalAddr()))
if !hasState {
// TODO (alexdadgar) - This will need to be updated when
// we add support for node IDs.
configuration := raft.Configuration{
Servers: []raft.Server{
raft.Server{
ID: raft.ServerID(trans.LocalAddr()),
Address: trans.LocalAddr(),
},
},
}
if err := raft.BootstrapCluster(s.config.RaftConfig,
log, stable, snap, trans, configuration); err != nil {
return err
}
}
}
// Make sure we set the LogOutput
s.config.RaftConfig.LogOutput = s.config.LogOutput
// Setup the leader channel
leaderCh := make(chan bool, 1)
s.config.RaftConfig.NotifyCh = leaderCh
s.leaderCh = leaderCh
// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable,
snap, peers, trans)
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
if err != nil {
if s.raftStore != nil {
s.raftStore.Close()
}
trans.Close()
return err
}
return nil
@ -978,3 +1029,30 @@ func (s *Server) Datacenter() string {
func (s *Server) GetConfig() *Config {
return s.config
}
// TODO(alex) we need a outage guide
// peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same
// location.
const peersInfoContent = `
As of Nomad 0.5.6, the peers.json file is only used for recovery
after an outage. It should be formatted as a JSON array containing the address
and port of each Consul server in the cluster, like this:
["10.1.0.1:8500","10.1.0.2:8500","10.1.0.3:8500"]
Under normal operation, the peers.json file will not be present.
When Consul starts for the first time, it will create this peers.info file and
delete any existing peers.json file so that recovery doesn't occur on the first
startup.
Once this peers.info file is present, any peers.json file will be ingested at
startup, and will set the Raft peer configuration manually to recover from an
outage. It's crucial that all servers in the cluster are shut down before
creating the peers.json file, and that all servers receive the same
configuration. Once the peers.json file is successfully ingested and applied, it
will be deleted.
Please see https://www.consul.io/docs/guides/outage.html for more information.
`