From ffb174b596078247c5fe067acf57363af19a7bfa Mon Sep 17 00:00:00 2001 From: Arkadiusz Date: Tue, 4 Jan 2022 20:07:16 +0100 Subject: [PATCH] Fix log streaming missing frames (#11721) Perform one more read after receiving cancel when streaming file from the allocation API --- .changelog/11721.txt | 3 +++ client/fs_endpoint.go | 43 +++++++++++++++++++++++++------------- client/fs_endpoint_test.go | 34 +++++++++++++++++++++++------- 3 files changed, 58 insertions(+), 22 deletions(-) create mode 100644 .changelog/11721.txt diff --git a/.changelog/11721.txt b/.changelog/11721.txt new file mode 100644 index 000000000..f6a665305 --- /dev/null +++ b/.changelog/11721.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: Fixed a bug where the allocation log streaming API was missing log frames that spanned log file rotation +``` diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 87644bd0a..2796c45c2 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -246,18 +246,14 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { defer framer.Destroy() // If we aren't following end as soon as we hit EOF - var eofCancelCh chan error - if !req.Follow { - eofCancelCh = make(chan error) - close(eofCancelCh) - } + cancelAfterFirstEof := !req.Follow ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Start streaming go func() { - if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, eofCancelCh); err != nil { + if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, nil, cancelAfterFirstEof); err != nil { select { case errCh <- err: case <-ctx.Done(): @@ -578,21 +574,21 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in } var eofCancelCh chan error + cancelAfterFirstEof := false exitAfter := false if !follow && idx > maxIndex { // Exceeded what was there initially so return return nil } else if !follow && idx == maxIndex { // At the end - eofCancelCh = make(chan error) - close(eofCancelCh) + cancelAfterFirstEof = true exitAfter = true } else { eofCancelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1) } p := filepath.Join(logPath, logEntry.Name) - err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh) + err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, cancelAfterFirstEof) // Check if the context is cancelled select { @@ -637,10 +633,11 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in // streamFile is the internal method to stream the content of a file. If limit // is greater than zero, the stream will end once that many bytes have been -// read. eofCancelCh is used to cancel the stream if triggered while at EOF. If -// the connection is broken an EPIPE error is returned +// read. If eofCancelCh is triggered while at EOF, read one more frame and +// cancel the stream on the next EOF. If the connection is broken an EPIPE +// error is returned. func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64, - fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error) error { + fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, cancelAfterFirstEof bool) error { // Get the reader file, err := fs.ReadAt(path, offset) @@ -667,6 +664,9 @@ func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, // read and reach EOF. var changes *watch.FileChanges + // Only watch file when there is a need for it + cancelReceived := cancelAfterFirstEof + // Start streaming the data bufSize := int64(streamFrameSize) if limit > 0 && limit < streamFrameSize { @@ -704,6 +704,14 @@ OUTER: continue } + // At this point we can stop without waiting for more changes, + // because we have EOF and either we're not following at all, + // or we received an event from the eofCancelCh channel + // and last read was executed + if cancelReceived { + return nil + } + // If EOF is hit, wait for a change to the file if changes == nil { changes, err = fs.ChangeEvents(waitCtx, path, offset) @@ -752,12 +760,19 @@ OUTER: return nil case <-ctx.Done(): return nil - case err, ok := <-eofCancelCh: + case _, ok := <-eofCancelCh: if !ok { return nil } - return err + if err != nil { + return err + } + + // try to read one more frame to avoid dropped entries + // during log rotation + cancelReceived = true + continue OUTER } } } diff --git a/client/fs_endpoint_test.go b/client/fs_endpoint_test.go index fa650df27..76fad1847 100644 --- a/client/fs_endpoint_test.go +++ b/client/fs_endpoint_test.go @@ -1568,7 +1568,7 @@ func TestFS_streamFile_NoFile(t *testing.T) { defer framer.Destroy() err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, "foo", 0, ad, framer, nil) + context.Background(), 0, "foo", 0, ad, framer, nil, false) require.Error(t, err) if runtime.GOOS == "windows" { require.Contains(t, err.Error(), "cannot find the file") @@ -1629,7 +1629,7 @@ func TestFS_streamFile_Modify(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -1704,7 +1704,7 @@ func TestFS_streamFile_Truncate(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -1808,7 +1808,7 @@ func TestFS_streamImpl_Delete(t *testing.T) { // Start streaming go func() { if err := c.endpoints.FileSystem.streamFile( - context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil { + context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -1918,14 +1918,30 @@ func TestFS_logsImpl_Follow(t *testing.T) { expected := []byte("012345") initialWrites := 3 - writeToFile := func(index int, data []byte) { + filePath := func(index int) string { logFile := fmt.Sprintf("%s.%s.%d", task, logType, index) - logFilePath := filepath.Join(logDir, logFile) + return filepath.Join(logDir, logFile) + } + writeToFile := func(index int, data []byte) { + logFilePath := filePath(index) err := ioutil.WriteFile(logFilePath, data, 0777) if err != nil { t.Fatalf("Failed to create file: %v", err) } } + appendToFile := func(index int, data []byte) { + logFilePath := filePath(index) + f, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_WRONLY, 0600) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + + defer f.Close() + + if _, err = f.Write(data); err != nil { + t.Fatalf("Failed to write file: %v", err) + } + } for i := 0; i < initialWrites; i++ { writeToFile(i, expected[i:i+1]) } @@ -1967,11 +1983,13 @@ func TestFS_logsImpl_Follow(t *testing.T) { t.Fatalf("did not receive data: got %q", string(received)) } - // We got the first chunk of data, write out the rest to the next file + // We got the first chunk of data, write out the rest splitted + // between the last file and to the next file // at an index much ahead to check that it is following and detecting // skips skipTo := initialWrites + 10 - writeToFile(skipTo, expected[initialWrites:]) + appendToFile(initialWrites-1, expected[initialWrites:initialWrites+1]) + writeToFile(skipTo, expected[initialWrites+1:]) select { case <-fullResultCh: