Fixes race on StreamFramer Destroy
This PR: * Fixes a race in which StreamFramer could panic while closing. * Simplifies the logic of the StreamFramer * Removes a potentially leaked goroutine
This commit is contained in:
parent
c8ddd98e3b
commit
7ead95c333
|
@ -227,16 +227,18 @@ func (s *StreamFrame) IsHeartbeat() bool {
|
|||
|
||||
// StreamFramer is used to buffer and send frames as well as heartbeat.
|
||||
type StreamFramer struct {
|
||||
out io.WriteCloser
|
||||
enc *codec.Encoder
|
||||
frameSize int
|
||||
heartbeat *time.Ticker
|
||||
flusher *time.Ticker
|
||||
out io.WriteCloser
|
||||
enc *codec.Encoder
|
||||
encLock sync.Mutex
|
||||
|
||||
frameSize int
|
||||
|
||||
heartbeat *time.Ticker
|
||||
flusher *time.Ticker
|
||||
|
||||
shutdownCh chan struct{}
|
||||
exitCh chan struct{}
|
||||
|
||||
outbound chan *StreamFrame
|
||||
|
||||
// The mutex protects everything below
|
||||
l sync.Mutex
|
||||
|
||||
|
@ -266,7 +268,6 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio
|
|||
frameSize: frameSize,
|
||||
heartbeat: heartbeat,
|
||||
flusher: flusher,
|
||||
outbound: make(chan *StreamFrame),
|
||||
data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)),
|
||||
shutdownCh: make(chan struct{}),
|
||||
exitCh: make(chan struct{}),
|
||||
|
@ -279,10 +280,11 @@ func (s *StreamFramer) Destroy() {
|
|||
close(s.shutdownCh)
|
||||
s.heartbeat.Stop()
|
||||
s.flusher.Stop()
|
||||
running := s.running
|
||||
s.l.Unlock()
|
||||
|
||||
// Ensure things were flushed
|
||||
if s.running {
|
||||
if running {
|
||||
<-s.exitCh
|
||||
}
|
||||
s.out.Close()
|
||||
|
@ -309,90 +311,60 @@ func (s *StreamFramer) ExitCh() <-chan struct{} {
|
|||
// run is the internal run method. It exits if Destroy is called or an error
|
||||
// occurs, in which case the exit channel is closed.
|
||||
func (s *StreamFramer) run() {
|
||||
// Store any error and mark it as not running
|
||||
var err error
|
||||
defer func() {
|
||||
close(s.exitCh)
|
||||
|
||||
s.l.Lock()
|
||||
close(s.outbound)
|
||||
s.Err = err
|
||||
s.running = false
|
||||
s.Err = err
|
||||
s.l.Unlock()
|
||||
}()
|
||||
|
||||
// Start a heartbeat/flusher go-routine. This is done seprately to avoid blocking
|
||||
// the outbound channel.
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.exitCh:
|
||||
return
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
case <-s.flusher.C:
|
||||
// Skip if there is nothing to flush
|
||||
s.l.Lock()
|
||||
if s.f == nil {
|
||||
s.l.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
// Read the data for the frame, and send it
|
||||
s.f.Data = s.readData()
|
||||
select {
|
||||
case s.outbound <- s.f:
|
||||
s.f = nil
|
||||
case <-s.exitCh:
|
||||
}
|
||||
s.l.Unlock()
|
||||
case <-s.heartbeat.C:
|
||||
// Send a heartbeat frame
|
||||
s.l.Lock()
|
||||
select {
|
||||
case s.outbound <- &StreamFrame{}:
|
||||
default:
|
||||
}
|
||||
s.l.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-s.shutdownCh:
|
||||
break OUTER
|
||||
case o := <-s.outbound:
|
||||
// Send the frame
|
||||
if err = s.enc.Encode(o); err != nil {
|
||||
return
|
||||
case <-s.flusher.C:
|
||||
// Skip if there is nothing to flush
|
||||
s.l.Lock()
|
||||
if s.f == nil {
|
||||
s.l.Unlock()
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush any existing frames
|
||||
FLUSH:
|
||||
for {
|
||||
select {
|
||||
case o := <-s.outbound:
|
||||
// Send the frame and then clear the current working frame
|
||||
if err = s.enc.Encode(o); err != nil {
|
||||
// Read the data for the frame, and send it
|
||||
s.f.Data = s.readData()
|
||||
err = s.send(s.f)
|
||||
s.f = nil
|
||||
s.l.Unlock()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
case <-s.heartbeat.C:
|
||||
// Send a heartbeat frame
|
||||
if err = s.send(&StreamFrame{}); err != nil {
|
||||
return
|
||||
}
|
||||
default:
|
||||
break FLUSH
|
||||
}
|
||||
}
|
||||
|
||||
s.l.Lock()
|
||||
if s.f != nil {
|
||||
s.f.Data = s.readData()
|
||||
s.enc.Encode(s.f)
|
||||
err = s.send(s.f)
|
||||
s.f = nil
|
||||
}
|
||||
s.l.Unlock()
|
||||
}
|
||||
|
||||
// send takes a StreamFrame, encodes and sends it
|
||||
func (s *StreamFramer) send(f *StreamFrame) error {
|
||||
s.encLock.Lock()
|
||||
defer s.encLock.Unlock()
|
||||
return s.enc.Encode(f)
|
||||
}
|
||||
|
||||
// readData is a helper which reads the buffered data returning up to the frame
|
||||
// size of data. Must be called with the lock held. The returned value is
|
||||
// invalid on the next read or write into the StreamFramer buffer
|
||||
|
@ -424,6 +396,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
|
|||
if s.Err != nil {
|
||||
return s.Err
|
||||
}
|
||||
|
||||
return fmt.Errorf("StreamFramer not running")
|
||||
}
|
||||
|
||||
|
@ -435,8 +408,12 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
|
|||
select {
|
||||
case <-s.exitCh:
|
||||
return nil
|
||||
case s.outbound <- &f:
|
||||
s.f = nil
|
||||
default:
|
||||
}
|
||||
err := s.send(&f)
|
||||
s.f = nil
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -457,11 +434,16 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
|
|||
select {
|
||||
case <-s.exitCh:
|
||||
return nil
|
||||
case s.outbound <- &StreamFrame{
|
||||
default:
|
||||
}
|
||||
|
||||
f := &StreamFrame{
|
||||
Offset: s.f.Offset,
|
||||
File: s.f.File,
|
||||
FileEvent: s.f.FileEvent,
|
||||
}:
|
||||
}
|
||||
if err := s.send(f); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -472,12 +454,17 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
|
|||
select {
|
||||
case <-s.exitCh:
|
||||
return nil
|
||||
case s.outbound <- &StreamFrame{
|
||||
default:
|
||||
}
|
||||
|
||||
f := &StreamFrame{
|
||||
Offset: s.f.Offset,
|
||||
File: s.f.File,
|
||||
FileEvent: s.f.FileEvent,
|
||||
Data: d,
|
||||
}:
|
||||
}
|
||||
if err := s.send(f); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -866,6 +853,10 @@ func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logT
|
|||
scanCh := time.Tick(nextLogCheckRate)
|
||||
for {
|
||||
select {
|
||||
case <-t.Dead():
|
||||
next <- fmt.Errorf("shutdown triggered")
|
||||
close(next)
|
||||
return
|
||||
case err := <-eofCancelCh:
|
||||
next <- err
|
||||
close(next)
|
||||
|
|
Loading…
Reference in New Issue