From c7b633b6c150feff46ee6ed5957244e0ddc96818 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Thu, 31 Oct 2019 09:59:24 -0400 Subject: [PATCH] lock in sub select rm redundant lock wip to use framing wip switch to stream frames --- api/agent.go | 41 +++++++++++--------- client/agent_endpoint.go | 72 +++++++++++++++++++++++++++++++---- client/agent_endpoint_test.go | 12 +++++- client/structs/structs.go | 3 ++ command/agent_monitor.go | 45 ++++++++++++---------- 5 files changed, 124 insertions(+), 49 deletions(-) diff --git a/api/agent.go b/api/agent.go index d3622acbc..bccb2c19f 100644 --- a/api/agent.go +++ b/api/agent.go @@ -1,7 +1,6 @@ package api import ( - "bufio" "encoding/json" "fmt" "net/url" @@ -244,25 +243,28 @@ type MonitorFrame struct { // Monitor returns a channel which will receive streaming logs from the agent // Providing a non-nil stopCh can be used to close the connection and stop log streaming -func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *MonitorFrame, error) { +func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { + + errCh := make(chan error, 1) r, err := a.client.newRequest("GET", "/v1/agent/monitor") if err != nil { - return nil, err + errCh <- err + return nil, errCh } r.setQueryOptions(q) _, resp, err := requireOK(a.client.doRequest(r)) if err != nil { - return nil, err + errCh <- err + return nil, errCh } - frames := make(chan *MonitorFrame, 10) + frames := make(chan *StreamFrame, 10) go func() { defer resp.Body.Close() - defer close(frames) - scanner := bufio.NewScanner(resp.Body) - LOOP: + dec := json.NewDecoder(resp.Body) + for { select { case <-stopCh: @@ -270,17 +272,20 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *Monito default: } - if scanner.Scan() { - var frame MonitorFrame - if bytes := scanner.Bytes(); len(bytes) > 0 { - frame.Data = bytes - frames <- &frame - } else { - frames <- &frame - } - } else { - break LOOP + // Decode the next frame + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + close(frames) + errCh <- err + return } + + // Discard heartbeat frame + if frame.IsHeartbeat() { + continue + } + + frames <- &frame } }() diff --git a/client/agent_endpoint.go b/client/agent_endpoint.go index e1b6e31ec..e15e74da4 100644 --- a/client/agent_endpoint.go +++ b/client/agent_endpoint.go @@ -1,6 +1,7 @@ package client import ( + "bytes" "context" "errors" "io" @@ -14,6 +15,7 @@ import ( metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" ) @@ -27,6 +29,10 @@ func NewAgentEndpoint(c *Client) *Agent { return m } +type monitorFrame struct { + Data []byte `json:",omitempty"` +} + func (m *Agent) monitor(conn io.ReadWriteCloser) { defer metrics.MeasureSince([]string{"client", "monitor", "monitor"}, time.Now()) defer conn.Close() @@ -70,6 +76,17 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { Level: logLevel, }) + frames := make(chan *sframer.StreamFrame, streamFramesBuffer) + errCh := make(chan error) + var buf bytes.Buffer + frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + + // framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 64*1024) + framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024) + framer.Run() + defer framer.Destroy() + + // goroutine to detect remote side closing go func() { if _, err := conn.Read(nil); err != nil { // One end of the pipe explicitly closed, exit @@ -83,14 +100,59 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { }() logCh := monitor.Start(stopCh) + initialOffset := int64(0) + + // receive logs and build frames + go func() { + defer framer.Destroy() + LOOP: + for { + select { + case log := <-logCh: + if err := framer.Send("", "log", log, initialOffset); err != nil { + select { + case errCh <- err: + case <-ctx.Done(): + } + break LOOP + } + case <-ctx.Done(): + break LOOP + } + } + }() var streamErr error OUTER: for { select { - case log := <-logCh: + case frame, ok := <-frames: + if !ok { + // frame may have been closed when an error + // occurred. Check once more for an error. + select { + case streamErr = <-errCh: + // There was a pending error! + default: + // No error, continue on + } + + break OUTER + } + var resp cstructs.StreamErrWrapper - resp.Payload = log + if args.PlainText { + resp.Payload = frame.Data + } else { + if err := frameCodec.Encode(frame); err != nil { + streamErr = err + break OUTER + } + + resp.Payload = buf.Bytes() + buf.Reset() + } + if err := encoder.Encode(resp); err != nil { streamErr = err break OUTER @@ -106,11 +168,5 @@ OUTER: if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") { return } - - // Attempt to send the error - encoder.Encode(&cstructs.StreamErrWrapper{ - Error: cstructs.NewRpcError(streamErr, helper.Int64ToPtr(500)), - }) - return } } diff --git a/client/agent_endpoint_test.go b/client/agent_endpoint_test.go index 38acd6fe6..14f53cc6a 100644 --- a/client/agent_endpoint_test.go +++ b/client/agent_endpoint_test.go @@ -1,6 +1,7 @@ package client import ( + "encoding/json" "fmt" "io" "net" @@ -10,11 +11,13 @@ import ( "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client/config" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/ugorji/go/codec" ) @@ -71,7 +74,7 @@ func TestMonitor_Monitor(t *testing.T) { encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) - timeout := time.After(1 * time.Second) + timeout := time.After(5 * time.Second) expected := "[DEBUG]" received := "" @@ -86,7 +89,12 @@ OUTER: if msg.Error != nil { t.Fatalf("Got error: %v", msg.Error.Error()) } - received += string(msg.Payload) + + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) if strings.Contains(received, expected) { require.Nil(p2.Close()) break OUTER diff --git a/client/structs/structs.go b/client/structs/structs.go index 350e9de69..6a84e2d63 100644 --- a/client/structs/structs.go +++ b/client/structs/structs.go @@ -44,6 +44,9 @@ type MonitorRequest struct { // NodeID is the node we want to track the logs of NodeID string + // PlainText disables base64 encoding. + PlainText bool + structs.QueryOptions } diff --git a/command/agent_monitor.go b/command/agent_monitor.go index a369fbb48..ba5dbd0df 100644 --- a/command/agent_monitor.go +++ b/command/agent_monitor.go @@ -2,11 +2,13 @@ package command import ( "fmt" + "io" "os" "os/signal" "strconv" "strings" "syscall" + "time" "github.com/hashicorp/nomad/api" "github.com/mitchellh/cli" @@ -88,38 +90,39 @@ func (c *MonitorCommand) Run(args []string) int { query := &api.QueryOptions{ Params: params, } + eventDoneCh := make(chan struct{}) - frames, err := client.Agent().Monitor(eventDoneCh, query) - if err != nil { + frames, errCh := client.Agent().Monitor(eventDoneCh, query) + select { + case err := <-errCh: c.Ui.Error(fmt.Sprintf("Error starting monitor: %s", err)) c.Ui.Error(commandErrorText(c)) return 1 + default: } - go func() { - defer close(eventDoneCh) - OUTER: - for { - select { - case frame, ok := <-frames: - if !ok { - break OUTER - } - c.Ui.Output(string(frame.Data)) - } - } + // Create a reader + var r io.ReadCloser + frameReader := api.NewFrameReader(frames, errCh, eventDoneCh) + frameReader.SetUnblockTime(500 * time.Millisecond) + r = frameReader - }() + defer r.Close() signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - select { - case <-eventDoneCh: - c.Ui.Error("Remote side ended the monitor! This usually means that the\n" + - "remote side has exited or crashed.") + go func() { + <-signalCh + // End the streaming + r.Close() + }() + + _, err = io.Copy(os.Stdout, r) + if err != nil { + c.Ui.Error(fmt.Sprintf("error monitoring logs: %s", err)) return 1 - case <-signalCh: - return 0 } + + return 0 }