yamux
This commit is contained in:
parent
9dc26699b4
commit
9971b3393f
|
@ -17,6 +17,7 @@ import (
|
|||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
|
@ -26,7 +27,6 @@ import (
|
|||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/allocrunner"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
|
@ -212,16 +212,19 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
|||
}
|
||||
}
|
||||
|
||||
// Create the logger
|
||||
logger := cfg.Logger.ResetNamed("")
|
||||
|
||||
// Create the client
|
||||
c := &Client{
|
||||
config: cfg,
|
||||
consulCatalog: consulCatalog,
|
||||
consulService: consulService,
|
||||
start: time.Now(),
|
||||
connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
|
||||
connPool: pool.NewPool(logger, clientRPCCache, clientMaxStreams, tlsWrap),
|
||||
tlsWrap: tlsWrap,
|
||||
streamingRpcs: structs.NewStreamingRpcRegistry(),
|
||||
logger: cfg.Logger.ResetNamed("").StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
allocs: make(map[string]*allocrunner.AllocRunner),
|
||||
allocUpdates: make(chan *structs.Allocation, 64),
|
||||
shutdownCh: make(chan struct{}),
|
||||
|
|
|
@ -4,12 +4,14 @@ import (
|
|||
"container/list"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/rpc"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/nomad/helper/tlsutil"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
@ -121,8 +123,8 @@ func (c *Conn) returnClient(client *StreamClient) {
|
|||
type ConnPool struct {
|
||||
sync.Mutex
|
||||
|
||||
// LogOutput is used to control logging
|
||||
logOutput io.Writer
|
||||
// logger is the logger to be used
|
||||
logger *log.Logger
|
||||
|
||||
// The maximum time to keep a connection open
|
||||
maxTime time.Duration
|
||||
|
@ -156,9 +158,9 @@ type ConnPool struct {
|
|||
// Set maxTime to 0 to disable reaping. maxStreams is used to control
|
||||
// the number of idle streams allowed.
|
||||
// If TLS settings are provided outgoing connections use TLS.
|
||||
func NewPool(logOutput io.Writer, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.RegionWrapper) *ConnPool {
|
||||
func NewPool(logger hclog.Logger, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.RegionWrapper) *ConnPool {
|
||||
pool := &ConnPool{
|
||||
logOutput: logOutput,
|
||||
logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
maxTime: maxTime,
|
||||
maxStreams: maxStreams,
|
||||
pool: make(map[string]*Conn),
|
||||
|
@ -335,7 +337,8 @@ func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn,
|
|||
|
||||
// Setup the logger
|
||||
conf := yamux.DefaultConfig()
|
||||
conf.LogOutput = p.logOutput
|
||||
conf.LogOutput = nil
|
||||
conf.Logger = p.logger
|
||||
|
||||
// Create a multiplexed session
|
||||
session, err := yamux.Client(conn, conf)
|
||||
|
|
|
@ -14,8 +14,8 @@ import (
|
|||
)
|
||||
|
||||
func newTestPool(t *testing.T) *ConnPool {
|
||||
w := testlog.NewWriter(t)
|
||||
p := NewPool(w, 1*time.Minute, 10, nil)
|
||||
l := testlog.HCLogger(t)
|
||||
p := NewPool(l, 1*time.Minute, 10, nil)
|
||||
return p
|
||||
}
|
||||
|
||||
|
|
17
nomad/rpc.go
17
nomad/rpc.go
|
@ -13,6 +13,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
golog "log"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
|
@ -47,13 +49,16 @@ const (
|
|||
|
||||
type rpcHandler struct {
|
||||
*Server
|
||||
logger log.Logger
|
||||
logger log.Logger
|
||||
gologger *golog.Logger
|
||||
}
|
||||
|
||||
func newRpcHandler(s *Server) *rpcHandler {
|
||||
logger := s.logger.Named("rpc")
|
||||
return &rpcHandler{
|
||||
Server: s,
|
||||
logger: s.logger.Named("rpc"),
|
||||
Server: s,
|
||||
logger: logger,
|
||||
gologger: logger.StandardLogger(&log.StandardLoggerOptions{InferLevels: true}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -207,7 +212,8 @@ func (r *rpcHandler) handleMultiplex(ctx context.Context, conn net.Conn, rpcCtx
|
|||
}()
|
||||
|
||||
conf := yamux.DefaultConfig()
|
||||
conf.LogOutput = r.config.LogOutput
|
||||
conf.LogOutput = nil
|
||||
conf.Logger = r.gologger
|
||||
server, err := yamux.Server(conn, conf)
|
||||
if err != nil {
|
||||
r.logger.Error("multiplex failed to create yamux server", "error", err)
|
||||
|
@ -315,7 +321,8 @@ func (r *rpcHandler) handleMultiplexV2(ctx context.Context, conn net.Conn, rpcCt
|
|||
}()
|
||||
|
||||
conf := yamux.DefaultConfig()
|
||||
conf.LogOutput = r.config.LogOutput
|
||||
conf.LogOutput = nil
|
||||
conf.Logger = r.gologger
|
||||
server, err := yamux.Server(conn, conf)
|
||||
if err != nil {
|
||||
r.logger.Error("multiplex_v2 failed to create yamux server", "error", err)
|
||||
|
|
|
@ -297,7 +297,7 @@ func TestRPC_handleMultiplexV2(t *testing.T) {
|
|||
|
||||
// Make two streams
|
||||
conf := yamux.DefaultConfig()
|
||||
conf.LogOutput = testlog.NewWriter(t)
|
||||
conf.Logger = testlog.Logger(t)
|
||||
session, err := yamux.Client(p1, conf)
|
||||
require.Nil(err)
|
||||
|
||||
|
|
|
@ -286,12 +286,15 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Create the logger
|
||||
logger := config.Logger.ResetNamed("nomad")
|
||||
|
||||
// Create the server
|
||||
s := &Server{
|
||||
config: config,
|
||||
consulCatalog: consulCatalog,
|
||||
connPool: pool.NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
|
||||
logger: config.Logger.ResetNamed("nomad"),
|
||||
connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap),
|
||||
logger: logger,
|
||||
tlsWrap: tlsWrap,
|
||||
rpcServer: rpc.NewServer(),
|
||||
streamingRpcs: structs.NewStreamingRpcRegistry(),
|
||||
|
|
Loading…
Reference in New Issue