open-vault/helper/monitor/monitor.go

170 lines
4.0 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package monitor
import (
"fmt"
"time"
log "github.com/hashicorp/go-hclog"
"go.uber.org/atomic"
)
// Monitor provides a mechanism to stream logs using go-hclog
// InterceptLogger and SinkAdapter. It allows streaming of logs
// at a different log level than what is set on the logger.
type Monitor interface {
// Start returns a channel of log messages which are sent
// every time a log message occurs
Start() <-chan []byte
// Stop de-registers the sink from the InterceptLogger
// and closes the log channels
Stop()
}
// monitor implements the Monitor interface. Note that this
// struct is not threadsafe.
type monitor struct {
sink log.SinkAdapter
// logger is the logger we will be monitoring
logger log.InterceptLogger
// logCh is a buffered chan where we send logs when streaming
logCh chan []byte
// doneCh coordinates the shutdown of logCh
doneCh chan struct{}
// droppedCount is the current count of messages
// that were dropped from the logCh buffer.
droppedCount *atomic.Uint32
bufSize int
// dropCheckInterval is the amount of time we should
// wait to check for dropped messages. Defaults
// to 3 seconds
dropCheckInterval time.Duration
// started is whether the monitor has been started or not.
// This is to ensure that we don't start it again until
// it has been shut down.
started *atomic.Bool
}
// NewMonitor creates a new Monitor. Start must be called in order to actually start
// streaming logs. buf is the buffer size of the channel that sends log messages.
func NewMonitor(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) (Monitor, error) {
return newMonitor(buf, logger, opts)
}
func newMonitor(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) (*monitor, error) {
if buf <= 0 {
return nil, fmt.Errorf("buf must be greater than zero")
}
sw := &monitor{
logger: logger,
logCh: make(chan []byte, buf),
doneCh: make(chan struct{}),
bufSize: buf,
dropCheckInterval: 3 * time.Second,
droppedCount: atomic.NewUint32(0),
started: atomic.NewBool(false),
}
opts.Output = sw
sink := log.NewSinkAdapter(opts)
sw.sink = sink
return sw, nil
}
// Stop deregisters the sink and stops the monitoring process
func (d *monitor) Stop() {
d.logger.DeregisterSink(d.sink)
close(d.doneCh)
d.started.Store(false)
}
// Start registers a sink on the monitor's logger and starts sending
// received log messages over the returned channel.
func (d *monitor) Start() <-chan []byte {
// Check to see if this has already been started. If not, flag
// it and proceed. If so, bail out early.
if !d.started.CAS(false, true) {
return nil
}
// register our sink with the logger
d.logger.RegisterSink(d.sink)
streamCh := make(chan []byte, d.bufSize)
// Run a go routine that listens for streamed
// log messages and sends them to streamCh.
//
// It also periodically checks for dropped
// messages and makes room on the logCh to add
// a dropped message count warning
go func() {
defer close(streamCh)
ticker := time.NewTicker(d.dropCheckInterval)
defer ticker.Stop()
var logMessage []byte
for {
logMessage = nil
select {
case <-ticker.C:
// Check if there have been any dropped messages.
dc := d.droppedCount.Load()
if dc > 0 {
logMessage = []byte(fmt.Sprintf("Monitor dropped %d logs during monitor request\n", dc))
d.droppedCount.Swap(0)
}
case logMessage = <-d.logCh:
case <-d.doneCh:
return
}
if len(logMessage) > 0 {
select {
case <-d.doneCh:
return
case streamCh <- logMessage:
}
}
}
}()
return streamCh
}
// Write attempts to send latest log to logCh
// it drops the log if channel is unavailable to receive
func (d *monitor) Write(p []byte) (n int, err error) {
// ensure logCh is still open
select {
case <-d.doneCh:
return
default:
}
bytes := make([]byte, len(p))
copy(bytes, p)
select {
case d.logCh <- bytes:
default:
d.droppedCount.Add(1)
}
return len(p), nil
}