agent + consul
This commit is contained in:
parent
bb0f869eb2
commit
7739ef51ce
|
@ -51,7 +51,7 @@ func TestAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation, restarts
|
|||
alloc.Job.Type = structs.JobTypeBatch
|
||||
}
|
||||
vclient := vaultclient.NewMockVaultClient()
|
||||
ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, consulApi.NewMockConsulServiceClient(t), NoopPrevAlloc{})
|
||||
ar := NewAllocRunner(testlog.Logger(t), conf, db, upd.Update, alloc, vclient, consulApi.NewMockConsulServiceClient(t, testlog.HCLogger(t)), NoopPrevAlloc{})
|
||||
return upd, ar
|
||||
}
|
||||
|
||||
|
|
|
@ -16,28 +16,30 @@ import (
|
|||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/boltdb/bolt"
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
hstats "github.com/hashicorp/nomad/helper/stats"
|
||||
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
|
||||
vaultapi "github.com/hashicorp/vault/api"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocdir"
|
||||
"github.com/hashicorp/nomad/client/allocrunner"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||
"github.com/hashicorp/nomad/client/servers"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
"github.com/hashicorp/nomad/client/stats"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/client/vaultclient"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/helper/pool"
|
||||
hstats "github.com/hashicorp/nomad/helper/stats"
|
||||
"github.com/hashicorp/nomad/helper/tlsutil"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
|
||||
vaultapi "github.com/hashicorp/vault/api"
|
||||
"github.com/shirou/gopsutil/host"
|
||||
)
|
||||
|
||||
|
@ -196,7 +198,7 @@ var (
|
|||
)
|
||||
|
||||
// NewClient is used to create a new client from the given configuration
|
||||
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService consulApi.ConsulServiceAPI, logger *log.Logger) (*Client, error) {
|
||||
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService consulApi.ConsulServiceAPI) (*Client, error) {
|
||||
// Create the tls wrapper
|
||||
var tlsWrap tlsutil.RegionWrapper
|
||||
if cfg.TLSConfig.EnableRPC {
|
||||
|
@ -219,7 +221,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
|||
connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
|
||||
tlsWrap: tlsWrap,
|
||||
streamingRpcs: structs.NewStreamingRpcRegistry(),
|
||||
logger: logger,
|
||||
logger: cfg.Logger.ResetNamed("").StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
allocs: make(map[string]*allocrunner.AllocRunner),
|
||||
allocUpdates: make(chan *structs.Allocation, 64),
|
||||
shutdownCh: make(chan struct{}),
|
||||
|
@ -245,7 +247,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
|||
}
|
||||
|
||||
// Add the stats collector
|
||||
statsCollector := stats.NewHostStatsCollector(logger, c.config.AllocDir)
|
||||
statsCollector := stats.NewHostStatsCollector(c.logger, c.config.AllocDir)
|
||||
c.hostStatsCollector = statsCollector
|
||||
|
||||
// Add the garbage collector
|
||||
|
@ -257,7 +259,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
|||
ParallelDestroys: cfg.GCParallelDestroys,
|
||||
ReservedDiskMB: cfg.Node.Reserved.DiskMB,
|
||||
}
|
||||
c.garbageCollector = NewAllocGarbageCollector(logger, statsCollector, c, gcConfig)
|
||||
c.garbageCollector = NewAllocGarbageCollector(c.logger, statsCollector, c, gcConfig)
|
||||
go c.garbageCollector.Run()
|
||||
|
||||
// Setup the node
|
||||
|
@ -287,7 +289,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
|||
c.configLock.RLock()
|
||||
if len(c.configCopy.Servers) > 0 {
|
||||
if _, err := c.setServersImpl(c.configCopy.Servers, true); err != nil {
|
||||
logger.Printf("[WARN] client: None of the configured servers are valid: %v", err)
|
||||
c.logger.Printf("[WARN] client: None of the configured servers are valid: %v", err)
|
||||
}
|
||||
}
|
||||
c.configLock.RUnlock()
|
||||
|
@ -308,13 +310,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
|||
|
||||
// Restore the state
|
||||
if err := c.restoreState(); err != nil {
|
||||
logger.Printf("[ERR] client: failed to restore state: %v", err)
|
||||
logger.Printf("[ERR] client: Nomad is unable to start due to corrupt state. "+
|
||||
c.logger.Printf("[ERR] client: failed to restore state: %v", err)
|
||||
c.logger.Printf("[ERR] client: Nomad is unable to start due to corrupt state. "+
|
||||
"The safest way to proceed is to manually stop running task processes "+
|
||||
"and remove Nomad's state (%q) and alloc (%q) directories before "+
|
||||
"restarting. Lost allocations will be rescheduled.",
|
||||
c.config.StateDir, c.config.AllocDir)
|
||||
logger.Printf("[ERR] client: Corrupt state is often caused by a bug. Please " +
|
||||
c.logger.Printf("[ERR] client: Corrupt state is often caused by a bug. Please " +
|
||||
"report as much information as possible to " +
|
||||
"https://github.com/hashicorp/nomad/issues")
|
||||
return nil, fmt.Errorf("failed to restore state")
|
||||
|
|
|
@ -602,11 +602,11 @@ func TestClient_SaveRestoreState(t *testing.T) {
|
|||
}
|
||||
|
||||
// Create a new client
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
c1.config.Logger = logger
|
||||
catalog := consul.NewMockCatalog(logger)
|
||||
mockService := consulApi.NewMockConsulServiceClient(t)
|
||||
mockService.Logger = logger
|
||||
c2, err := NewClient(c1.config, catalog, mockService, logger)
|
||||
mockService := consulApi.NewMockConsulServiceClient(t, logger)
|
||||
c2, err := NewClient(c1.config, catalog, mockService)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
|
@ -8,6 +8,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/nomad/structs/config"
|
||||
|
@ -76,6 +78,9 @@ type Config struct {
|
|||
// LogOutput is the destination for logs
|
||||
LogOutput io.Writer
|
||||
|
||||
// Logger provides a logger to thhe client
|
||||
Logger log.Logger
|
||||
|
||||
// Region is the clients region
|
||||
Region string
|
||||
|
||||
|
|
|
@ -2,11 +2,11 @@ package consul
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
)
|
||||
|
||||
|
@ -34,17 +34,18 @@ type MockConsulServiceClient struct {
|
|||
Ops []MockConsulOp
|
||||
mu sync.Mutex
|
||||
|
||||
Logger *log.Logger
|
||||
logger log.Logger
|
||||
|
||||
// AllocRegistrationsFn allows injecting return values for the
|
||||
// AllocRegistrations function.
|
||||
AllocRegistrationsFn func(allocID string) (*consul.AllocRegistration, error)
|
||||
}
|
||||
|
||||
func NewMockConsulServiceClient(t testing.T) *MockConsulServiceClient {
|
||||
func NewMockConsulServiceClient(t testing.T, logger log.Logger) *MockConsulServiceClient {
|
||||
logger = logger.Named("mock_consul")
|
||||
m := MockConsulServiceClient{
|
||||
Ops: make([]MockConsulOp, 0, 20),
|
||||
Logger: testlog.Logger(t),
|
||||
logger: logger,
|
||||
}
|
||||
return &m
|
||||
}
|
||||
|
@ -52,7 +53,7 @@ func NewMockConsulServiceClient(t testing.T) *MockConsulServiceClient {
|
|||
func (m *MockConsulServiceClient) UpdateTask(old, new *consul.TaskServices) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.Logger.Printf("[TEST] mock_consul: UpdateTask(alloc: %s, task: %s)", new.AllocID[:6], new.Name)
|
||||
m.logger.Trace("UpdateTask", "alloc_id", new.AllocID, "task", new.Name)
|
||||
m.Ops = append(m.Ops, NewMockConsulOp("update", new.AllocID, new.Name))
|
||||
return nil
|
||||
}
|
||||
|
@ -60,7 +61,7 @@ func (m *MockConsulServiceClient) UpdateTask(old, new *consul.TaskServices) erro
|
|||
func (m *MockConsulServiceClient) RegisterTask(task *consul.TaskServices) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.Logger.Printf("[TEST] mock_consul: RegisterTask(alloc: %s, task: %s)", task.AllocID, task.Name)
|
||||
m.logger.Trace("RegisterTask", "alloc_id", task.AllocID, "task", task.Name)
|
||||
m.Ops = append(m.Ops, NewMockConsulOp("add", task.AllocID, task.Name))
|
||||
return nil
|
||||
}
|
||||
|
@ -68,14 +69,14 @@ func (m *MockConsulServiceClient) RegisterTask(task *consul.TaskServices) error
|
|||
func (m *MockConsulServiceClient) RemoveTask(task *consul.TaskServices) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.Logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", task.AllocID, task.Name)
|
||||
m.logger.Trace("RemoveTask", "alloc_id", task.AllocID, "task", task.Name)
|
||||
m.Ops = append(m.Ops, NewMockConsulOp("remove", task.AllocID, task.Name))
|
||||
}
|
||||
|
||||
func (m *MockConsulServiceClient) AllocRegistrations(allocID string) (*consul.AllocRegistration, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.Logger.Printf("[TEST] mock_consul: AllocRegistrations(%q)", allocID)
|
||||
m.logger.Trace("AllocRegistrations", "alloc_id", allocID)
|
||||
m.Ops = append(m.Ops, NewMockConsulOp("alloc_registrations", allocID, ""))
|
||||
|
||||
if m.AllocRegistrationsFn != nil {
|
||||
|
|
|
@ -36,11 +36,11 @@ func TestClient(t testing.T, cb func(c *config.Config)) *Client {
|
|||
cb(conf)
|
||||
}
|
||||
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
conf.Logger = logger
|
||||
catalog := consul.NewMockCatalog(logger)
|
||||
mockService := consulApi.NewMockConsulServiceClient(t)
|
||||
mockService.Logger = logger
|
||||
client, err := NewClient(conf, catalog, mockService, logger)
|
||||
mockService := consulApi.NewMockConsulServiceClient(t, logger)
|
||||
client, err := NewClient(conf, catalog, mockService)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -14,12 +13,16 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
golog "log"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
uuidparse "github.com/hashicorp/go-uuid"
|
||||
clientconfig "github.com/hashicorp/nomad/client/config"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
uuidparse "github.com/hashicorp/go-uuid"
|
||||
"github.com/hashicorp/nomad/client"
|
||||
clientconfig "github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad"
|
||||
|
@ -50,8 +53,9 @@ type Agent struct {
|
|||
config *Config
|
||||
configLock sync.Mutex
|
||||
|
||||
logger *log.Logger
|
||||
logOutput io.Writer
|
||||
logger log.Logger
|
||||
httpLogger log.Logger
|
||||
logOutput io.Writer
|
||||
|
||||
// consulService is Nomad's custom Consul client for managing services
|
||||
// and checks.
|
||||
|
@ -75,14 +79,22 @@ type Agent struct {
|
|||
func NewAgent(config *Config, logOutput io.Writer, inmem *metrics.InmemSink) (*Agent, error) {
|
||||
a := &Agent{
|
||||
config: config,
|
||||
logger: log.New(logOutput, "", log.LstdFlags|log.Lmicroseconds),
|
||||
logOutput: logOutput,
|
||||
shutdownCh: make(chan struct{}),
|
||||
InmemSink: inmem,
|
||||
}
|
||||
|
||||
// Create the loggers
|
||||
a.logger = log.New(&log.LoggerOptions{
|
||||
Name: "agent",
|
||||
Level: log.LevelFromString(config.LogLevel),
|
||||
Output: logOutput,
|
||||
JSONFormat: false, // TODO(alex,hclog) Add a config option
|
||||
})
|
||||
a.httpLogger = a.logger.ResetNamed("http")
|
||||
|
||||
// Global logger should match internal logger as much as possible
|
||||
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
|
||||
golog.SetFlags(golog.LstdFlags | golog.Lmicroseconds)
|
||||
|
||||
if err := a.setupConsul(config.Consul); err != nil {
|
||||
return nil, fmt.Errorf("Failed to initialize Consul client: %v", err)
|
||||
|
@ -338,6 +350,7 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
|
|||
a.config.AdvertiseAddrs.RPC)
|
||||
}
|
||||
|
||||
conf.Logger = a.logger
|
||||
conf.LogOutput = a.logOutput
|
||||
conf.LogLevel = a.config.LogLevel
|
||||
conf.DevMode = a.config.DevMode
|
||||
|
@ -368,8 +381,8 @@ func (a *Agent) clientConfig() (*clientconfig.Config, error) {
|
|||
}
|
||||
}
|
||||
if len(invalidConsulKeys) > 0 {
|
||||
a.logger.Printf("[WARN] agent: Invalid keys: %v", strings.Join(invalidConsulKeys, ","))
|
||||
a.logger.Printf(`Nomad client ignores consul related configuration in client options.
|
||||
a.logger.Warn("invalid consul keys", "keys", strings.Join(invalidConsulKeys, ","))
|
||||
a.logger.Warn(`Nomad client ignores consul related configuration in client options.
|
||||
Please refer to the guide https://www.nomadproject.io/docs/agent/configuration/consul.html
|
||||
to configure Nomad to work with Consul.`)
|
||||
}
|
||||
|
@ -480,7 +493,8 @@ func (a *Agent) setupServer() error {
|
|||
}
|
||||
|
||||
// Create the server
|
||||
server, err := nomad.NewServer(conf, a.consulCatalog, a.logger)
|
||||
// TODO(alex,hclog)
|
||||
server, err := nomad.NewServer(conf, a.consulCatalog, a.logger.ResetNamed("").StandardLogger(&log.StandardLoggerOptions{InferLevels: true}))
|
||||
if err != nil {
|
||||
return fmt.Errorf("server setup failed: %v", err)
|
||||
}
|
||||
|
@ -650,7 +664,7 @@ func (a *Agent) setupClient() error {
|
|||
}
|
||||
}
|
||||
|
||||
client, err := client.NewClient(conf, a.consulCatalog, a.consulService, a.logger)
|
||||
client, err := client.NewClient(conf, a.consulCatalog, a.consulService)
|
||||
if err != nil {
|
||||
return fmt.Errorf("client setup failed: %v", err)
|
||||
}
|
||||
|
@ -702,7 +716,7 @@ func (a *Agent) agentHTTPCheck(server bool) *structs.ServiceCheck {
|
|||
return &check
|
||||
}
|
||||
if a.config.TLSConfig.VerifyHTTPSClient {
|
||||
a.logger.Printf("[WARN] agent: not registering Nomad HTTPS Health Check because verify_https_client enabled")
|
||||
a.logger.Warn("not registering Nomad HTTPS Health Check because verify_https_client enabled")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -788,12 +802,12 @@ func (a *Agent) findLoopbackDevice() (string, string, string, error) {
|
|||
func (a *Agent) Leave() error {
|
||||
if a.client != nil {
|
||||
if err := a.client.Leave(); err != nil {
|
||||
a.logger.Printf("[ERR] agent: client leave failed: %v", err)
|
||||
a.logger.Error("client leave failed", "error", err)
|
||||
}
|
||||
}
|
||||
if a.server != nil {
|
||||
if err := a.server.Leave(); err != nil {
|
||||
a.logger.Printf("[ERR] agent: server leave failed: %v", err)
|
||||
a.logger.Error("server leave failed", "error", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -808,23 +822,23 @@ func (a *Agent) Shutdown() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
a.logger.Println("[INFO] agent: requesting shutdown")
|
||||
a.logger.Info("requesting shutdown")
|
||||
if a.client != nil {
|
||||
if err := a.client.Shutdown(); err != nil {
|
||||
a.logger.Printf("[ERR] agent: client shutdown failed: %v", err)
|
||||
a.logger.Error("client shutdown failed", "error", err)
|
||||
}
|
||||
}
|
||||
if a.server != nil {
|
||||
if err := a.server.Shutdown(); err != nil {
|
||||
a.logger.Printf("[ERR] agent: server shutdown failed: %v", err)
|
||||
a.logger.Error("server shutdown failed", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := a.consulService.Shutdown(); err != nil {
|
||||
a.logger.Printf("[ERR] agent: shutting down Consul client failed: %v", err)
|
||||
a.logger.Error("shutting down Consul client failed", "error", err)
|
||||
}
|
||||
|
||||
a.logger.Println("[INFO] agent: shutdown complete")
|
||||
a.logger.Info("shutdown complete")
|
||||
a.shutdown = true
|
||||
close(a.shutdownCh)
|
||||
return nil
|
||||
|
@ -875,7 +889,7 @@ func (a *Agent) ShouldReload(newConfig *Config) (agent, http bool) {
|
|||
|
||||
isEqual, err := a.config.TLSConfig.CertificateInfoIsEqual(newConfig.TLSConfig)
|
||||
if err != nil {
|
||||
a.logger.Printf("[INFO] agent: error when parsing TLS certificate %v", err)
|
||||
a.logger.Error("parsing TLS certificate", "error", err)
|
||||
return false, false
|
||||
} else if !isEqual {
|
||||
return true, true
|
||||
|
@ -930,9 +944,9 @@ func (a *Agent) Reload(newConfig *Config) error {
|
|||
a.config.TLSConfig = newConfig.TLSConfig.Copy()
|
||||
|
||||
if newConfig.TLSConfig.IsEmpty() {
|
||||
a.logger.Println("[WARN] agent: Downgrading agent's existing TLS configuration to plaintext")
|
||||
a.logger.Warn("downgrading agent's existing TLS configuration to plaintext")
|
||||
} else {
|
||||
a.logger.Println("[INFO] agent: Upgrading from plaintext configuration to TLS")
|
||||
a.logger.Info("upgrading from plaintext configuration to TLS")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -221,9 +221,9 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request)
|
|||
}
|
||||
|
||||
// Set the servers list into the client
|
||||
s.agent.logger.Printf("[TRACE] Adding servers %+q to the client's primary server list", servers)
|
||||
s.agent.logger.Trace("adding servers to the client's primary server list", "servers", servers, "path", "/v1/agent/servers", "method", "PUT")
|
||||
if _, err := client.SetServers(servers); err != nil {
|
||||
s.agent.logger.Printf("[ERR] Attempt to add servers %q to client failed: %v", servers, err)
|
||||
s.agent.logger.Error("failed adding servers to client's server list", "servers", servers, "error", err, "path", "/v1/agent/servers", "method", "PUT")
|
||||
//TODO is this the right error to return?
|
||||
return nil, CodedError(400, err.Error())
|
||||
}
|
||||
|
|
|
@ -333,7 +333,7 @@ func TestAget_Client_TelemetryConfiguration(t *testing.T) {
|
|||
// API health check depending on configuration.
|
||||
func TestAgent_HTTPCheck(t *testing.T) {
|
||||
t.Parallel()
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
agent := func() *Agent {
|
||||
return &Agent{
|
||||
logger: logger,
|
||||
|
@ -414,7 +414,7 @@ func TestAgent_HTTPCheckPath(t *testing.T) {
|
|||
// Agent.agentHTTPCheck only needs a config and logger
|
||||
a := &Agent{
|
||||
config: DevConfig(),
|
||||
logger: testlog.Logger(t),
|
||||
logger: testlog.HCLogger(t),
|
||||
}
|
||||
if err := a.config.normalizeAddrs(); err != nil {
|
||||
t.Fatalf("error normalizing config: %v", err)
|
||||
|
@ -632,7 +632,7 @@ func TestServer_Reload_TLS_WithNilConfiguration(t *testing.T) {
|
|||
t.Parallel()
|
||||
assert := assert.New(t)
|
||||
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
|
||||
agent := &Agent{
|
||||
logger: logger,
|
||||
|
@ -656,7 +656,7 @@ func TestServer_Reload_TLS_UpgradeToTLS(t *testing.T) {
|
|||
dir := tmpDir(t)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
|
||||
agentConfig := &Config{
|
||||
TLSConfig: &sconfig.TLSConfig{},
|
||||
|
@ -698,7 +698,7 @@ func TestServer_Reload_TLS_DowngradeFromTLS(t *testing.T) {
|
|||
dir := tmpDir(t)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
|
||||
agentConfig := &Config{
|
||||
TLSConfig: &sconfig.TLSConfig{
|
||||
|
@ -930,7 +930,7 @@ func TestServer_ShouldReload_ReturnTrueForFileChanges(t *testing.T) {
|
|||
key = "../../helper/tlsutil/testdata/nomad-foo-key.pem"
|
||||
)
|
||||
|
||||
logger := testlog.Logger(t)
|
||||
logger := testlog.HCLogger(t)
|
||||
|
||||
agentConfig := &Config{
|
||||
TLSConfig: &sconfig.TLSConfig{
|
||||
|
|
|
@ -479,7 +479,7 @@ func TestHTTP_AllocSnapshot_Atomic(t *testing.T) {
|
|||
|
||||
// require Snapshot fails
|
||||
if err := allocDir.Snapshot(ioutil.Discard); err != nil {
|
||||
s.logger.Printf("[DEBUG] agent.test: snapshot returned error: %v", err)
|
||||
t.Logf("[DEBUG] agent.test: snapshot returned error: %v", err)
|
||||
} else {
|
||||
t.Errorf("expected Snapshot() to fail but it did not")
|
||||
}
|
||||
|
|
|
@ -570,7 +570,7 @@ func (c *Command) handleRetryJoin(config *Config) error {
|
|||
joiner := retryJoiner{
|
||||
discover: &discover.Discover{},
|
||||
errCh: c.retryJoinErrCh,
|
||||
logger: c.agent.logger,
|
||||
logger: c.agent.logger.Named("joiner"),
|
||||
serverJoin: c.agent.server.Join,
|
||||
serverEnabled: true,
|
||||
}
|
||||
|
@ -593,7 +593,7 @@ func (c *Command) handleRetryJoin(config *Config) error {
|
|||
config.Server.RetryInterval = 0
|
||||
}
|
||||
|
||||
c.agent.logger.Printf("[WARN] agent: Using deprecated retry_join fields. Upgrade configuration to use server_join")
|
||||
c.agent.logger.Warn("using deprecated retry_join fields. Upgrade configuration to use server_join")
|
||||
}
|
||||
|
||||
if config.Server.Enabled &&
|
||||
|
@ -603,7 +603,7 @@ func (c *Command) handleRetryJoin(config *Config) error {
|
|||
joiner := retryJoiner{
|
||||
discover: &discover.Discover{},
|
||||
errCh: c.retryJoinErrCh,
|
||||
logger: c.agent.logger,
|
||||
logger: c.agent.logger.Named("joiner"),
|
||||
serverJoin: c.agent.server.Join,
|
||||
serverEnabled: true,
|
||||
}
|
||||
|
@ -621,7 +621,7 @@ func (c *Command) handleRetryJoin(config *Config) error {
|
|||
joiner := retryJoiner{
|
||||
discover: &discover.Discover{},
|
||||
errCh: c.retryJoinErrCh,
|
||||
logger: c.agent.logger,
|
||||
logger: c.agent.logger.Named("joiner"),
|
||||
clientJoin: c.agent.client.SetServers,
|
||||
clientEnabled: true,
|
||||
}
|
||||
|
@ -704,7 +704,7 @@ WAIT:
|
|||
// reloadHTTPServer shuts down the existing HTTP server and restarts it. This
|
||||
// is helpful when reloading the agent configuration.
|
||||
func (c *Command) reloadHTTPServer() error {
|
||||
c.agent.logger.Println("[INFO] agent: Reloading HTTP server with new TLS configuration")
|
||||
c.agent.logger.Info("reloading HTTP server with new TLS configuration")
|
||||
|
||||
c.httpServer.Shutdown()
|
||||
|
||||
|
@ -741,23 +741,23 @@ func (c *Command) handleReload() {
|
|||
|
||||
shouldReloadAgent, shouldReloadHTTP := c.agent.ShouldReload(newConf)
|
||||
if shouldReloadAgent {
|
||||
c.agent.logger.Printf("[DEBUG] agent: starting reload of agent config")
|
||||
c.agent.logger.Debug("starting reload of agent config")
|
||||
err := c.agent.Reload(newConf)
|
||||
if err != nil {
|
||||
c.agent.logger.Printf("[ERR] agent: failed to reload the config: %v", err)
|
||||
c.agent.logger.Error("failed to reload the config", "error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if s := c.agent.Server(); s != nil {
|
||||
c.agent.logger.Printf("[DEBUG] agent: starting reload of server config")
|
||||
c.agent.logger.Debug("starting reload of server config")
|
||||
sconf, err := convertServerConfig(newConf, c.logOutput)
|
||||
if err != nil {
|
||||
c.agent.logger.Printf("[ERR] agent: failed to convert server config: %v", err)
|
||||
c.agent.logger.Error("failed to convert server config", "error", err)
|
||||
return
|
||||
} else {
|
||||
if err := s.Reload(sconf); err != nil {
|
||||
c.agent.logger.Printf("[ERR] agent: reloading server config failed: %v", err)
|
||||
c.agent.logger.Error("reloading server config failed", "error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -765,13 +765,13 @@ func (c *Command) handleReload() {
|
|||
|
||||
if s := c.agent.Client(); s != nil {
|
||||
clientConfig, err := c.agent.clientConfig()
|
||||
c.agent.logger.Printf("[DEBUG] agent: starting reload of client config")
|
||||
c.agent.logger.Debug("starting reload of client config")
|
||||
if err != nil {
|
||||
c.agent.logger.Printf("[ERR] agent: reloading client config failed: %v", err)
|
||||
c.agent.logger.Error("reloading client config failed", "error", err)
|
||||
return
|
||||
}
|
||||
if err := c.agent.Client().Reload(clientConfig); err != nil {
|
||||
c.agent.logger.Printf("[ERR] agent: reloading client config failed: %v", err)
|
||||
c.agent.logger.Error("reloading client config failed", "error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -783,7 +783,7 @@ func (c *Command) handleReload() {
|
|||
if shouldReloadHTTP {
|
||||
err := c.reloadHTTPServer()
|
||||
if err != nil {
|
||||
c.agent.logger.Printf("[ERR] http: failed to reload the config: %v", err)
|
||||
c.agent.httpLogger.Error("reloading config failed", "error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,29 +2,31 @@ package consul
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
// MockCatalog can be used for testing where the CatalogAPI is needed.
|
||||
type MockCatalog struct {
|
||||
logger *log.Logger
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func NewMockCatalog(l *log.Logger) *MockCatalog {
|
||||
func NewMockCatalog(l log.Logger) *MockCatalog {
|
||||
l = l.Named("mock_consul")
|
||||
return &MockCatalog{logger: l}
|
||||
}
|
||||
|
||||
func (m *MockCatalog) Datacenters() ([]string, error) {
|
||||
dcs := []string{"dc1"}
|
||||
m.logger.Printf("[DEBUG] mock_consul: Datacenters() -> (%q, nil)", dcs)
|
||||
m.logger.Trace("Datacenters()", "dcs", dcs, "error", "nil")
|
||||
return dcs, nil
|
||||
}
|
||||
|
||||
func (m *MockCatalog) Service(service, tag string, q *api.QueryOptions) ([]*api.CatalogService, *api.QueryMeta, error) {
|
||||
m.logger.Printf("[DEBUG] mock_consul: Service(%q, %q, %#v) -> (nil, nil, nil)", service, tag, q)
|
||||
m.logger.Trace("Services()", "service", service, "tag", tag, "query_options", q)
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -3,9 +3,10 @@ package consul
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
@ -50,7 +51,7 @@ type checkRestart struct {
|
|||
// checks should be counted.
|
||||
graceUntil time.Time
|
||||
|
||||
logger *log.Logger
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// apply restart state for check and restart task if necessary. Current
|
||||
|
@ -62,8 +63,7 @@ type checkRestart struct {
|
|||
func (c *checkRestart) apply(now time.Time, status string) bool {
|
||||
healthy := func() {
|
||||
if !c.unhealthyState.IsZero() {
|
||||
c.logger.Printf("[DEBUG] consul.health: alloc %q task %q check %q became healthy; canceling restart",
|
||||
c.allocID, c.taskName, c.checkName)
|
||||
c.logger.Debug("canceling restart because check became healthy")
|
||||
c.unhealthyState = time.Time{}
|
||||
}
|
||||
}
|
||||
|
@ -89,8 +89,7 @@ func (c *checkRestart) apply(now time.Time, status string) bool {
|
|||
if c.unhealthyState.IsZero() {
|
||||
// First failure, set restart deadline
|
||||
if c.timeLimit != 0 {
|
||||
c.logger.Printf("[DEBUG] consul.health: alloc %q task %q check %q became unhealthy. Restarting in %s if not healthy",
|
||||
c.allocID, c.taskName, c.checkName, c.timeLimit)
|
||||
c.logger.Debug("check became unhealthy. Will restart if check doesn't become healthy", "time_limit", c.timeLimit)
|
||||
}
|
||||
c.unhealthyState = now
|
||||
}
|
||||
|
@ -101,7 +100,7 @@ func (c *checkRestart) apply(now time.Time, status string) bool {
|
|||
// Must test >= because if limit=1, restartAt == first failure
|
||||
if now.Equal(restartAt) || now.After(restartAt) {
|
||||
// hasn't become healthy by deadline, restart!
|
||||
c.logger.Printf("[DEBUG] consul.health: restarting alloc %q task %q due to unhealthy check %q", c.allocID, c.taskName, c.checkName)
|
||||
c.logger.Debug("restarting due to unhealthy check")
|
||||
|
||||
// Tell TaskRunner to restart due to failure
|
||||
const failure = true
|
||||
|
@ -139,17 +138,17 @@ type checkWatcher struct {
|
|||
// squelch repeated error messages.
|
||||
lastErr bool
|
||||
|
||||
logger *log.Logger
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// newCheckWatcher creates a new checkWatcher but does not call its Run method.
|
||||
func newCheckWatcher(logger *log.Logger, consul ChecksAPI) *checkWatcher {
|
||||
func newCheckWatcher(logger log.Logger, consul ChecksAPI) *checkWatcher {
|
||||
return &checkWatcher{
|
||||
consul: consul,
|
||||
pollFreq: defaultPollFreq,
|
||||
checkUpdateCh: make(chan checkWatchUpdate, 8),
|
||||
done: make(chan struct{}),
|
||||
logger: logger,
|
||||
logger: logger.ResetNamed("consul.health"),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -193,8 +192,8 @@ func (w *checkWatcher) Run(ctx context.Context) {
|
|||
|
||||
// Add/update a check
|
||||
checks[update.checkID] = update.checkRestart
|
||||
w.logger.Printf("[DEBUG] consul.health: watching alloc %q task %q check %q",
|
||||
update.checkRestart.allocID, update.checkRestart.taskName, update.checkRestart.checkName)
|
||||
w.logger.Debug("watching check", "alloc_id", update.checkRestart.allocID,
|
||||
"task", update.checkRestart.taskName, "check", update.checkRestart.checkName)
|
||||
|
||||
// if first check was added make sure polling is enabled
|
||||
if len(checks) == 1 {
|
||||
|
@ -215,7 +214,7 @@ func (w *checkWatcher) Run(ctx context.Context) {
|
|||
if err != nil {
|
||||
if !w.lastErr {
|
||||
w.lastErr = true
|
||||
w.logger.Printf("[ERR] consul.health: error retrieving health checks: %q", err)
|
||||
w.logger.Error("failed retrieving health checks", "error", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -239,7 +238,7 @@ func (w *checkWatcher) Run(ctx context.Context) {
|
|||
if !ok {
|
||||
// Only warn if outside grace period to avoid races with check registration
|
||||
if now.After(check.graceUntil) {
|
||||
w.logger.Printf("[WARN] consul.health: watched check %q (%s) not found in Consul", check.checkName, cid)
|
||||
w.logger.Warn("watched check not found in Consul", "check", check.checkName, "check_id", cid)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -286,7 +285,7 @@ func (w *checkWatcher) Watch(allocID, taskName, checkID string, check *structs.S
|
|||
graceUntil: time.Now().Add(check.CheckRestart.Grace),
|
||||
timeLimit: check.Interval * time.Duration(check.CheckRestart.Limit-1),
|
||||
ignoreWarnings: check.CheckRestart.IgnoreWarnings,
|
||||
logger: w.logger,
|
||||
logger: w.logger.With("alloc_id", allocID, "task", taskName, "check", check.Name),
|
||||
}
|
||||
|
||||
update := checkWatchUpdate{
|
||||
|
|
|
@ -116,7 +116,7 @@ func (c *fakeChecksAPI) Checks() (map[string]*api.AgentCheck, error) {
|
|||
// logger and faster poll frequency.
|
||||
func testWatcherSetup(t *testing.T) (*fakeChecksAPI, *checkWatcher) {
|
||||
fakeAPI := newFakeChecksAPI()
|
||||
cw := newCheckWatcher(testlog.Logger(t), fakeAPI)
|
||||
cw := newCheckWatcher(testlog.HCLogger(t), fakeAPI)
|
||||
cw.pollFreq = 10 * time.Millisecond
|
||||
return fakeAPI, cw
|
||||
}
|
||||
|
@ -142,7 +142,7 @@ func TestCheckWatcher_Skip(t *testing.T) {
|
|||
check := testCheck()
|
||||
check.CheckRestart = nil
|
||||
|
||||
cw := newCheckWatcher(testlog.Logger(t), newFakeChecksAPI())
|
||||
cw := newCheckWatcher(testlog.HCLogger(t), newFakeChecksAPI())
|
||||
restarter1 := newFakeCheckRestarter(cw, "testalloc1", "testtask1", "testcheck1", check)
|
||||
cw.Watch("testalloc1", "testtask1", "testcheck1", check, restarter1)
|
||||
|
||||
|
|
|
@ -3,7 +3,6 @@ package consul
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
@ -13,8 +12,10 @@ import (
|
|||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/api"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
@ -193,7 +194,7 @@ func (s *ServiceRegistration) copy() *ServiceRegistration {
|
|||
// ServiceClient handles task and agent service registration with Consul.
|
||||
type ServiceClient struct {
|
||||
client AgentAPI
|
||||
logger *log.Logger
|
||||
logger log.Logger
|
||||
retryInterval time.Duration
|
||||
maxRetryInterval time.Duration
|
||||
periodicInterval time.Duration
|
||||
|
@ -242,7 +243,8 @@ type ServiceClient struct {
|
|||
// Client, logger and takes whether the client is being used by a Nomad Client agent.
|
||||
// When being used by a Nomad client, this Consul client reconciles all services and
|
||||
// checks created by Nomad on behalf of running tasks.
|
||||
func NewServiceClient(consulClient AgentAPI, logger *log.Logger, isNomadClient bool) *ServiceClient {
|
||||
func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bool) *ServiceClient {
|
||||
logger = logger.ResetNamed("consul.sync")
|
||||
return &ServiceClient{
|
||||
client: consulClient,
|
||||
logger: logger,
|
||||
|
@ -306,7 +308,7 @@ INIT:
|
|||
c.merge(ops)
|
||||
}
|
||||
}
|
||||
c.logger.Printf("[TRACE] consul.sync: able to contact Consul")
|
||||
c.logger.Trace("able to contact Consul")
|
||||
|
||||
// Block until contact with Consul has been established
|
||||
// Start checkWatcher
|
||||
|
@ -329,10 +331,10 @@ INIT:
|
|||
if err := c.sync(); err != nil {
|
||||
if failures == 0 {
|
||||
// Log on the first failure
|
||||
c.logger.Printf("[WARN] consul.sync: failed to update services in Consul: %v", err)
|
||||
c.logger.Warn("failed to update services in Consul", "error", err)
|
||||
} else if failures%10 == 0 {
|
||||
// Log every 10th consecutive failure
|
||||
c.logger.Printf("[ERR] consul.sync: still unable to update services in Consul after %d failures; latest error: %v", failures, err)
|
||||
c.logger.Error("still unable to update services in Consul", "failures", failures, "error", err)
|
||||
}
|
||||
|
||||
failures++
|
||||
|
@ -352,7 +354,7 @@ INIT:
|
|||
retryTimer.Reset(backoff)
|
||||
} else {
|
||||
if failures > 0 {
|
||||
c.logger.Printf("[INFO] consul.sync: successfully updated services in Consul")
|
||||
c.logger.Info("successfully updated services in Consul")
|
||||
failures = 0
|
||||
}
|
||||
|
||||
|
@ -531,8 +533,8 @@ func (c *ServiceClient) sync() error {
|
|||
}
|
||||
}
|
||||
|
||||
c.logger.Printf("[DEBUG] consul.sync: registered %d services, %d checks; deregistered %d services, %d checks",
|
||||
sreg, creg, sdereg, cdereg)
|
||||
c.logger.Debug("sync complete", "registed_services", sreg, "deregistered_services", sdereg,
|
||||
"registered_checks", creg, "deregistered_checks", cdereg)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -999,12 +1001,12 @@ func (c *ServiceClient) Shutdown() error {
|
|||
// deadline was reached
|
||||
for id := range c.agentServices {
|
||||
if err := c.client.ServiceDeregister(id); err != nil {
|
||||
c.logger.Printf("[ERR] consul.sync: error deregistering agent service (id: %q): %v", id, err)
|
||||
c.logger.Error("failed deregistering agent service", "service_id", id, "error", err)
|
||||
}
|
||||
}
|
||||
for id := range c.agentChecks {
|
||||
if err := c.client.CheckDeregister(id); err != nil {
|
||||
c.logger.Printf("[ERR] consul.sync: error deregistering agent service (id: %q): %v", id, err)
|
||||
c.logger.Error("failed deregistering agent check", "check_id", id, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -135,7 +135,7 @@ func TestConsul_Integration(t *testing.T) {
|
|||
consulClient, err := consulapi.NewClient(consulConfig)
|
||||
assert.Nil(err)
|
||||
|
||||
serviceClient := consul.NewServiceClient(consulClient.Agent(), logger, true)
|
||||
serviceClient := consul.NewServiceClient(consulClient.Agent(), testlog.HCLogger(t), true)
|
||||
defer serviceClient.Shutdown() // just-in-case cleanup
|
||||
consulRan := make(chan struct{})
|
||||
go func() {
|
||||
|
|
|
@ -2,10 +2,11 @@ package consul
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/nomad/client/driver"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
@ -44,16 +45,17 @@ type scriptCheck struct {
|
|||
// lastCheckOk is true if the last check was ok; otherwise false
|
||||
lastCheckOk bool
|
||||
|
||||
logger *log.Logger
|
||||
logger log.Logger
|
||||
shutdownCh <-chan struct{}
|
||||
}
|
||||
|
||||
// newScriptCheck creates a new scriptCheck. run() should be called once the
|
||||
// initial check is registered with Consul.
|
||||
func newScriptCheck(allocID, taskName, checkID string, check *structs.ServiceCheck,
|
||||
exec driver.ScriptExecutor, agent heartbeater, logger *log.Logger,
|
||||
exec driver.ScriptExecutor, agent heartbeater, logger log.Logger,
|
||||
shutdownCh <-chan struct{}) *scriptCheck {
|
||||
|
||||
logger = logger.ResetNamed("consul.checks").With("task", taskName, "alloc_id", allocID, "check", check.Name)
|
||||
return &scriptCheck{
|
||||
allocID: allocID,
|
||||
taskName: taskName,
|
||||
|
@ -108,8 +110,7 @@ func (s *scriptCheck) run() *scriptHandle {
|
|||
// Log deadline exceeded every time as it's a
|
||||
// distinct issue from checks returning
|
||||
// failures
|
||||
s.logger.Printf("[WARN] consul.checks: check %q for task %q alloc %q timed out (%s)",
|
||||
s.check.Name, s.taskName, s.allocID, s.check.Timeout)
|
||||
s.logger.Warn("check timed out", "timeout", s.check.Timeout)
|
||||
}
|
||||
|
||||
// cleanup context
|
||||
|
@ -143,18 +144,15 @@ func (s *scriptCheck) run() *scriptHandle {
|
|||
if err != nil {
|
||||
if s.lastCheckOk {
|
||||
s.lastCheckOk = false
|
||||
s.logger.Printf("[WARN] consul.checks: update for task %q alloc %q check %q failed: %v",
|
||||
s.taskName, s.allocID, s.check.Name, err)
|
||||
s.logger.Warn("updating check failed", "error", err)
|
||||
} else {
|
||||
s.logger.Printf("[DEBUG] consul.checks: update for task %q alloc %q check %q still failing: %v",
|
||||
s.taskName, s.allocID, s.check.Name, err)
|
||||
s.logger.Debug("updating check still failing", "error", err)
|
||||
}
|
||||
|
||||
} else if !s.lastCheckOk {
|
||||
// Succeeded for the first time or after failing; log
|
||||
s.lastCheckOk = true
|
||||
s.logger.Printf("[INFO] consul.checks: update for task %q alloc %q check %q succeeded",
|
||||
s.taskName, s.allocID, s.check.Name)
|
||||
s.logger.Info("updating check succeeded")
|
||||
}
|
||||
|
||||
select {
|
||||
|
|
|
@ -60,7 +60,7 @@ func TestConsulScript_Exec_Cancel(t *testing.T) {
|
|||
exec := newBlockingScriptExec()
|
||||
|
||||
// pass nil for heartbeater as it shouldn't be called
|
||||
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.Logger(t), nil)
|
||||
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.HCLogger(t), nil)
|
||||
handle := check.run()
|
||||
|
||||
// wait until Exec is called
|
||||
|
@ -112,7 +112,7 @@ func TestConsulScript_Exec_Timeout(t *testing.T) {
|
|||
exec := newBlockingScriptExec()
|
||||
|
||||
hb := newFakeHeartbeater()
|
||||
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), nil)
|
||||
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), nil)
|
||||
handle := check.run()
|
||||
defer handle.cancel() // just-in-case cleanup
|
||||
<-exec.running
|
||||
|
@ -161,7 +161,7 @@ func TestConsulScript_Exec_TimeoutCritical(t *testing.T) {
|
|||
Timeout: time.Nanosecond,
|
||||
}
|
||||
hb := newFakeHeartbeater()
|
||||
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.Logger(t), nil)
|
||||
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.HCLogger(t), nil)
|
||||
handle := check.run()
|
||||
defer handle.cancel() // just-in-case cleanup
|
||||
|
||||
|
@ -206,7 +206,7 @@ func TestConsulScript_Exec_Shutdown(t *testing.T) {
|
|||
hb := newFakeHeartbeater()
|
||||
shutdown := make(chan struct{})
|
||||
exec := newSimpleExec(0, nil)
|
||||
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown)
|
||||
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), shutdown)
|
||||
handle := check.run()
|
||||
defer handle.cancel() // just-in-case cleanup
|
||||
|
||||
|
@ -243,7 +243,7 @@ func TestConsulScript_Exec_Codes(t *testing.T) {
|
|||
hb := newFakeHeartbeater()
|
||||
shutdown := make(chan struct{})
|
||||
exec := newSimpleExec(code, err)
|
||||
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.Logger(t), shutdown)
|
||||
check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), shutdown)
|
||||
handle := check.run()
|
||||
defer handle.cancel()
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ func setupFake(t *testing.T) *testFakeCtx {
|
|||
fc := NewMockAgent()
|
||||
tt := testTask()
|
||||
return &testFakeCtx{
|
||||
ServiceClient: NewServiceClient(fc, testlog.Logger(t), true),
|
||||
ServiceClient: NewServiceClient(fc, testlog.HCLogger(t), true),
|
||||
FakeConsul: fc,
|
||||
Task: tt,
|
||||
MockExec: tt.DriverExec.(*mockExec),
|
||||
|
|
|
@ -2,16 +2,16 @@ package consul
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
version "github.com/hashicorp/go-version"
|
||||
)
|
||||
|
||||
// checkConsulTLSSkipVerify logs if Consul does not support TLSSkipVerify on
|
||||
// checks and is intended to be run in a goroutine.
|
||||
func checkConsulTLSSkipVerify(ctx context.Context, logger *log.Logger, client AgentAPI, done chan struct{}) {
|
||||
func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client AgentAPI, done chan struct{}) {
|
||||
const (
|
||||
baseline = time.Second
|
||||
limit = 20 * time.Second
|
||||
|
@ -24,10 +24,10 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger *log.Logger, client Ag
|
|||
self, err := client.Self()
|
||||
if err == nil {
|
||||
if supportsTLSSkipVerify(self) {
|
||||
logger.Printf("[TRACE] consul.sync: supports TLSSkipVerify")
|
||||
logger.Trace("Consul supports TLSSkipVerify")
|
||||
} else {
|
||||
logger.Printf("[WARN] consul.sync: Consul does NOT support TLSSkipVerify; please upgrade to Consul %s or newer",
|
||||
consulTLSSkipVerifyMinVersion)
|
||||
logger.Warn("Consul does NOT support TLSSkipVerify; please upgrade Consul",
|
||||
"min_version", consulTLSSkipVerifyMinVersion)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
|
@ -14,8 +13,10 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/NYTimes/gziphandler"
|
||||
assetfs "github.com/elazarl/go-bindata-assetfs"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/NYTimes/gziphandler"
|
||||
"github.com/hashicorp/nomad/helper/tlsutil"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/rs/cors"
|
||||
|
@ -52,7 +53,7 @@ type HTTPServer struct {
|
|||
mux *http.ServeMux
|
||||
listener net.Listener
|
||||
listenerCh chan struct{}
|
||||
logger *log.Logger
|
||||
logger log.Logger
|
||||
Addr string
|
||||
}
|
||||
|
||||
|
@ -91,7 +92,7 @@ func NewHTTPServer(agent *Agent, config *Config) (*HTTPServer, error) {
|
|||
mux: mux,
|
||||
listener: ln,
|
||||
listenerCh: make(chan struct{}),
|
||||
logger: agent.logger,
|
||||
logger: agent.httpLogger,
|
||||
Addr: ln.Addr().String(),
|
||||
}
|
||||
srv.registerHandlers(config.EnableDebug)
|
||||
|
@ -130,7 +131,7 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
|
|||
// Shutdown is used to shutdown the HTTP server
|
||||
func (s *HTTPServer) Shutdown() {
|
||||
if s != nil {
|
||||
s.logger.Printf("[DEBUG] http: Shutting down http server")
|
||||
s.logger.Debug("shutting down http server")
|
||||
s.listener.Close()
|
||||
<-s.listenerCh // block until http.Serve has returned.
|
||||
}
|
||||
|
@ -278,14 +279,13 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
|
|||
reqURL := req.URL.String()
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
s.logger.Printf("[DEBUG] http: Request %v %v (%v)", req.Method, reqURL, time.Now().Sub(start))
|
||||
s.logger.Debug("request complete", "method", req.Method, "path", reqURL, "duration", time.Now().Sub(start))
|
||||
}()
|
||||
obj, err := handler(resp, req)
|
||||
|
||||
// Check for an error
|
||||
HAS_ERR:
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] http: Request %v, error: %v", reqURL, err)
|
||||
code := 500
|
||||
errMsg := err.Error()
|
||||
if http, ok := err.(HTTPCodedError); ok {
|
||||
|
@ -303,6 +303,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
|
|||
|
||||
resp.WriteHeader(code)
|
||||
resp.Write([]byte(errMsg))
|
||||
s.logger.Error("request failed", "method", req.Method, "path", reqURL, "error", err, "code", code)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -2,11 +2,18 @@ package agent
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
var (
|
||||
// Only create the prometheus handler once
|
||||
promHandler http.Handler
|
||||
promOnce sync.Once
|
||||
)
|
||||
|
||||
// MetricsRequest returns metrics for the agent. Metrics are JSON by default
|
||||
// but Prometheus is an optional format.
|
||||
func (s *HTTPServer) MetricsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
|
@ -15,16 +22,22 @@ func (s *HTTPServer) MetricsRequest(resp http.ResponseWriter, req *http.Request)
|
|||
}
|
||||
|
||||
if format := req.URL.Query().Get("format"); format == "prometheus" {
|
||||
handlerOptions := promhttp.HandlerOpts{
|
||||
ErrorLog: s.logger,
|
||||
ErrorHandling: promhttp.ContinueOnError,
|
||||
DisableCompression: true,
|
||||
}
|
||||
|
||||
handler := promhttp.HandlerFor(prometheus.DefaultGatherer, handlerOptions)
|
||||
handler.ServeHTTP(resp, req)
|
||||
s.prometheusHandler().ServeHTTP(resp, req)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return s.agent.InmemSink.DisplayMetrics(resp, req)
|
||||
}
|
||||
|
||||
func (s *HTTPServer) prometheusHandler() http.Handler {
|
||||
promOnce.Do(func() {
|
||||
handlerOptions := promhttp.HandlerOpts{
|
||||
ErrorLog: s.logger.Named("prometheus_handler").StandardLogger(nil),
|
||||
ErrorHandling: promhttp.ContinueOnError,
|
||||
DisableCompression: true,
|
||||
}
|
||||
|
||||
promHandler = promhttp.HandlerFor(prometheus.DefaultGatherer, handlerOptions)
|
||||
})
|
||||
return promHandler
|
||||
}
|
||||
|
|
|
@ -2,9 +2,12 @@ package agent
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
golog "log"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
// DiscoverInterface is an interface for the Discover type in the go-discover
|
||||
|
@ -15,7 +18,7 @@ type DiscoverInterface interface {
|
|||
// The config string must have the format 'provider=xxx key=val key=val ...'
|
||||
// where the keys and values are provider specific. The values are URL
|
||||
// encoded.
|
||||
Addrs(string, *log.Logger) ([]string, error)
|
||||
Addrs(string, *golog.Logger) ([]string, error)
|
||||
|
||||
// Help describes the format of the configuration string for address
|
||||
// discovery and the various provider specific options.
|
||||
|
@ -48,8 +51,8 @@ type retryJoiner struct {
|
|||
// limit has been reached
|
||||
errCh chan struct{}
|
||||
|
||||
// logger is the agent logger.
|
||||
logger *log.Logger
|
||||
// logger is the retry joiners logger
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// Validate ensures that the configuration passes validity checks for the
|
||||
|
@ -100,8 +103,9 @@ func (r *retryJoiner) RetryJoin(serverJoin *ServerJoin) {
|
|||
attempt := 0
|
||||
|
||||
addrsToJoin := strings.Join(serverJoin.RetryJoin, " ")
|
||||
r.logger.Printf("[INFO] agent: Joining cluster... %s", addrsToJoin)
|
||||
r.logger.Info("starting retry join", "servers", addrsToJoin)
|
||||
|
||||
standardLogger := r.logger.StandardLogger(&log.StandardLoggerOptions{InferLevels: true})
|
||||
for {
|
||||
var addrs []string
|
||||
var n int
|
||||
|
@ -110,9 +114,9 @@ func (r *retryJoiner) RetryJoin(serverJoin *ServerJoin) {
|
|||
for _, addr := range serverJoin.RetryJoin {
|
||||
switch {
|
||||
case strings.HasPrefix(addr, "provider="):
|
||||
servers, err := r.discover.Addrs(addr, r.logger)
|
||||
servers, err := r.discover.Addrs(addr, standardLogger)
|
||||
if err != nil {
|
||||
r.logger.Printf("[ERR] agent: Join error %s", err)
|
||||
r.logger.Error("determining join addresses failed", "error", err)
|
||||
} else {
|
||||
addrs = append(addrs, servers...)
|
||||
}
|
||||
|
@ -125,14 +129,14 @@ func (r *retryJoiner) RetryJoin(serverJoin *ServerJoin) {
|
|||
if r.serverEnabled && r.serverJoin != nil {
|
||||
n, err = r.serverJoin(addrs)
|
||||
if err == nil {
|
||||
r.logger.Printf("[INFO] agent: Join completed. Server synced with %d initial servers", n)
|
||||
r.logger.Info("retry join completed", "initial_servers", n, "agent_mode", "server")
|
||||
return
|
||||
}
|
||||
}
|
||||
if r.clientEnabled && r.clientJoin != nil {
|
||||
n, err = r.clientJoin(addrs)
|
||||
if err == nil {
|
||||
r.logger.Printf("[INFO] agent: Join completed. Client synced with %d initial servers", n)
|
||||
r.logger.Info("retry join completed", "initial_servers", n, "agent_mode", "client")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -140,14 +144,13 @@ func (r *retryJoiner) RetryJoin(serverJoin *ServerJoin) {
|
|||
|
||||
attempt++
|
||||
if serverJoin.RetryMaxAttempts > 0 && attempt > serverJoin.RetryMaxAttempts {
|
||||
r.logger.Printf("[ERR] agent: max join retry exhausted, exiting")
|
||||
r.logger.Error("max join retry exhausted, exiting")
|
||||
close(r.errCh)
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
r.logger.Printf("[WARN] agent: Join failed: %q, retrying in %v", err,
|
||||
serverJoin.RetryInterval)
|
||||
r.logger.Warn("join failed", "error", err, "retry", serverJoin.RetryInterval)
|
||||
}
|
||||
time.Sleep(serverJoin.RetryInterval)
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ func TestRetryJoin_Server_NonCloud(t *testing.T) {
|
|||
discover: &MockDiscover{},
|
||||
serverJoin: mockJoin,
|
||||
serverEnabled: true,
|
||||
logger: testlog.Logger(t),
|
||||
logger: testlog.HCLogger(t),
|
||||
errCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
|
@ -123,7 +123,7 @@ func TestRetryJoin_Server_Cloud(t *testing.T) {
|
|||
discover: mockDiscover,
|
||||
serverJoin: mockJoin,
|
||||
serverEnabled: true,
|
||||
logger: testlog.Logger(t),
|
||||
logger: testlog.HCLogger(t),
|
||||
errCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
|
@ -155,7 +155,7 @@ func TestRetryJoin_Server_MixedProvider(t *testing.T) {
|
|||
discover: mockDiscover,
|
||||
serverJoin: mockJoin,
|
||||
serverEnabled: true,
|
||||
logger: testlog.Logger(t),
|
||||
logger: testlog.HCLogger(t),
|
||||
errCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
|
@ -186,7 +186,7 @@ func TestRetryJoin_Client(t *testing.T) {
|
|||
discover: &MockDiscover{},
|
||||
clientJoin: mockJoin,
|
||||
clientEnabled: true,
|
||||
logger: testlog.Logger(t),
|
||||
logger: testlog.HCLogger(t),
|
||||
errCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
"io"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
// UseStdout returns true if NOMAD_TEST_STDOUT=1 and sends logs to stdout.
|
||||
|
@ -52,7 +54,18 @@ func WithPrefix(t LogPrinter, prefix string) *log.Logger {
|
|||
return New(t, prefix, log.Lmicroseconds)
|
||||
}
|
||||
|
||||
// NewLog logger with "TEST" prefix and the Lmicroseconds flag.
|
||||
// Logger returns a new test logger with the Lmicroseconds flag set and no
|
||||
// prefix.
|
||||
func Logger(t LogPrinter) *log.Logger {
|
||||
return WithPrefix(t, "")
|
||||
}
|
||||
|
||||
//HCLogger returns a new test hc-logger.
|
||||
func HCLogger(t LogPrinter) hclog.Logger {
|
||||
opts := &hclog.LoggerOptions{
|
||||
Level: hclog.Trace,
|
||||
Output: NewWriter(t),
|
||||
IncludeLocation: true,
|
||||
}
|
||||
return hclog.New(opts)
|
||||
}
|
||||
|
|
|
@ -77,7 +77,8 @@ func TestServer(t testing.T, cb func(*Config)) *Server {
|
|||
config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap
|
||||
|
||||
logger := testlog.WithPrefix(t, fmt.Sprintf("[%s] ", config.NodeName))
|
||||
catalog := consul.NewMockCatalog(logger)
|
||||
hclogger := testlog.HCLogger(t)
|
||||
catalog := consul.NewMockCatalog(hclogger)
|
||||
|
||||
for i := 10; i >= 0; i-- {
|
||||
// Get random ports
|
||||
|
|
Loading…
Reference in New Issue