27bb03bbc0
* adding copyright header * fix fmt and a test
170 lines
4 KiB
Go
170 lines
4 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package monitor
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
"go.uber.org/atomic"
|
|
)
|
|
|
|
// Monitor provides a mechanism to stream logs using go-hclog
|
|
// InterceptLogger and SinkAdapter. It allows streaming of logs
|
|
// at a different log level than what is set on the logger.
|
|
type Monitor interface {
|
|
// Start returns a channel of log messages which are sent
|
|
// every time a log message occurs
|
|
Start() <-chan []byte
|
|
|
|
// Stop de-registers the sink from the InterceptLogger
|
|
// and closes the log channels
|
|
Stop()
|
|
}
|
|
|
|
// monitor implements the Monitor interface. Note that this
|
|
// struct is not threadsafe.
|
|
type monitor struct {
|
|
sink log.SinkAdapter
|
|
|
|
// logger is the logger we will be monitoring
|
|
logger log.InterceptLogger
|
|
|
|
// logCh is a buffered chan where we send logs when streaming
|
|
logCh chan []byte
|
|
|
|
// doneCh coordinates the shutdown of logCh
|
|
doneCh chan struct{}
|
|
|
|
// droppedCount is the current count of messages
|
|
// that were dropped from the logCh buffer.
|
|
droppedCount *atomic.Uint32
|
|
bufSize int
|
|
|
|
// dropCheckInterval is the amount of time we should
|
|
// wait to check for dropped messages. Defaults
|
|
// to 3 seconds
|
|
dropCheckInterval time.Duration
|
|
|
|
// started is whether the monitor has been started or not.
|
|
// This is to ensure that we don't start it again until
|
|
// it has been shut down.
|
|
started *atomic.Bool
|
|
}
|
|
|
|
// NewMonitor creates a new Monitor. Start must be called in order to actually start
|
|
// streaming logs. buf is the buffer size of the channel that sends log messages.
|
|
func NewMonitor(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) (Monitor, error) {
|
|
return newMonitor(buf, logger, opts)
|
|
}
|
|
|
|
func newMonitor(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) (*monitor, error) {
|
|
if buf <= 0 {
|
|
return nil, fmt.Errorf("buf must be greater than zero")
|
|
}
|
|
|
|
sw := &monitor{
|
|
logger: logger,
|
|
logCh: make(chan []byte, buf),
|
|
doneCh: make(chan struct{}),
|
|
bufSize: buf,
|
|
dropCheckInterval: 3 * time.Second,
|
|
droppedCount: atomic.NewUint32(0),
|
|
started: atomic.NewBool(false),
|
|
}
|
|
|
|
opts.Output = sw
|
|
sink := log.NewSinkAdapter(opts)
|
|
sw.sink = sink
|
|
|
|
return sw, nil
|
|
}
|
|
|
|
// Stop deregisters the sink and stops the monitoring process
|
|
func (d *monitor) Stop() {
|
|
d.logger.DeregisterSink(d.sink)
|
|
close(d.doneCh)
|
|
d.started.Store(false)
|
|
}
|
|
|
|
// Start registers a sink on the monitor's logger and starts sending
|
|
// received log messages over the returned channel.
|
|
func (d *monitor) Start() <-chan []byte {
|
|
// Check to see if this has already been started. If not, flag
|
|
// it and proceed. If so, bail out early.
|
|
if !d.started.CAS(false, true) {
|
|
return nil
|
|
}
|
|
|
|
// register our sink with the logger
|
|
d.logger.RegisterSink(d.sink)
|
|
|
|
streamCh := make(chan []byte, d.bufSize)
|
|
|
|
// Run a go routine that listens for streamed
|
|
// log messages and sends them to streamCh.
|
|
//
|
|
// It also periodically checks for dropped
|
|
// messages and makes room on the logCh to add
|
|
// a dropped message count warning
|
|
go func() {
|
|
defer close(streamCh)
|
|
|
|
ticker := time.NewTicker(d.dropCheckInterval)
|
|
defer ticker.Stop()
|
|
|
|
var logMessage []byte
|
|
for {
|
|
logMessage = nil
|
|
|
|
select {
|
|
case <-ticker.C:
|
|
// Check if there have been any dropped messages.
|
|
dc := d.droppedCount.Load()
|
|
|
|
if dc > 0 {
|
|
logMessage = []byte(fmt.Sprintf("Monitor dropped %d logs during monitor request\n", dc))
|
|
d.droppedCount.Swap(0)
|
|
}
|
|
case logMessage = <-d.logCh:
|
|
case <-d.doneCh:
|
|
return
|
|
}
|
|
|
|
if len(logMessage) > 0 {
|
|
select {
|
|
case <-d.doneCh:
|
|
return
|
|
case streamCh <- logMessage:
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return streamCh
|
|
}
|
|
|
|
// Write attempts to send latest log to logCh
|
|
// it drops the log if channel is unavailable to receive
|
|
func (d *monitor) Write(p []byte) (n int, err error) {
|
|
// ensure logCh is still open
|
|
select {
|
|
case <-d.doneCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
bytes := make([]byte, len(p))
|
|
copy(bytes, p)
|
|
|
|
select {
|
|
case d.logCh <- bytes:
|
|
default:
|
|
d.droppedCount.Add(1)
|
|
}
|
|
|
|
return len(p), nil
|
|
}
|