return 400 if invalid log_json param is given

Addresses feedback around monitor implementation

subselect on stopCh to prevent blocking forever.

Set up a separate goroutine to check every 3 seconds for dropped
messages.

rename returned ch to avoid confusion
This commit is contained in:
Drew Bailey 2019-10-30 09:55:41 -04:00
parent 17d876d5ef
commit 32f62edbb0
No known key found for this signature in database
GPG key ID: FBA61B9FB7CCE1A7
4 changed files with 113 additions and 26 deletions

View file

@ -178,10 +178,14 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
// Determine if we are targeting a server or client
nodeID := req.URL.Query().Get("node_id")
logJSON := false
logJSONStr := req.URL.Query().Get("log_json")
logJSON, err := strconv.ParseBool(logJSONStr)
if logJSONStr != "" {
parsed, err := strconv.ParseBool(logJSONStr)
if err != nil {
logJSON = false
return nil, CodedError(400, fmt.Sprintf("Unknown option for log json: %v", err))
}
logJSON = parsed
}
// Build the request and parse the ACL token

View file

@ -255,6 +255,20 @@ func TestHTTP_AgentMonitor(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
// invalid log_json
{
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_json=no", nil)
require.Nil(t, err)
resp := newClosableRecorder()
// Make the request
_, err = s.Server.AgentMonitor(resp, req)
if err.(HTTPCodedError).Code() != 400 {
t.Fatalf("expected 400 response, got: %v", resp.Code)
}
}
// unknown log_level
{
req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=unknown", nil)
require.Nil(t, err)

View file

@ -1,27 +1,34 @@
package monitor
import (
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
)
// Monitor provides a mechanism to stream logs using go-hclog
// InterceptLogger and SinkAdapter. It allows streaming of logs
// at a different log level than what is set on the logger.
type Monitor struct {
sync.Mutex
sink log.SinkAdapter
logger log.InterceptLogger
logCh chan []byte
index int
droppedCount int
bufSize int
droppedDuration time.Duration
}
// New creates a new Monitor. Start must be called in order to actually start
// streaming logs
func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor {
sw := &Monitor{
logger: logger,
logCh: make(chan []byte, buf),
index: 0,
bufSize: buf,
droppedDuration: 3 * time.Second,
}
opts.Output = sw
@ -31,15 +38,25 @@ func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor
return sw
}
// Start registers a sink on the monitors logger and starts sending
// received log messages over the returned channel. A non-nil
// sopCh can be used to deregister the sink and stop log streaming
func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
d.logger.RegisterSink(d.sink)
logCh := make(chan []byte, d.bufSize)
streamCh := make(chan []byte, d.bufSize)
go func() {
defer close(streamCh)
for {
select {
case log := <-d.logCh:
logCh <- log
select {
case <-stopCh:
d.logger.DeregisterSink(d.sink)
close(d.logCh)
return
case streamCh <- log:
}
case <-stopCh:
d.Lock()
defer d.Unlock()
@ -51,7 +68,38 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
}
}()
return logCh
go func() {
// loop and check for dropped messages
LOOP:
for {
select {
case <-stopCh:
break LOOP
case <-time.After(d.droppedDuration):
d.Lock()
defer d.Unlock()
if d.droppedCount > 0 {
dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
select {
case d.logCh <- []byte(dropped):
default:
// Make room for dropped message
select {
case <-d.logCh:
d.droppedCount++
dropped = fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
default:
}
d.logCh <- []byte(dropped)
}
d.droppedCount = 0
}
}
}
}()
return streamCh
}
// Write attempts to send latest log to logCh
@ -67,10 +115,6 @@ func (d *Monitor) Write(p []byte) (n int, err error) {
case d.logCh <- bytes:
default:
d.droppedCount++
if d.droppedCount > 10 {
d.logger.Warn("Monitor dropped %d logs during monitor request", d.droppedCount)
d.droppedCount = 0
}
}
return
}

View file

@ -1,11 +1,11 @@
package monitor
import (
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
log "github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)
@ -38,6 +38,7 @@ func TestMonitor_Start(t *testing.T) {
logger.Debug("test log")
}
// Ensure number of dropped messages are logged
func TestMonitor_DroppedMessages(t *testing.T) {
t.Parallel()
@ -48,15 +49,39 @@ func TestMonitor_DroppedMessages(t *testing.T) {
m := New(5, logger, &log.LoggerOptions{
Level: log.Debug,
})
m.droppedDuration = 5 * time.Millisecond
doneCh := make(chan struct{})
defer close(doneCh)
m.Start(doneCh)
logCh := m.Start(doneCh)
for i := 0; i <= 9; i++ {
logger.Debug("test message")
for i := 0; i <= 100; i++ {
logger.Debug(fmt.Sprintf("test message %d", i))
}
assert.Greater(t, m.droppedCount, 0)
received := ""
passed := make(chan struct{})
go func() {
for {
select {
case recv := <-logCh:
received += string(recv)
if strings.Contains(received, "[WARN] Monitor dropped 90 logs during monitor request") {
close(passed)
}
}
}
}()
TEST:
for {
select {
case <-passed:
break TEST
case <-time.After(1 * time.Second):
require.Fail(t, "expected to see warn dropped messages")
}
}
}