improve monitor performance (#10368)

* remove flush for each write to http response in the agent monitor endpoint

* fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover.

* start log reading goroutine before adding the sink to avoid filling the log channel before getting a chance of reading from it

* flush every 500ms to optimize log writing in the http server side.

* add changelog file

* add issue url to changelog

* fix changelog url

* Update changelog

Co-authored-by: Daniel Nephin <dnephin@hashicorp.com>

* use ticker to flush and avoid race condition when flushing in a different goroutine

* stop the ticker when done

Co-authored-by: Daniel Nephin <dnephin@hashicorp.com>

* Revert "fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover."

This reverts commit 1eeddf7a

* wait for log consumer loop to start before registering the sink

Co-authored-by: Daniel Nephin <dnephin@hashicorp.com>
This commit is contained in:
Dhia Ayachi 2021-06-15 12:05:52 -04:00 committed by GitHub
parent 82862e2a89
commit 4b75f15fb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 5 deletions

3
.changelog/10368.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
monitoring: optimize the monitoring endpoint to avoid losing logs when under high load.
```

View File

@ -5,6 +5,7 @@ import (
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
@ -1204,6 +1205,9 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request)
// a gzip stream it will go ahead and write out the HTTP response header // a gzip stream it will go ahead and write out the HTTP response header
resp.Write([]byte("")) resp.Write([]byte(""))
flusher.Flush() flusher.Flush()
const flushDelay = 200 * time.Millisecond
flushTicker := time.NewTicker(flushDelay)
defer flushTicker.Stop()
// Stream logs until the connection is closed. // Stream logs until the connection is closed.
for { for {
@ -1213,9 +1217,13 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request)
if droppedCount > 0 { if droppedCount > 0 {
s.agent.logger.Warn("Dropped logs during monitor request", "dropped_count", droppedCount) s.agent.logger.Warn("Dropped logs during monitor request", "dropped_count", droppedCount)
} }
flusher.Flush()
return nil, nil return nil, nil
case log := <-logsCh: case log := <-logsCh:
fmt.Fprint(resp, string(log)) fmt.Fprint(resp, string(log))
case <-flushTicker.C:
flusher.Flush() flusher.Flush()
} }
} }

View File

@ -2,6 +2,7 @@ package monitor
import ( import (
"errors" "errors"
"sync"
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
) )
@ -80,21 +81,22 @@ func (d *monitor) Stop() int {
// received log messages over the returned channel. // received log messages over the returned channel.
func (d *monitor) Start() <-chan []byte { func (d *monitor) Start() <-chan []byte {
// register our sink with the logger // register our sink with the logger
d.logger.RegisterSink(d.sink)
streamCh := make(chan []byte, d.bufSize) streamCh := make(chan []byte, d.bufSize)
// run a go routine that listens for streamed // run a go routine that listens for streamed
// log messages and sends them to streamCh // log messages and sends them to streamCh
wg := new(sync.WaitGroup)
wg.Add(1)
go func() { go func() {
defer close(streamCh) defer close(streamCh)
wg.Done()
for { for {
select { select {
case log := <-d.logCh: case logLine := <-d.logCh:
select { select {
case <-d.doneCh: case <-d.doneCh:
return return
case streamCh <- log: case streamCh <- logLine:
} }
case <-d.doneCh: case <-d.doneCh:
return return
@ -102,6 +104,9 @@ func (d *monitor) Start() <-chan []byte {
} }
}() }()
//wait for the consumer loop to start before registering the sink to avoid filling the log channel
wg.Wait()
d.logger.RegisterSink(d.sink)
return streamCh return streamCh
} }