open-consul/consul/server.go

767 lines
21 KiB
Go
Raw Normal View History

2013-12-06 23:43:07 +00:00
package consul
import (
"crypto/tls"
"errors"
2013-12-06 23:43:07 +00:00
"fmt"
"log"
2013-12-07 00:35:13 +00:00
"net"
"net/rpc"
2013-12-06 23:43:07 +00:00
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
2013-12-06 23:43:07 +00:00
"sync"
"time"
2014-08-08 22:32:43 +00:00
"github.com/hashicorp/consul/acl"
2016-03-30 00:39:19 +00:00
"github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft"
2015-01-30 05:55:11 +00:00
"github.com/hashicorp/raft-boltdb"
2015-04-15 23:12:45 +00:00
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
2013-12-06 23:43:07 +00:00
)
// These are the protocol versions that Consul can _understand_. These are
// Consul-level protocol versions, that are used to configure the Serf
// protocol versions.
const (
ProtocolVersionMin uint8 = 1
// Version 3 added support for network coordinates but we kept the
// default protocol version at 2 to ease the transition to this new
// feature. A Consul agent speaking version 2 of the protocol will
// attempt to send its coordinates to a server who understands version
// 3 or greater.
ProtocolVersion2Compatible = 2
ProtocolVersionMax = 3
)
2013-12-06 23:43:07 +00:00
const (
2015-01-30 05:55:11 +00:00
serfLANSnapshot = "serf/local.snapshot"
serfWANSnapshot = "serf/remote.snapshot"
raftState = "raft/"
snapshotsRetained = 2
// serverRPCCache controls how long we keep an idle connection
// open to a server
serverRPCCache = 2 * time.Minute
2015-09-15 12:22:08 +00:00
// serverMaxStreams controls how many idle streams we keep
// open to a server
serverMaxStreams = 64
2014-08-08 22:32:43 +00:00
// raftLogCacheSize is the maximum number of logs to cache in-memory.
2015-09-11 19:24:54 +00:00
// This is used to reduce disk I/O for the recently committed entries.
raftLogCacheSize = 512
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second
2013-12-06 23:43:07 +00:00
)
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
2014-08-08 22:32:43 +00:00
// aclAuthCache is the authoritative ACL cache
aclAuthCache *acl.Cache
2015-06-18 23:19:05 +00:00
// aclCache is the non-authoritative ACL cache.
aclCache *aclCache
2014-08-09 00:38:39 +00:00
2014-08-08 22:32:43 +00:00
// Consul configuration
2013-12-06 23:43:07 +00:00
config *Config
2013-12-09 20:09:57 +00:00
// Connection pool to other consul servers
connPool *ConnPool
2014-01-09 23:30:36 +00:00
// Endpoints holds our RPC endpoints
endpoints endpoints
// eventChLAN is used to receive events from the
// serf cluster in the datacenter
eventChLAN chan serf.Event
2013-12-06 23:43:07 +00:00
// eventChWAN is used to receive events from the
// serf cluster that spans datacenters
eventChWAN chan serf.Event
2013-12-06 23:43:07 +00:00
// fsm is the state machine used with Raft to provide
// strong consistency.
fsm *consulFSM
// Have we attempted to leave the cluster
left bool
// localConsuls is used to track the known consuls
2015-02-19 22:45:47 +00:00
// in the local datacenter. Used to do leader forwarding.
2016-03-30 00:39:19 +00:00
localConsuls map[string]*agent.Server
localLock sync.RWMutex
2013-12-06 23:43:07 +00:00
// Logger uses the provided LogOutput
logger *log.Logger
// The raft instance is used among Consul nodes within the
// DC to protect operations that require strong consistency
raft *raft.Raft
raftLayer *RaftLayer
2013-12-09 23:29:01 +00:00
raftPeers raft.PeerStore
2015-01-30 05:55:11 +00:00
raftStore *raftboltdb.BoltStore
raftTransport *raft.NetworkTransport
2015-11-29 04:40:05 +00:00
raftInmem *raft.InmemStore
2013-12-06 23:43:07 +00:00
// reconcileCh is used to pass events from the serf handler
// into the leader manager, so that the strong state can be
// updated
reconcileCh chan serf.Member
2013-12-12 00:24:34 +00:00
// remoteConsuls is used to track the known consuls in
2015-02-19 22:45:47 +00:00
// remote datacenters. Used to do DC forwarding.
2016-03-30 00:39:19 +00:00
remoteConsuls map[string][]*agent.Server
2013-12-12 00:24:34 +00:00
remoteLock sync.RWMutex
2013-12-07 00:35:13 +00:00
// rpcListener is used to listen for incoming connections
rpcListener net.Listener
rpcServer *rpc.Server
// rpcTLS is the TLS config for incoming TLS requests
rpcTLS *tls.Config
// serfLAN is the Serf cluster maintained inside the DC
2013-12-06 23:43:07 +00:00
// which contains all the DC nodes
serfLAN *serf.Serf
2013-12-06 23:43:07 +00:00
// serfWAN is the Serf cluster maintained between DC's
2013-12-06 23:43:07 +00:00
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
2013-12-06 23:43:07 +00:00
// sessionTimers track the expiration time of each Session that has
// a TTL. On expiration, a SessionDestroy event will occur, and
2015-09-11 19:24:54 +00:00
// destroy the session via standard session destroy processing
sessionTimers map[string]*time.Timer
sessionTimersLock sync.Mutex
2014-12-11 07:49:44 +00:00
// tombstoneGC is used to track the pending GC invocations
// for the KV tombstones
tombstoneGC *state.TombstoneGC
2014-12-11 07:49:44 +00:00
2013-12-06 23:43:07 +00:00
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
}
2014-01-09 23:30:36 +00:00
// Holds the RPC endpoints
type endpoints struct {
Catalog *Catalog
Health *Health
Status *Status
KVS *KVS
Session *Session
Internal *Internal
ACL *ACL
Coordinate *Coordinate
PreparedQuery *PreparedQuery
Txn *Txn
2014-01-09 23:30:36 +00:00
}
2013-12-06 23:43:07 +00:00
// NewServer is used to construct a new Consul server from the
// configuration, potentially returning an error
func NewServer(config *Config) (*Server, error) {
// Check the protocol version
if err := config.CheckVersion(); err != nil {
return nil, err
}
2013-12-06 23:43:07 +00:00
// Check for a data directory!
2015-11-29 04:40:05 +00:00
if config.DataDir == "" && !config.DevMode {
2013-12-06 23:43:07 +00:00
return nil, fmt.Errorf("Config must provide a DataDir")
}
2014-08-05 22:20:35 +00:00
// Sanity check the ACLs
if err := config.CheckACL(); err != nil {
return nil, err
}
2013-12-06 23:43:07 +00:00
// Ensure we have a log output
if config.LogOutput == nil {
config.LogOutput = os.Stderr
}
2015-05-11 22:15:36 +00:00
// Create the tls wrapper for outgoing connections
tlsConf := config.tlsConfig()
2015-05-11 22:15:36 +00:00
tlsWrap, err := tlsConf.OutgoingTLSWrapper()
if err != nil {
return nil, err
}
// Get the incoming tls config
incomingTLS, err := tlsConf.IncomingTLSConfig()
if err != nil {
return nil, err
}
2013-12-06 23:43:07 +00:00
// Create a logger
logger := log.New(config.LogOutput, "", log.LstdFlags)
2014-12-11 07:49:44 +00:00
// Create the tombstone GC
gc, err := state.NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity)
2014-12-11 07:49:44 +00:00
if err != nil {
return nil, err
}
2013-12-06 23:43:07 +00:00
// Create server
s := &Server{
2013-12-12 00:24:34 +00:00
config: config,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
2013-12-12 00:24:34 +00:00
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
2016-03-30 00:39:19 +00:00
localConsuls: make(map[string]*agent.Server),
2013-12-12 00:24:34 +00:00
logger: logger,
reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]*agent.Server, 4),
2013-12-12 00:24:34 +00:00
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
2014-12-11 07:49:44 +00:00
tombstoneGC: gc,
2013-12-12 00:24:34 +00:00
shutdownCh: make(chan struct{}),
2013-12-06 23:43:07 +00:00
}
2014-08-08 22:32:43 +00:00
// Initialize the authoritative ACL cache
2014-08-12 17:38:57 +00:00
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclFault)
2014-08-08 22:32:43 +00:00
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
}
2015-06-18 23:19:05 +00:00
// Set up the non-authoritative ACL cache
if s.aclCache, err = newAclCache(config, logger, s.RPC); err != nil {
2014-08-08 22:32:43 +00:00
s.Shutdown()
2015-06-18 23:19:05 +00:00
return nil, err
2014-08-09 00:38:39 +00:00
}
// Initialize the RPC layer
if err := s.setupRPC(tlsWrap); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
}
2013-12-09 23:29:01 +00:00
// Initialize the Raft server
if err := s.setupRaft(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
// Initialize the lan Serf
2013-12-07 01:18:09 +00:00
s.serfLAN, err = s.setupSerf(config.SerfLANConfig,
s.eventChLAN, serfLANSnapshot, false)
2013-12-06 23:43:07 +00:00
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
2013-12-06 23:43:07 +00:00
}
go s.lanEventHandler()
2013-12-06 23:43:07 +00:00
// Initialize the wan Serf
2013-12-07 01:18:09 +00:00
s.serfWAN, err = s.setupSerf(config.SerfWANConfig,
s.eventChWAN, serfWANSnapshot, true)
2013-12-06 23:43:07 +00:00
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start wan serf: %v", err)
2013-12-06 23:43:07 +00:00
}
go s.wanEventHandler()
2013-12-06 23:43:07 +00:00
// Start listening for RPC requests
go s.listen()
2014-12-19 00:57:49 +00:00
// Start the metrics handlers
go s.sessionStats()
2013-12-06 23:43:07 +00:00
return s, nil
}
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool) (*serf.Serf, error) {
addr := s.rpcListener.Addr().(*net.TCPAddr)
2014-01-30 21:13:29 +00:00
conf.Init()
if wan {
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
}
2014-01-30 21:13:29 +00:00
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
2014-06-06 22:36:40 +00:00
conf.Tags["build"] = s.config.Build
2014-01-30 21:13:29 +00:00
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
2014-01-20 23:39:07 +00:00
if s.config.Bootstrap {
2014-01-30 21:13:29 +00:00
conf.Tags["bootstrap"] = "1"
2014-01-20 23:39:07 +00:00
}
if s.config.BootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect)
}
2013-12-06 23:43:07 +00:00
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.EventCh = ch
if !s.config.DevMode {
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
}
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
if wan {
conf.Merge = &wanMergeDelegate{}
} else {
conf.Merge = &lanMergeDelegate{dc: s.config.Datacenter}
}
// Until Consul supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf.EnableNameConflictResolution = false
2013-12-19 22:18:55 +00:00
if err := ensurePath(conf.SnapshotPath, false); err != nil {
2013-12-06 23:43:07 +00:00
return nil, err
}
2015-05-08 08:31:34 +00:00
// Plumb down the enable coordinates flag.
conf.DisableCoordinates = s.config.DisableCoordinates
2015-05-08 08:31:34 +00:00
2013-12-06 23:43:07 +00:00
return serf.Create(conf)
}
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we are in bootstrap mode, enable a single node cluster
2015-11-29 04:40:05 +00:00
if s.config.Bootstrap || s.config.DevMode {
s.config.RaftConfig.EnableSingleNode = true
}
2013-12-11 01:00:48 +00:00
// Create the FSM
var err error
s.fsm, err = NewFSM(s.tombstoneGC, s.config.LogOutput)
2013-12-11 01:00:48 +00:00
if err != nil {
return err
}
2015-11-29 04:40:05 +00:00
// Create a transport layer
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second, s.config.LogOutput)
s.raftTransport = trans
2015-11-29 04:40:05 +00:00
var log raft.LogStore
var stable raft.StableStore
var snap raft.SnapshotStore
if s.config.DevMode {
store := raft.NewInmemStore()
s.raftInmem = store
stable = store
log = store
snap = raft.NewDiscardSnapshotStore()
s.raftPeers = &raft.StaticPeers{}
2015-11-29 04:40:05 +00:00
} else {
// Create the base raft path
path := filepath.Join(s.config.DataDir, raftState)
if err := ensurePath(path, true); err != nil {
return err
}
2013-12-06 23:43:07 +00:00
2015-11-29 04:40:05 +00:00
// Create the backend raft store for logs and stable storage
store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
if err != nil {
return err
}
s.raftStore = store
stable = store
2015-11-29 04:40:05 +00:00
// Wrap the store in a LogCache to improve performance
cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
if err != nil {
store.Close()
return err
}
log = cacheStore
2013-12-06 23:43:07 +00:00
2015-11-29 04:40:05 +00:00
// Create the snapshot store
snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
if err != nil {
store.Close()
return err
}
snap = snapshots
2013-12-06 23:43:07 +00:00
2015-11-29 04:40:05 +00:00
// Setup the peer store
s.raftPeers = raft.NewJSONPeers(path, trans)
}
2013-12-06 23:43:07 +00:00
// Ensure local host is always included if we are in bootstrap mode
if s.config.Bootstrap {
peerAddrs, err := s.raftPeers.Peers()
if err != nil {
2015-11-29 04:40:05 +00:00
if s.raftStore != nil {
s.raftStore.Close()
}
return err
}
if !raft.PeerContained(peerAddrs, trans.LocalAddr()) {
s.raftPeers.SetPeers(raft.AddUniquePeer(peerAddrs, trans.LocalAddr()))
}
}
2013-12-23 23:30:45 +00:00
// Make sure we set the LogOutput
s.config.RaftConfig.LogOutput = s.config.LogOutput
2013-12-06 23:43:07 +00:00
// Setup the Raft store
2015-11-29 04:40:05 +00:00
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable,
snap, s.raftPeers, trans)
2013-12-06 23:43:07 +00:00
if err != nil {
2016-01-05 17:45:36 +00:00
if s.raftStore != nil {
s.raftStore.Close()
}
2013-12-06 23:43:07 +00:00
trans.Close()
return err
}
2014-01-09 23:49:09 +00:00
// Start monitoring leadership
go s.monitorLeadership()
2013-12-06 23:43:07 +00:00
return nil
}
2013-12-07 00:35:13 +00:00
// setupRPC is used to setup the RPC listener
2015-05-11 22:15:36 +00:00
func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
2014-01-09 23:30:36 +00:00
// Create endpoints
s.endpoints.Status = &Status{s}
s.endpoints.Catalog = &Catalog{s}
s.endpoints.Health = &Health{s}
2014-03-31 21:15:49 +00:00
s.endpoints.KVS = &KVS{s}
s.endpoints.Session = &Session{s}
2014-04-28 21:44:36 +00:00
s.endpoints.Internal = &Internal{s}
2014-08-06 00:05:59 +00:00
s.endpoints.ACL = &ACL{s}
s.endpoints.Coordinate = NewCoordinate(s)
s.endpoints.PreparedQuery = &PreparedQuery{s}
s.endpoints.Txn = &Txn{s}
2014-01-09 23:30:36 +00:00
2013-12-09 22:49:07 +00:00
// Register the handlers
2014-01-09 23:30:36 +00:00
s.rpcServer.Register(s.endpoints.Status)
s.rpcServer.Register(s.endpoints.Catalog)
s.rpcServer.Register(s.endpoints.Health)
2014-03-31 21:15:49 +00:00
s.rpcServer.Register(s.endpoints.KVS)
s.rpcServer.Register(s.endpoints.Session)
2014-04-28 21:44:36 +00:00
s.rpcServer.Register(s.endpoints.Internal)
2014-08-06 00:05:59 +00:00
s.rpcServer.Register(s.endpoints.ACL)
2015-05-14 21:37:13 +00:00
s.rpcServer.Register(s.endpoints.Coordinate)
s.rpcServer.Register(s.endpoints.PreparedQuery)
s.rpcServer.Register(s.endpoints.Txn)
2013-12-09 22:49:07 +00:00
2014-01-01 00:45:13 +00:00
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
2013-12-07 00:35:13 +00:00
if err != nil {
return err
}
s.rpcListener = list
2013-12-31 22:00:25 +00:00
var advertise net.Addr
if s.config.RPCAdvertise != nil {
advertise = s.config.RPCAdvertise
} else {
advertise = s.rpcListener.Addr()
}
// Verify that we have a usable advertise address
addr, ok := advertise.(*net.TCPAddr)
if !ok {
list.Close()
return fmt.Errorf("RPC advertise address is not a TCP Address: %v", addr)
}
if addr.IP.IsUnspecified() {
list.Close()
return fmt.Errorf("RPC advertise address is not advertisable: %v", addr)
}
2015-05-11 22:15:36 +00:00
// Provide a DC specific wrapper. Raft replication is only
// ever done in the same datacenter, so we can provide it as a constant.
wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap)
s.raftLayer = NewRaftLayer(advertise, wrapper)
2013-12-07 00:35:13 +00:00
return nil
}
2013-12-06 23:43:07 +00:00
// Shutdown is used to shutdown the server
func (s *Server) Shutdown() error {
2014-01-10 19:06:11 +00:00
s.logger.Printf("[INFO] consul: shutting down server")
2013-12-06 23:43:07 +00:00
s.shutdownLock.Lock()
defer s.shutdownLock.Unlock()
if s.shutdown {
return nil
}
s.shutdown = true
close(s.shutdownCh)
if s.serfLAN != nil {
s.serfLAN.Shutdown()
2013-12-06 23:43:07 +00:00
}
if s.serfWAN != nil {
s.serfWAN.Shutdown()
2013-12-06 23:43:07 +00:00
}
if s.raft != nil {
s.raftTransport.Close()
s.raftLayer.Close()
future := s.raft.Shutdown()
if err := future.Error(); err != nil {
2013-12-24 00:20:51 +00:00
s.logger.Printf("[WARN] consul: Error shutting down raft: %s", err)
}
2015-11-29 04:40:05 +00:00
if s.raftStore != nil {
s.raftStore.Close()
}
// Clear the peer set on a graceful leave to avoid
// triggering elections on a rejoin.
if s.left {
s.raftPeers.SetPeers(nil)
}
2013-12-06 23:43:07 +00:00
}
2013-12-07 00:35:13 +00:00
if s.rpcListener != nil {
s.rpcListener.Close()
}
2013-12-09 20:09:57 +00:00
// Close the connection pool
s.connPool.Shutdown()
2013-12-06 23:43:07 +00:00
return nil
}
2013-12-07 01:18:09 +00:00
2013-12-09 20:10:27 +00:00
// Leave is used to prepare for a graceful shutdown of the server
func (s *Server) Leave() error {
2014-01-10 19:06:11 +00:00
s.logger.Printf("[INFO] consul: server starting leave")
s.left = true
2013-12-09 20:10:27 +00:00
// Check the number of known peers
numPeers, err := s.numOtherPeers()
if err != nil {
s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err)
return err
}
// If we are the current leader, and we have any other peers (cluster has multiple
// servers), we should do a RemovePeer to safely reduce the quorum size. If we are
// not the leader, then we should issue our leave intention and wait to be removed
// for some sane period of time.
isLeader := s.IsLeader()
if isLeader && numPeers > 0 {
future := s.raft.RemovePeer(s.raftTransport.LocalAddr())
if err := future.Error(); err != nil && err != raft.ErrUnknownPeer {
s.logger.Printf("[ERR] consul: failed to remove ourself as raft peer: %v", err)
}
}
2013-12-09 20:10:27 +00:00
// Leave the WAN pool
if s.serfWAN != nil {
if err := s.serfWAN.Leave(); err != nil {
2014-01-10 19:06:11 +00:00
s.logger.Printf("[ERR] consul: failed to leave WAN Serf cluster: %v", err)
2013-12-09 20:10:27 +00:00
}
}
// Leave the LAN pool
if s.serfLAN != nil {
if err := s.serfLAN.Leave(); err != nil {
2014-01-10 19:06:11 +00:00
s.logger.Printf("[ERR] consul: failed to leave LAN Serf cluster: %v", err)
2013-12-09 20:10:27 +00:00
}
}
// If we were not leader, wait to be safely removed from the cluster.
// We must wait to allow the raft replication to take place, otherwise
// an immediate shutdown could cause a loss of quorum.
if !isLeader {
limit := time.Now().Add(raftRemoveGracePeriod)
for numPeers > 0 && time.Now().Before(limit) {
// Update the number of peers
numPeers, err = s.numOtherPeers()
if err != nil {
s.logger.Printf("[ERR] consul: failed to check raft peers: %v", err)
break
}
// Avoid the sleep if we are done
if numPeers == 0 {
break
}
// Sleep a while and check again
time.Sleep(50 * time.Millisecond)
}
if numPeers != 0 {
s.logger.Printf("[WARN] consul: failed to leave raft peer set gracefully, timeout")
}
}
2013-12-09 20:10:27 +00:00
return nil
}
// numOtherPeers is used to check on the number of known peers
2015-09-15 12:22:08 +00:00
// excluding the local node
func (s *Server) numOtherPeers() (int, error) {
peers, err := s.raftPeers.Peers()
if err != nil {
return 0, err
}
otherPeers := raft.ExcludePeer(peers, s.raftTransport.LocalAddr())
return len(otherPeers), nil
}
2013-12-07 01:18:09 +00:00
// JoinLAN is used to have Consul join the inner-DC pool
// The target address should be another node inside the DC
// listening on the Serf LAN address
func (s *Server) JoinLAN(addrs []string) (int, error) {
return s.serfLAN.Join(addrs, true)
2013-12-07 01:18:09 +00:00
}
// JoinWAN is used to have Consul join the cross-WAN Consul ring
// The target address should be another node listening on the
// Serf WAN address
func (s *Server) JoinWAN(addrs []string) (int, error) {
return s.serfWAN.Join(addrs, true)
2013-12-07 01:18:09 +00:00
}
// LocalMember is used to return the local node
func (c *Server) LocalMember() serf.Member {
return c.serfLAN.LocalMember()
}
// LANMembers is used to return the members of the LAN cluster
func (s *Server) LANMembers() []serf.Member {
return s.serfLAN.Members()
}
// WANMembers is used to return the members of the LAN cluster
func (s *Server) WANMembers() []serf.Member {
return s.serfWAN.Members()
}
// RemoveFailedNode is used to remove a failed node from the cluster
func (s *Server) RemoveFailedNode(node string) error {
if err := s.serfLAN.RemoveFailedNode(node); err != nil {
return err
}
if err := s.serfWAN.RemoveFailedNode(node); err != nil {
return err
}
return nil
}
// IsLeader checks if this server is the cluster leader
func (s *Server) IsLeader() bool {
return s.raft.State() == raft.Leader
}
// KeyManagerLAN returns the LAN Serf keyring manager
func (s *Server) KeyManagerLAN() *serf.KeyManager {
return s.serfLAN.KeyManager()
}
// KeyManagerWAN returns the WAN Serf keyring manager
func (s *Server) KeyManagerWAN() *serf.KeyManager {
return s.serfWAN.KeyManager()
}
// Encrypted determines if gossip is encrypted
func (s *Server) Encrypted() bool {
return s.serfLAN.EncryptionEnabled() && s.serfWAN.EncryptionEnabled()
}
// inmemCodec is used to do an RPC call without going over a network
type inmemCodec struct {
method string
args interface{}
reply interface{}
err error
}
func (i *inmemCodec) ReadRequestHeader(req *rpc.Request) error {
req.ServiceMethod = i.method
return nil
}
func (i *inmemCodec) ReadRequestBody(args interface{}) error {
sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.args)))
dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(args)))
dst.Set(sourceValue)
return nil
}
func (i *inmemCodec) WriteResponse(resp *rpc.Response, reply interface{}) error {
if resp.Error != "" {
i.err = errors.New(resp.Error)
return nil
}
sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(reply)))
dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.reply)))
dst.Set(sourceValue)
return nil
}
func (i *inmemCodec) Close() error {
return nil
}
// RPC is used to make a local RPC call
func (s *Server) RPC(method string, args interface{}, reply interface{}) error {
codec := &inmemCodec{
method: method,
args: args,
reply: reply,
}
if err := s.rpcServer.ServeRequest(codec); err != nil {
return err
}
return codec.err
}
// InjectEndpoint is used to substitute an endpoint for testing.
func (s *Server) InjectEndpoint(endpoint interface{}) error {
s.logger.Printf("[WARN] consul: endpoint injected; this should only be used for testing")
return s.rpcServer.Register(endpoint)
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (s *Server) Stats() map[string]map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
s.remoteLock.RLock()
numKnownDCs := len(s.remoteConsuls)
s.remoteLock.RUnlock()
stats := map[string]map[string]string{
"consul": map[string]string{
2014-02-24 02:08:58 +00:00
"server": "true",
"leader": fmt.Sprintf("%v", s.IsLeader()),
"leader_addr": s.raft.Leader(),
2014-02-24 02:08:58 +00:00
"bootstrap": fmt.Sprintf("%v", s.config.Bootstrap),
"known_datacenters": toString(uint64(numKnownDCs)),
},
2014-02-24 02:08:58 +00:00
"raft": s.raft.Stats(),
"serf_lan": s.serfLAN.Stats(),
"serf_wan": s.serfWAN.Stats(),
"runtime": runtimeStats(),
}
if peers, err := s.raftPeers.Peers(); err == nil {
stats["raft"]["raft_peers"] = strings.Join(peers, ",")
} else {
s.logger.Printf("[DEBUG] server: error getting raft peers: %v", err)
}
return stats
}
2015-04-15 23:12:45 +00:00
// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool.
func (s *Server) GetLANCoordinate() (*coordinate.Coordinate, error) {
2015-04-15 23:12:45 +00:00
return s.serfLAN.GetCoordinate()
}
2015-05-08 08:31:34 +00:00
// GetWANCoordinate returns the coordinate of the server in the WAN gossip pool.
func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
2015-05-08 08:31:34 +00:00
return s.serfWAN.GetCoordinate()
}