Cleans up locking and factors markForUse into a Conn method.

This commit is contained in:
James Phillips 2015-08-13 10:01:05 -07:00
parent 18711b40b8
commit 1c6414e806
1 changed files with 12 additions and 18 deletions

View File

@ -114,6 +114,12 @@ func (c *Conn) returnClient(client *StreamClient) {
} }
} }
// markForUse does all the bookkeeping required to ready a connection for use.
func (c *Conn) markForUse() {
c.lastUsed = time.Now()
atomic.AddInt32(&c.refCount, 1)
}
// ConnPool is used to maintain a connection pool to other // ConnPool is used to maintain a connection pool to other
// Consul servers. This is used to reduce the latency of // Consul servers. This is used to reduce the latency of
// RPC requests between servers. It is only used to pool // RPC requests between servers. It is only used to pool
@ -192,20 +198,13 @@ func (p *ConnPool) Shutdown() error {
// and will return that one if it succeeds. If all else fails, it will return a // and will return that one if it succeeds. If all else fails, it will return a
// newly-created connection and add it to the pool. // newly-created connection and add it to the pool.
func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) { func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) {
// markforUse does all the bookkeeping for an existing connection we wish
// to use from the pool.
markForUse := func(c *Conn) {
c.lastUsed = time.Now()
atomic.AddInt32(&c.refCount, 1)
}
// Check to see if there's a pooled connection available. This is up // Check to see if there's a pooled connection available. This is up
// here since it should the the vastly more common case than the rest // here since it should the the vastly more common case than the rest
// of the code here. // of the code here.
p.Lock() p.Lock()
c := p.pool[addr.String()] c := p.pool[addr.String()]
if c != nil { if c != nil {
markForUse(c) c.markForUse()
p.Unlock() p.Unlock()
return c, nil return c, nil
} }
@ -225,20 +224,15 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error)
// If we are the lead thread, make the new connection and then wake // If we are the lead thread, make the new connection and then wake
// everybody else up to see if we got it. // everybody else up to see if we got it.
if isLeadThread { if isLeadThread {
defer func() {
p.Lock()
delete(p.limiter, addr.String())
p.Unlock()
close(wait)
}()
c, err := p.getNewConn(dc, addr, version) c, err := p.getNewConn(dc, addr, version)
p.Lock()
delete(p.limiter, addr.String())
close(wait)
if err != nil { if err != nil {
p.Unlock()
return nil, err return nil, err
} }
p.Lock()
p.pool[addr.String()] = c p.pool[addr.String()] = c
p.Unlock() p.Unlock()
return c, nil return c, nil
@ -255,7 +249,7 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error)
// See if the lead thread was able to get us a connection. // See if the lead thread was able to get us a connection.
p.Lock() p.Lock()
if c := p.pool[addr.String()]; c != nil { if c := p.pool[addr.String()]; c != nil {
markForUse(c) c.markForUse()
p.Unlock() p.Unlock()
return c, nil return c, nil
} }