This commit is contained in:
Alex Dadgar 2017-01-13 13:12:36 -08:00
parent 9154f8a8ce
commit 6478167e19

View file

@ -255,6 +255,8 @@ func (s *StreamFrame) IsCleared() bool {
// StreamFramer is used to buffer and send frames as well as heartbeat. // StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct { type StreamFramer struct {
plainTxt bool
out io.WriteCloser out io.WriteCloser
enc *codec.Encoder enc *codec.Encoder
encLock sync.Mutex encLock sync.Mutex
@ -282,15 +284,20 @@ type StreamFramer struct {
// NewStreamFramer creates a new stream framer that will output StreamFrames to // NewStreamFramer creates a new stream framer that will output StreamFrames to
// the passed output. // the passed output.
func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { func NewStreamFramer(out io.WriteCloser, plainTxt bool, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer {
// Create a JSON encoder // Create a JSON encoder
enc := codec.NewEncoder(out, jsonHandle) enc := codec.NewEncoder(out, jsonHandle)
// Create the heartbeat and flush ticker // Create the heartbeat and flush ticker
heartbeat := time.NewTicker(heartbeatRate) var heartbeat *time.Ticker
if !plainTxt {
heartbeat = time.NewTicker(heartbeatRate)
}
flusher := time.NewTicker(batchWindow) flusher := time.NewTicker(batchWindow)
return &StreamFramer{ return &StreamFramer{
plainTxt: plainTxt,
out: out, out: out,
enc: enc, enc: enc,
frameSize: frameSize, frameSize: frameSize,
@ -306,7 +313,9 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio
func (s *StreamFramer) Destroy() { func (s *StreamFramer) Destroy() {
s.l.Lock() s.l.Lock()
close(s.shutdownCh) close(s.shutdownCh)
s.heartbeat.Stop() if s.heartbeat != nil {
s.heartbeat.Stop()
}
s.flusher.Stop() s.flusher.Stop()
running := s.running running := s.running
s.l.Unlock() s.l.Unlock()
@ -348,6 +357,11 @@ func (s *StreamFramer) run() {
s.l.Unlock() s.l.Unlock()
}() }()
var heartbeat <-chan time.Time
if s.heartbeat != nil {
heartbeat = s.heartbeat.C
}
OUTER: OUTER:
for { for {
select { select {
@ -369,7 +383,7 @@ OUTER:
if err != nil { if err != nil {
return return
} }
case <-s.heartbeat.C: case <-heartbeat:
// Send a heartbeat frame // Send a heartbeat frame
if err = s.send(HeartbeatStreamFrame); err != nil { if err = s.send(HeartbeatStreamFrame); err != nil {
return return
@ -390,6 +404,10 @@ OUTER:
func (s *StreamFramer) send(f *StreamFrame) error { func (s *StreamFramer) send(f *StreamFrame) error {
s.encLock.Lock() s.encLock.Lock()
defer s.encLock.Unlock() defer s.encLock.Unlock()
if s.plainTxt {
_, err := io.Copy(s.out, bytes.NewReader(f.Data))
return err
}
return s.enc.Encode(f) return s.enc.Encode(f)
} }
@ -549,7 +567,7 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
output := ioutils.NewWriteFlusher(resp) output := ioutils.NewWriteFlusher(resp)
// Create the framer // Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer := NewStreamFramer(output, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run() framer.Run()
defer framer.Destroy() defer framer.Destroy()
@ -697,7 +715,7 @@ OUTER:
// applied. Defaults to "start". // applied. Defaults to "start".
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, task, logType string var allocID, task, logType string
var follow bool var plain, follow bool
var err error var err error
q := req.URL.Query() q := req.URL.Query()
@ -710,8 +728,18 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
return nil, taskNotPresentErr return nil, taskNotPresentErr
} }
if follow, err = strconv.ParseBool(q.Get("follow")); err != nil { followStr := q.Get("follow")
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err) if followStr != "" {
if follow, err = strconv.ParseBool(followStr); err != nil {
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err)
}
}
plainStr := q.Get("plain")
if plainStr != "" {
if plain, err = strconv.ParseBool(plainStr); err != nil {
return nil, fmt.Errorf("Failed to parse plain field to boolean: %v", err)
}
} }
logType = q.Get("type") logType = q.Get("type")
@ -747,15 +775,15 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
// Create an output that gets flushed on every write // Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp) output := ioutils.NewWriteFlusher(resp)
return nil, s.logs(follow, offset, origin, task, logType, fs, output) return nil, s.logs(follow, plain, offset, origin, task, logType, fs, output)
} }
func (s *HTTPServer) logs(follow bool, offset int64, func (s *HTTPServer) logs(follow, plain bool, offset int64,
origin, task, logType string, origin, task, logType string,
fs allocdir.AllocDirFS, output io.WriteCloser) error { fs allocdir.AllocDirFS, output io.WriteCloser) error {
// Create the framer // Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer := NewStreamFramer(output, plain, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run() framer.Run()
defer framer.Destroy() defer framer.Destroy()