diff --git a/command/agent/agent.go b/command/agent/agent.go new file mode 100644 index 000000000..7bb1c3157 --- /dev/null +++ b/command/agent/agent.go @@ -0,0 +1,12 @@ +package agent + +type Agent struct { +} + +func (a *Agent) Leave() error { + return nil +} + +func (a *Agent) Shutdown() error { + return nil +} diff --git a/command/agent/command.go b/command/agent/command.go index 8b923941d..3c1212fca 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -3,14 +3,23 @@ package agent import ( "flag" "fmt" + "io" + "os" + "os/signal" "strings" + "syscall" "time" "github.com/armon/go-metrics" + "github.com/hashicorp/go-syslog" + "github.com/hashicorp/logutils" "github.com/hashicorp/vault/helper/flag-slice" "github.com/mitchellh/cli" ) +// gracefulTimeout controls how long we wait before forcefully terminating +const gracefulTimeout = 5 * time.Second + // Command is a Command implementation that runs a Consul agent. // The command will not end unless a shutdown message is sent on the // ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly @@ -22,7 +31,10 @@ type Command struct { Ui cli.Ui ShutdownCh <-chan struct{} - args []string + args []string + agent *Agent + logFilter *logutils.LevelFilter + logOutput io.Writer } func (c *Command) readConfig() *Config { @@ -42,6 +54,8 @@ func (c *Command) readConfig() *Config { var config *Config if dev { config = DevConfig() + } else { + config = DefaultConfig() } for _, path := range configPath { current, err := LoadConfig(path) @@ -61,6 +75,48 @@ func (c *Command) readConfig() *Config { return config } +// setupLoggers is used to setup the logGate, logWriter, and our logOutput +func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Writer) { + // Setup logging. First create the gated log writer, which will + // store logs until we're ready to show them. Then create the level + // filter, filtering logs of the specified level. + logGate := &GatedWriter{ + Writer: &cli.UiWriter{Ui: c.Ui}, + } + + c.logFilter = LevelFilter() + c.logFilter.MinLevel = logutils.LogLevel(strings.ToUpper(config.LogLevel)) + c.logFilter.Writer = logGate + if !ValidateLevelFilter(c.logFilter.MinLevel, c.logFilter) { + c.Ui.Error(fmt.Sprintf( + "Invalid log level: %s. Valid log levels are: %v", + c.logFilter.MinLevel, c.logFilter.Levels)) + return nil, nil, nil + } + + // Check if syslog is enabled + var syslog io.Writer + if config.EnableSyslog { + l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, config.SyslogFacility, "consul") + if err != nil { + c.Ui.Error(fmt.Sprintf("Syslog setup failed: %v", err)) + return nil, nil, nil + } + syslog = &SyslogWrapper{l, c.logFilter} + } + + // Create a log writer, and wrap a logOutput around it + logWriter := NewLogWriter(512) + var logOutput io.Writer + if syslog != nil { + logOutput = io.MultiWriter(c.logFilter, logWriter, syslog) + } else { + logOutput = io.MultiWriter(c.logFilter, logWriter) + } + c.logOutput = logOutput + return logGate, logWriter, logOutput +} + func (c *Command) Run(args []string) int { c.Ui = &cli.PrefixedUi{ OutputPrefix: "==> ", @@ -76,13 +132,111 @@ func (c *Command) Run(args []string) int { return 1 } + // Setup the log outputs + logGate, _, _ := c.setupLoggers(config) + if logGate == nil { + return 1 + } + // Initialize the telemetry if err := c.setupTelementry(config); err != nil { c.Ui.Error(fmt.Sprintf("Error initializing telemetry: %s", err)) return 1 } - return 0 + // Let the user know things are running + c.Ui.Output("Nomad agent running!") + + // Enable log streaming + c.Ui.Info("") + c.Ui.Output("Log data will now stream in as it occurs:\n") + logGate.Flush() + + // Wait for exit + return c.handleSignals(config) +} + +// handleSignals blocks until we get an exit-causing signal +func (c *Command) handleSignals(config *Config) int { + signalCh := make(chan os.Signal, 4) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) + + // Wait for a signal +WAIT: + var sig os.Signal + select { + case s := <-signalCh: + sig = s + case <-c.ShutdownCh: + sig = os.Interrupt + } + c.Ui.Output(fmt.Sprintf("Caught signal: %v", sig)) + + // Check if this is a SIGHUP + if sig == syscall.SIGHUP { + if conf := c.handleReload(config); conf != nil { + config = conf + } + goto WAIT + } + + // Check if we should do a graceful leave + graceful := false + if sig == os.Interrupt && !config.LeaveOnInt { + graceful = true + } else if sig == syscall.SIGTERM && config.LeaveOnTerm { + graceful = true + } + + // Bail fast if not doing a graceful leave + if !graceful { + return 1 + } + + // Attempt a graceful leave + gracefulCh := make(chan struct{}) + c.Ui.Output("Gracefully shutting down agent...") + go func() { + if err := c.agent.Leave(); err != nil { + c.Ui.Error(fmt.Sprintf("Error: %s", err)) + return + } + close(gracefulCh) + }() + + // Wait for leave or another signal + select { + case <-signalCh: + return 1 + case <-time.After(gracefulTimeout): + return 1 + case <-gracefulCh: + return 0 + } +} + +// handleReload is invoked when we should reload our configs, e.g. SIGHUP +func (c *Command) handleReload(config *Config) *Config { + c.Ui.Output("Reloading configuration...") + newConf := c.readConfig() + if newConf == nil { + c.Ui.Error(fmt.Sprintf("Failed to reload configs")) + return config + } + + // Change the log level + minLevel := logutils.LogLevel(strings.ToUpper(newConf.LogLevel)) + if ValidateLevelFilter(minLevel, c.logFilter) { + c.logFilter.SetMinLevel(minLevel) + } else { + c.Ui.Error(fmt.Sprintf( + "Invalid log level: %s. Valid log levels are: %v", + minLevel, c.logFilter.Levels)) + + // Keep the current log level + newConf.LogLevel = config.LogLevel + } + return newConf } // setupTelementry is used ot setup the telemetry sub-systems @@ -101,7 +255,7 @@ func (c *Command) setupTelementry(config *Config) error { telConfig = config.Telemetry } - metricsConf := metrics.DefaultConfig("vault") + metricsConf := metrics.DefaultConfig("nomad") metricsConf.EnableHostname = !telConfig.DisableHostname // Configure the statsite sink diff --git a/command/agent/config.go b/command/agent/config.go index 88997c68f..a1df75d78 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -36,6 +36,11 @@ type Config struct { Server *ServerConfig `hcl:"server"` Telemetry *Telemetry `hcl:"telemetry"` + + LeaveOnInt bool + LeaveOnTerm bool + EnableSyslog bool + SyslogFacility string } type ClientConfig struct { @@ -79,7 +84,16 @@ type Telemetry struct { // DevConfig is a Config that is used for dev mode of Nomad. func DevConfig() *Config { - return &Config{} + return &Config{ + LogLevel: "DEBUG", + } +} + +// DefaultConfig is a the baseline configuration for Nomad +func DefaultConfig() *Config { + return &Config{ + LogLevel: "INFO", + } } // Merge merges two configurations. diff --git a/command/agent/gated_writer.go b/command/agent/gated_writer.go new file mode 100644 index 000000000..e9417c4b0 --- /dev/null +++ b/command/agent/gated_writer.go @@ -0,0 +1,43 @@ +package agent + +import ( + "io" + "sync" +) + +// GatedWriter is an io.Writer implementation that buffers all of its +// data into an internal buffer until it is told to let data through. +type GatedWriter struct { + Writer io.Writer + + buf [][]byte + flush bool + lock sync.RWMutex +} + +// Flush tells the GatedWriter to flush any buffered data and to stop +// buffering. +func (w *GatedWriter) Flush() { + w.lock.Lock() + w.flush = true + w.lock.Unlock() + + for _, p := range w.buf { + w.Write(p) + } + w.buf = nil +} + +func (w *GatedWriter) Write(p []byte) (n int, err error) { + w.lock.RLock() + defer w.lock.RUnlock() + + if w.flush { + return w.Writer.Write(p) + } + + p2 := make([]byte, len(p)) + copy(p2, p) + w.buf = append(w.buf, p2) + return len(p), nil +} diff --git a/command/agent/gated_writer_test.go b/command/agent/gated_writer_test.go new file mode 100644 index 000000000..5327bad6a --- /dev/null +++ b/command/agent/gated_writer_test.go @@ -0,0 +1,34 @@ +package agent + +import ( + "bytes" + "io" + "testing" +) + +func TestGatedWriter_impl(t *testing.T) { + var _ io.Writer = new(GatedWriter) +} + +func TestGatedWriter(t *testing.T) { + buf := new(bytes.Buffer) + w := &GatedWriter{Writer: buf} + w.Write([]byte("foo\n")) + w.Write([]byte("bar\n")) + + if buf.String() != "" { + t.Fatalf("bad: %s", buf.String()) + } + + w.Flush() + + if buf.String() != "foo\nbar\n" { + t.Fatalf("bad: %s", buf.String()) + } + + w.Write([]byte("baz\n")) + + if buf.String() != "foo\nbar\nbaz\n" { + t.Fatalf("bad: %s", buf.String()) + } +} diff --git a/command/agent/log_levels.go b/command/agent/log_levels.go new file mode 100644 index 000000000..a0f29c622 --- /dev/null +++ b/command/agent/log_levels.go @@ -0,0 +1,28 @@ +package agent + +import ( + "io/ioutil" + + "github.com/hashicorp/logutils" +) + +// LevelFilter returns a LevelFilter that is configured with the log +// levels that we use. +func LevelFilter() *logutils.LevelFilter { + return &logutils.LevelFilter{ + Levels: []logutils.LogLevel{"TRACE", "DEBUG", "INFO", "WARN", "ERR"}, + MinLevel: "INFO", + Writer: ioutil.Discard, + } +} + +// ValidateLevelFilter verifies that the log levels within the filter +// are valid. +func ValidateLevelFilter(minLevel logutils.LogLevel, filter *logutils.LevelFilter) bool { + for _, level := range filter.Levels { + if level == minLevel { + return true + } + } + return false +} diff --git a/command/agent/log_writer.go b/command/agent/log_writer.go new file mode 100644 index 000000000..e6e76d0e8 --- /dev/null +++ b/command/agent/log_writer.go @@ -0,0 +1,83 @@ +package agent + +import ( + "sync" +) + +// LogHandler interface is used for clients that want to subscribe +// to logs, for example to stream them over an IPC mechanism +type LogHandler interface { + HandleLog(string) +} + +// logWriter implements io.Writer so it can be used as a log sink. +// It maintains a circular buffer of logs, and a set of handlers to +// which it can stream the logs to. +type logWriter struct { + sync.Mutex + logs []string + index int + handlers map[LogHandler]struct{} +} + +// NewLogWriter creates a logWriter with the given buffer capacity +func NewLogWriter(buf int) *logWriter { + return &logWriter{ + logs: make([]string, buf), + index: 0, + handlers: make(map[LogHandler]struct{}), + } +} + +// RegisterHandler adds a log handler to receive logs, and sends +// the last buffered logs to the handler +func (l *logWriter) RegisterHandler(lh LogHandler) { + l.Lock() + defer l.Unlock() + + // Do nothing if already registered + if _, ok := l.handlers[lh]; ok { + return + } + + // Register + l.handlers[lh] = struct{}{} + + // Send the old logs + if l.logs[l.index] != "" { + for i := l.index; i < len(l.logs); i++ { + lh.HandleLog(l.logs[i]) + } + } + for i := 0; i < l.index; i++ { + lh.HandleLog(l.logs[i]) + } +} + +// DeregisterHandler removes a LogHandler and prevents more invocations +func (l *logWriter) DeregisterHandler(lh LogHandler) { + l.Lock() + defer l.Unlock() + delete(l.handlers, lh) +} + +// Write is used to accumulate new logs +func (l *logWriter) Write(p []byte) (n int, err error) { + l.Lock() + defer l.Unlock() + + // Strip off newlines at the end if there are any since we store + // individual log lines in the agent. + n = len(p) + if p[n-1] == '\n' { + p = p[:n-1] + } + + l.logs[l.index] = string(p) + l.index = (l.index + 1) % len(l.logs) + + for lh, _ := range l.handlers { + lh.HandleLog(string(p)) + } + return +} diff --git a/command/agent/log_writer_test.go b/command/agent/log_writer_test.go new file mode 100644 index 000000000..47e446d22 --- /dev/null +++ b/command/agent/log_writer_test.go @@ -0,0 +1,51 @@ +package agent + +import ( + "testing" +) + +type MockLogHandler struct { + logs []string +} + +func (m *MockLogHandler) HandleLog(l string) { + m.logs = append(m.logs, l) +} + +func TestLogWriter(t *testing.T) { + h := &MockLogHandler{} + w := NewLogWriter(4) + + // Write some logs + w.Write([]byte("one")) // Gets dropped! + w.Write([]byte("two")) + w.Write([]byte("three")) + w.Write([]byte("four")) + w.Write([]byte("five")) + + // Register a handler, sends old! + w.RegisterHandler(h) + + w.Write([]byte("six")) + w.Write([]byte("seven")) + + // Deregister + w.DeregisterHandler(h) + + w.Write([]byte("eight")) + w.Write([]byte("nine")) + + out := []string{ + "two", + "three", + "four", + "five", + "six", + "seven", + } + for idx := range out { + if out[idx] != h.logs[idx] { + t.Fatalf("mismatch %v", h.logs) + } + } +} diff --git a/command/agent/syslog.go b/command/agent/syslog.go new file mode 100644 index 000000000..b7aed636d --- /dev/null +++ b/command/agent/syslog.go @@ -0,0 +1,56 @@ +package agent + +import ( + "bytes" + "github.com/hashicorp/go-syslog" + "github.com/hashicorp/logutils" +) + +// levelPriority is used to map a log level to a +// syslog priority level +var levelPriority = map[string]gsyslog.Priority{ + "TRACE": gsyslog.LOG_DEBUG, + "DEBUG": gsyslog.LOG_INFO, + "INFO": gsyslog.LOG_NOTICE, + "WARN": gsyslog.LOG_WARNING, + "ERR": gsyslog.LOG_ERR, + "CRIT": gsyslog.LOG_CRIT, +} + +// SyslogWrapper is used to cleaup log messages before +// writing them to a Syslogger. Implements the io.Writer +// interface. +type SyslogWrapper struct { + l gsyslog.Syslogger + filt *logutils.LevelFilter +} + +// Write is used to implement io.Writer +func (s *SyslogWrapper) Write(p []byte) (int, error) { + // Skip syslog if the log level doesn't apply + if !s.filt.Check(p) { + return 0, nil + } + + // Extract log level + var level string + afterLevel := p + x := bytes.IndexByte(p, '[') + if x >= 0 { + y := bytes.IndexByte(p[x:], ']') + if y >= 0 { + level = string(p[x+1 : x+y]) + afterLevel = p[x+y+2:] + } + } + + // Each log level will be handled by a specific syslog priority + priority, ok := levelPriority[level] + if !ok { + priority = gsyslog.LOG_NOTICE + } + + // Attempt the write + err := s.l.WriteLevel(priority, afterLevel) + return len(p), err +} diff --git a/command/agent/syslog_test.go b/command/agent/syslog_test.go new file mode 100644 index 000000000..0990a081d --- /dev/null +++ b/command/agent/syslog_test.go @@ -0,0 +1,44 @@ +package agent + +import ( + "os" + "runtime" + "testing" + + "github.com/hashicorp/go-syslog" + "github.com/hashicorp/logutils" +) + +func TestSyslogFilter(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Syslog not supported on Windows") + } + if os.Getenv("TRAVIS") == "true" { + t.Skip("Syslog not supported on travis-ci") + } + + l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, "LOCAL0", "consul") + if err != nil { + t.Fatalf("err: %s", err) + } + + filt := LevelFilter() + filt.MinLevel = logutils.LogLevel("INFO") + + s := &SyslogWrapper{l, filt} + n, err := s.Write([]byte("[INFO] test")) + if err != nil { + t.Fatalf("err: %s", err) + } + if n == 0 { + t.Fatalf("should have logged") + } + + n, err = s.Write([]byte("[DEBUG] test")) + if err != nil { + t.Fatalf("err: %s", err) + } + if n != 0 { + t.Fatalf("should not have logged") + } +}