agent: move conn pool for muxed connections into separate pkg

This commit is contained in:
Frank Schroeder 2017-06-15 15:16:16 +02:00 committed by Frank Schröder
parent e930b55f71
commit 2c47bc5d5b
14 changed files with 125 additions and 109 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/consul/agent/consul/agent"
"github.com/hashicorp/consul/agent/consul/servers"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
@ -49,7 +50,7 @@ type Client struct {
config *Config
// Connection pool to consul servers
connPool *ConnPool
connPool *pool.ConnPool
// servers is responsible for the selection and maintenance of
// Consul servers this agent uses for RPC requests
@ -109,10 +110,19 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
logger = log.New(config.LogOutput, "", log.LstdFlags)
}
connPool := &pool.ConnPool{
SrcAddr: config.RPCSrcAddr,
LogOutput: config.LogOutput,
MaxTime: clientRPCConnMaxIdle,
MaxStreams: clientMaxStreams,
TLSWrapper: tlsWrap,
ForceTLS: config.VerifyOutgoing,
}
// Create server
c := &Client{
config: config,
connPool: NewPool(config.RPCSrcAddr, config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap, config.VerifyOutgoing),
connPool: connPool,
eventCh: make(chan serf.Event, serfEventBacklog),
logger: logger,
shutdownCh: make(chan struct{}),

View File

@ -265,7 +265,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
for range servers {
time.Sleep(100 * time.Millisecond)
s := c.servers.FindServer()
ok, err := c.connPool.PingConsulServer(s)
ok, err := c.connPool.Ping(s.Datacenter, s.Addr, s.Version, s.UseTLS)
if !ok {
t.Errorf("Unable to ping server %v: %s", s.String(), err)
}

View File

@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft"
)
@ -100,7 +101,7 @@ func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net
// Check for tls mode
if l.tlsFunc(address) && l.tlsWrap != nil {
// Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil {
if _, err := conn.Write([]byte{byte(pool.RPCTLS)}); err != nil {
conn.Close()
return nil, err
}
@ -113,7 +114,7 @@ func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net
}
// Write the Raft byte to set the mode
_, err = conn.Write([]byte{byte(rpcRaft)})
_, err = conn.Write([]byte{byte(pool.RPCRaft)})
if err != nil {
conn.Close()
return nil, err

View File

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/consul/agent/consul/agent"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/memberlist"
@ -19,18 +20,6 @@ import (
"github.com/hashicorp/yamux"
)
type RPCType byte
const (
rpcConsul RPCType = iota
rpcRaft
rpcMultiplex // Old Muxado byte, no longer supported.
rpcTLS
rpcMultiplexV2
rpcSnapshot
rpcGossip
)
const (
// maxQueryTime is used to bound the limit of a blocking query
maxQueryTime = 600 * time.Second
@ -92,24 +81,25 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
conn.Close()
return
}
typ := pool.RPCType(buf[0])
// Enforce TLS if VerifyIncoming is set
if s.config.VerifyIncoming && !isTLS && RPCType(buf[0]) != rpcTLS {
if s.config.VerifyIncoming && !isTLS && typ != pool.RPCTLS {
s.logger.Printf("[WARN] consul.rpc: Non-TLS connection attempted with VerifyIncoming set %s", logConn(conn))
conn.Close()
return
}
// Switch on the byte
switch RPCType(buf[0]) {
case rpcConsul:
switch typ {
case pool.RPCConsul:
s.handleConsulConn(conn)
case rpcRaft:
case pool.RPCRaft:
metrics.IncrCounter([]string{"consul", "rpc", "raft_handoff"}, 1)
s.raftLayer.Handoff(conn)
case rpcTLS:
case pool.RPCTLS:
if s.rpcTLS == nil {
s.logger.Printf("[WARN] consul.rpc: TLS connection attempted, server not configured for TLS %s", logConn(conn))
conn.Close()
@ -118,14 +108,14 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
conn = tls.Server(conn, s.rpcTLS)
s.handleConn(conn, true)
case rpcMultiplexV2:
case pool.RPCMultiplexV2:
s.handleMultiplexV2(conn)
case rpcSnapshot:
case pool.RPCSnapshot:
s.handleSnapshotConn(conn)
default:
s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v %s", buf[0], logConn(conn))
s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v %s", typ, logConn(conn))
conn.Close()
return
}

View File

@ -22,6 +22,7 @@ import (
"github.com/hashicorp/consul/agent/consul/servers"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
@ -99,7 +100,7 @@ type Server struct {
config *Config
// Connection pool to other consul servers
connPool *ConnPool
connPool *pool.ConnPool
// Endpoints holds our RPC endpoints
endpoints endpoints
@ -271,12 +272,21 @@ func NewServerLogger(config *Config, logger *log.Logger) (*Server, error) {
// Create the shutdown channel - this is closed but never written to.
shutdownCh := make(chan struct{})
connPool := &pool.ConnPool{
SrcAddr: config.RPCSrcAddr,
LogOutput: config.LogOutput,
MaxTime: serverRPCCache,
MaxStreams: serverMaxStreams,
TLSWrapper: tlsWrap,
ForceTLS: config.VerifyOutgoing,
}
// Create server.
s := &Server{
autopilotRemoveDeadCh: make(chan struct{}),
autopilotShutdownCh: make(chan struct{}),
config: config,
connPool: NewPool(config.RPCSrcAddr, config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap, config.VerifyOutgoing),
connPool: connPool,
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
localConsuls: make(map[raft.ServerAddress]*agent.Server),

View File

@ -593,7 +593,7 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) {
if leader == nil {
t.Fatal("no leader")
}
return s2.connPool.PingConsulServer(leader)
return s2.connPool.Ping(leader.Datacenter, leader.Addr, leader.Version, leader.UseTLS)
}
func TestServer_TLSToNoTLS(t *testing.T) {

View File

@ -8,6 +8,7 @@ package servers
import (
"log"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
@ -59,7 +60,7 @@ type ManagerSerfCluster interface {
// Pinger is an interface wrapping client.ConnPool to prevent a cyclic import
// dependency.
type Pinger interface {
PingConsulServer(s *agent.Server) (bool, error)
Ping(dc string, addr net.Addr, version int, useTLS bool) (bool, error)
}
// serverList is a local copy of the struct used to maintain the list of
@ -306,14 +307,14 @@ func (m *Manager) RebalanceServers() {
for i := 0; i < len(l.servers); i++ {
// Always test the first server. Failed servers are cycled
// while Serf detects the node has failed.
selectedServer := l.servers[0]
srv := l.servers[0]
ok, err := m.connPoolPinger.PingConsulServer(selectedServer)
ok, err := m.connPoolPinger.Ping(srv.Datacenter, srv.Addr, srv.Version, srv.UseTLS)
if ok {
foundHealthyServer = true
break
}
m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, selectedServer.String(), err)
m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, srv, err)
l.cycleServer()
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"log"
"math/rand"
"net"
"os"
"testing"
"time"
@ -31,7 +32,7 @@ type fauxConnPool struct {
failPct float64
}
func (cp *fauxConnPool) PingConsulServer(server *agent.Server) (bool, error) {
func (cp *fauxConnPool) Ping(string, net.Addr, int, bool) (bool, error) {
var success bool
successProb := rand.Float64()
if successProb > cp.failPct {
@ -179,7 +180,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
// failPct of the servers for the reconcile. This
// allows for the selected server to no longer be
// healthy for the reconcile below.
if ok, _ := m.connPoolPinger.PingConsulServer(node); ok {
if ok, _ := m.connPoolPinger.Ping(node.Datacenter, node.Addr, node.Version, node.UseTLS); ok {
// Will still be present
healthyServers = append(healthyServers, node)
} else {

View File

@ -4,6 +4,7 @@ import (
"fmt"
"log"
"math/rand"
"net"
"os"
"strings"
"testing"
@ -17,7 +18,7 @@ type fauxConnPool struct {
failPct float64
}
func (cp *fauxConnPool) PingConsulServer(server *agent.Server) (bool, error) {
func (cp *fauxConnPool) Ping(string, net.Addr, int, bool) (bool, error) {
var success bool
successProb := rand.Float64()
if successProb > cp.failPct {

View File

@ -17,6 +17,7 @@ import (
"time"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/snapshot"
"github.com/hashicorp/go-msgpack/codec"
)
@ -187,10 +188,10 @@ RESPOND:
// the streaming output (for a snapshot). If the reply contains an error, this
// will always return an error as well, so you don't need to check the error
// inside the filled-in reply.
func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr, useTLS bool,
func SnapshotRPC(connPool *pool.ConnPool, dc string, addr net.Addr, useTLS bool,
args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) {
conn, hc, err := pool.DialTimeout(dc, addr, 10*time.Second, useTLS)
conn, hc, err := connPool.DialTimeout(dc, addr, 10*time.Second, useTLS)
if err != nil {
return nil, err
}
@ -206,7 +207,7 @@ func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr, useTLS bool,
// Write the snapshot RPC byte to set the mode, then perform the
// request.
if _, err := conn.Write([]byte{byte(rpcSnapshot)}); err != nil {
if _, err := conn.Write([]byte{byte(pool.RPCSnapshot)}); err != nil {
return nil, fmt.Errorf("failed to write stream type: %v", err)
}

View File

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/consul/agent/consul/agent"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/agent/pool"
)
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
@ -18,14 +19,14 @@ import (
// as we run the health check fairly frequently.
type StatsFetcher struct {
logger *log.Logger
pool *ConnPool
pool *pool.ConnPool
datacenter string
inflight map[string]struct{}
inflightLock sync.Mutex
}
// NewStatsFetcher returns a stats fetcher.
func NewStatsFetcher(logger *log.Logger, pool *ConnPool, datacenter string) *StatsFetcher {
func NewStatsFetcher(logger *log.Logger, pool *pool.ConnPool, datacenter string) *StatsFetcher {
return &StatsFetcher{
logger: logger,
pool: pool,

View File

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
@ -19,7 +20,7 @@ func rpcClient(t *testing.T, s *Server) rpc.ClientCodec {
}
// Write the Consul RPC byte to set the mode
conn.Write([]byte{byte(rpcConsul)})
conn.Write([]byte{byte(pool.RPCConsul)})
return msgpackrpc.NewClientCodec(conn)
}

15
agent/pool/conn.go Normal file
View File

@ -0,0 +1,15 @@
package pool
type RPCType byte
const (
// keep numbers unique.
// iota depends on order
RPCConsul RPCType = 0
RPCRaft = 1
RPCMultiplex = 2 // Old Muxado byte, no longer supported.
RPCTLS = 3
RPCMultiplexV2 = 4
RPCSnapshot = 5
RPCGossip = 6
)

View File

@ -1,4 +1,4 @@
package consul
package pool
import (
"container/list"
@ -10,7 +10,6 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/consul/agent/consul/agent"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux"
@ -90,7 +89,7 @@ func (c *Conn) getClient() (*StreamClient, error) {
func (c *Conn) returnClient(client *StreamClient) {
didSave := false
c.clientLock.Lock()
if c.clients.Len() < c.pool.maxStreams && atomic.LoadInt32(&c.shouldClose) == 0 {
if c.clients.Len() < c.pool.MaxStreams && atomic.LoadInt32(&c.shouldClose) == 0 {
c.clients.PushFront(client)
didSave = true
@ -112,25 +111,34 @@ func (c *Conn) markForUse() {
atomic.AddInt32(&c.refCount, 1)
}
// ConnPool is used to maintain a connection pool to other
// Consul servers. This is used to reduce the latency of
// RPC requests between servers. It is only used to pool
// connections in the rpcConsul mode. Raft connections
// are pooled separately.
// ConnPool is used to maintain a connection pool to other Consul
// servers. This is used to reduce the latency of RPC requests between
// servers. It is only used to pool connections in the rpcConsul mode.
// Raft connections are pooled separately. Maintain at most one
// connection per host, for up to MaxTime. When MaxTime connection
// reaping is disabled. MaxStreams is used to control the number of idle
// streams allowed. If TLS settings are provided outgoing connections
// use TLS.
type ConnPool struct {
sync.Mutex
// src is the source address for outgoing connections.
src *net.TCPAddr
// SrcAddr is the source address for outgoing connections.
SrcAddr *net.TCPAddr
// LogOutput is used to control logging
logOutput io.Writer
LogOutput io.Writer
// The maximum time to keep a connection open
maxTime time.Duration
MaxTime time.Duration
// The maximum number of open streams to keep
maxStreams int
MaxStreams int
// TLS wrapper
TLSWrapper tlsutil.DCWrapper
// ForceTLS is used to enforce outgoing TLS verification
ForceTLS bool
sync.Mutex
// pool maps an address to a open connection
pool map[string]*Conn
@ -141,42 +149,30 @@ type ConnPool struct {
// on to close.
limiter map[string]chan struct{}
// TLS wrapper
tlsWrap tlsutil.DCWrapper
// forceTLS is used to enforce outgoing TLS verification
forceTLS bool
// Used to indicate the pool is shutdown
shutdown bool
shutdownCh chan struct{}
// once initializes the internal data structures and connection
// reaping on first use.
once sync.Once
}
// NewPool is used to make a new connection pool
// Maintain at most one connection per host, for up to maxTime.
// Set maxTime to 0 to disable reaping. maxStreams is used to control
// the number of idle streams allowed.
// If TLS settings are provided outgoing connections use TLS.
func NewPool(src *net.TCPAddr, logOutput io.Writer, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.DCWrapper, forceTLS bool) *ConnPool {
pool := &ConnPool{
src: src,
logOutput: logOutput,
maxTime: maxTime,
maxStreams: maxStreams,
pool: make(map[string]*Conn),
limiter: make(map[string]chan struct{}),
tlsWrap: tlsWrap,
forceTLS: forceTLS,
shutdownCh: make(chan struct{}),
// init configures the initial data structures. It should be called
// by p.once.Do(p.init) in all public methods.
func (p *ConnPool) init() {
p.pool = make(map[string]*Conn)
p.limiter = make(map[string]chan struct{})
p.shutdownCh = make(chan struct{})
if p.MaxTime > 0 {
go p.reap()
}
if maxTime > 0 {
go pool.reap()
}
return pool
}
// Shutdown is used to close the connection pool
func (p *ConnPool) Shutdown() error {
p.once.Do(p.init)
p.Lock()
defer p.Unlock()
@ -272,8 +268,10 @@ type HalfCloser interface {
// DialTimeout is used to establish a raw connection to the given server, with a
// given connection timeout.
func (p *ConnPool) DialTimeout(dc string, addr net.Addr, timeout time.Duration, useTLS bool) (net.Conn, HalfCloser, error) {
p.once.Do(p.init)
// Try to dial the conn
d := &net.Dialer{LocalAddr: p.src, Timeout: timeout}
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: timeout}
conn, err := d.Dial("tcp", addr.String())
if err != nil {
return nil, nil, err
@ -288,15 +286,15 @@ func (p *ConnPool) DialTimeout(dc string, addr net.Addr, timeout time.Duration,
}
// Check if TLS is enabled
if (useTLS || p.forceTLS) && p.tlsWrap != nil {
if (useTLS || p.ForceTLS) && p.TLSWrapper != nil {
// Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil {
if _, err := conn.Write([]byte{byte(RPCTLS)}); err != nil {
conn.Close()
return nil, nil, err
}
// Wrap the connection in a TLS client
tlsConn, err := p.tlsWrap(dc, conn)
tlsConn, err := p.TLSWrapper(dc, conn)
if err != nil {
conn.Close()
return nil, nil, err
@ -323,14 +321,14 @@ func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int, useTLS bool
}
// Write the Consul multiplex byte to set the mode
if _, err := conn.Write([]byte{byte(rpcMultiplexV2)}); err != nil {
if _, err := conn.Write([]byte{byte(RPCMultiplexV2)}); err != nil {
conn.Close()
return nil, err
}
// Setup the logger
conf := yamux.DefaultConfig()
conf.LogOutput = p.logOutput
conf.LogOutput = p.LogOutput
// Create a multiplexed session
session, _ = yamux.Client(conn, conf)
@ -403,6 +401,8 @@ START:
// RPC is used to make an RPC call to a remote host
func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, useTLS bool, args interface{}, reply interface{}) error {
p.once.Do(p.init)
// Get a usable client
conn, sc, err := p.getClient(dc, addr, version, useTLS)
if err != nil {
@ -423,28 +423,12 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, use
return nil
}
// PingConsulServer sends a Status.Ping message to the specified server and
// Ping sends a Status.Ping message to the specified server and
// returns true if healthy, false if an error occurred
func (p *ConnPool) PingConsulServer(s *agent.Server) (bool, error) {
// Get a usable client
conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version, s.UseTLS)
if err != nil {
return false, err
}
// Make the RPC call
func (p *ConnPool) Ping(dc string, addr net.Addr, version int, useTLS bool) (bool, error) {
var out struct{}
err = msgpackrpc.CallWithCodec(sc.codec, "Status.Ping", struct{}{}, &out)
if err != nil {
sc.Close()
p.releaseConn(conn)
return false, err
}
// Done with the connection
conn.returnClient(sc)
p.releaseConn(conn)
return true, nil
err := p.RPC(dc, addr, version, "Status.Ping", useTLS, struct{}{}, &out)
return err == nil, err
}
// Reap is used to close conns open over maxTime
@ -463,7 +447,7 @@ func (p *ConnPool) reap() {
now := time.Now()
for host, conn := range p.pool {
// Skip recently used connections
if now.Sub(conn.lastUsed) < p.maxTime {
if now.Sub(conn.lastUsed) < p.MaxTime {
continue
}