agent: start basic logging setup

This commit is contained in:
Armon Dadgar 2015-08-16 13:54:49 -07:00
parent c1aa76cc3b
commit 93332295db
10 changed files with 523 additions and 4 deletions

12
command/agent/agent.go Normal file
View File

@ -0,0 +1,12 @@
package agent
type Agent struct {
}
func (a *Agent) Leave() error {
return nil
}
func (a *Agent) Shutdown() error {
return nil
}

View File

@ -3,14 +3,23 @@ package agent
import (
"flag"
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
"github.com/hashicorp/vault/helper/flag-slice"
"github.com/mitchellh/cli"
)
// gracefulTimeout controls how long we wait before forcefully terminating
const gracefulTimeout = 5 * time.Second
// Command is a Command implementation that runs a Consul agent.
// The command will not end unless a shutdown message is sent on the
// ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly
@ -23,6 +32,9 @@ type Command struct {
ShutdownCh <-chan struct{}
args []string
agent *Agent
logFilter *logutils.LevelFilter
logOutput io.Writer
}
func (c *Command) readConfig() *Config {
@ -42,6 +54,8 @@ func (c *Command) readConfig() *Config {
var config *Config
if dev {
config = DevConfig()
} else {
config = DefaultConfig()
}
for _, path := range configPath {
current, err := LoadConfig(path)
@ -61,6 +75,48 @@ func (c *Command) readConfig() *Config {
return config
}
// setupLoggers is used to setup the logGate, logWriter, and our logOutput
func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Writer) {
// Setup logging. First create the gated log writer, which will
// store logs until we're ready to show them. Then create the level
// filter, filtering logs of the specified level.
logGate := &GatedWriter{
Writer: &cli.UiWriter{Ui: c.Ui},
}
c.logFilter = LevelFilter()
c.logFilter.MinLevel = logutils.LogLevel(strings.ToUpper(config.LogLevel))
c.logFilter.Writer = logGate
if !ValidateLevelFilter(c.logFilter.MinLevel, c.logFilter) {
c.Ui.Error(fmt.Sprintf(
"Invalid log level: %s. Valid log levels are: %v",
c.logFilter.MinLevel, c.logFilter.Levels))
return nil, nil, nil
}
// Check if syslog is enabled
var syslog io.Writer
if config.EnableSyslog {
l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, config.SyslogFacility, "consul")
if err != nil {
c.Ui.Error(fmt.Sprintf("Syslog setup failed: %v", err))
return nil, nil, nil
}
syslog = &SyslogWrapper{l, c.logFilter}
}
// Create a log writer, and wrap a logOutput around it
logWriter := NewLogWriter(512)
var logOutput io.Writer
if syslog != nil {
logOutput = io.MultiWriter(c.logFilter, logWriter, syslog)
} else {
logOutput = io.MultiWriter(c.logFilter, logWriter)
}
c.logOutput = logOutput
return logGate, logWriter, logOutput
}
func (c *Command) Run(args []string) int {
c.Ui = &cli.PrefixedUi{
OutputPrefix: "==> ",
@ -76,14 +132,112 @@ func (c *Command) Run(args []string) int {
return 1
}
// Setup the log outputs
logGate, _, _ := c.setupLoggers(config)
if logGate == nil {
return 1
}
// Initialize the telemetry
if err := c.setupTelementry(config); err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing telemetry: %s", err))
return 1
}
// Let the user know things are running
c.Ui.Output("Nomad agent running!")
// Enable log streaming
c.Ui.Info("")
c.Ui.Output("Log data will now stream in as it occurs:\n")
logGate.Flush()
// Wait for exit
return c.handleSignals(config)
}
// handleSignals blocks until we get an exit-causing signal
func (c *Command) handleSignals(config *Config) int {
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
// Wait for a signal
WAIT:
var sig os.Signal
select {
case s := <-signalCh:
sig = s
case <-c.ShutdownCh:
sig = os.Interrupt
}
c.Ui.Output(fmt.Sprintf("Caught signal: %v", sig))
// Check if this is a SIGHUP
if sig == syscall.SIGHUP {
if conf := c.handleReload(config); conf != nil {
config = conf
}
goto WAIT
}
// Check if we should do a graceful leave
graceful := false
if sig == os.Interrupt && !config.LeaveOnInt {
graceful = true
} else if sig == syscall.SIGTERM && config.LeaveOnTerm {
graceful = true
}
// Bail fast if not doing a graceful leave
if !graceful {
return 1
}
// Attempt a graceful leave
gracefulCh := make(chan struct{})
c.Ui.Output("Gracefully shutting down agent...")
go func() {
if err := c.agent.Leave(); err != nil {
c.Ui.Error(fmt.Sprintf("Error: %s", err))
return
}
close(gracefulCh)
}()
// Wait for leave or another signal
select {
case <-signalCh:
return 1
case <-time.After(gracefulTimeout):
return 1
case <-gracefulCh:
return 0
}
}
// handleReload is invoked when we should reload our configs, e.g. SIGHUP
func (c *Command) handleReload(config *Config) *Config {
c.Ui.Output("Reloading configuration...")
newConf := c.readConfig()
if newConf == nil {
c.Ui.Error(fmt.Sprintf("Failed to reload configs"))
return config
}
// Change the log level
minLevel := logutils.LogLevel(strings.ToUpper(newConf.LogLevel))
if ValidateLevelFilter(minLevel, c.logFilter) {
c.logFilter.SetMinLevel(minLevel)
} else {
c.Ui.Error(fmt.Sprintf(
"Invalid log level: %s. Valid log levels are: %v",
minLevel, c.logFilter.Levels))
// Keep the current log level
newConf.LogLevel = config.LogLevel
}
return newConf
}
// setupTelementry is used ot setup the telemetry sub-systems
func (c *Command) setupTelementry(config *Config) error {
@ -101,7 +255,7 @@ func (c *Command) setupTelementry(config *Config) error {
telConfig = config.Telemetry
}
metricsConf := metrics.DefaultConfig("vault")
metricsConf := metrics.DefaultConfig("nomad")
metricsConf.EnableHostname = !telConfig.DisableHostname
// Configure the statsite sink

View File

@ -36,6 +36,11 @@ type Config struct {
Server *ServerConfig `hcl:"server"`
Telemetry *Telemetry `hcl:"telemetry"`
LeaveOnInt bool
LeaveOnTerm bool
EnableSyslog bool
SyslogFacility string
}
type ClientConfig struct {
@ -79,7 +84,16 @@ type Telemetry struct {
// DevConfig is a Config that is used for dev mode of Nomad.
func DevConfig() *Config {
return &Config{}
return &Config{
LogLevel: "DEBUG",
}
}
// DefaultConfig is a the baseline configuration for Nomad
func DefaultConfig() *Config {
return &Config{
LogLevel: "INFO",
}
}
// Merge merges two configurations.

View File

@ -0,0 +1,43 @@
package agent
import (
"io"
"sync"
)
// GatedWriter is an io.Writer implementation that buffers all of its
// data into an internal buffer until it is told to let data through.
type GatedWriter struct {
Writer io.Writer
buf [][]byte
flush bool
lock sync.RWMutex
}
// Flush tells the GatedWriter to flush any buffered data and to stop
// buffering.
func (w *GatedWriter) Flush() {
w.lock.Lock()
w.flush = true
w.lock.Unlock()
for _, p := range w.buf {
w.Write(p)
}
w.buf = nil
}
func (w *GatedWriter) Write(p []byte) (n int, err error) {
w.lock.RLock()
defer w.lock.RUnlock()
if w.flush {
return w.Writer.Write(p)
}
p2 := make([]byte, len(p))
copy(p2, p)
w.buf = append(w.buf, p2)
return len(p), nil
}

View File

@ -0,0 +1,34 @@
package agent
import (
"bytes"
"io"
"testing"
)
func TestGatedWriter_impl(t *testing.T) {
var _ io.Writer = new(GatedWriter)
}
func TestGatedWriter(t *testing.T) {
buf := new(bytes.Buffer)
w := &GatedWriter{Writer: buf}
w.Write([]byte("foo\n"))
w.Write([]byte("bar\n"))
if buf.String() != "" {
t.Fatalf("bad: %s", buf.String())
}
w.Flush()
if buf.String() != "foo\nbar\n" {
t.Fatalf("bad: %s", buf.String())
}
w.Write([]byte("baz\n"))
if buf.String() != "foo\nbar\nbaz\n" {
t.Fatalf("bad: %s", buf.String())
}
}

View File

@ -0,0 +1,28 @@
package agent
import (
"io/ioutil"
"github.com/hashicorp/logutils"
)
// LevelFilter returns a LevelFilter that is configured with the log
// levels that we use.
func LevelFilter() *logutils.LevelFilter {
return &logutils.LevelFilter{
Levels: []logutils.LogLevel{"TRACE", "DEBUG", "INFO", "WARN", "ERR"},
MinLevel: "INFO",
Writer: ioutil.Discard,
}
}
// ValidateLevelFilter verifies that the log levels within the filter
// are valid.
func ValidateLevelFilter(minLevel logutils.LogLevel, filter *logutils.LevelFilter) bool {
for _, level := range filter.Levels {
if level == minLevel {
return true
}
}
return false
}

View File

@ -0,0 +1,83 @@
package agent
import (
"sync"
)
// LogHandler interface is used for clients that want to subscribe
// to logs, for example to stream them over an IPC mechanism
type LogHandler interface {
HandleLog(string)
}
// logWriter implements io.Writer so it can be used as a log sink.
// It maintains a circular buffer of logs, and a set of handlers to
// which it can stream the logs to.
type logWriter struct {
sync.Mutex
logs []string
index int
handlers map[LogHandler]struct{}
}
// NewLogWriter creates a logWriter with the given buffer capacity
func NewLogWriter(buf int) *logWriter {
return &logWriter{
logs: make([]string, buf),
index: 0,
handlers: make(map[LogHandler]struct{}),
}
}
// RegisterHandler adds a log handler to receive logs, and sends
// the last buffered logs to the handler
func (l *logWriter) RegisterHandler(lh LogHandler) {
l.Lock()
defer l.Unlock()
// Do nothing if already registered
if _, ok := l.handlers[lh]; ok {
return
}
// Register
l.handlers[lh] = struct{}{}
// Send the old logs
if l.logs[l.index] != "" {
for i := l.index; i < len(l.logs); i++ {
lh.HandleLog(l.logs[i])
}
}
for i := 0; i < l.index; i++ {
lh.HandleLog(l.logs[i])
}
}
// DeregisterHandler removes a LogHandler and prevents more invocations
func (l *logWriter) DeregisterHandler(lh LogHandler) {
l.Lock()
defer l.Unlock()
delete(l.handlers, lh)
}
// Write is used to accumulate new logs
func (l *logWriter) Write(p []byte) (n int, err error) {
l.Lock()
defer l.Unlock()
// Strip off newlines at the end if there are any since we store
// individual log lines in the agent.
n = len(p)
if p[n-1] == '\n' {
p = p[:n-1]
}
l.logs[l.index] = string(p)
l.index = (l.index + 1) % len(l.logs)
for lh, _ := range l.handlers {
lh.HandleLog(string(p))
}
return
}

View File

@ -0,0 +1,51 @@
package agent
import (
"testing"
)
type MockLogHandler struct {
logs []string
}
func (m *MockLogHandler) HandleLog(l string) {
m.logs = append(m.logs, l)
}
func TestLogWriter(t *testing.T) {
h := &MockLogHandler{}
w := NewLogWriter(4)
// Write some logs
w.Write([]byte("one")) // Gets dropped!
w.Write([]byte("two"))
w.Write([]byte("three"))
w.Write([]byte("four"))
w.Write([]byte("five"))
// Register a handler, sends old!
w.RegisterHandler(h)
w.Write([]byte("six"))
w.Write([]byte("seven"))
// Deregister
w.DeregisterHandler(h)
w.Write([]byte("eight"))
w.Write([]byte("nine"))
out := []string{
"two",
"three",
"four",
"five",
"six",
"seven",
}
for idx := range out {
if out[idx] != h.logs[idx] {
t.Fatalf("mismatch %v", h.logs)
}
}
}

56
command/agent/syslog.go Normal file
View File

@ -0,0 +1,56 @@
package agent
import (
"bytes"
"github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
)
// levelPriority is used to map a log level to a
// syslog priority level
var levelPriority = map[string]gsyslog.Priority{
"TRACE": gsyslog.LOG_DEBUG,
"DEBUG": gsyslog.LOG_INFO,
"INFO": gsyslog.LOG_NOTICE,
"WARN": gsyslog.LOG_WARNING,
"ERR": gsyslog.LOG_ERR,
"CRIT": gsyslog.LOG_CRIT,
}
// SyslogWrapper is used to cleaup log messages before
// writing them to a Syslogger. Implements the io.Writer
// interface.
type SyslogWrapper struct {
l gsyslog.Syslogger
filt *logutils.LevelFilter
}
// Write is used to implement io.Writer
func (s *SyslogWrapper) Write(p []byte) (int, error) {
// Skip syslog if the log level doesn't apply
if !s.filt.Check(p) {
return 0, nil
}
// Extract log level
var level string
afterLevel := p
x := bytes.IndexByte(p, '[')
if x >= 0 {
y := bytes.IndexByte(p[x:], ']')
if y >= 0 {
level = string(p[x+1 : x+y])
afterLevel = p[x+y+2:]
}
}
// Each log level will be handled by a specific syslog priority
priority, ok := levelPriority[level]
if !ok {
priority = gsyslog.LOG_NOTICE
}
// Attempt the write
err := s.l.WriteLevel(priority, afterLevel)
return len(p), err
}

View File

@ -0,0 +1,44 @@
package agent
import (
"os"
"runtime"
"testing"
"github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
)
func TestSyslogFilter(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Syslog not supported on Windows")
}
if os.Getenv("TRAVIS") == "true" {
t.Skip("Syslog not supported on travis-ci")
}
l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, "LOCAL0", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
filt := LevelFilter()
filt.MinLevel = logutils.LogLevel("INFO")
s := &SyslogWrapper{l, filt}
n, err := s.Write([]byte("[INFO] test"))
if err != nil {
t.Fatalf("err: %s", err)
}
if n == 0 {
t.Fatalf("should have logged")
}
n, err = s.Write([]byte("[DEBUG] test"))
if err != nil {
t.Fatalf("err: %s", err)
}
if n != 0 {
t.Fatalf("should not have logged")
}
}