Conn Pool can emit new connections to listeners.

This PR enables the client to listen for new connections and then bind
an RPC listener to each connection in an upcoming PR.
This commit is contained in:
Alex Dadgar 2018-01-10 17:21:28 -08:00
parent 8058ab039f
commit 7a23659ea8
2 changed files with 78 additions and 0 deletions

View File

@ -134,6 +134,10 @@ type ConnPool struct {
// Used to indicate the pool is shutdown
shutdown bool
shutdownCh chan struct{}
// connListener is used to notify a potential listener of a new connection
// being made.
connListener chan<- *yamux.Session
}
// NewPool is used to make a new connection pool
@ -170,6 +174,12 @@ func (p *ConnPool) Shutdown() error {
if p.shutdown {
return nil
}
if p.connListener != nil {
close(p.connListener)
p.connListener = nil
}
p.shutdown = true
close(p.shutdownCh)
return nil
@ -188,6 +198,21 @@ func (p *ConnPool) ReloadTLS(tlsWrap tlsutil.RegionWrapper) {
p.tlsWrap = tlsWrap
}
// SetConnListener is used to listen to new connections being made. The
// channel will be closed when the conn pool is closed or a new listener is set.
func (p *ConnPool) SetConnListener(l chan<- *yamux.Session) {
p.Lock()
defer p.Unlock()
// Close the old listener
if p.connListener != nil {
close(p.connListener)
}
// Store the new listener
p.connListener = l
}
// Acquire is used to get a connection that is
// pooled or to return a new connection
func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, error) {
@ -227,6 +252,15 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er
}
p.pool[addr.String()] = c
// If there is a connection listener, notify them of the new connection.
if p.connListener != nil {
select {
case p.connListener <- c.session:
default:
}
}
p.Unlock()
return c, nil
}

44
nomad/pool_test.go Normal file
View File

@ -0,0 +1,44 @@
package nomad
import (
"testing"
"time"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
"github.com/stretchr/testify/require"
)
func newTestPool(t *testing.T) *ConnPool {
w := testlog.NewWriter(t)
p := NewPool(w, 1*time.Minute, 10, nil)
return p
}
func TestConnPool_ConnListener(t *testing.T) {
// Create a server and test pool
s := testServer(t, nil)
pool := newTestPool(t)
// Setup a listener
c := make(chan *yamux.Session, 1)
pool.SetConnListener(c)
// Make an RPC
var out struct{}
err := pool.RPC(s.Region(), s.config.RPCAddr, structs.ApiMajorVersion, "Status.Ping", struct{}{}, &out)
require.Nil(t, err)
// Assert we get a connection.
select {
case <-c:
case <-time.After(100 * time.Millisecond):
t.Fatalf("timeout")
}
// Test that the channel is closed when the pool shuts down.
require.Nil(t, pool.Shutdown())
_, ok := <-c
require.False(t, ok)
}