Adding time based reaping to ConnPool

This commit is contained in:
Armon Dadgar 2013-12-19 15:42:17 -08:00
parent 5f0bc8c91d
commit 32c822db1b
3 changed files with 52 additions and 7 deletions

View File

@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"sync"
"time"
)
// Interface is used to provide either a Client or Server,
@ -66,7 +67,7 @@ func NewClient(config *Config) (*Client, error) {
// Create server
c := &Client{
config: config,
connPool: NewPool(1),
connPool: NewPool(3, 30*time.Second),
eventCh: make(chan serf.Event, 256),
logger: logger,
shutdownCh: make(chan struct{}),

View File

@ -10,9 +10,10 @@ import (
// Conn is a pooled connection to a Consul server
type Conn struct {
addr net.Addr
conn *net.TCPConn
client *rpc.Client
addr net.Addr
conn *net.TCPConn
client *rpc.Client
lastUsed time.Time
}
func (c *Conn) Close() error {
@ -30,6 +31,9 @@ type ConnPool struct {
// The maximum connectsion to maintain per server
maxConns int
// The maximum time to keep a connection open
maxTime time.Duration
// Pool maps an address to a list of connections
pool map[string][]*Conn
@ -38,12 +42,17 @@ type ConnPool struct {
}
// NewPool is used to make a new connection pool
// Maintain at most maxConns per host
func NewPool(maxConns int) *ConnPool {
// Maintain at most maxConns per host, for up to maxTime.
// Set maxTime to 0 to disable reaping.
func NewPool(maxConns int, maxTime time.Duration) *ConnPool {
pool := &ConnPool{
maxConns: maxConns,
maxTime: maxTime,
pool: make(map[string][]*Conn),
}
if maxTime > 0 {
go pool.reap()
}
return pool
}
@ -130,6 +139,9 @@ func (p *ConnPool) Return(conn *Conn) {
p.Lock()
defer p.Unlock()
// Set the last used time
conn.lastUsed = time.Now()
// Look for existing connections
conns := p.pool[conn.addr.String()]
@ -169,3 +181,35 @@ func (p *ConnPool) RPC(addr net.Addr, method string, args interface{}, reply int
}
return err
}
// Reap is used to close conns open over maxTime
func (p *ConnPool) reap() {
for !p.shutdown {
// Sleep for a while
time.Sleep(time.Second)
// Reap all old conns
p.Lock()
now := time.Now()
for host, conns := range p.pool {
n := len(conns)
for i := 0; i < n; i++ {
// Skip new connections
conn := conns[i]
if now.Sub(conn.lastUsed) < p.maxTime {
continue
}
// Close the conn
conn.Close()
// Remove from pool
conns[i], conns[n-1] = conns[n-1], nil
conns = conns[:n-1]
p.pool[host] = conns
n--
}
}
p.Unlock()
}
}

View File

@ -95,7 +95,7 @@ func NewServer(config *Config) (*Server, error) {
// Create server
s := &Server{
config: config,
connPool: NewPool(5),
connPool: NewPool(5, 0),
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
logger: logger,