refactor rpc listener methods, wait for proper shutdown

This commit is contained in:
Chelsea Holland Komlo 2017-11-30 14:52:13 -05:00
parent 6a2432659a
commit 909bb0af07

View file

@ -113,6 +113,7 @@ type Server struct {
// rpcListener is used to listen for incoming connections // rpcListener is used to listen for incoming connections
rpcListener net.Listener rpcListener net.Listener
rpcListenerLock sync.Mutex rpcListenerLock sync.Mutex
listenerCh chan struct{}
rpcServer *rpc.Server rpcServer *rpc.Server
rpcAdvertise net.Addr rpcAdvertise net.Addr
@ -328,10 +329,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
// Start ingesting events for Serf // Start ingesting events for Serf
go s.serfEventHandler() go s.serfEventHandler()
// Start the RPC listeners s.startRPCListener()
ctx, cancel := context.WithCancel(context.Background())
s.rpcCancel = cancel
go s.listen(ctx)
// Emit metrics for the eval broker // Emit metrics for the eval broker
go evalBroker.EmitStats(time.Second, s.shutdownCh) go evalBroker.EmitStats(time.Second, s.shutdownCh)
@ -355,6 +353,33 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
return s, nil return s, nil
} }
// Start the RPC listeners
func (s *Server) startRPCListener() {
s.rpcListenerLock.Lock()
defer s.rpcListenerLock.Unlock()
ctx, cancel := context.WithCancel(context.Background())
s.rpcCancel = cancel
go func() {
defer close(s.listenerCh)
s.listen(ctx)
}()
}
func (s *Server) createRPCListener() error {
s.rpcListenerLock.Lock()
defer s.rpcListenerLock.Unlock()
s.listenerCh = make(chan struct{})
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil || list == nil {
s.logger.Printf("[ERR] nomad: No TLS listener to reload")
return err
}
s.rpcListener = list
return nil
}
func getTLSConf(enableRPC bool, tlsConf *tlsutil.Config) (*tls.Config, tlsutil.RegionWrapper, error) { func getTLSConf(enableRPC bool, tlsConf *tlsutil.Config) (*tls.Config, tlsutil.RegionWrapper, error) {
var tlsWrap tlsutil.RegionWrapper var tlsWrap tlsutil.RegionWrapper
var incomingTLS *tls.Config var incomingTLS *tls.Config
@ -410,21 +435,14 @@ func (s *Server) ReloadTLSConnections(newTLSConfig *config.TLSConfig) error {
// reinitialize our rpc listener // reinitialize our rpc listener
s.rpcListenerLock.Lock() s.rpcListenerLock.Lock()
s.rpcListener.Close() s.rpcListener.Close()
time.Sleep(500 * time.Millisecond) <-s.listenerCh
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil || list == nil {
s.logger.Printf("[ERR] nomad: No TLS listener to reload")
return err
}
s.rpcListener = list
// reinitialize the cancel context
ctx, cancel := context.WithCancel(context.Background())
s.rpcCancel = cancel
s.rpcListenerLock.Unlock() s.rpcListenerLock.Unlock()
go s.listen(ctx) err = s.createRPCListener()
if err != nil {
return err
}
s.startRPCListener()
s.raftLayerLock.Lock() s.raftLayerLock.Lock()
s.raftLayer.Close() s.raftLayer.Close()
@ -865,6 +883,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
return err return err
} }
s.rpcListener = list s.rpcListener = list
s.listenerCh = make(chan struct{})
if s.config.RPCAdvertise != nil { if s.config.RPCAdvertise != nil {
s.rpcAdvertise = s.config.RPCAdvertise s.rpcAdvertise = s.config.RPCAdvertise