4b75f15fb7
* remove flush for each write to http response in the agent monitor endpoint * fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover. * start log reading goroutine before adding the sink to avoid filling the log channel before getting a chance of reading from it * flush every 500ms to optimize log writing in the http server side. * add changelog file * add issue url to changelog * fix changelog url * Update changelog Co-authored-by: Daniel Nephin <dnephin@hashicorp.com> * use ticker to flush and avoid race condition when flushing in a different goroutine * stop the ticker when done Co-authored-by: Daniel Nephin <dnephin@hashicorp.com> * Revert "fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover." This reverts commit 1eeddf7a * wait for log consumer loop to start before registering the sink Co-authored-by: Daniel Nephin <dnephin@hashicorp.com>
134 lines
2.9 KiB
Go
134 lines
2.9 KiB
Go
package monitor
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
)
|
|
|
|
// Monitor provides a mechanism to stream logs using go-hclog
|
|
// InterceptLogger and SinkAdapter. It allows streaming of logs
|
|
// at a different log level than what is set on the logger.
|
|
type Monitor interface {
|
|
// Start returns a channel of log messages which are sent
|
|
// ever time a log message occurs
|
|
Start() <-chan []byte
|
|
|
|
// Stop deregisters the sink from the InterceptLogger and closes the log
|
|
// channels. This returns a count of the number of log messages that were
|
|
// dropped during streaming.
|
|
Stop() int
|
|
}
|
|
|
|
// monitor implements the Monitor interface
|
|
type monitor struct {
|
|
sink log.SinkAdapter
|
|
|
|
// logger is the logger we will be monitoring
|
|
logger log.InterceptLogger
|
|
|
|
// logCh is a buffered chan where we send logs when streaming
|
|
logCh chan []byte
|
|
|
|
// droppedCount is the current count of messages
|
|
// that were dropped from the logCh buffer.
|
|
droppedCount int
|
|
|
|
// doneCh coordinates the shutdown of logCh
|
|
doneCh chan struct{}
|
|
|
|
// Defaults to 512.
|
|
bufSize int
|
|
}
|
|
|
|
type Config struct {
|
|
BufferSize int
|
|
Logger log.InterceptLogger
|
|
LoggerOptions *log.LoggerOptions
|
|
}
|
|
|
|
// New creates a new Monitor. Start must be called in order to actually start
|
|
// streaming logs
|
|
func New(cfg Config) Monitor {
|
|
bufSize := cfg.BufferSize
|
|
if bufSize == 0 {
|
|
bufSize = 512
|
|
}
|
|
|
|
sw := &monitor{
|
|
logger: cfg.Logger,
|
|
logCh: make(chan []byte, bufSize),
|
|
doneCh: make(chan struct{}, 1),
|
|
bufSize: bufSize,
|
|
}
|
|
|
|
cfg.LoggerOptions.Output = sw
|
|
sink := log.NewSinkAdapter(cfg.LoggerOptions)
|
|
sw.sink = sink
|
|
|
|
return sw
|
|
}
|
|
|
|
// Stop deregisters the sink and stops the monitoring process
|
|
func (d *monitor) Stop() int {
|
|
d.logger.DeregisterSink(d.sink)
|
|
close(d.doneCh)
|
|
return d.droppedCount
|
|
}
|
|
|
|
// Start registers a sink on the monitor's logger and starts sending
|
|
// received log messages over the returned channel.
|
|
func (d *monitor) Start() <-chan []byte {
|
|
// register our sink with the logger
|
|
streamCh := make(chan []byte, d.bufSize)
|
|
// run a go routine that listens for streamed
|
|
// log messages and sends them to streamCh
|
|
|
|
wg := new(sync.WaitGroup)
|
|
wg.Add(1)
|
|
go func() {
|
|
defer close(streamCh)
|
|
wg.Done()
|
|
for {
|
|
select {
|
|
case logLine := <-d.logCh:
|
|
select {
|
|
case <-d.doneCh:
|
|
return
|
|
case streamCh <- logLine:
|
|
}
|
|
case <-d.doneCh:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
//wait for the consumer loop to start before registering the sink to avoid filling the log channel
|
|
wg.Wait()
|
|
d.logger.RegisterSink(d.sink)
|
|
return streamCh
|
|
}
|
|
|
|
// Write attempts to send latest log to logCh
|
|
// it drops the log if channel is unavailable to receive
|
|
func (d *monitor) Write(p []byte) (int, error) {
|
|
// Check whether we have been stopped
|
|
select {
|
|
case <-d.doneCh:
|
|
return 0, errors.New("monitor stopped")
|
|
default:
|
|
}
|
|
|
|
bytes := make([]byte, len(p))
|
|
copy(bytes, p)
|
|
|
|
select {
|
|
case d.logCh <- bytes:
|
|
default:
|
|
d.droppedCount++
|
|
}
|
|
|
|
return len(p), nil
|
|
}
|