open-nomad/command/agent/monitor/monitor.go

134 lines
3 KiB
Go
Raw Normal View History

package monitor
import (
"fmt"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
)
// Monitor provides a mechanism to stream logs using go-hclog
// InterceptLogger and SinkAdapter. It allows streaming of logs
// at a different log level than what is set on the logger.
type Monitor struct {
// sync.Mutex protects droppedCount and logCh
sync.Mutex
sink log.SinkAdapter
// logger is the logger we will be monitoring
logger log.InterceptLogger
logCh chan []byte
// droppedCount is the current count of messages
// that were dropped from the logCh buffer.
// only access under lock
droppedCount int
bufSize int
// droppedDuration is the amount of time we should
// wait to check for dropped messages. Defaults
// to 3 seconds
droppedDuration time.Duration
}
// New creates a new Monitor. Start must be called in order to actually start
// streaming logs
func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor {
sw := &Monitor{
logger: logger,
logCh: make(chan []byte, buf),
bufSize: buf,
droppedDuration: 3 * time.Second,
}
opts.Output = sw
sink := log.NewSinkAdapter(opts)
sw.sink = sink
return sw
}
// Start registers a sink on the monitor's logger and starts sending
// received log messages over the returned channel. A non-nil
// stopCh can be used to de-register the sink and stop log streaming
func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
d.logger.RegisterSink(d.sink)
streamCh := make(chan []byte, d.bufSize)
go func() {
defer close(streamCh)
for {
select {
case log := <-d.logCh:
select {
case <-stopCh:
d.logger.DeregisterSink(d.sink)
close(d.logCh)
return
case streamCh <- log:
}
case <-stopCh:
d.Lock()
defer d.Unlock()
d.logger.DeregisterSink(d.sink)
close(d.logCh)
return
}
}
}()
go func() {
// loop and check for dropped messages
LOOP:
for {
select {
case <-stopCh:
break LOOP
case <-time.After(d.droppedDuration):
d.Lock()
// Check if there have been any dropped messages.
if d.droppedCount > 0 {
dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
select {
// Try sending dropped message count to logCh in case
// there is room in the buffer now.
case d.logCh <- []byte(dropped):
default:
// Drop a log message to make room for "Monitor dropped.." message
select {
case <-d.logCh:
d.droppedCount++
dropped = fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
default:
}
d.logCh <- []byte(dropped)
}
d.droppedCount = 0
}
d.Unlock()
}
}
}()
return streamCh
}
2019-10-25 18:25:19 +00:00
// Write attempts to send latest log to logCh
// it drops the log if channel is unavailable to receive
func (d *Monitor) Write(p []byte) (n int, err error) {
d.Lock()
defer d.Unlock()
bytes := make([]byte, len(p))
copy(bytes, p)
select {
case d.logCh <- bytes:
default:
d.droppedCount++
}
return
}