Merge pull request #1047 from hashicorp/b-log-rotator

Fixing race issues in log rotator
This commit is contained in:
Diptanu Choudhury 2016-04-06 19:26:53 -07:00
commit b51a44dde0

View file

@ -14,7 +14,8 @@ import (
"time"
)
var (
const (
bufSize = 32768
flushDur = 100 * time.Millisecond
)
@ -31,6 +32,7 @@ type FileRotator struct {
currentFile *os.File // currentFile is the file that is currently getting written
currentWr int64 // currentWr is the number of bytes written to the current file
bufw *bufio.Writer
bufLock sync.Mutex
flushTicker *time.Ticker
logger *log.Logger
@ -74,9 +76,10 @@ func (f *FileRotator) Write(p []byte) (n int, err error) {
// Check if we still have space in the current file, otherwise close and
// open the next file
if f.currentWr >= f.FileSize {
f.bufw.Flush()
f.flushBuffer()
f.currentFile.Close()
if err := f.nextFile(); err != nil {
f.logger.Printf("[ERROR] driver.rotator: error creating next file: %v", err)
return 0, err
}
}
@ -88,12 +91,10 @@ func (f *FileRotator) Write(p []byte) (n int, err error) {
if remainingSize < int64(len(p[n:])) {
// Write the number of bytes that we can write on the current file
li := int64(n) + remainingSize
nw, err = f.bufw.Write(p[n:li])
//nw, err = f.currentFile.Write(p[n:li])
nw, err = f.writeToBuffer(p[n:li])
} else {
// Write all the bytes in the current file
nw, err = f.bufw.Write(p[n:])
//nw, err = f.currentFile.Write(p[n:])
nw, err = f.writeToBuffer(p[n:])
}
// Increment the number of bytes written so far in this method
@ -103,6 +104,7 @@ func (f *FileRotator) Write(p []byte) (n int, err error) {
// Increment the total number of bytes in the file
f.currentWr += int64(n)
if err != nil {
f.logger.Printf("[ERROR] driver.rotator: error writing to file: %v", err)
return
}
}
@ -179,11 +181,7 @@ func (f *FileRotator) createFile() error {
return err
}
f.currentWr = fi.Size()
if f.bufw == nil {
f.bufw = bufio.NewWriter(f.currentFile)
} else {
f.bufw.Reset(f.currentFile)
}
f.createOrResetBuffer()
return nil
}
@ -191,9 +189,7 @@ func (f *FileRotator) createFile() error {
// file
func (f *FileRotator) flushPeriodically() {
for _ = range f.flushTicker.C {
if f.bufw != nil {
f.bufw.Flush()
}
f.flushBuffer()
}
}
@ -203,9 +199,7 @@ func (f *FileRotator) Close() {
// Stop the ticker and flush for one last time
f.flushTicker.Stop()
if f.bufw != nil {
f.bufw.Flush()
}
f.flushBuffer()
// Stop the purge go routine
if !f.closed {
@ -258,3 +252,32 @@ func (f *FileRotator) purgeOldFiles() {
}
}
}
// flushBuffer flushes the buffer
func (f *FileRotator) flushBuffer() error {
f.bufLock.Lock()
defer f.bufLock.Unlock()
if f.bufw != nil {
return f.bufw.Flush()
}
return nil
}
// writeToBuffer writes the byte array to buffer
func (f *FileRotator) writeToBuffer(p []byte) (int, error) {
f.bufLock.Lock()
defer f.bufLock.Unlock()
return f.bufw.Write(p)
}
// createOrResetBuffer creates a new buffer if we don't have one otherwise
// resets the buffer
func (f *FileRotator) createOrResetBuffer() {
f.bufLock.Lock()
defer f.bufLock.Unlock()
if f.bufw == nil {
f.bufw = bufio.NewWriterSize(f.currentFile, bufSize)
} else {
f.bufw.Reset(f.currentFile)
}
}