diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index ceb16d8eb..8d19d3c2d 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -178,10 +178,14 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( // Determine if we are targeting a server or client nodeID := req.URL.Query().Get("node_id") + logJSON := false logJSONStr := req.URL.Query().Get("log_json") - logJSON, err := strconv.ParseBool(logJSONStr) - if err != nil { - logJSON = false + if logJSONStr != "" { + parsed, err := strconv.ParseBool(logJSONStr) + if err != nil { + return nil, CodedError(400, fmt.Sprintf("Unknown option for log json: %v", err)) + } + logJSON = parsed } // Build the request and parse the ACL token diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 767801048..feeb8a828 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -255,6 +255,20 @@ func TestHTTP_AgentMonitor(t *testing.T) { t.Parallel() httpTest(t, nil, func(s *TestAgent) { + // invalid log_json + { + req, err := http.NewRequest("GET", "/v1/agent/monitor?log_json=no", nil) + require.Nil(t, err) + resp := newClosableRecorder() + + // Make the request + _, err = s.Server.AgentMonitor(resp, req) + if err.(HTTPCodedError).Code() != 400 { + t.Fatalf("expected 400 response, got: %v", resp.Code) + } + } + + // unknown log_level { req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=unknown", nil) require.Nil(t, err) diff --git a/command/agent/monitor/monitor.go b/command/agent/monitor/monitor.go index 1d063ae1d..3605bb992 100644 --- a/command/agent/monitor/monitor.go +++ b/command/agent/monitor/monitor.go @@ -1,27 +1,34 @@ package monitor import ( + "fmt" "sync" + "time" log "github.com/hashicorp/go-hclog" ) +// Monitor provides a mechanism to stream logs using go-hclog +// InterceptLogger and SinkAdapter. It allows streaming of logs +// at a different log level than what is set on the logger. type Monitor struct { sync.Mutex - sink log.SinkAdapter - logger log.InterceptLogger - logCh chan []byte - index int - droppedCount int - bufSize int + sink log.SinkAdapter + logger log.InterceptLogger + logCh chan []byte + droppedCount int + bufSize int + droppedDuration time.Duration } +// New creates a new Monitor. Start must be called in order to actually start +// streaming logs func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor { sw := &Monitor{ - logger: logger, - logCh: make(chan []byte, buf), - index: 0, - bufSize: buf, + logger: logger, + logCh: make(chan []byte, buf), + bufSize: buf, + droppedDuration: 3 * time.Second, } opts.Output = sw @@ -31,15 +38,25 @@ func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor return sw } +// Start registers a sink on the monitors logger and starts sending +// received log messages over the returned channel. A non-nil +// sopCh can be used to deregister the sink and stop log streaming func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { d.logger.RegisterSink(d.sink) - logCh := make(chan []byte, d.bufSize) + streamCh := make(chan []byte, d.bufSize) go func() { + defer close(streamCh) for { select { case log := <-d.logCh: - logCh <- log + select { + case <-stopCh: + d.logger.DeregisterSink(d.sink) + close(d.logCh) + return + case streamCh <- log: + } case <-stopCh: d.Lock() defer d.Unlock() @@ -51,7 +68,38 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { } }() - return logCh + go func() { + // loop and check for dropped messages + LOOP: + for { + select { + case <-stopCh: + break LOOP + case <-time.After(d.droppedDuration): + d.Lock() + defer d.Unlock() + + if d.droppedCount > 0 { + dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount) + select { + case d.logCh <- []byte(dropped): + default: + // Make room for dropped message + select { + case <-d.logCh: + d.droppedCount++ + dropped = fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount) + default: + } + d.logCh <- []byte(dropped) + } + d.droppedCount = 0 + } + } + } + }() + + return streamCh } // Write attempts to send latest log to logCh @@ -67,10 +115,6 @@ func (d *Monitor) Write(p []byte) (n int, err error) { case d.logCh <- bytes: default: d.droppedCount++ - if d.droppedCount > 10 { - d.logger.Warn("Monitor dropped %d logs during monitor request", d.droppedCount) - d.droppedCount = 0 - } } return } diff --git a/command/agent/monitor/monitor_test.go b/command/agent/monitor/monitor_test.go index b513db44e..bb6cedaf2 100644 --- a/command/agent/monitor/monitor_test.go +++ b/command/agent/monitor/monitor_test.go @@ -1,11 +1,11 @@ package monitor import ( + "fmt" + "strings" "testing" "time" - "github.com/stretchr/testify/assert" - log "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" ) @@ -38,6 +38,7 @@ func TestMonitor_Start(t *testing.T) { logger.Debug("test log") } +// Ensure number of dropped messages are logged func TestMonitor_DroppedMessages(t *testing.T) { t.Parallel() @@ -48,15 +49,39 @@ func TestMonitor_DroppedMessages(t *testing.T) { m := New(5, logger, &log.LoggerOptions{ Level: log.Debug, }) + m.droppedDuration = 5 * time.Millisecond doneCh := make(chan struct{}) defer close(doneCh) - m.Start(doneCh) + logCh := m.Start(doneCh) - for i := 0; i <= 9; i++ { - logger.Debug("test message") + for i := 0; i <= 100; i++ { + logger.Debug(fmt.Sprintf("test message %d", i)) } - assert.Greater(t, m.droppedCount, 0) + received := "" + + passed := make(chan struct{}) + go func() { + for { + select { + case recv := <-logCh: + received += string(recv) + if strings.Contains(received, "[WARN] Monitor dropped 90 logs during monitor request") { + close(passed) + } + } + } + }() + +TEST: + for { + select { + case <-passed: + break TEST + case <-time.After(1 * time.Second): + require.Fail(t, "expected to see warn dropped messages") + } + } }