lock in sub select

rm redundant lock

wip to use framing

wip switch to stream frames
This commit is contained in:
Drew Bailey 2019-10-31 09:59:24 -04:00
parent fb23c1325d
commit c7b633b6c1
No known key found for this signature in database
GPG Key ID: FBA61B9FB7CCE1A7
5 changed files with 124 additions and 49 deletions

View File

@ -1,7 +1,6 @@
package api
import (
"bufio"
"encoding/json"
"fmt"
"net/url"
@ -244,25 +243,28 @@ type MonitorFrame struct {
// Monitor returns a channel which will receive streaming logs from the agent
// Providing a non-nil stopCh can be used to close the connection and stop log streaming
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *MonitorFrame, error) {
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
errCh := make(chan error, 1)
r, err := a.client.newRequest("GET", "/v1/agent/monitor")
if err != nil {
return nil, err
errCh <- err
return nil, errCh
}
r.setQueryOptions(q)
_, resp, err := requireOK(a.client.doRequest(r))
if err != nil {
return nil, err
errCh <- err
return nil, errCh
}
frames := make(chan *MonitorFrame, 10)
frames := make(chan *StreamFrame, 10)
go func() {
defer resp.Body.Close()
defer close(frames)
scanner := bufio.NewScanner(resp.Body)
LOOP:
dec := json.NewDecoder(resp.Body)
for {
select {
case <-stopCh:
@ -270,17 +272,20 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *Monito
default:
}
if scanner.Scan() {
var frame MonitorFrame
if bytes := scanner.Bytes(); len(bytes) > 0 {
frame.Data = bytes
frames <- &frame
} else {
frames <- &frame
}
} else {
break LOOP
// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
close(frames)
errCh <- err
return
}
// Discard heartbeat frame
if frame.IsHeartbeat() {
continue
}
frames <- &frame
}
}()

View File

@ -1,6 +1,7 @@
package client
import (
"bytes"
"context"
"errors"
"io"
@ -14,6 +15,7 @@ import (
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
)
@ -27,6 +29,10 @@ func NewAgentEndpoint(c *Client) *Agent {
return m
}
type monitorFrame struct {
Data []byte `json:",omitempty"`
}
func (m *Agent) monitor(conn io.ReadWriteCloser) {
defer metrics.MeasureSince([]string{"client", "monitor", "monitor"}, time.Now())
defer conn.Close()
@ -70,6 +76,17 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
Level: logLevel,
})
frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
errCh := make(chan error)
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)
// framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 64*1024)
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
framer.Run()
defer framer.Destroy()
// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe explicitly closed, exit
@ -83,14 +100,59 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
}()
logCh := monitor.Start(stopCh)
initialOffset := int64(0)
// receive logs and build frames
go func() {
defer framer.Destroy()
LOOP:
for {
select {
case log := <-logCh:
if err := framer.Send("", "log", log, initialOffset); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
}()
var streamErr error
OUTER:
for {
select {
case log := <-logCh:
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
// There was a pending error!
default:
// No error, continue on
}
break OUTER
}
var resp cstructs.StreamErrWrapper
resp.Payload = log
if args.PlainText {
resp.Payload = frame.Data
} else {
if err := frameCodec.Encode(frame); err != nil {
streamErr = err
break OUTER
}
resp.Payload = buf.Bytes()
buf.Reset()
}
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
@ -106,11 +168,5 @@ OUTER:
if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") {
return
}
// Attempt to send the error
encoder.Encode(&cstructs.StreamErrWrapper{
Error: cstructs.NewRpcError(streamErr, helper.Int64ToPtr(500)),
})
return
}
}

View File

@ -1,6 +1,7 @@
package client
import (
"encoding/json"
"fmt"
"io"
"net"
@ -10,11 +11,13 @@ import (
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client/config"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ugorji/go/codec"
)
@ -71,7 +74,7 @@ func TestMonitor_Monitor(t *testing.T) {
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(1 * time.Second)
timeout := time.After(5 * time.Second)
expected := "[DEBUG]"
received := ""
@ -86,7 +89,12 @@ OUTER:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}
received += string(msg.Payload)
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)
received += string(frame.Data)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER

View File

@ -44,6 +44,9 @@ type MonitorRequest struct {
// NodeID is the node we want to track the logs of
NodeID string
// PlainText disables base64 encoding.
PlainText bool
structs.QueryOptions
}

View File

@ -2,11 +2,13 @@ package command
import (
"fmt"
"io"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/hashicorp/nomad/api"
"github.com/mitchellh/cli"
@ -88,38 +90,39 @@ func (c *MonitorCommand) Run(args []string) int {
query := &api.QueryOptions{
Params: params,
}
eventDoneCh := make(chan struct{})
frames, err := client.Agent().Monitor(eventDoneCh, query)
if err != nil {
frames, errCh := client.Agent().Monitor(eventDoneCh, query)
select {
case err := <-errCh:
c.Ui.Error(fmt.Sprintf("Error starting monitor: %s", err))
c.Ui.Error(commandErrorText(c))
return 1
default:
}
go func() {
defer close(eventDoneCh)
OUTER:
for {
select {
case frame, ok := <-frames:
if !ok {
break OUTER
}
c.Ui.Output(string(frame.Data))
}
}
// Create a reader
var r io.ReadCloser
frameReader := api.NewFrameReader(frames, errCh, eventDoneCh)
frameReader.SetUnblockTime(500 * time.Millisecond)
r = frameReader
}()
defer r.Close()
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
select {
case <-eventDoneCh:
c.Ui.Error("Remote side ended the monitor! This usually means that the\n" +
"remote side has exited or crashed.")
go func() {
<-signalCh
// End the streaming
r.Close()
}()
_, err = io.Copy(os.Stdout, r)
if err != nil {
c.Ui.Error(fmt.Sprintf("error monitoring logs: %s", err))
return 1
case <-signalCh:
return 0
}
return 0
}