open-nomad/nomad/server.go

213 lines
4.6 KiB
Go
Raw Normal View History

2015-06-01 15:49:10 +00:00
package nomad
import (
"fmt"
"log"
"os"
"path/filepath"
"sync"
"time"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
)
const (
raftState = "raft/"
snapshotsRetained = 2
// raftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently commited entries.
raftLogCacheSize = 512
)
// Server is Nomad server which manages the job queues,
// schedulers, and notification bus for agents.
type Server struct {
config *Config
logger *log.Logger
// The raft instance is used among Consul nodes within the
// DC to protect operations that require strong consistency
raft *raft.Raft
raftLayer *RaftLayer
raftPeers raft.PeerStore
raftStore *raftboltdb.BoltStore
2015-06-01 19:11:40 +00:00
raftInmem *raft.InmemStore
2015-06-01 15:49:10 +00:00
raftTransport *raft.NetworkTransport
// fsm is the state machine used with Raft
fsm *nomadFSM
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
}
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
func NewServer(config *Config) (*Server, error) {
// Check the protocol version
if err := config.CheckVersion(); err != nil {
return nil, err
}
// Ensure we have a log output
if config.LogOutput == nil {
config.LogOutput = os.Stderr
}
// Create a logger
logger := log.New(config.LogOutput, "", log.LstdFlags)
// Create the server
s := &Server{
config: config,
logger: logger,
shutdownCh: make(chan struct{}),
}
// Initialize the Raft server
if err := s.setupRaft(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
// Done
return s, nil
}
// Shutdown is used to shutdown the server
func (s *Server) Shutdown() error {
s.logger.Printf("[INFO] nomad: shutting down server")
s.shutdownLock.Lock()
defer s.shutdownLock.Unlock()
if s.shutdown {
return nil
}
s.shutdown = true
close(s.shutdownCh)
if s.raft != nil {
s.raftTransport.Close()
s.raftLayer.Close()
future := s.raft.Shutdown()
if err := future.Error(); err != nil {
s.logger.Printf("[WARN] nomad: Error shutting down raft: %s", err)
}
s.raftStore.Close()
}
// Close the fsm
if s.fsm != nil {
s.fsm.Close()
}
return nil
}
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// If we are in bootstrap mode, enable a single node cluster
2015-06-01 19:11:40 +00:00
if s.config.Bootstrap || s.config.DevMode {
2015-06-01 15:49:10 +00:00
s.config.RaftConfig.EnableSingleNode = true
}
// Create the FSM
var err error
2015-06-01 19:11:40 +00:00
s.fsm, err = NewFSM(s.config.LogOutput)
2015-06-01 15:49:10 +00:00
if err != nil {
return err
}
2015-06-01 19:11:40 +00:00
// Create a transport layer
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second,
s.config.LogOutput)
s.raftTransport = trans
2015-06-01 15:49:10 +00:00
// Create the backend raft store for logs and stable storage
2015-06-01 19:11:40 +00:00
var log raft.LogStore
var stable raft.StableStore
var snap raft.SnapshotStore
var peers raft.PeerStore
if s.config.DevMode {
store := raft.NewInmemStore()
s.raftInmem = store
stable = store
log = store
snap = raft.NewDiscardSnapshotStore()
peers = &raft.StaticPeers{}
} else {
// Create the base raft path
path := filepath.Join(s.config.DataDir, raftState)
if err := ensurePath(path, true); err != nil {
return err
}
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Create the BoltDB backend
store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
if err != nil {
return err
}
s.raftStore = store
stable = store
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Wrap the store in a LogCache to improve performance
cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
if err != nil {
store.Close()
return err
}
log = cacheStore
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Create the snapshot store
snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
if err != nil {
if s.raftStore != nil {
s.raftStore.Close()
}
return err
}
snap = snapshots
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Setup the peer store
s.raftPeers = raft.NewJSONPeers(path, trans)
peers = s.raftPeers
}
2015-06-01 15:49:10 +00:00
// Ensure local host is always included if we are in bootstrap mode
2015-06-01 19:11:40 +00:00
if s.config.RaftConfig.EnableSingleNode {
p, err := peers.Peers()
2015-06-01 15:49:10 +00:00
if err != nil {
2015-06-01 19:11:40 +00:00
if s.raftStore != nil {
s.raftStore.Close()
}
2015-06-01 15:49:10 +00:00
return err
}
2015-06-01 19:11:40 +00:00
if !raft.PeerContained(p, trans.LocalAddr()) {
peers.SetPeers(raft.AddUniquePeer(p, trans.LocalAddr()))
2015-06-01 15:49:10 +00:00
}
}
// Make sure we set the LogOutput
s.config.RaftConfig.LogOutput = s.config.LogOutput
// Setup the Raft store
2015-06-01 19:11:40 +00:00
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable,
snap, peers, trans)
2015-06-01 15:49:10 +00:00
if err != nil {
2015-06-01 19:11:40 +00:00
if s.raftStore != nil {
s.raftStore.Close()
}
2015-06-01 15:49:10 +00:00
trans.Close()
return err
}
// Start monitoring leadership
go s.monitorLeadership()
return nil
}