From 2c47bc5d5b6e1c4a3e0c92d665471f6d4052031f Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Thu, 15 Jun 2017 15:16:16 +0200 Subject: [PATCH] agent: move conn pool for muxed connections into separate pkg --- agent/consul/client.go | 14 +- agent/consul/client_test.go | 2 +- agent/consul/raft_rpc.go | 5 +- agent/consul/rpc.go | 30 ++--- agent/consul/server.go | 14 +- agent/consul/server_test.go | 2 +- agent/consul/servers/manager.go | 9 +- agent/consul/servers/manager_internal_test.go | 5 +- agent/consul/servers/manager_test.go | 3 +- agent/consul/snapshot_endpoint.go | 7 +- agent/consul/stats_fetcher.go | 5 +- agent/consul/status_endpoint_test.go | 3 +- agent/pool/conn.go | 15 +++ agent/{consul => pool}/pool.go | 120 ++++++++---------- 14 files changed, 125 insertions(+), 109 deletions(-) create mode 100644 agent/pool/conn.go rename agent/{consul => pool}/pool.go (81%) diff --git a/agent/consul/client.go b/agent/consul/client.go index 3ea1e19f0..6ee73e12b 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/consul/agent/consul/agent" "github.com/hashicorp/consul/agent/consul/servers" "github.com/hashicorp/consul/agent/consul/structs" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/lib" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" @@ -49,7 +50,7 @@ type Client struct { config *Config // Connection pool to consul servers - connPool *ConnPool + connPool *pool.ConnPool // servers is responsible for the selection and maintenance of // Consul servers this agent uses for RPC requests @@ -109,10 +110,19 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) { logger = log.New(config.LogOutput, "", log.LstdFlags) } + connPool := &pool.ConnPool{ + SrcAddr: config.RPCSrcAddr, + LogOutput: config.LogOutput, + MaxTime: clientRPCConnMaxIdle, + MaxStreams: clientMaxStreams, + TLSWrapper: tlsWrap, + ForceTLS: config.VerifyOutgoing, + } + // Create server c := &Client{ config: config, - connPool: NewPool(config.RPCSrcAddr, config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap, config.VerifyOutgoing), + connPool: connPool, eventCh: make(chan serf.Event, serfEventBacklog), logger: logger, shutdownCh: make(chan struct{}), diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index 3560d839b..dd453eb2b 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -265,7 +265,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { for range servers { time.Sleep(100 * time.Millisecond) s := c.servers.FindServer() - ok, err := c.connPool.PingConsulServer(s) + ok, err := c.connPool.Ping(s.Datacenter, s.Addr, s.Version, s.UseTLS) if !ok { t.Errorf("Unable to ping server %v: %s", s.String(), err) } diff --git a/agent/consul/raft_rpc.go b/agent/consul/raft_rpc.go index 461a21411..9fb236a64 100644 --- a/agent/consul/raft_rpc.go +++ b/agent/consul/raft_rpc.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/raft" ) @@ -100,7 +101,7 @@ func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net // Check for tls mode if l.tlsFunc(address) && l.tlsWrap != nil { // Switch the connection into TLS mode - if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil { + if _, err := conn.Write([]byte{byte(pool.RPCTLS)}); err != nil { conn.Close() return nil, err } @@ -113,7 +114,7 @@ func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net } // Write the Raft byte to set the mode - _, err = conn.Write([]byte{byte(rpcRaft)}) + _, err = conn.Write([]byte{byte(pool.RPCRaft)}) if err != nil { conn.Close() return nil, err diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 3f8c97656..55b330c2c 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/consul/agent/consul/agent" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/structs" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-memdb" "github.com/hashicorp/memberlist" @@ -19,18 +20,6 @@ import ( "github.com/hashicorp/yamux" ) -type RPCType byte - -const ( - rpcConsul RPCType = iota - rpcRaft - rpcMultiplex // Old Muxado byte, no longer supported. - rpcTLS - rpcMultiplexV2 - rpcSnapshot - rpcGossip -) - const ( // maxQueryTime is used to bound the limit of a blocking query maxQueryTime = 600 * time.Second @@ -92,24 +81,25 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) { conn.Close() return } + typ := pool.RPCType(buf[0]) // Enforce TLS if VerifyIncoming is set - if s.config.VerifyIncoming && !isTLS && RPCType(buf[0]) != rpcTLS { + if s.config.VerifyIncoming && !isTLS && typ != pool.RPCTLS { s.logger.Printf("[WARN] consul.rpc: Non-TLS connection attempted with VerifyIncoming set %s", logConn(conn)) conn.Close() return } // Switch on the byte - switch RPCType(buf[0]) { - case rpcConsul: + switch typ { + case pool.RPCConsul: s.handleConsulConn(conn) - case rpcRaft: + case pool.RPCRaft: metrics.IncrCounter([]string{"consul", "rpc", "raft_handoff"}, 1) s.raftLayer.Handoff(conn) - case rpcTLS: + case pool.RPCTLS: if s.rpcTLS == nil { s.logger.Printf("[WARN] consul.rpc: TLS connection attempted, server not configured for TLS %s", logConn(conn)) conn.Close() @@ -118,14 +108,14 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) { conn = tls.Server(conn, s.rpcTLS) s.handleConn(conn, true) - case rpcMultiplexV2: + case pool.RPCMultiplexV2: s.handleMultiplexV2(conn) - case rpcSnapshot: + case pool.RPCSnapshot: s.handleSnapshotConn(conn) default: - s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v %s", buf[0], logConn(conn)) + s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v %s", typ, logConn(conn)) conn.Close() return } diff --git a/agent/consul/server.go b/agent/consul/server.go index 3b319000e..6047689b2 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -22,6 +22,7 @@ import ( "github.com/hashicorp/consul/agent/consul/servers" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/structs" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" @@ -99,7 +100,7 @@ type Server struct { config *Config // Connection pool to other consul servers - connPool *ConnPool + connPool *pool.ConnPool // Endpoints holds our RPC endpoints endpoints endpoints @@ -271,12 +272,21 @@ func NewServerLogger(config *Config, logger *log.Logger) (*Server, error) { // Create the shutdown channel - this is closed but never written to. shutdownCh := make(chan struct{}) + connPool := &pool.ConnPool{ + SrcAddr: config.RPCSrcAddr, + LogOutput: config.LogOutput, + MaxTime: serverRPCCache, + MaxStreams: serverMaxStreams, + TLSWrapper: tlsWrap, + ForceTLS: config.VerifyOutgoing, + } + // Create server. s := &Server{ autopilotRemoveDeadCh: make(chan struct{}), autopilotShutdownCh: make(chan struct{}), config: config, - connPool: NewPool(config.RPCSrcAddr, config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap, config.VerifyOutgoing), + connPool: connPool, eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), localConsuls: make(map[raft.ServerAddress]*agent.Server), diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index cdb6e3bb4..c2a733071 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -593,7 +593,7 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) { if leader == nil { t.Fatal("no leader") } - return s2.connPool.PingConsulServer(leader) + return s2.connPool.Ping(leader.Datacenter, leader.Addr, leader.Version, leader.UseTLS) } func TestServer_TLSToNoTLS(t *testing.T) { diff --git a/agent/consul/servers/manager.go b/agent/consul/servers/manager.go index 263b796ec..ef149d008 100644 --- a/agent/consul/servers/manager.go +++ b/agent/consul/servers/manager.go @@ -8,6 +8,7 @@ package servers import ( "log" "math/rand" + "net" "sync" "sync/atomic" "time" @@ -59,7 +60,7 @@ type ManagerSerfCluster interface { // Pinger is an interface wrapping client.ConnPool to prevent a cyclic import // dependency. type Pinger interface { - PingConsulServer(s *agent.Server) (bool, error) + Ping(dc string, addr net.Addr, version int, useTLS bool) (bool, error) } // serverList is a local copy of the struct used to maintain the list of @@ -306,14 +307,14 @@ func (m *Manager) RebalanceServers() { for i := 0; i < len(l.servers); i++ { // Always test the first server. Failed servers are cycled // while Serf detects the node has failed. - selectedServer := l.servers[0] + srv := l.servers[0] - ok, err := m.connPoolPinger.PingConsulServer(selectedServer) + ok, err := m.connPoolPinger.Ping(srv.Datacenter, srv.Addr, srv.Version, srv.UseTLS) if ok { foundHealthyServer = true break } - m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, selectedServer.String(), err) + m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, srv, err) l.cycleServer() } diff --git a/agent/consul/servers/manager_internal_test.go b/agent/consul/servers/manager_internal_test.go index 5a5d76a58..c6c993697 100644 --- a/agent/consul/servers/manager_internal_test.go +++ b/agent/consul/servers/manager_internal_test.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "math/rand" + "net" "os" "testing" "time" @@ -31,7 +32,7 @@ type fauxConnPool struct { failPct float64 } -func (cp *fauxConnPool) PingConsulServer(server *agent.Server) (bool, error) { +func (cp *fauxConnPool) Ping(string, net.Addr, int, bool) (bool, error) { var success bool successProb := rand.Float64() if successProb > cp.failPct { @@ -179,7 +180,7 @@ func test_reconcileServerList(maxServers int) (bool, error) { // failPct of the servers for the reconcile. This // allows for the selected server to no longer be // healthy for the reconcile below. - if ok, _ := m.connPoolPinger.PingConsulServer(node); ok { + if ok, _ := m.connPoolPinger.Ping(node.Datacenter, node.Addr, node.Version, node.UseTLS); ok { // Will still be present healthyServers = append(healthyServers, node) } else { diff --git a/agent/consul/servers/manager_test.go b/agent/consul/servers/manager_test.go index 2ed015438..8c8041bd5 100644 --- a/agent/consul/servers/manager_test.go +++ b/agent/consul/servers/manager_test.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "math/rand" + "net" "os" "strings" "testing" @@ -17,7 +18,7 @@ type fauxConnPool struct { failPct float64 } -func (cp *fauxConnPool) PingConsulServer(server *agent.Server) (bool, error) { +func (cp *fauxConnPool) Ping(string, net.Addr, int, bool) (bool, error) { var success bool successProb := rand.Float64() if successProb > cp.failPct { diff --git a/agent/consul/snapshot_endpoint.go b/agent/consul/snapshot_endpoint.go index 800517263..227a872fd 100644 --- a/agent/consul/snapshot_endpoint.go +++ b/agent/consul/snapshot_endpoint.go @@ -17,6 +17,7 @@ import ( "time" "github.com/hashicorp/consul/agent/consul/structs" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/snapshot" "github.com/hashicorp/go-msgpack/codec" ) @@ -187,10 +188,10 @@ RESPOND: // the streaming output (for a snapshot). If the reply contains an error, this // will always return an error as well, so you don't need to check the error // inside the filled-in reply. -func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr, useTLS bool, +func SnapshotRPC(connPool *pool.ConnPool, dc string, addr net.Addr, useTLS bool, args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) { - conn, hc, err := pool.DialTimeout(dc, addr, 10*time.Second, useTLS) + conn, hc, err := connPool.DialTimeout(dc, addr, 10*time.Second, useTLS) if err != nil { return nil, err } @@ -206,7 +207,7 @@ func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr, useTLS bool, // Write the snapshot RPC byte to set the mode, then perform the // request. - if _, err := conn.Write([]byte{byte(rpcSnapshot)}); err != nil { + if _, err := conn.Write([]byte{byte(pool.RPCSnapshot)}); err != nil { return nil, fmt.Errorf("failed to write stream type: %v", err) } diff --git a/agent/consul/stats_fetcher.go b/agent/consul/stats_fetcher.go index acbf69eb4..291933f70 100644 --- a/agent/consul/stats_fetcher.go +++ b/agent/consul/stats_fetcher.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/agent/consul/agent" "github.com/hashicorp/consul/agent/consul/structs" + "github.com/hashicorp/consul/agent/pool" ) // StatsFetcher has two functions for autopilot. First, lets us fetch all the @@ -18,14 +19,14 @@ import ( // as we run the health check fairly frequently. type StatsFetcher struct { logger *log.Logger - pool *ConnPool + pool *pool.ConnPool datacenter string inflight map[string]struct{} inflightLock sync.Mutex } // NewStatsFetcher returns a stats fetcher. -func NewStatsFetcher(logger *log.Logger, pool *ConnPool, datacenter string) *StatsFetcher { +func NewStatsFetcher(logger *log.Logger, pool *pool.ConnPool, datacenter string) *StatsFetcher { return &StatsFetcher{ logger: logger, pool: pool, diff --git a/agent/consul/status_endpoint_test.go b/agent/consul/status_endpoint_test.go index 9f74ac6b4..776d91b77 100644 --- a/agent/consul/status_endpoint_test.go +++ b/agent/consul/status_endpoint_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/net-rpc-msgpackrpc" ) @@ -19,7 +20,7 @@ func rpcClient(t *testing.T, s *Server) rpc.ClientCodec { } // Write the Consul RPC byte to set the mode - conn.Write([]byte{byte(rpcConsul)}) + conn.Write([]byte{byte(pool.RPCConsul)}) return msgpackrpc.NewClientCodec(conn) } diff --git a/agent/pool/conn.go b/agent/pool/conn.go new file mode 100644 index 000000000..1cccb2e02 --- /dev/null +++ b/agent/pool/conn.go @@ -0,0 +1,15 @@ +package pool + +type RPCType byte + +const ( + // keep numbers unique. + // iota depends on order + RPCConsul RPCType = 0 + RPCRaft = 1 + RPCMultiplex = 2 // Old Muxado byte, no longer supported. + RPCTLS = 3 + RPCMultiplexV2 = 4 + RPCSnapshot = 5 + RPCGossip = 6 +) diff --git a/agent/consul/pool.go b/agent/pool/pool.go similarity index 81% rename from agent/consul/pool.go rename to agent/pool/pool.go index 67f1b39b4..4be5b72ca 100644 --- a/agent/consul/pool.go +++ b/agent/pool/pool.go @@ -1,4 +1,4 @@ -package consul +package pool import ( "container/list" @@ -10,7 +10,6 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/consul/agent/consul/agent" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/yamux" @@ -90,7 +89,7 @@ func (c *Conn) getClient() (*StreamClient, error) { func (c *Conn) returnClient(client *StreamClient) { didSave := false c.clientLock.Lock() - if c.clients.Len() < c.pool.maxStreams && atomic.LoadInt32(&c.shouldClose) == 0 { + if c.clients.Len() < c.pool.MaxStreams && atomic.LoadInt32(&c.shouldClose) == 0 { c.clients.PushFront(client) didSave = true @@ -112,25 +111,34 @@ func (c *Conn) markForUse() { atomic.AddInt32(&c.refCount, 1) } -// ConnPool is used to maintain a connection pool to other -// Consul servers. This is used to reduce the latency of -// RPC requests between servers. It is only used to pool -// connections in the rpcConsul mode. Raft connections -// are pooled separately. +// ConnPool is used to maintain a connection pool to other Consul +// servers. This is used to reduce the latency of RPC requests between +// servers. It is only used to pool connections in the rpcConsul mode. +// Raft connections are pooled separately. Maintain at most one +// connection per host, for up to MaxTime. When MaxTime connection +// reaping is disabled. MaxStreams is used to control the number of idle +// streams allowed. If TLS settings are provided outgoing connections +// use TLS. type ConnPool struct { - sync.Mutex - - // src is the source address for outgoing connections. - src *net.TCPAddr + // SrcAddr is the source address for outgoing connections. + SrcAddr *net.TCPAddr // LogOutput is used to control logging - logOutput io.Writer + LogOutput io.Writer // The maximum time to keep a connection open - maxTime time.Duration + MaxTime time.Duration // The maximum number of open streams to keep - maxStreams int + MaxStreams int + + // TLS wrapper + TLSWrapper tlsutil.DCWrapper + + // ForceTLS is used to enforce outgoing TLS verification + ForceTLS bool + + sync.Mutex // pool maps an address to a open connection pool map[string]*Conn @@ -141,42 +149,30 @@ type ConnPool struct { // on to close. limiter map[string]chan struct{} - // TLS wrapper - tlsWrap tlsutil.DCWrapper - - // forceTLS is used to enforce outgoing TLS verification - forceTLS bool - // Used to indicate the pool is shutdown shutdown bool shutdownCh chan struct{} + + // once initializes the internal data structures and connection + // reaping on first use. + once sync.Once } -// NewPool is used to make a new connection pool -// Maintain at most one connection per host, for up to maxTime. -// Set maxTime to 0 to disable reaping. maxStreams is used to control -// the number of idle streams allowed. -// If TLS settings are provided outgoing connections use TLS. -func NewPool(src *net.TCPAddr, logOutput io.Writer, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.DCWrapper, forceTLS bool) *ConnPool { - pool := &ConnPool{ - src: src, - logOutput: logOutput, - maxTime: maxTime, - maxStreams: maxStreams, - pool: make(map[string]*Conn), - limiter: make(map[string]chan struct{}), - tlsWrap: tlsWrap, - forceTLS: forceTLS, - shutdownCh: make(chan struct{}), +// init configures the initial data structures. It should be called +// by p.once.Do(p.init) in all public methods. +func (p *ConnPool) init() { + p.pool = make(map[string]*Conn) + p.limiter = make(map[string]chan struct{}) + p.shutdownCh = make(chan struct{}) + if p.MaxTime > 0 { + go p.reap() } - if maxTime > 0 { - go pool.reap() - } - return pool } // Shutdown is used to close the connection pool func (p *ConnPool) Shutdown() error { + p.once.Do(p.init) + p.Lock() defer p.Unlock() @@ -272,8 +268,10 @@ type HalfCloser interface { // DialTimeout is used to establish a raw connection to the given server, with a // given connection timeout. func (p *ConnPool) DialTimeout(dc string, addr net.Addr, timeout time.Duration, useTLS bool) (net.Conn, HalfCloser, error) { + p.once.Do(p.init) + // Try to dial the conn - d := &net.Dialer{LocalAddr: p.src, Timeout: timeout} + d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: timeout} conn, err := d.Dial("tcp", addr.String()) if err != nil { return nil, nil, err @@ -288,15 +286,15 @@ func (p *ConnPool) DialTimeout(dc string, addr net.Addr, timeout time.Duration, } // Check if TLS is enabled - if (useTLS || p.forceTLS) && p.tlsWrap != nil { + if (useTLS || p.ForceTLS) && p.TLSWrapper != nil { // Switch the connection into TLS mode - if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil { + if _, err := conn.Write([]byte{byte(RPCTLS)}); err != nil { conn.Close() return nil, nil, err } // Wrap the connection in a TLS client - tlsConn, err := p.tlsWrap(dc, conn) + tlsConn, err := p.TLSWrapper(dc, conn) if err != nil { conn.Close() return nil, nil, err @@ -323,14 +321,14 @@ func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int, useTLS bool } // Write the Consul multiplex byte to set the mode - if _, err := conn.Write([]byte{byte(rpcMultiplexV2)}); err != nil { + if _, err := conn.Write([]byte{byte(RPCMultiplexV2)}); err != nil { conn.Close() return nil, err } // Setup the logger conf := yamux.DefaultConfig() - conf.LogOutput = p.logOutput + conf.LogOutput = p.LogOutput // Create a multiplexed session session, _ = yamux.Client(conn, conf) @@ -403,6 +401,8 @@ START: // RPC is used to make an RPC call to a remote host func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, useTLS bool, args interface{}, reply interface{}) error { + p.once.Do(p.init) + // Get a usable client conn, sc, err := p.getClient(dc, addr, version, useTLS) if err != nil { @@ -423,28 +423,12 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, use return nil } -// PingConsulServer sends a Status.Ping message to the specified server and +// Ping sends a Status.Ping message to the specified server and // returns true if healthy, false if an error occurred -func (p *ConnPool) PingConsulServer(s *agent.Server) (bool, error) { - // Get a usable client - conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version, s.UseTLS) - if err != nil { - return false, err - } - - // Make the RPC call +func (p *ConnPool) Ping(dc string, addr net.Addr, version int, useTLS bool) (bool, error) { var out struct{} - err = msgpackrpc.CallWithCodec(sc.codec, "Status.Ping", struct{}{}, &out) - if err != nil { - sc.Close() - p.releaseConn(conn) - return false, err - } - - // Done with the connection - conn.returnClient(sc) - p.releaseConn(conn) - return true, nil + err := p.RPC(dc, addr, version, "Status.Ping", useTLS, struct{}{}, &out) + return err == nil, err } // Reap is used to close conns open over maxTime @@ -463,7 +447,7 @@ func (p *ConnPool) reap() { now := time.Now() for host, conn := range p.pool { // Skip recently used connections - if now.Sub(conn.lastUsed) < p.maxTime { + if now.Sub(conn.lastUsed) < p.MaxTime { continue }