api: never return EOF from Logs error chan
Closing the frames chan is the only race-free way to signal to receivers that all frames have been sent and no errors have occurred. If EOF is sent on error chan receivers may not receive the last frame (or frames since the chan is buffered) before receiving the error. Closing frames is the idiomatic way of signaling there is no more data to be read from a chan.
This commit is contained in:
parent
6f0e2e808b
commit
949938534b
12
api/fs.go
12
api/fs.go
|
@ -254,10 +254,15 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
|
|||
// * cancel: A channel that when closed, streaming will end.
|
||||
//
|
||||
// The return value is a channel that will emit StreamFrames as they are read.
|
||||
// The chan will be closed when follow=false and the end of the file is
|
||||
// reached.
|
||||
//
|
||||
// Unexpected (non-EOF) errors will be sent on the error chan.
|
||||
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
|
||||
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
|
@ -315,8 +320,11 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
|
|||
// Decode the next frame
|
||||
var frame StreamFrame
|
||||
if err := dec.Decode(&frame); err != nil {
|
||||
errCh <- err
|
||||
close(frames)
|
||||
if err == io.EOF || err == io.ErrClosedPipe {
|
||||
close(frames)
|
||||
} else {
|
||||
errCh <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue