Fix log streaming missing frames (#11721)

Perform one more read after receiving cancel when streaming file from the allocation API
This commit is contained in:
Arkadiusz 2022-01-04 20:07:16 +01:00 committed by GitHub
parent 1f4e100edc
commit ffb174b596
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 22 deletions

3
.changelog/11721.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
client: Fixed a bug where the allocation log streaming API was missing log frames that spanned log file rotation
```

View File

@ -246,18 +246,14 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
defer framer.Destroy()
// If we aren't following end as soon as we hit EOF
var eofCancelCh chan error
if !req.Follow {
eofCancelCh = make(chan error)
close(eofCancelCh)
}
cancelAfterFirstEof := !req.Follow
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start streaming
go func() {
if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, eofCancelCh); err != nil {
if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, nil, cancelAfterFirstEof); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
@ -578,21 +574,21 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in
}
var eofCancelCh chan error
cancelAfterFirstEof := false
exitAfter := false
if !follow && idx > maxIndex {
// Exceeded what was there initially so return
return nil
} else if !follow && idx == maxIndex {
// At the end
eofCancelCh = make(chan error)
close(eofCancelCh)
cancelAfterFirstEof = true
exitAfter = true
} else {
eofCancelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1)
}
p := filepath.Join(logPath, logEntry.Name)
err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh)
err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, cancelAfterFirstEof)
// Check if the context is cancelled
select {
@ -637,10 +633,11 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in
// streamFile is the internal method to stream the content of a file. If limit
// is greater than zero, the stream will end once that many bytes have been
// read. eofCancelCh is used to cancel the stream if triggered while at EOF. If
// the connection is broken an EPIPE error is returned
// read. If eofCancelCh is triggered while at EOF, read one more frame and
// cancel the stream on the next EOF. If the connection is broken an EPIPE
// error is returned.
func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64,
fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error) error {
fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, cancelAfterFirstEof bool) error {
// Get the reader
file, err := fs.ReadAt(path, offset)
@ -667,6 +664,9 @@ func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string,
// read and reach EOF.
var changes *watch.FileChanges
// Only watch file when there is a need for it
cancelReceived := cancelAfterFirstEof
// Start streaming the data
bufSize := int64(streamFrameSize)
if limit > 0 && limit < streamFrameSize {
@ -704,6 +704,14 @@ OUTER:
continue
}
// At this point we can stop without waiting for more changes,
// because we have EOF and either we're not following at all,
// or we received an event from the eofCancelCh channel
// and last read was executed
if cancelReceived {
return nil
}
// If EOF is hit, wait for a change to the file
if changes == nil {
changes, err = fs.ChangeEvents(waitCtx, path, offset)
@ -752,12 +760,19 @@ OUTER:
return nil
case <-ctx.Done():
return nil
case err, ok := <-eofCancelCh:
case _, ok := <-eofCancelCh:
if !ok {
return nil
}
return err
if err != nil {
return err
}
// try to read one more frame to avoid dropped entries
// during log rotation
cancelReceived = true
continue OUTER
}
}
}

View File

@ -1568,7 +1568,7 @@ func TestFS_streamFile_NoFile(t *testing.T) {
defer framer.Destroy()
err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, "foo", 0, ad, framer, nil)
context.Background(), 0, "foo", 0, ad, framer, nil, false)
require.Error(t, err)
if runtime.GOOS == "windows" {
require.Contains(t, err.Error(), "cannot find the file")
@ -1629,7 +1629,7 @@ func TestFS_streamFile_Modify(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
@ -1704,7 +1704,7 @@ func TestFS_streamFile_Truncate(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
@ -1808,7 +1808,7 @@ func TestFS_streamImpl_Delete(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
@ -1918,14 +1918,30 @@ func TestFS_logsImpl_Follow(t *testing.T) {
expected := []byte("012345")
initialWrites := 3
writeToFile := func(index int, data []byte) {
filePath := func(index int) string {
logFile := fmt.Sprintf("%s.%s.%d", task, logType, index)
logFilePath := filepath.Join(logDir, logFile)
return filepath.Join(logDir, logFile)
}
writeToFile := func(index int, data []byte) {
logFilePath := filePath(index)
err := ioutil.WriteFile(logFilePath, data, 0777)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
}
appendToFile := func(index int, data []byte) {
logFilePath := filePath(index)
f, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
defer f.Close()
if _, err = f.Write(data); err != nil {
t.Fatalf("Failed to write file: %v", err)
}
}
for i := 0; i < initialWrites; i++ {
writeToFile(i, expected[i:i+1])
}
@ -1967,11 +1983,13 @@ func TestFS_logsImpl_Follow(t *testing.T) {
t.Fatalf("did not receive data: got %q", string(received))
}
// We got the first chunk of data, write out the rest to the next file
// We got the first chunk of data, write out the rest splitted
// between the last file and to the next file
// at an index much ahead to check that it is following and detecting
// skips
skipTo := initialWrites + 10
writeToFile(skipTo, expected[initialWrites:])
appendToFile(initialWrites-1, expected[initialWrites:initialWrites+1])
writeToFile(skipTo, expected[initialWrites+1:])
select {
case <-fullResultCh: