From 6478167e19d5d282b706484913467e2cfb454e3a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 13 Jan 2017 13:12:36 -0800 Subject: [PATCH] plain w --- command/agent/fs_endpoint.go | 50 ++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index d84411e2b..101115534 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -255,6 +255,8 @@ func (s *StreamFrame) IsCleared() bool { // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { + plainTxt bool + out io.WriteCloser enc *codec.Encoder encLock sync.Mutex @@ -282,15 +284,20 @@ type StreamFramer struct { // NewStreamFramer creates a new stream framer that will output StreamFrames to // the passed output. -func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { +func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { // Create a JSON encoder enc := codec.NewEncoder(out, jsonHandle) // Create the heartbeat and flush ticker - heartbeat := time.NewTicker(heartbeatRate) + var heartbeat *time.Ticker + if !plainTxt { + heartbeat = time.NewTicker(heartbeatRate) + } + flusher := time.NewTicker(batchWindow) return &StreamFramer{ + plainTxt: plainTxt, out: out, enc: enc, frameSize: frameSize, @@ -306,7 +313,9 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio func (s *StreamFramer) Destroy() { s.l.Lock() close(s.shutdownCh) - s.heartbeat.Stop() + if s.heartbeat != nil { + s.heartbeat.Stop() + } s.flusher.Stop() running := s.running s.l.Unlock() @@ -348,6 +357,11 @@ func (s *StreamFramer) run() { s.l.Unlock() }() + var heartbeat <-chan time.Time + if s.heartbeat != nil { + heartbeat = s.heartbeat.C + } + OUTER: for { select { @@ -369,7 +383,7 @@ OUTER: if err != nil { return } - case <-s.heartbeat.C: + case <-heartbeat: // Send a heartbeat frame if err = s.send(HeartbeatStreamFrame); err != nil { return @@ -390,6 +404,10 @@ OUTER: func (s *StreamFramer) send(f *StreamFrame) error { s.encLock.Lock() defer s.encLock.Unlock() + if s.plainTxt { + _, err := io.Copy(s.out, bytes.NewReader(f.Data)) + return err + } return s.enc.Encode(f) } @@ -549,7 +567,7 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf output := ioutils.NewWriteFlusher(resp) // Create the framer - framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(output, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -697,7 +715,7 @@ OUTER: // applied. Defaults to "start". func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, task, logType string - var follow bool + var plain, follow bool var err error q := req.URL.Query() @@ -710,8 +728,18 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac return nil, taskNotPresentErr } - if follow, err = strconv.ParseBool(q.Get("follow")); err != nil { - return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err) + followStr := q.Get("follow") + if followStr != "" { + if follow, err = strconv.ParseBool(followStr); err != nil { + return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err) + } + } + + plainStr := q.Get("plain") + if plainStr != "" { + if plain, err = strconv.ParseBool(plainStr); err != nil { + return nil, fmt.Errorf("Failed to parse plain field to boolean: %v", err) + } } logType = q.Get("type") @@ -747,15 +775,15 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) - return nil, s.logs(follow, offset, origin, task, logType, fs, output) + return nil, s.logs(follow, plain, offset, origin, task, logType, fs, output) } -func (s *HTTPServer) logs(follow bool, offset int64, +func (s *HTTPServer) logs(follow, plain bool, offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error { // Create the framer - framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(output, plain, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy()