diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index a70546e60..48cc5a16e 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -227,16 +227,18 @@ func (s *StreamFrame) IsHeartbeat() bool { // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { - out io.WriteCloser - enc *codec.Encoder - frameSize int - heartbeat *time.Ticker - flusher *time.Ticker + out io.WriteCloser + enc *codec.Encoder + encLock sync.Mutex + + frameSize int + + heartbeat *time.Ticker + flusher *time.Ticker + shutdownCh chan struct{} exitCh chan struct{} - outbound chan *StreamFrame - // The mutex protects everything below l sync.Mutex @@ -266,7 +268,6 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio frameSize: frameSize, heartbeat: heartbeat, flusher: flusher, - outbound: make(chan *StreamFrame), data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)), shutdownCh: make(chan struct{}), exitCh: make(chan struct{}), @@ -279,10 +280,11 @@ func (s *StreamFramer) Destroy() { close(s.shutdownCh) s.heartbeat.Stop() s.flusher.Stop() + running := s.running s.l.Unlock() // Ensure things were flushed - if s.running { + if running { <-s.exitCh } s.out.Close() @@ -309,90 +311,60 @@ func (s *StreamFramer) ExitCh() <-chan struct{} { // run is the internal run method. It exits if Destroy is called or an error // occurs, in which case the exit channel is closed. func (s *StreamFramer) run() { - // Store any error and mark it as not running var err error defer func() { close(s.exitCh) - s.l.Lock() - close(s.outbound) - s.Err = err s.running = false + s.Err = err s.l.Unlock() }() - // Start a heartbeat/flusher go-routine. This is done seprately to avoid blocking - // the outbound channel. - go func() { - for { - select { - case <-s.exitCh: - return - case <-s.shutdownCh: - return - case <-s.flusher.C: - // Skip if there is nothing to flush - s.l.Lock() - if s.f == nil { - s.l.Unlock() - continue - } - - // Read the data for the frame, and send it - s.f.Data = s.readData() - select { - case s.outbound <- s.f: - s.f = nil - case <-s.exitCh: - } - s.l.Unlock() - case <-s.heartbeat.C: - // Send a heartbeat frame - s.l.Lock() - select { - case s.outbound <- &StreamFrame{}: - default: - } - s.l.Unlock() - } - } - }() - OUTER: for { select { case <-s.shutdownCh: break OUTER - case o := <-s.outbound: - // Send the frame - if err = s.enc.Encode(o); err != nil { - return + case <-s.flusher.C: + // Skip if there is nothing to flush + s.l.Lock() + if s.f == nil { + s.l.Unlock() + continue } - } - } - // Flush any existing frames -FLUSH: - for { - select { - case o := <-s.outbound: - // Send the frame and then clear the current working frame - if err = s.enc.Encode(o); err != nil { + // Read the data for the frame, and send it + s.f.Data = s.readData() + err = s.send(s.f) + s.f = nil + s.l.Unlock() + if err != nil { + return + } + case <-s.heartbeat.C: + // Send a heartbeat frame + if err = s.send(&StreamFrame{}); err != nil { return } - default: - break FLUSH } } s.l.Lock() if s.f != nil { s.f.Data = s.readData() - s.enc.Encode(s.f) + err = s.send(s.f) + s.f = nil } s.l.Unlock() } +// send takes a StreamFrame, encodes and sends it +func (s *StreamFramer) send(f *StreamFrame) error { + s.encLock.Lock() + defer s.encLock.Unlock() + return s.enc.Encode(f) +} + // readData is a helper which reads the buffered data returning up to the frame // size of data. Must be called with the lock held. The returned value is // invalid on the next read or write into the StreamFramer buffer @@ -424,6 +396,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e if s.Err != nil { return s.Err } + return fmt.Errorf("StreamFramer not running") } @@ -435,8 +408,12 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e select { case <-s.exitCh: return nil - case s.outbound <- &f: - s.f = nil + default: + } + err := s.send(&f) + s.f = nil + if err != nil { + return err } } @@ -457,11 +434,16 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e select { case <-s.exitCh: return nil - case s.outbound <- &StreamFrame{ + default: + } + + f := &StreamFrame{ Offset: s.f.Offset, File: s.f.File, FileEvent: s.f.FileEvent, - }: + } + if err := s.send(f); err != nil { + return err } } @@ -472,12 +454,17 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e select { case <-s.exitCh: return nil - case s.outbound <- &StreamFrame{ + default: + } + + f := &StreamFrame{ Offset: s.f.Offset, File: s.f.File, FileEvent: s.f.FileEvent, Data: d, - }: + } + if err := s.send(f); err != nil { + return err } } @@ -866,6 +853,10 @@ func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logT scanCh := time.Tick(nextLogCheckRate) for { select { + case <-t.Dead(): + next <- fmt.Errorf("shutdown triggered") + close(next) + return case err := <-eofCancelCh: next <- err close(next)