From 7a23659ea82db3a7326c83e40aa2444f0c3a055d Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 10 Jan 2018 17:21:28 -0800 Subject: [PATCH] Conn Pool can emit new connections to listeners. This PR enables the client to listen for new connections and then bind an RPC listener to each connection in an upcoming PR. --- nomad/pool.go | 34 ++++++++++++++++++++++++++++++++++ nomad/pool_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 nomad/pool_test.go diff --git a/nomad/pool.go b/nomad/pool.go index 017621c99..185bd73ac 100644 --- a/nomad/pool.go +++ b/nomad/pool.go @@ -134,6 +134,10 @@ type ConnPool struct { // Used to indicate the pool is shutdown shutdown bool shutdownCh chan struct{} + + // connListener is used to notify a potential listener of a new connection + // being made. + connListener chan<- *yamux.Session } // NewPool is used to make a new connection pool @@ -170,6 +174,12 @@ func (p *ConnPool) Shutdown() error { if p.shutdown { return nil } + + if p.connListener != nil { + close(p.connListener) + p.connListener = nil + } + p.shutdown = true close(p.shutdownCh) return nil @@ -188,6 +198,21 @@ func (p *ConnPool) ReloadTLS(tlsWrap tlsutil.RegionWrapper) { p.tlsWrap = tlsWrap } +// SetConnListener is used to listen to new connections being made. The +// channel will be closed when the conn pool is closed or a new listener is set. +func (p *ConnPool) SetConnListener(l chan<- *yamux.Session) { + p.Lock() + defer p.Unlock() + + // Close the old listener + if p.connListener != nil { + close(p.connListener) + } + + // Store the new listener + p.connListener = l +} + // Acquire is used to get a connection that is // pooled or to return a new connection func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, error) { @@ -227,6 +252,15 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er } p.pool[addr.String()] = c + + // If there is a connection listener, notify them of the new connection. + if p.connListener != nil { + select { + case p.connListener <- c.session: + default: + } + } + p.Unlock() return c, nil } diff --git a/nomad/pool_test.go b/nomad/pool_test.go new file mode 100644 index 000000000..ef9158bf5 --- /dev/null +++ b/nomad/pool_test.go @@ -0,0 +1,44 @@ +package nomad + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/yamux" + "github.com/stretchr/testify/require" +) + +func newTestPool(t *testing.T) *ConnPool { + w := testlog.NewWriter(t) + p := NewPool(w, 1*time.Minute, 10, nil) + return p +} + +func TestConnPool_ConnListener(t *testing.T) { + // Create a server and test pool + s := testServer(t, nil) + pool := newTestPool(t) + + // Setup a listener + c := make(chan *yamux.Session, 1) + pool.SetConnListener(c) + + // Make an RPC + var out struct{} + err := pool.RPC(s.Region(), s.config.RPCAddr, structs.ApiMajorVersion, "Status.Ping", struct{}{}, &out) + require.Nil(t, err) + + // Assert we get a connection. + select { + case <-c: + case <-time.After(100 * time.Millisecond): + t.Fatalf("timeout") + } + + // Test that the channel is closed when the pool shuts down. + require.Nil(t, pool.Shutdown()) + _, ok := <-c + require.False(t, ok) +}