Enhance API pkg to utilize Server's Client Tunnel
This PR enhances the API package by having client only RPCs route through the server when they are low cost and for filesystem access to first attempt a direct connection to the node and then falling back to a server routed request.
This commit is contained in:
parent
0e85ae77b4
commit
aa98f8ba7b
|
@ -48,13 +48,9 @@ func (a *Allocations) Info(allocID string, q *QueryOptions) (*Allocation, *Query
|
|||
}
|
||||
|
||||
func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (*AllocResourceUsage, error) {
|
||||
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var resp AllocResourceUsage
|
||||
_, err = nodeClient.query("/v1/client/allocation/"+alloc.ID+"/stats", &resp, nil)
|
||||
path := fmt.Sprintf("/v1/client/allocation/%s/stats", alloc.ID)
|
||||
_, err := a.client.query(path, &resp, q)
|
||||
return &resp, err
|
||||
}
|
||||
|
||||
|
|
38
api/api.go
38
api/api.go
|
@ -7,6 +7,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
|
@ -18,6 +19,13 @@ import (
|
|||
rootcerts "github.com/hashicorp/go-rootcerts"
|
||||
)
|
||||
|
||||
var (
|
||||
// ClientConnTimeout is the timeout applied when attempting to contact a
|
||||
// client directly before switching to a connection through the Nomad
|
||||
// server.
|
||||
ClientConnTimeout = 1 * time.Second
|
||||
)
|
||||
|
||||
// QueryOptions are used to parameterize a query
|
||||
type QueryOptions struct {
|
||||
// Providing a datacenter overwrites the region provided
|
||||
|
@ -128,8 +136,10 @@ type Config struct {
|
|||
}
|
||||
|
||||
// ClientConfig copies the configuration with a new client address, region, and
|
||||
// whether the client has TLS enabled.
|
||||
func (c *Config) ClientConfig(region, address string, tlsEnabled bool) *Config {
|
||||
// whether the client has TLS enabled. If a timeout is greater than or equal to
|
||||
// zero, the timeout will be applied on the HTTP client, otherwise the default
|
||||
// is used. A timeout of zero means the connection won't be timedout.
|
||||
func (c *Config) ClientConfig(region, address string, tlsEnabled bool, timeout time.Duration) *Config {
|
||||
scheme := "http"
|
||||
if tlsEnabled {
|
||||
scheme = "https"
|
||||
|
@ -145,6 +155,16 @@ func (c *Config) ClientConfig(region, address string, tlsEnabled bool) *Config {
|
|||
WaitTime: c.WaitTime,
|
||||
TLSConfig: c.TLSConfig.Copy(),
|
||||
}
|
||||
|
||||
// Apply a timeout.
|
||||
if timeout.Nanoseconds() >= 0 {
|
||||
transport := config.httpClient.Transport.(*http.Transport)
|
||||
transport.DialContext = (&net.Dialer{
|
||||
Timeout: timeout,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).DialContext
|
||||
}
|
||||
|
||||
if tlsEnabled && config.TLSConfig != nil {
|
||||
config.TLSConfig.TLSServerName = fmt.Sprintf("client.%s.nomad", region)
|
||||
}
|
||||
|
@ -343,7 +363,15 @@ func (c *Client) SetNamespace(namespace string) {
|
|||
// GetNodeClient returns a new Client that will dial the specified node. If the
|
||||
// QueryOptions is set, its region will be used.
|
||||
func (c *Client) GetNodeClient(nodeID string, q *QueryOptions) (*Client, error) {
|
||||
return c.getNodeClientImpl(nodeID, q, c.Nodes().Info)
|
||||
return c.getNodeClientImpl(nodeID, -1, q, c.Nodes().Info)
|
||||
}
|
||||
|
||||
// GetNodeClientWithTimeout returns a new Client that will dial the specified
|
||||
// node using the specified timeout. If the QueryOptions is set, its region will
|
||||
// be used.
|
||||
func (c *Client) GetNodeClientWithTimeout(
|
||||
nodeID string, timeout time.Duration, q *QueryOptions) (*Client, error) {
|
||||
return c.getNodeClientImpl(nodeID, timeout, q, c.Nodes().Info)
|
||||
}
|
||||
|
||||
// nodeLookup is the definition of a function used to lookup a node. This is
|
||||
|
@ -353,7 +381,7 @@ type nodeLookup func(nodeID string, q *QueryOptions) (*Node, *QueryMeta, error)
|
|||
// getNodeClientImpl is the implementation of creating a API client for
|
||||
// contacting a node. It takes a function to lookup the node such that it can be
|
||||
// mocked during tests.
|
||||
func (c *Client) getNodeClientImpl(nodeID string, q *QueryOptions, lookup nodeLookup) (*Client, error) {
|
||||
func (c *Client) getNodeClientImpl(nodeID string, timeout time.Duration, q *QueryOptions, lookup nodeLookup) (*Client, error) {
|
||||
node, _, err := lookup(nodeID, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -379,7 +407,7 @@ func (c *Client) getNodeClientImpl(nodeID string, q *QueryOptions, lookup nodeLo
|
|||
}
|
||||
|
||||
// Get an API client for the node
|
||||
conf := c.config.ClientConfig(region, node.HTTPAddr, node.TLSEnabled)
|
||||
conf := c.config.ClientConfig(region, node.HTTPAddr, node.TLSEnabled, timeout)
|
||||
return NewClient(conf)
|
||||
}
|
||||
|
||||
|
|
|
@ -426,7 +426,7 @@ func TestClient_NodeClient(t *testing.T) {
|
|||
name := fmt.Sprintf("%s__%s__%s", c.ExpectedAddr, c.ExpectedRegion, c.ExpectedTLSServerName)
|
||||
t.Run(name, func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
nodeClient, err := c.Client.getNodeClientImpl("testID", c.QueryOptions, c.Node)
|
||||
nodeClient, err := c.Client.getNodeClientImpl("testID", -1, c.QueryOptions, c.Node)
|
||||
assert.Nil(err)
|
||||
assert.Equal(c.ExpectedRegion, nodeClient.config.Region)
|
||||
assert.Equal(c.ExpectedAddr, nodeClient.config.Address)
|
||||
|
|
89
api/fs.go
89
api/fs.go
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -51,22 +52,16 @@ func (c *Client) AllocFS() *AllocFS {
|
|||
|
||||
// List is used to list the files at a given path of an allocation directory
|
||||
func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*AllocFileInfo, *QueryMeta, error) {
|
||||
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if q == nil {
|
||||
q = &QueryOptions{}
|
||||
}
|
||||
if q.Params == nil {
|
||||
q.Params = make(map[string]string)
|
||||
}
|
||||
|
||||
q.Params["path"] = path
|
||||
|
||||
var resp []*AllocFileInfo
|
||||
qm, err := nodeClient.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q)
|
||||
qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -76,11 +71,6 @@ func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*Allo
|
|||
|
||||
// Stat is used to stat a file at a given path of an allocation directory
|
||||
func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocFileInfo, *QueryMeta, error) {
|
||||
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if q == nil {
|
||||
q = &QueryOptions{}
|
||||
}
|
||||
|
@ -91,7 +81,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
|
|||
q.Params["path"] = path
|
||||
|
||||
var resp AllocFileInfo
|
||||
qm, err := nodeClient.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q)
|
||||
qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -101,7 +91,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
|
|||
// ReadAt is used to read bytes at a given offset until limit at the given path
|
||||
// in an allocation directory. If limit is <= 0, there is no limit.
|
||||
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
|
||||
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
|
||||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -117,17 +107,28 @@ func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int
|
|||
q.Params["offset"] = strconv.FormatInt(offset, 10)
|
||||
q.Params["limit"] = strconv.FormatInt(limit, 10)
|
||||
|
||||
r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID), q)
|
||||
reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID)
|
||||
r, err := nodeClient.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// There was a networking error when talking directly to the client.
|
||||
if _, ok := err.(net.Error); !ok {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Try via the server
|
||||
r, err = a.client.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// Cat is used to read contents of a file at the given path in an allocation
|
||||
// directory
|
||||
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
|
||||
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
|
||||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -140,11 +141,21 @@ func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadC
|
|||
}
|
||||
|
||||
q.Params["path"] = path
|
||||
|
||||
r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID), q)
|
||||
reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID)
|
||||
r, err := nodeClient.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// There was a networking error when talking directly to the client.
|
||||
if _, ok := err.(net.Error); !ok {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Try via the server
|
||||
r, err = a.client.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
|
@ -160,7 +171,7 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
|
|||
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
|
||||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
|
@ -177,10 +188,21 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
|
|||
q.Params["offset"] = strconv.FormatInt(offset, 10)
|
||||
q.Params["origin"] = origin
|
||||
|
||||
r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID), q)
|
||||
reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
|
||||
r, err := nodeClient.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
// There was a networking error when talking directly to the client.
|
||||
if _, ok := err.(net.Error); !ok {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
}
|
||||
|
||||
// Try via the server
|
||||
r, err = a.client.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
}
|
||||
}
|
||||
|
||||
// Create the output channel
|
||||
|
@ -236,7 +258,7 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
|
|||
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
|
||||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
|
@ -255,10 +277,21 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
|
|||
q.Params["origin"] = origin
|
||||
q.Params["offset"] = strconv.FormatInt(offset, 10)
|
||||
|
||||
r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID), q)
|
||||
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
|
||||
r, err := nodeClient.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
// There was a networking error when talking directly to the client.
|
||||
if _, ok := err.(net.Error); !ok {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
}
|
||||
|
||||
// Try via the server
|
||||
r, err = a.client.rawQuery(reqPath, q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return nil, errCh
|
||||
}
|
||||
}
|
||||
|
||||
// Create the output channel
|
||||
|
|
24
api/nodes.go
24
api/nodes.go
|
@ -1,6 +1,7 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
)
|
||||
|
@ -72,25 +73,26 @@ func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMet
|
|||
}
|
||||
|
||||
func (n *Nodes) Stats(nodeID string, q *QueryOptions) (*HostStats, error) {
|
||||
nodeClient, err := n.client.GetNodeClient(nodeID, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var resp HostStats
|
||||
if _, err := nodeClient.query("/v1/client/stats", &resp, nil); err != nil {
|
||||
path := fmt.Sprintf("/v1/client/stats?node_id=%s", nodeID)
|
||||
if _, err := n.client.query(path, &resp, q); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
func (n *Nodes) GC(nodeID string, q *QueryOptions) error {
|
||||
nodeClient, err := n.client.GetNodeClient(nodeID, q)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var resp struct{}
|
||||
_, err = nodeClient.query("/v1/client/gc", &resp, nil)
|
||||
path := fmt.Sprintf("/v1/client/gc?node_id=%s", nodeID)
|
||||
_, err := n.client.query(path, &resp, q)
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO Add tests
|
||||
func (n *Nodes) GcAlloc(allocID string, q *QueryOptions) error {
|
||||
var resp struct{}
|
||||
path := fmt.Sprintf("/v1/client/allocation/%s/gc", allocID)
|
||||
_, err := n.client.query(path, &resp, q)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,9 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNodes_List(t *testing.T) {
|
||||
|
@ -275,3 +277,27 @@ func TestNodes_Sort(t *testing.T) {
|
|||
t.Fatalf("\n\n%#v\n\n%#v", nodes, expect)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodes_GC(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
c, s := makeClient(t, nil, nil)
|
||||
defer s.Stop()
|
||||
nodes := c.Nodes()
|
||||
|
||||
err := nodes.GC(uuid.Generate(), nil)
|
||||
require.NotNil(err)
|
||||
require.Contains(err.Error(), "Unknown node")
|
||||
}
|
||||
|
||||
func TestNodes_GcAlloc(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
c, s := makeClient(t, nil, nil)
|
||||
defer s.Stop()
|
||||
nodes := c.Nodes()
|
||||
|
||||
err := nodes.GcAlloc(uuid.Generate(), nil)
|
||||
require.NotNil(err)
|
||||
require.Contains(err.Error(), "unknown allocation")
|
||||
}
|
||||
|
|
|
@ -234,6 +234,7 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
|
|||
Path: path,
|
||||
Origin: origin,
|
||||
Offset: offset,
|
||||
Follow: true,
|
||||
}
|
||||
s.parse(resp, req, &fsReq.QueryOptions.Region, &fsReq.QueryOptions)
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
|
@ -348,12 +349,15 @@ func TestHTTP_FS_Stream(t *testing.T) {
|
|||
a.ID, offset)
|
||||
|
||||
p, _ := io.Pipe()
|
||||
|
||||
req, err := http.NewRequest("GET", path, p)
|
||||
require.Nil(err)
|
||||
respW := httptest.NewRecorder()
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
_, err = s.Server.Stream(respW, req)
|
||||
require.Nil(err)
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
out := ""
|
||||
|
@ -369,6 +373,12 @@ func TestHTTP_FS_Stream(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
})
|
||||
|
||||
select {
|
||||
case <-doneCh:
|
||||
t.Fatal("shouldn't close")
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
|
||||
p.Close()
|
||||
})
|
||||
}
|
||||
|
@ -410,3 +420,49 @@ func TestHTTP_FS_Logs(t *testing.T) {
|
|||
p.Close()
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_FS_Logs_Follow(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
httpTest(t, nil, func(s *TestAgent) {
|
||||
a := mockFSAlloc(s.client.NodeID(), nil)
|
||||
addAllocToClient(s, a, terminalClientAlloc)
|
||||
|
||||
offset := 4
|
||||
expectation := defaultLoggerMockDriverStdout[len(defaultLoggerMockDriverStdout)-offset:]
|
||||
path := fmt.Sprintf("/v1/client/fs/logs/%s?type=stdout&task=web&offset=%d&origin=end&plain=true&follow=true",
|
||||
a.ID, offset)
|
||||
|
||||
p, _ := io.Pipe()
|
||||
req, err := http.NewRequest("GET", path, p)
|
||||
require.Nil(err)
|
||||
respW := httptest.NewRecorder()
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
_, err = s.Server.Logs(respW, req)
|
||||
require.Nil(err)
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
out := ""
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
output, err := ioutil.ReadAll(respW.Body)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
out += string(output)
|
||||
return out == expectation, fmt.Errorf("%q != %q", out, expectation)
|
||||
}, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
|
||||
select {
|
||||
case <-doneCh:
|
||||
t.Fatal("shouldn't close")
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
|
||||
p.Close()
|
||||
})
|
||||
}
|
||||
|
|
|
@ -12,9 +12,17 @@ client {
|
|||
# this should be like "nomad.service.consul:4647" and a system
|
||||
# like Consul used for service discovery.
|
||||
servers = ["127.0.0.1:4647"]
|
||||
|
||||
options {
|
||||
"driver.raw_exec.enable" = "1"
|
||||
}
|
||||
}
|
||||
|
||||
# Modify our port to avoid a collision with server1 and client1
|
||||
ports {
|
||||
http = 5657
|
||||
}
|
||||
|
||||
advertise {
|
||||
http = "127.0.1.1"
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ func (c *Conn) getClient() (*StreamClient, error) {
|
|||
return sc, nil
|
||||
}
|
||||
|
||||
// returnStream is used when done with a stream
|
||||
// returnClient is used when done with a stream
|
||||
// to allow re-use by a future RPC
|
||||
func (c *Conn) returnClient(client *StreamClient) {
|
||||
didSave := false
|
||||
|
@ -357,7 +357,8 @@ func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn,
|
|||
return c, nil
|
||||
}
|
||||
|
||||
// clearConn is used to clear any cached connection, potentially in response to an erro
|
||||
// clearConn is used to clear any cached connection, potentially in response to
|
||||
// an error
|
||||
func (p *ConnPool) clearConn(conn *Conn) {
|
||||
// Ensure returned streams are closed
|
||||
atomic.StoreInt32(&conn.shouldClose, 1)
|
||||
|
|
Loading…
Reference in New Issue