Merge pull request #4686 from hashicorp/f-logger-deps
Use StandardLogger for Raft/Serf/Memberlist/Yamux
This commit is contained in:
commit
a7c1dbb152
|
@ -17,6 +17,7 @@ import (
|
||||||
|
|
||||||
metrics "github.com/armon/go-metrics"
|
metrics "github.com/armon/go-metrics"
|
||||||
consulapi "github.com/hashicorp/consul/api"
|
consulapi "github.com/hashicorp/consul/api"
|
||||||
|
hclog "github.com/hashicorp/go-hclog"
|
||||||
multierror "github.com/hashicorp/go-multierror"
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||||
|
@ -26,7 +27,6 @@ import (
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
"github.com/boltdb/bolt"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
"github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/hashicorp/nomad/client/allocdir"
|
"github.com/hashicorp/nomad/client/allocdir"
|
||||||
"github.com/hashicorp/nomad/client/allocrunner"
|
"github.com/hashicorp/nomad/client/allocrunner"
|
||||||
"github.com/hashicorp/nomad/client/config"
|
"github.com/hashicorp/nomad/client/config"
|
||||||
|
@ -212,16 +212,19 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create the logger
|
||||||
|
logger := cfg.Logger.ResetNamed("")
|
||||||
|
|
||||||
// Create the client
|
// Create the client
|
||||||
c := &Client{
|
c := &Client{
|
||||||
config: cfg,
|
config: cfg,
|
||||||
consulCatalog: consulCatalog,
|
consulCatalog: consulCatalog,
|
||||||
consulService: consulService,
|
consulService: consulService,
|
||||||
start: time.Now(),
|
start: time.Now(),
|
||||||
connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
|
connPool: pool.NewPool(logger, clientRPCCache, clientMaxStreams, tlsWrap),
|
||||||
tlsWrap: tlsWrap,
|
tlsWrap: tlsWrap,
|
||||||
streamingRpcs: structs.NewStreamingRpcRegistry(),
|
streamingRpcs: structs.NewStreamingRpcRegistry(),
|
||||||
logger: cfg.Logger.ResetNamed("").StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||||
allocs: make(map[string]*allocrunner.AllocRunner),
|
allocs: make(map[string]*allocrunner.AllocRunner),
|
||||||
allocUpdates: make(chan *structs.Allocation, 64),
|
allocUpdates: make(chan *structs.Allocation, 64),
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
|
|
|
@ -4,12 +4,14 @@ import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"net/rpc"
|
"net/rpc"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
hclog "github.com/hashicorp/go-hclog"
|
||||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
"github.com/hashicorp/nomad/helper/tlsutil"
|
"github.com/hashicorp/nomad/helper/tlsutil"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
@ -121,8 +123,8 @@ func (c *Conn) returnClient(client *StreamClient) {
|
||||||
type ConnPool struct {
|
type ConnPool struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
// LogOutput is used to control logging
|
// logger is the logger to be used
|
||||||
logOutput io.Writer
|
logger *log.Logger
|
||||||
|
|
||||||
// The maximum time to keep a connection open
|
// The maximum time to keep a connection open
|
||||||
maxTime time.Duration
|
maxTime time.Duration
|
||||||
|
@ -156,9 +158,9 @@ type ConnPool struct {
|
||||||
// Set maxTime to 0 to disable reaping. maxStreams is used to control
|
// Set maxTime to 0 to disable reaping. maxStreams is used to control
|
||||||
// the number of idle streams allowed.
|
// the number of idle streams allowed.
|
||||||
// If TLS settings are provided outgoing connections use TLS.
|
// If TLS settings are provided outgoing connections use TLS.
|
||||||
func NewPool(logOutput io.Writer, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.RegionWrapper) *ConnPool {
|
func NewPool(logger hclog.Logger, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.RegionWrapper) *ConnPool {
|
||||||
pool := &ConnPool{
|
pool := &ConnPool{
|
||||||
logOutput: logOutput,
|
logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||||
maxTime: maxTime,
|
maxTime: maxTime,
|
||||||
maxStreams: maxStreams,
|
maxStreams: maxStreams,
|
||||||
pool: make(map[string]*Conn),
|
pool: make(map[string]*Conn),
|
||||||
|
@ -335,7 +337,8 @@ func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn,
|
||||||
|
|
||||||
// Setup the logger
|
// Setup the logger
|
||||||
conf := yamux.DefaultConfig()
|
conf := yamux.DefaultConfig()
|
||||||
conf.LogOutput = p.logOutput
|
conf.LogOutput = nil
|
||||||
|
conf.Logger = p.logger
|
||||||
|
|
||||||
// Create a multiplexed session
|
// Create a multiplexed session
|
||||||
session, err := yamux.Client(conn, conf)
|
session, err := yamux.Client(conn, conf)
|
||||||
|
|
|
@ -14,8 +14,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func newTestPool(t *testing.T) *ConnPool {
|
func newTestPool(t *testing.T) *ConnPool {
|
||||||
w := testlog.NewWriter(t)
|
l := testlog.HCLogger(t)
|
||||||
p := NewPool(w, 1*time.Minute, 10, nil)
|
p := NewPool(l, 1*time.Minute, 10, nil)
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
17
nomad/rpc.go
17
nomad/rpc.go
|
@ -13,6 +13,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
golog "log"
|
||||||
|
|
||||||
metrics "github.com/armon/go-metrics"
|
metrics "github.com/armon/go-metrics"
|
||||||
log "github.com/hashicorp/go-hclog"
|
log "github.com/hashicorp/go-hclog"
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
@ -47,13 +49,16 @@ const (
|
||||||
|
|
||||||
type rpcHandler struct {
|
type rpcHandler struct {
|
||||||
*Server
|
*Server
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
|
gologger *golog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRpcHandler(s *Server) *rpcHandler {
|
func newRpcHandler(s *Server) *rpcHandler {
|
||||||
|
logger := s.logger.Named("rpc")
|
||||||
return &rpcHandler{
|
return &rpcHandler{
|
||||||
Server: s,
|
Server: s,
|
||||||
logger: s.logger.Named("rpc"),
|
logger: logger,
|
||||||
|
gologger: logger.StandardLogger(&log.StandardLoggerOptions{InferLevels: true}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,7 +212,8 @@ func (r *rpcHandler) handleMultiplex(ctx context.Context, conn net.Conn, rpcCtx
|
||||||
}()
|
}()
|
||||||
|
|
||||||
conf := yamux.DefaultConfig()
|
conf := yamux.DefaultConfig()
|
||||||
conf.LogOutput = r.config.LogOutput
|
conf.LogOutput = nil
|
||||||
|
conf.Logger = r.gologger
|
||||||
server, err := yamux.Server(conn, conf)
|
server, err := yamux.Server(conn, conf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.logger.Error("multiplex failed to create yamux server", "error", err)
|
r.logger.Error("multiplex failed to create yamux server", "error", err)
|
||||||
|
@ -315,7 +321,8 @@ func (r *rpcHandler) handleMultiplexV2(ctx context.Context, conn net.Conn, rpcCt
|
||||||
}()
|
}()
|
||||||
|
|
||||||
conf := yamux.DefaultConfig()
|
conf := yamux.DefaultConfig()
|
||||||
conf.LogOutput = r.config.LogOutput
|
conf.LogOutput = nil
|
||||||
|
conf.Logger = r.gologger
|
||||||
server, err := yamux.Server(conn, conf)
|
server, err := yamux.Server(conn, conf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.logger.Error("multiplex_v2 failed to create yamux server", "error", err)
|
r.logger.Error("multiplex_v2 failed to create yamux server", "error", err)
|
||||||
|
|
|
@ -297,7 +297,7 @@ func TestRPC_handleMultiplexV2(t *testing.T) {
|
||||||
|
|
||||||
// Make two streams
|
// Make two streams
|
||||||
conf := yamux.DefaultConfig()
|
conf := yamux.DefaultConfig()
|
||||||
conf.LogOutput = testlog.NewWriter(t)
|
conf.Logger = testlog.Logger(t)
|
||||||
session, err := yamux.Client(p1, conf)
|
session, err := yamux.Client(p1, conf)
|
||||||
require.Nil(err)
|
require.Nil(err)
|
||||||
|
|
||||||
|
|
|
@ -286,12 +286,15 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create the logger
|
||||||
|
logger := config.Logger.ResetNamed("nomad")
|
||||||
|
|
||||||
// Create the server
|
// Create the server
|
||||||
s := &Server{
|
s := &Server{
|
||||||
config: config,
|
config: config,
|
||||||
consulCatalog: consulCatalog,
|
consulCatalog: consulCatalog,
|
||||||
connPool: pool.NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
|
connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap),
|
||||||
logger: config.Logger.ResetNamed("nomad"),
|
logger: logger,
|
||||||
tlsWrap: tlsWrap,
|
tlsWrap: tlsWrap,
|
||||||
rpcServer: rpc.NewServer(),
|
rpcServer: rpc.NewServer(),
|
||||||
streamingRpcs: structs.NewStreamingRpcRegistry(),
|
streamingRpcs: structs.NewStreamingRpcRegistry(),
|
||||||
|
@ -1089,8 +1092,10 @@ func (s *Server) setupRaft() error {
|
||||||
s.config.LogOutput)
|
s.config.LogOutput)
|
||||||
s.raftTransport = trans
|
s.raftTransport = trans
|
||||||
|
|
||||||
// Make sure we set the LogOutput.
|
// Make sure we set the Logger.
|
||||||
s.config.RaftConfig.LogOutput = s.config.LogOutput
|
logger := s.logger.StandardLogger(&log.StandardLoggerOptions{InferLevels: true})
|
||||||
|
s.config.RaftConfig.Logger = logger
|
||||||
|
s.config.RaftConfig.LogOutput = nil
|
||||||
|
|
||||||
// Our version of Raft protocol requires the LocalID to match the network
|
// Our version of Raft protocol requires the LocalID to match the network
|
||||||
// address of the transport.
|
// address of the transport.
|
||||||
|
@ -1253,8 +1258,11 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
|
||||||
if s.config.UpgradeVersion != "" {
|
if s.config.UpgradeVersion != "" {
|
||||||
conf.Tags[AutopilotVersionTag] = s.config.UpgradeVersion
|
conf.Tags[AutopilotVersionTag] = s.config.UpgradeVersion
|
||||||
}
|
}
|
||||||
conf.MemberlistConfig.LogOutput = s.config.LogOutput
|
logger := s.logger.StandardLogger(&log.StandardLoggerOptions{InferLevels: true})
|
||||||
conf.LogOutput = s.config.LogOutput
|
conf.MemberlistConfig.Logger = logger
|
||||||
|
conf.Logger = logger
|
||||||
|
conf.MemberlistConfig.LogOutput = nil
|
||||||
|
conf.LogOutput = nil
|
||||||
conf.EventCh = ch
|
conf.EventCh = ch
|
||||||
if !s.config.DevMode {
|
if !s.config.DevMode {
|
||||||
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
|
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
|
||||||
|
|
13
vendor/github.com/hashicorp/yamux/mux.go
generated
vendored
13
vendor/github.com/hashicorp/yamux/mux.go
generated
vendored
|
@ -3,6 +3,7 @@ package yamux
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -30,8 +31,13 @@ type Config struct {
|
||||||
// window size that we allow for a stream.
|
// window size that we allow for a stream.
|
||||||
MaxStreamWindowSize uint32
|
MaxStreamWindowSize uint32
|
||||||
|
|
||||||
// LogOutput is used to control the log destination
|
// LogOutput is used to control the log destination. Either Logger or
|
||||||
|
// LogOutput can be set, not both.
|
||||||
LogOutput io.Writer
|
LogOutput io.Writer
|
||||||
|
|
||||||
|
// Logger is used to pass in the logger to be used. Either Logger or
|
||||||
|
// LogOutput can be set, not both.
|
||||||
|
Logger *log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultConfig is used to return a default configuration
|
// DefaultConfig is used to return a default configuration
|
||||||
|
@ -57,6 +63,11 @@ func VerifyConfig(config *Config) error {
|
||||||
if config.MaxStreamWindowSize < initialStreamWindow {
|
if config.MaxStreamWindowSize < initialStreamWindow {
|
||||||
return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow)
|
return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow)
|
||||||
}
|
}
|
||||||
|
if config.LogOutput != nil && config.Logger != nil {
|
||||||
|
return fmt.Errorf("both Logger and LogOutput may not be set, select one")
|
||||||
|
} else if config.LogOutput == nil && config.Logger == nil {
|
||||||
|
return fmt.Errorf("one of Logger or LogOutput must be set, select one")
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
13
vendor/github.com/hashicorp/yamux/session.go
generated
vendored
13
vendor/github.com/hashicorp/yamux/session.go
generated
vendored
|
@ -86,9 +86,14 @@ type sendReady struct {
|
||||||
|
|
||||||
// newSession is used to construct a new session
|
// newSession is used to construct a new session
|
||||||
func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
|
func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
|
||||||
|
logger := config.Logger
|
||||||
|
if logger == nil {
|
||||||
|
logger = log.New(config.LogOutput, "", log.LstdFlags)
|
||||||
|
}
|
||||||
|
|
||||||
s := &Session{
|
s := &Session{
|
||||||
config: config,
|
config: config,
|
||||||
logger: log.New(config.LogOutput, "", log.LstdFlags),
|
logger: logger,
|
||||||
conn: conn,
|
conn: conn,
|
||||||
bufRead: bufio.NewReader(conn),
|
bufRead: bufio.NewReader(conn),
|
||||||
pings: make(map[uint32]chan struct{}),
|
pings: make(map[uint32]chan struct{}),
|
||||||
|
@ -309,8 +314,10 @@ func (s *Session) keepalive() {
|
||||||
case <-time.After(s.config.KeepAliveInterval):
|
case <-time.After(s.config.KeepAliveInterval):
|
||||||
_, err := s.Ping()
|
_, err := s.Ping()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
|
if err != ErrSessionShutdown {
|
||||||
s.exitErr(ErrKeepAliveTimeout)
|
s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
|
||||||
|
s.exitErr(ErrKeepAliveTimeout)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-s.shutdownCh:
|
case <-s.shutdownCh:
|
||||||
|
|
2
vendor/vendor.json
vendored
2
vendor/vendor.json
vendored
|
@ -202,7 +202,7 @@
|
||||||
{"path":"github.com/hashicorp/vault/helper/jsonutil","checksumSHA1":"POgkM3GrjRFw6H3sw95YNEs552A=","revision":"8575f8fedcf8f5a6eb2b4701cb527b99574b5286","revisionTime":"2018-09-06T17:45:45Z"},
|
{"path":"github.com/hashicorp/vault/helper/jsonutil","checksumSHA1":"POgkM3GrjRFw6H3sw95YNEs552A=","revision":"8575f8fedcf8f5a6eb2b4701cb527b99574b5286","revisionTime":"2018-09-06T17:45:45Z"},
|
||||||
{"path":"github.com/hashicorp/vault/helper/parseutil","checksumSHA1":"HA2MV/2XI0HcoThSRxQCaBZR2ps=","revision":"8575f8fedcf8f5a6eb2b4701cb527b99574b5286","revisionTime":"2018-09-06T17:45:45Z"},
|
{"path":"github.com/hashicorp/vault/helper/parseutil","checksumSHA1":"HA2MV/2XI0HcoThSRxQCaBZR2ps=","revision":"8575f8fedcf8f5a6eb2b4701cb527b99574b5286","revisionTime":"2018-09-06T17:45:45Z"},
|
||||||
{"path":"github.com/hashicorp/vault/helper/strutil","checksumSHA1":"HdVuYhZ5TuxeIFqi0jy2GHW7a4o=","revision":"8575f8fedcf8f5a6eb2b4701cb527b99574b5286","revisionTime":"2018-09-06T17:45:45Z"},
|
{"path":"github.com/hashicorp/vault/helper/strutil","checksumSHA1":"HdVuYhZ5TuxeIFqi0jy2GHW7a4o=","revision":"8575f8fedcf8f5a6eb2b4701cb527b99574b5286","revisionTime":"2018-09-06T17:45:45Z"},
|
||||||
{"path":"github.com/hashicorp/yamux","checksumSHA1":"NnWv17i1tpvBNJtpdRRWpE6j4LY=","revision":"2658be15c5f05e76244154714161f17e3e77de2e","revisionTime":"2018-03-14T20:07:45Z"},
|
{"path":"github.com/hashicorp/yamux","checksumSHA1":"m9OKUPd/iliwKxs+LCSmAGpDJOs=","revision":"7221087c3d281fda5f794e28c2ea4c6e4d5c4558","revisionTime":"2018-09-17T20:50:41Z"},
|
||||||
{"path":"github.com/hpcloud/tail/util","checksumSHA1":"0xM336Lb25URO/1W1/CtGoRygVU=","revision":"37f4271387456dd1bf82ab1ad9229f060cc45386","revisionTime":"2017-08-14T16:06:53Z"},
|
{"path":"github.com/hpcloud/tail/util","checksumSHA1":"0xM336Lb25URO/1W1/CtGoRygVU=","revision":"37f4271387456dd1bf82ab1ad9229f060cc45386","revisionTime":"2017-08-14T16:06:53Z"},
|
||||||
{"path":"github.com/hpcloud/tail/watch","checksumSHA1":"TP4OAv5JMtzj2TB6OQBKqauaKDc=","revision":"37f4271387456dd1bf82ab1ad9229f060cc45386","revisionTime":"2017-08-14T16:06:53Z"},
|
{"path":"github.com/hpcloud/tail/watch","checksumSHA1":"TP4OAv5JMtzj2TB6OQBKqauaKDc=","revision":"37f4271387456dd1bf82ab1ad9229f060cc45386","revisionTime":"2017-08-14T16:06:53Z"},
|
||||||
{"path":"github.com/jmespath/go-jmespath","comment":"0.2.2-2-gc01cf91","revision":"c01cf91b011868172fdcd9f41838e80c9d716264"},
|
{"path":"github.com/jmespath/go-jmespath","comment":"0.2.2-2-gc01cf91","revision":"c01cf91b011868172fdcd9f41838e80c9d716264"},
|
||||||
|
|
Loading…
Reference in a new issue