package nomad import ( "crypto/tls" "errors" "fmt" "log" "net" "net/rpc" "path/filepath" "reflect" "sort" "strconv" "strings" "sync" "sync/atomic" "time" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" "github.com/hashicorp/raft-boltdb" "github.com/hashicorp/serf/serf" ) const ( // datacenterQueryLimit sets the max number of DCs that a Nomad // Server will query to find bootstrap_expect servers. datacenterQueryLimit = 25 // maxStaleLeadership is the maximum time we will permit this Nomad // Server to go without seeing a valid Raft leader. maxStaleLeadership = 15 * time.Second // peersPollInterval is used as the polling interval between attempts // to query Consul for Nomad Servers. peersPollInterval = 45 * time.Second // peersPollJitter is used to provide a slight amount of variance to // the retry interval when querying Consul Servers peersPollJitterFactor = 2 raftState = "raft/" serfSnapshot = "serf/snapshot" snapshotsRetained = 2 // serverRPCCache controls how long we keep an idle connection open to a server serverRPCCache = 2 * time.Minute // serverMaxStreams controsl how many idle streams we keep open to a server serverMaxStreams = 64 // raftLogCacheSize is the maximum number of logs to cache in-memory. // This is used to reduce disk I/O for the recently committed entries. raftLogCacheSize = 512 // raftRemoveGracePeriod is how long we wait to allow a RemovePeer // to replicate to gracefully leave the cluster. raftRemoveGracePeriod = 5 * time.Second ) // Server is Nomad server which manages the job queues, // schedulers, and notification bus for agents. type Server struct { config *Config logger *log.Logger // Connection pool to other Nomad servers connPool *ConnPool // Endpoints holds our RPC endpoints endpoints endpoints // The raft instance is used among Nomad nodes within the // region to protect operations that require strong consistency leaderCh <-chan bool raft *raft.Raft raftLayer *RaftLayer raftPeers raft.PeerStore raftStore *raftboltdb.BoltStore raftInmem *raft.InmemStore raftTransport *raft.NetworkTransport // fsm is the state machine used with Raft fsm *nomadFSM // rpcListener is used to listen for incoming connections rpcListener net.Listener rpcServer *rpc.Server rpcAdvertise net.Addr // rpcTLS is the TLS config for incoming TLS requests rpcTLS *tls.Config // peers is used to track the known Nomad servers. This is // used for region forwarding and clustering. peers map[string][]*serverParts localPeers map[string]*serverParts peerLock sync.RWMutex // serf is the Serf cluster containing only Nomad // servers. This is used for multi-region federation // and automatic clustering within regions. serf *serf.Serf // reconcileCh is used to pass events from the serf handler // into the leader manager. Mostly used to handle when servers // join/leave from the region. reconcileCh chan serf.Member // eventCh is used to receive events from the serf cluster eventCh chan serf.Event // evalBroker is used to manage the in-progress evaluations // that are waiting to be brokered to a sub-scheduler evalBroker *EvalBroker // BlockedEvals is used to manage evaluations that are blocked on node // capacity changes. blockedEvals *BlockedEvals // planQueue is used to manage the submitted allocation // plans that are waiting to be assessed by the leader planQueue *PlanQueue // periodicDispatcher is used to track and create evaluations for periodic jobs. periodicDispatcher *PeriodicDispatch // heartbeatTimers track the expiration time of each heartbeat that has // a TTL. On expiration, the node status is updated to be 'down'. heartbeatTimers map[string]*time.Timer heartbeatTimersLock sync.Mutex // consulSyncer advertises this Nomad Agent with Consul consulSyncer *consul.Syncer // vault is the client for communicating with Vault. vault VaultClient // Worker used for processing workers []*Worker left bool shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex } // Holds the RPC endpoints type endpoints struct { Status *Status Node *Node Job *Job Eval *Eval Plan *Plan Alloc *Alloc Region *Region Periodic *Periodic System *System } // NewServer is used to construct a new Nomad server from the // configuration, potentially returning an error func NewServer(config *Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Server, error) { // Check the protocol version if err := config.CheckVersion(); err != nil { return nil, err } // Create an eval broker evalBroker, err := NewEvalBroker(config.EvalNackTimeout, config.EvalDeliveryLimit) if err != nil { return nil, err } // Create a new blocked eval tracker. blockedEvals := NewBlockedEvals(evalBroker) // Create a plan queue planQueue, err := NewPlanQueue() if err != nil { return nil, err } // Create the server s := &Server{ config: config, consulSyncer: consulSyncer, connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, nil), logger: logger, rpcServer: rpc.NewServer(), peers: make(map[string][]*serverParts), localPeers: make(map[string]*serverParts), reconcileCh: make(chan serf.Member, 32), eventCh: make(chan serf.Event, 256), evalBroker: evalBroker, blockedEvals: blockedEvals, planQueue: planQueue, shutdownCh: make(chan struct{}), } // Create the periodic dispatcher for launching periodic jobs. s.periodicDispatcher = NewPeriodicDispatch(s.logger, s) // Setup Vault if err := s.setupVaultClient(); err != nil { s.Shutdown() s.logger.Printf("[ERR] nomad: failed to setup Vault client: %v", err) return nil, fmt.Errorf("Failed to setup Vault client: %v", err) } // Initialize the RPC layer // TODO: TLS... if err := s.setupRPC(nil); err != nil { s.Shutdown() s.logger.Printf("[ERR] nomad: failed to start RPC layer: %s", err) return nil, fmt.Errorf("Failed to start RPC layer: %v", err) } // Initialize the Raft server if err := s.setupRaft(); err != nil { s.Shutdown() s.logger.Printf("[ERR] nomad: failed to start Raft: %s", err) return nil, fmt.Errorf("Failed to start Raft: %v", err) } // Initialize the wan Serf s.serf, err = s.setupSerf(config.SerfConfig, s.eventCh, serfSnapshot) if err != nil { s.Shutdown() s.logger.Printf("[ERR] nomad: failed to start serf WAN: %s", err) return nil, fmt.Errorf("Failed to start serf: %v", err) } // Initialize the scheduling workers if err := s.setupWorkers(); err != nil { s.Shutdown() s.logger.Printf("[ERR] nomad: failed to start workers: %s", err) return nil, fmt.Errorf("Failed to start workers: %v", err) } // Setup the Consul syncer if err := s.setupConsulSyncer(); err != nil { return nil, fmt.Errorf("failed to create server Consul syncer: %v") } // Monitor leadership changes go s.monitorLeadership() // Start ingesting events for Serf go s.serfEventHandler() // Start the RPC listeners go s.listen() // Emit metrics for the eval broker go evalBroker.EmitStats(time.Second, s.shutdownCh) // Emit metrics for the plan queue go planQueue.EmitStats(time.Second, s.shutdownCh) // Emit metrics for the blocked eval tracker. go blockedEvals.EmitStats(time.Second, s.shutdownCh) // Emit metrics go s.heartbeatStats() // Done return s, nil } // Shutdown is used to shutdown the server func (s *Server) Shutdown() error { s.logger.Printf("[INFO] nomad: shutting down server") s.shutdownLock.Lock() defer s.shutdownLock.Unlock() if s.shutdown { return nil } s.shutdown = true close(s.shutdownCh) if s.serf != nil { s.serf.Shutdown() } if s.raft != nil { s.raftTransport.Close() s.raftLayer.Close() future := s.raft.Shutdown() if err := future.Error(); err != nil { s.logger.Printf("[WARN] nomad: Error shutting down raft: %s", err) } if s.raftStore != nil { s.raftStore.Close() } } // Shutdown the RPC listener if s.rpcListener != nil { s.rpcListener.Close() } // Close the connection pool s.connPool.Shutdown() // Close the fsm if s.fsm != nil { s.fsm.Close() } // Stop Vault token renewal if s.vault != nil { s.vault.Stop() } return nil } // IsShutdown checks if the server is shutdown func (s *Server) IsShutdown() bool { select { case <-s.shutdownCh: return true default: return false } } // Leave is used to prepare for a graceful shutdown of the server func (s *Server) Leave() error { s.logger.Printf("[INFO] nomad: server starting leave") s.left = true // Check the number of known peers numPeers, err := s.numOtherPeers() if err != nil { s.logger.Printf("[ERR] nomad: failed to check raft peers: %v", err) return err } // If we are the current leader, and we have any other peers (cluster has multiple // servers), we should do a RemovePeer to safely reduce the quorum size. If we are // not the leader, then we should issue our leave intention and wait to be removed // for some sane period of time. isLeader := s.IsLeader() if isLeader && numPeers > 0 { future := s.raft.RemovePeer(s.raftTransport.LocalAddr()) if err := future.Error(); err != nil && err != raft.ErrUnknownPeer { s.logger.Printf("[ERR] nomad: failed to remove ourself as raft peer: %v", err) } } // Leave the gossip pool if s.serf != nil { if err := s.serf.Leave(); err != nil { s.logger.Printf("[ERR] nomad: failed to leave Serf cluster: %v", err) } } // If we were not leader, wait to be safely removed from the cluster. // We must wait to allow the raft replication to take place, otherwise // an immediate shutdown could cause a loss of quorum. if !isLeader { limit := time.Now().Add(raftRemoveGracePeriod) for numPeers > 0 && time.Now().Before(limit) { // Update the number of peers numPeers, err = s.numOtherPeers() if err != nil { s.logger.Printf("[ERR] nomad: failed to check raft peers: %v", err) break } // Avoid the sleep if we are done if numPeers == 0 { break } // Sleep a while and check again time.Sleep(50 * time.Millisecond) } if numPeers != 0 { s.logger.Printf("[WARN] nomad: failed to leave raft peer set gracefully, timeout") } } return nil } // setupBootstrapHandler() creates the closure necessary to support a Consul // fallback handler. func (s *Server) setupBootstrapHandler() error { // peersTimeout is used to indicate to the Consul Syncer that the // current Nomad Server has a stale peer set. peersTimeout will time // out if the Consul Syncer bootstrapFn has not observed a Raft // leader in maxStaleLeadership. If peersTimeout has been triggered, // the Consul Syncer will begin querying Consul for other Nomad // Servers. // // NOTE: time.Timer is used vs time.Time in order to handle clock // drift because time.Timer is implemented as a monotonic clock. var peersTimeout *time.Timer = time.NewTimer(0) // consulQueryCount is the number of times the bootstrapFn has been // called, regardless of success. var consulQueryCount uint64 // leadershipTimedOut is a helper method that returns true if the // peersTimeout timer has expired. leadershipTimedOut := func() bool { select { case <-peersTimeout.C: return true default: return false } } // The bootstrapFn callback handler is used to periodically poll // Consul to look up the Nomad Servers in Consul. In the event the // server has been brought up without a `retry-join` configuration // and this Server is partitioned from the rest of the cluster, // periodically poll Consul to reattach this Server to other servers // in the same region and automatically reform a quorum (assuming the // correct number of servers required for quorum are present). bootstrapFn := func() error { // If there is a raft leader, do nothing if s.raft.Leader() != "" { peersTimeout.Reset(maxStaleLeadership) return nil } // (ab)use serf.go's behavior of setting BootstrapExpect to // zero if we have bootstrapped. If we have bootstrapped bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect) if bootstrapExpect == 0 { // This Nomad Server has been bootstrapped. Rely on // the peersTimeout firing as a guard to prevent // aggressive querying of Consul. if !leadershipTimedOut() { return nil } } else { if consulQueryCount > 0 && !leadershipTimedOut() { return nil } // This Nomad Server has not been bootstrapped, reach // out to Consul if our peer list is less than // `bootstrap_expect`. raftPeers, err := s.raftPeers.Peers() if err != nil { peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return nil } // The necessary number of Nomad Servers required for // quorum has been reached, we do not need to poll // Consul. Let the normal timeout-based strategy // take over. if len(raftPeers) >= int(bootstrapExpect) { peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return nil } } consulQueryCount++ s.logger.Printf("[DEBUG] server.consul: lost contact with Nomad quorum, falling back to Consul for server list") consulCatalog := s.consulSyncer.ConsulClient().Catalog() dcs, err := consulCatalog.Datacenters() if err != nil { peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return fmt.Errorf("server.consul: unable to query Consul datacenters: %v", err) } if len(dcs) > 2 { // Query the local DC first, then shuffle the // remaining DCs. If additional calls to bootstrapFn // are necessary, this Nomad Server will eventually // walk all datacenter until it finds enough hosts to // form a quorum. shuffleStrings(dcs[1:]) dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)] } nomadServerServiceName := s.config.ConsulConfig.ServerServiceName var mErr multierror.Error const defaultMaxNumNomadServers = 8 nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) localNode := s.serf.Memberlist().LocalNode() for _, dc := range dcs { consulOpts := &consulapi.QueryOptions{ AllowStale: true, Datacenter: dc, Near: "_agent", WaitTime: consul.DefaultQueryWaitDuration, } consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts) if err != nil { err := fmt.Errorf("failed to query service %q in Consul datacenter %q: %v", nomadServerServiceName, dc, err) s.logger.Printf("[WARN] server.consul: %v", err) mErr.Errors = append(mErr.Errors, err) continue } for _, cs := range consulServices { port := strconv.FormatInt(int64(cs.ServicePort), 10) addr := cs.ServiceAddress if addr == "" { addr = cs.Address } if localNode.Addr.String() == addr && int(localNode.Port) == cs.ServicePort { continue } serverAddr := net.JoinHostPort(addr, port) nomadServerServices = append(nomadServerServices, serverAddr) } } if len(nomadServerServices) == 0 { if len(mErr.Errors) > 0 { peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return mErr.ErrorOrNil() } // Log the error and return nil so future handlers // can attempt to register the `nomad` service. pollInterval := peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor) s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters %+q, sleeping for %v", nomadServerServiceName, dcs, pollInterval) peersTimeout.Reset(pollInterval) return nil } numServersContacted, err := s.Join(nomadServerServices) if err != nil { peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err) } peersTimeout.Reset(maxStaleLeadership) s.logger.Printf("[INFO] server.consul: successfully contacted %d Nomad Servers", numServersContacted) return nil } s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn) return nil } // setupConsulSyncer creates Server-mode consul.Syncer which periodically // executes callbacks on a fixed interval. func (s *Server) setupConsulSyncer() error { if s.config.ConsulConfig.ServerAutoJoin { if err := s.setupBootstrapHandler(); err != nil { return err } } return nil } // setupVaultClient is used to set up the Vault API client. func (s *Server) setupVaultClient() error { v, err := NewVaultClient(s.config.VaultConfig, s.logger, s.purgeVaultAccessors) if err != nil { return err } s.vault = v return nil } // setupRPC is used to setup the RPC listener func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { // Create endpoints s.endpoints.Status = &Status{s} s.endpoints.Node = &Node{srv: s} s.endpoints.Job = &Job{s} s.endpoints.Eval = &Eval{s} s.endpoints.Plan = &Plan{s} s.endpoints.Alloc = &Alloc{s} s.endpoints.Region = &Region{s} s.endpoints.Periodic = &Periodic{s} s.endpoints.System = &System{s} // Register the handlers s.rpcServer.Register(s.endpoints.Status) s.rpcServer.Register(s.endpoints.Node) s.rpcServer.Register(s.endpoints.Job) s.rpcServer.Register(s.endpoints.Eval) s.rpcServer.Register(s.endpoints.Plan) s.rpcServer.Register(s.endpoints.Alloc) s.rpcServer.Register(s.endpoints.Region) s.rpcServer.Register(s.endpoints.Periodic) s.rpcServer.Register(s.endpoints.System) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { return err } s.rpcListener = list if s.config.RPCAdvertise != nil { s.rpcAdvertise = s.config.RPCAdvertise } else { s.rpcAdvertise = s.rpcListener.Addr() } // Verify that we have a usable advertise address addr, ok := s.rpcAdvertise.(*net.TCPAddr) if !ok { list.Close() return fmt.Errorf("RPC advertise address is not a TCP Address: %v", addr) } if addr.IP.IsUnspecified() { list.Close() return fmt.Errorf("RPC advertise address is not advertisable: %v", addr) } // Provide a DC specific wrapper. Raft replication is only // ever done in the same datacenter, so we can provide it as a constant. // wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap) // TODO: TLS... s.raftLayer = NewRaftLayer(s.rpcAdvertise, nil) return nil } // setupRaft is used to setup and initialize Raft func (s *Server) setupRaft() error { // If we are in bootstrap mode, enable a single node cluster if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) { s.config.RaftConfig.EnableSingleNode = true } // Create the FSM var err error s.fsm, err = NewFSM(s.evalBroker, s.periodicDispatcher, s.blockedEvals, s.config.LogOutput) if err != nil { return err } // Create a transport layer trans := raft.NewNetworkTransport(s.raftLayer, 3, s.config.RaftTimeout, s.config.LogOutput) s.raftTransport = trans // Create the backend raft store for logs and stable storage var log raft.LogStore var stable raft.StableStore var snap raft.SnapshotStore var peers raft.PeerStore if s.config.DevMode { store := raft.NewInmemStore() s.raftInmem = store stable = store log = store snap = raft.NewDiscardSnapshotStore() peers = &raft.StaticPeers{} s.raftPeers = peers } else { // Create the base raft path path := filepath.Join(s.config.DataDir, raftState) if err := ensurePath(path, true); err != nil { return err } // Create the BoltDB backend store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db")) if err != nil { return err } s.raftStore = store stable = store // Wrap the store in a LogCache to improve performance cacheStore, err := raft.NewLogCache(raftLogCacheSize, store) if err != nil { store.Close() return err } log = cacheStore // Create the snapshot store snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput) if err != nil { if s.raftStore != nil { s.raftStore.Close() } return err } snap = snapshots // Setup the peer store s.raftPeers = raft.NewJSONPeers(path, trans) peers = s.raftPeers } // Ensure local host is always included if we are in bootstrap mode if s.config.RaftConfig.EnableSingleNode { p, err := peers.Peers() if err != nil { if s.raftStore != nil { s.raftStore.Close() } return err } if !raft.PeerContained(p, trans.LocalAddr()) { peers.SetPeers(raft.AddUniquePeer(p, trans.LocalAddr())) } } // Make sure we set the LogOutput s.config.RaftConfig.LogOutput = s.config.LogOutput // Setup the leader channel leaderCh := make(chan bool, 1) s.config.RaftConfig.NotifyCh = leaderCh s.leaderCh = leaderCh // Setup the Raft store s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, peers, trans) if err != nil { if s.raftStore != nil { s.raftStore.Close() } trans.Close() return err } return nil } // setupSerf is used to setup and initialize a Serf func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) { conf.Init() conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Region) conf.Tags["role"] = "nomad" conf.Tags["region"] = s.config.Region conf.Tags["dc"] = s.config.Datacenter conf.Tags["vsn"] = fmt.Sprintf("%d", structs.ApiMajorVersion) conf.Tags["mvn"] = fmt.Sprintf("%d", structs.ApiMinorVersion) conf.Tags["build"] = s.config.Build conf.Tags["port"] = fmt.Sprintf("%d", s.rpcAdvertise.(*net.TCPAddr).Port) if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) { conf.Tags["bootstrap"] = "1" } bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect) if bootstrapExpect != 0 { conf.Tags["expect"] = fmt.Sprintf("%d", bootstrapExpect) } conf.MemberlistConfig.LogOutput = s.config.LogOutput conf.LogOutput = s.config.LogOutput conf.EventCh = ch if !s.config.DevMode { conf.SnapshotPath = filepath.Join(s.config.DataDir, path) if err := ensurePath(conf.SnapshotPath, false); err != nil { return nil, err } } conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion] conf.RejoinAfterLeave = true conf.Merge = &serfMergeDelegate{} // Until Nomad supports this fully, we disable automatic resolution. // When enabled, the Serf gossip may just turn off if we are the minority // node which is rather unexpected. conf.EnableNameConflictResolution = false return serf.Create(conf) } // setupWorkers is used to start the scheduling workers func (s *Server) setupWorkers() error { // Check if all the schedulers are disabled if len(s.config.EnabledSchedulers) == 0 || s.config.NumSchedulers == 0 { s.logger.Printf("[WARN] nomad: no enabled schedulers") return nil } // Start the workers for i := 0; i < s.config.NumSchedulers; i++ { if w, err := NewWorker(s); err != nil { return err } else { s.workers = append(s.workers, w) } } s.logger.Printf("[INFO] nomad: starting %d scheduling worker(s) for %v", s.config.NumSchedulers, s.config.EnabledSchedulers) return nil } // numOtherPeers is used to check on the number of known peers // excluding the local node func (s *Server) numOtherPeers() (int, error) { peers, err := s.raftPeers.Peers() if err != nil { return 0, err } otherPeers := raft.ExcludePeer(peers, s.raftTransport.LocalAddr()) return len(otherPeers), nil } // IsLeader checks if this server is the cluster leader func (s *Server) IsLeader() bool { return s.raft.State() == raft.Leader } // Join is used to have Nomad join the gossip ring // The target address should be another node listening on the // Serf address func (s *Server) Join(addrs []string) (int, error) { return s.serf.Join(addrs, true) } // LocalMember is used to return the local node func (c *Server) LocalMember() serf.Member { return c.serf.LocalMember() } // Members is used to return the members of the serf cluster func (s *Server) Members() []serf.Member { return s.serf.Members() } // RemoveFailedNode is used to remove a failed node from the cluster func (s *Server) RemoveFailedNode(node string) error { return s.serf.RemoveFailedNode(node) } // KeyManager returns the Serf keyring manager func (s *Server) KeyManager() *serf.KeyManager { return s.serf.KeyManager() } // Encrypted determines if gossip is encrypted func (s *Server) Encrypted() bool { return s.serf.EncryptionEnabled() } // State returns the underlying state store. This should *not* // be used to modify state directly. func (s *Server) State() *state.StateStore { return s.fsm.State() } // Regions returns the known regions in the cluster. func (s *Server) Regions() []string { s.peerLock.RLock() defer s.peerLock.RUnlock() regions := make([]string, 0, len(s.peers)) for region, _ := range s.peers { regions = append(regions, region) } sort.Strings(regions) return regions } // inmemCodec is used to do an RPC call without going over a network type inmemCodec struct { method string args interface{} reply interface{} err error } func (i *inmemCodec) ReadRequestHeader(req *rpc.Request) error { req.ServiceMethod = i.method return nil } func (i *inmemCodec) ReadRequestBody(args interface{}) error { sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.args))) dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(args))) dst.Set(sourceValue) return nil } func (i *inmemCodec) WriteResponse(resp *rpc.Response, reply interface{}) error { if resp.Error != "" { i.err = errors.New(resp.Error) return nil } sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(reply))) dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.reply))) dst.Set(sourceValue) return nil } func (i *inmemCodec) Close() error { return nil } // RPC is used to make a local RPC call func (s *Server) RPC(method string, args interface{}, reply interface{}) error { codec := &inmemCodec{ method: method, args: args, reply: reply, } if err := s.rpcServer.ServeRequest(codec); err != nil { return err } return codec.err } // Stats is used to return statistics for debugging and insight // for various sub-systems func (s *Server) Stats() map[string]map[string]string { toString := func(v uint64) string { return strconv.FormatUint(v, 10) } stats := map[string]map[string]string{ "nomad": map[string]string{ "server": "true", "leader": fmt.Sprintf("%v", s.IsLeader()), "leader_addr": s.raft.Leader(), "bootstrap": fmt.Sprintf("%v", s.config.Bootstrap), "known_regions": toString(uint64(len(s.peers))), }, "raft": s.raft.Stats(), "serf": s.serf.Stats(), "runtime": RuntimeStats(), } if peers, err := s.raftPeers.Peers(); err == nil { stats["raft"]["raft_peers"] = strings.Join(peers, ",") } else { s.logger.Printf("[DEBUG] server: error getting raft peers: %v", err) } return stats } // Region retuns the region of the server func (s *Server) Region() string { return s.config.Region } // Datacenter returns the data center of the server func (s *Server) Datacenter() string { return s.config.Datacenter } // GetConfig returns the config of the server for testing purposes only func (s *Server) GetConfig() *Config { return s.config }