Fixing race issues in the log rotator

This commit is contained in:
Diptanu Choudhury 2016-04-06 18:35:08 -07:00
parent 58f13db9f2
commit 47e2c6bbfc

View file

@ -14,7 +14,8 @@ import (
"time"
)
var (
const (
bufSize = 32768
flushDur = 100 * time.Millisecond
)
@ -31,6 +32,7 @@ type FileRotator struct {
currentFile *os.File // currentFile is the file that is currently getting written
currentWr int64 // currentWr is the number of bytes written to the current file
bufw *bufio.Writer
bufLock sync.Mutex
flushTicker *time.Ticker
logger *log.Logger
@ -74,7 +76,7 @@ func (f *FileRotator) Write(p []byte) (n int, err error) {
// Check if we still have space in the current file, otherwise close and
// open the next file
if f.currentWr >= f.FileSize {
f.bufw.Flush()
f.flushBuffer()
f.currentFile.Close()
if err := f.nextFile(); err != nil {
f.logger.Printf("[ERROR] driver.rotator: error creating next file: %v", err)
@ -89,12 +91,10 @@ func (f *FileRotator) Write(p []byte) (n int, err error) {
if remainingSize < int64(len(p[n:])) {
// Write the number of bytes that we can write on the current file
li := int64(n) + remainingSize
nw, err = f.bufw.Write(p[n:li])
//nw, err = f.currentFile.Write(p[n:li])
nw, err = f.writeToBuffer(p[n:li])
} else {
// Write all the bytes in the current file
nw, err = f.bufw.Write(p[n:])
//nw, err = f.currentFile.Write(p[n:])
nw, err = f.writeToBuffer(p[n:])
}
// Increment the number of bytes written so far in this method
@ -181,11 +181,7 @@ func (f *FileRotator) createFile() error {
return err
}
f.currentWr = fi.Size()
if f.bufw == nil {
f.bufw = bufio.NewWriter(f.currentFile)
} else {
f.bufw.Reset(f.currentFile)
}
f.createOrResetBuffer()
return nil
}
@ -193,9 +189,7 @@ func (f *FileRotator) createFile() error {
// file
func (f *FileRotator) flushPeriodically() {
for _ = range f.flushTicker.C {
if f.bufw != nil {
f.bufw.Flush()
}
f.flushBuffer()
}
}
@ -205,9 +199,7 @@ func (f *FileRotator) Close() {
// Stop the ticker and flush for one last time
f.flushTicker.Stop()
if f.bufw != nil {
f.bufw.Flush()
}
f.flushBuffer()
// Stop the purge go routine
if !f.closed {
@ -260,3 +252,32 @@ func (f *FileRotator) purgeOldFiles() {
}
}
}
// flushBuffer flushes the buffer
func (f *FileRotator) flushBuffer() error {
f.bufLock.Lock()
defer f.bufLock.Unlock()
if f.bufw != nil {
return f.bufw.Flush()
}
return nil
}
// writeToBuffer writes the byte array to buffer
func (f *FileRotator) writeToBuffer(p []byte) (int, error) {
f.bufLock.Lock()
defer f.bufLock.Unlock()
return f.bufw.Write(p)
}
// createOrResetBuffer creates a new buffer if we don't have one otherwise
// resets the buffer
func (f *FileRotator) createOrResetBuffer() {
f.bufLock.Lock()
defer f.bufLock.Unlock()
if f.bufw == nil {
f.bufw = bufio.NewWriterSize(f.currentFile, bufSize)
} else {
f.bufw.Reset(f.currentFile)
}
}