open-nomad/nomad/server.go

421 lines
10 KiB
Go
Raw Normal View History

2015-06-01 15:49:10 +00:00
package nomad
import (
2015-06-03 10:26:50 +00:00
"crypto/tls"
"errors"
2015-06-01 15:49:10 +00:00
"fmt"
"log"
2015-06-03 10:26:50 +00:00
"net"
"net/rpc"
2015-06-01 15:49:10 +00:00
"os"
"path/filepath"
2015-06-03 10:26:50 +00:00
"reflect"
"strconv"
2015-06-01 15:49:10 +00:00
"sync"
"time"
2015-06-03 10:26:50 +00:00
"github.com/hashicorp/consul/tlsutil"
2015-06-01 15:49:10 +00:00
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
2015-06-03 10:58:00 +00:00
"github.com/hashicorp/serf/serf"
2015-06-01 15:49:10 +00:00
)
const (
raftState = "raft/"
2015-06-03 10:58:00 +00:00
serfSnapshot = "serf/snapshot"
2015-06-01 15:49:10 +00:00
snapshotsRetained = 2
// raftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently commited entries.
raftLogCacheSize = 512
)
// Server is Nomad server which manages the job queues,
// schedulers, and notification bus for agents.
type Server struct {
config *Config
logger *log.Logger
2015-06-03 10:26:50 +00:00
// Endpoints holds our RPC endpoints
endpoints endpoints
2015-06-01 15:49:10 +00:00
// The raft instance is used among Consul nodes within the
// DC to protect operations that require strong consistency
raft *raft.Raft
raftLayer *RaftLayer
raftPeers raft.PeerStore
raftStore *raftboltdb.BoltStore
2015-06-01 19:11:40 +00:00
raftInmem *raft.InmemStore
2015-06-01 15:49:10 +00:00
raftTransport *raft.NetworkTransport
// fsm is the state machine used with Raft
fsm *nomadFSM
2015-06-03 10:26:50 +00:00
// rpcListener is used to listen for incoming connections
2015-06-03 10:58:00 +00:00
rpcListener net.Listener
rpcServer *rpc.Server
rpcAdvertise net.Addr
2015-06-03 10:26:50 +00:00
// rpcTLS is the TLS config for incoming TLS requests
rpcTLS *tls.Config
2015-06-03 10:58:00 +00:00
// serf is the Serf cluster containing only Nomad
// servers. This is used for multi-region federation
// and automatic clustering within regions.
serf *serf.Serf
// eventCh is used to receive events from the serf cluster
eventCh chan serf.Event
2015-06-01 15:49:10 +00:00
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
}
2015-06-03 10:26:50 +00:00
// Holds the RPC endpoints
type endpoints struct {
Status *Status
}
2015-06-01 15:49:10 +00:00
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
func NewServer(config *Config) (*Server, error) {
// Check the protocol version
if err := config.CheckVersion(); err != nil {
return nil, err
}
// Ensure we have a log output
if config.LogOutput == nil {
config.LogOutput = os.Stderr
}
// Create a logger
logger := log.New(config.LogOutput, "", log.LstdFlags)
// Create the server
s := &Server{
config: config,
logger: logger,
2015-06-03 10:58:00 +00:00
rpcServer: rpc.NewServer(),
eventCh: make(chan serf.Event, 256),
2015-06-01 15:49:10 +00:00
shutdownCh: make(chan struct{}),
}
2015-06-03 10:26:50 +00:00
// Initialize the RPC layer
// TODO: TLS...
if err := s.setupRPC(nil); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
}
2015-06-01 15:49:10 +00:00
// Initialize the Raft server
if err := s.setupRaft(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
2015-06-03 10:58:00 +00:00
// Initialize the wan Serf
var err error
s.serf, err = s.setupSerf(config.SerfConfig, s.eventCh, serfSnapshot)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start serf: %v", err)
}
go s.serfEventHandler()
2015-06-03 10:26:50 +00:00
// Start the RPC listeners
go s.listen()
2015-06-01 15:49:10 +00:00
// Done
return s, nil
}
// Shutdown is used to shutdown the server
func (s *Server) Shutdown() error {
s.logger.Printf("[INFO] nomad: shutting down server")
s.shutdownLock.Lock()
defer s.shutdownLock.Unlock()
if s.shutdown {
return nil
}
s.shutdown = true
close(s.shutdownCh)
if s.raft != nil {
s.raftTransport.Close()
s.raftLayer.Close()
future := s.raft.Shutdown()
if err := future.Error(); err != nil {
s.logger.Printf("[WARN] nomad: Error shutting down raft: %s", err)
}
if s.raftStore != nil {
s.raftStore.Close()
}
2015-06-01 15:49:10 +00:00
}
2015-06-03 10:26:50 +00:00
// Shutdown the RPC listener
if s.rpcListener != nil {
s.rpcListener.Close()
}
2015-06-01 15:49:10 +00:00
// Close the fsm
if s.fsm != nil {
s.fsm.Close()
}
return nil
}
2015-06-03 10:26:50 +00:00
// setupRPC is used to setup the RPC listener
func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
// Create endpoints
s.endpoints.Status = &Status{s}
// Register the handlers
s.rpcServer.Register(s.endpoints.Status)
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil {
return err
}
s.rpcListener = list
if s.config.RPCAdvertise != nil {
2015-06-03 10:58:00 +00:00
s.rpcAdvertise = s.config.RPCAdvertise
2015-06-03 10:26:50 +00:00
} else {
2015-06-03 10:58:00 +00:00
s.rpcAdvertise = s.rpcListener.Addr()
2015-06-03 10:26:50 +00:00
}
// Verify that we have a usable advertise address
2015-06-03 10:58:00 +00:00
addr, ok := s.rpcAdvertise.(*net.TCPAddr)
2015-06-03 10:26:50 +00:00
if !ok {
list.Close()
return fmt.Errorf("RPC advertise address is not a TCP Address: %v", addr)
}
if addr.IP.IsUnspecified() {
list.Close()
return fmt.Errorf("RPC advertise address is not advertisable: %v", addr)
}
// Provide a DC specific wrapper. Raft replication is only
// ever done in the same datacenter, so we can provide it as a constant.
// wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap)
// TODO: TLS...
2015-06-03 10:58:00 +00:00
s.raftLayer = NewRaftLayer(s.rpcAdvertise, nil)
2015-06-03 10:26:50 +00:00
return nil
}
2015-06-01 15:49:10 +00:00
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we are in bootstrap mode, enable a single node cluster
2015-06-01 19:11:40 +00:00
if s.config.Bootstrap || s.config.DevMode {
2015-06-01 15:49:10 +00:00
s.config.RaftConfig.EnableSingleNode = true
}
// Create the FSM
var err error
2015-06-01 19:11:40 +00:00
s.fsm, err = NewFSM(s.config.LogOutput)
2015-06-01 15:49:10 +00:00
if err != nil {
return err
}
2015-06-01 19:11:40 +00:00
// Create a transport layer
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second,
s.config.LogOutput)
s.raftTransport = trans
2015-06-01 15:49:10 +00:00
// Create the backend raft store for logs and stable storage
2015-06-01 19:11:40 +00:00
var log raft.LogStore
var stable raft.StableStore
var snap raft.SnapshotStore
var peers raft.PeerStore
if s.config.DevMode {
store := raft.NewInmemStore()
s.raftInmem = store
stable = store
log = store
snap = raft.NewDiscardSnapshotStore()
peers = &raft.StaticPeers{}
} else {
// Create the base raft path
path := filepath.Join(s.config.DataDir, raftState)
if err := ensurePath(path, true); err != nil {
return err
}
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Create the BoltDB backend
store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
if err != nil {
return err
}
s.raftStore = store
stable = store
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Wrap the store in a LogCache to improve performance
cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
if err != nil {
store.Close()
return err
}
log = cacheStore
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Create the snapshot store
snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
if err != nil {
if s.raftStore != nil {
s.raftStore.Close()
}
return err
}
snap = snapshots
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Setup the peer store
s.raftPeers = raft.NewJSONPeers(path, trans)
peers = s.raftPeers
}
2015-06-01 15:49:10 +00:00
// Ensure local host is always included if we are in bootstrap mode
2015-06-01 19:11:40 +00:00
if s.config.RaftConfig.EnableSingleNode {
p, err := peers.Peers()
2015-06-01 15:49:10 +00:00
if err != nil {
2015-06-01 19:11:40 +00:00
if s.raftStore != nil {
s.raftStore.Close()
}
2015-06-01 15:49:10 +00:00
return err
}
2015-06-01 19:11:40 +00:00
if !raft.PeerContained(p, trans.LocalAddr()) {
peers.SetPeers(raft.AddUniquePeer(p, trans.LocalAddr()))
2015-06-01 15:49:10 +00:00
}
}
// Make sure we set the LogOutput
s.config.RaftConfig.LogOutput = s.config.LogOutput
// Setup the Raft store
2015-06-01 19:11:40 +00:00
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable,
snap, peers, trans)
2015-06-01 15:49:10 +00:00
if err != nil {
2015-06-01 19:11:40 +00:00
if s.raftStore != nil {
s.raftStore.Close()
}
2015-06-01 15:49:10 +00:00
trans.Close()
return err
}
// Start monitoring leadership
go s.monitorLeadership()
return nil
}
2015-06-03 10:26:50 +00:00
2015-06-03 10:58:00 +00:00
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
conf.Init()
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Region)
conf.Tags["role"] = "nomad"
conf.Tags["region"] = s.config.Region
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["build"] = s.config.Build
conf.Tags["addr"] = fmt.Sprintf("%s", s.rpcAdvertise.String())
if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1"
}
if s.config.BootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect)
}
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.EventCh = ch
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = true
conf.Merge = &serfMergeDelegate{}
// Until Nomad supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf.EnableNameConflictResolution = false
if err := ensurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
2015-06-03 10:26:50 +00:00
// IsLeader checks if this server is the cluster leader
func (s *Server) IsLeader() bool {
return s.raft.State() == raft.Leader
}
// inmemCodec is used to do an RPC call without going over a network
type inmemCodec struct {
method string
args interface{}
reply interface{}
err error
}
func (i *inmemCodec) ReadRequestHeader(req *rpc.Request) error {
req.ServiceMethod = i.method
return nil
}
func (i *inmemCodec) ReadRequestBody(args interface{}) error {
sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.args)))
dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(args)))
dst.Set(sourceValue)
return nil
}
func (i *inmemCodec) WriteResponse(resp *rpc.Response, reply interface{}) error {
if resp.Error != "" {
i.err = errors.New(resp.Error)
return nil
}
sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(reply)))
dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.reply)))
dst.Set(sourceValue)
return nil
}
func (i *inmemCodec) Close() error {
return nil
}
// RPC is used to make a local RPC call
func (s *Server) RPC(method string, args interface{}, reply interface{}) error {
codec := &inmemCodec{
method: method,
args: args,
reply: reply,
}
if err := s.rpcServer.ServeRequest(codec); err != nil {
return err
}
return codec.err
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (s *Server) Stats() map[string]map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
stats := map[string]map[string]string{
"nomad": map[string]string{
"server": "true",
"leader": fmt.Sprintf("%v", s.IsLeader()),
"bootstrap": fmt.Sprintf("%v", s.config.Bootstrap),
"known_regions": toString(uint64(0)),
},
"raft": s.raft.Stats(),
2015-06-03 11:08:04 +00:00
"serf": s.serf.Stats(),
2015-06-03 10:26:50 +00:00
"runtime": runtimeStats(),
}
return stats
}