fixups from code review

Revert "close raft long-lived connections"

This reverts commit 3ffda28206fcb3d63ad117fd1d27ae6f832b6625.

reload raft connections on changing tls
This commit is contained in:
Chelsea Holland Komlo 2017-11-27 11:42:52 -05:00
parent dfb6a3d9a8
commit ae7fc4695e
5 changed files with 23 additions and 32 deletions

View File

@ -365,7 +365,7 @@ func (c *Client) init() error {
return nil
}
// ReloadTLSConnectoins allows a client to reload RPC connections if the
// ReloadTLSConnections allows a client to reload RPC connections if the
// client's TLS configuration changes from plaintext to TLS
func (c *Client) ReloadTLSConnections(newConfig *nconfig.TLSConfig) error {
c.configLock.Lock()

View File

@ -12,6 +12,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"syscall"
"time"
@ -46,6 +47,7 @@ type Command struct {
args []string
agent *Agent
httpServer *HTTPServer
httpServerLock sync.Mutex
logFilter *logutils.LevelFilter
logOutput io.Writer
retryJoinErrCh chan struct{}
@ -611,6 +613,8 @@ func (c *Command) reloadHTTPServerOnConfigChange(newConfig *Config) error {
if err != nil {
return err
}
c.httpServerLock.Lock()
defer c.httpServerLock.Unlock()
c.httpServer = http
return nil
@ -644,23 +648,14 @@ func (c *Command) handleReload() {
err := reloadFunc(newConf)
if err != nil {
c.agent.logger.Printf("[ERR] agent: failed to reload the config: %v", err)
}
err = c.httpServer.Shutdown()
if err != nil {
c.agent.logger.Printf("[ERR] agent: failed to stop HTTP server: %v", err)
return
}
// Wait some time to ensure a clean shutdown
time.Sleep(5 * time.Second)
http, err := NewHTTPServer(c.agent, c.agent.config)
err = c.reloadHTTPServerOnConfigChange(newConf)
if err != nil {
c.agent.logger.Printf("[ERR] agent: failed to reload http server: %v", err)
c.agent.logger.Printf("[ERR] agent: failed to reload the config: %v", err)
return
}
c.agent.logger.Println("[INFO] agent: successfully restarted the HTTP server")
c.httpServer = http
}
if s := c.agent.Server(); s != nil {

View File

@ -51,7 +51,7 @@ func (l *RaftLayer) Handoff(c net.Conn, ctx context.Context) error {
case <-l.closeCh:
return fmt.Errorf("Raft RPC layer closed")
case <-ctx.Done():
return fmt.Errorf("[INFO] nomad.rpc: Closing raft RPC connection")
return fmt.Errorf("[INFO] nomad.rpc: Closing server RPC connection")
}
}
@ -120,9 +120,11 @@ func (l *RaftLayer) ReloadTLS(tlsWrap tlsutil.Wrapper) {
defer l.closeLock.Unlock()
if !l.closed {
l.closed = true
close(l.closeCh)
}
l.tlsWrap = tlsWrap
l.closeCh = make(chan struct{})
l.closed = false
}

View File

@ -365,8 +365,9 @@ func (s *Server) ReloadTLSConnections(newTLSConfig *config.TLSConfig) error {
s.logger.Printf("[INFO] nomad: reloading server connections due to configuration changes")
s.configLock.Lock()
defer s.configLock.Unlock()
s.config.TLSConfig = newTLSConfig
s.configLock.Unlock()
var tlsWrap tlsutil.RegionWrapper
var incomingTLS *tls.Config
@ -393,6 +394,9 @@ func (s *Server) ReloadTLSConnections(newTLSConfig *config.TLSConfig) error {
s.rpcTLS = incomingTLS
s.rpcCancel()
s.raftTransport.Close()
s.raftLayer.Close()
s.connPool.ReloadTLS(tlsWrap)
// reinitialize our rpc listener
@ -405,16 +409,19 @@ func (s *Server) ReloadTLSConnections(newTLSConfig *config.TLSConfig) error {
}
s.rpcListener = list
wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap)
s.raftLayer.ReloadTLS(wrapper)
s.raftTransport.Reload()
time.Sleep(500 * time.Millisecond)
// reinitialize the cancel context
ctx, cancel := context.WithCancel(context.Background())
s.rpcCancel = cancel
go s.listen(ctx)
wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap)
s.raftLayer.ReloadTLS(wrapper)
// re-initialize the network transport with a re-initialized stream layer
trans := raft.NewNetworkTransport(s.raftLayer, 3, s.config.RaftTimeout,
s.config.LogOutput)
s.raftTransport = trans
s.logger.Printf("[INFO] nomad: finished reloading server connections")
return nil
}

View File

@ -177,19 +177,6 @@ func (n *NetworkTransport) Close() error {
return nil
}
func (n *NetworkTransport) Reload() {
n.shutdownLock.Lock()
defer n.shutdownLock.Unlock()
if !n.shutdown {
close(n.shutdownCh)
n.shutdown = true
}
time.Sleep(3 * time.Second)
n.shutdownCh = make(chan struct{})
}
// Consumer implements the Transport interface.
func (n *NetworkTransport) Consumer() <-chan RPC {
return n.consumeCh