open-nomad/client/logmon/logging/rotator.go

348 lines
9.2 KiB
Go
Raw Normal View History

2016-02-19 20:55:15 +00:00
package logging
import (
2016-02-23 16:51:21 +00:00
"bufio"
"bytes"
2016-02-19 20:55:15 +00:00
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
2016-03-22 05:05:06 +00:00
"sync"
2016-02-23 16:51:21 +00:00
"time"
hclog "github.com/hashicorp/go-hclog"
2016-02-19 20:55:15 +00:00
)
2016-04-07 01:35:08 +00:00
const (
2018-05-09 23:46:52 +00:00
// logBufferSize is the size of the buffer.
logBufferSize = 64 * 1024
2018-05-09 23:46:52 +00:00
// bufferFlushDuration is the duration at which we flush the buffer.
bufferFlushDuration = 100 * time.Millisecond
// lineScanLimit is the number of bytes we will attempt to scan for new
// lines when approaching the end of the file to avoid a log line being
// split between two files. Any single line that is greater than this limit
// may be split.
lineScanLimit = 32 * 1024
// newLineDelimiter is the delimiter used for new lines.
newLineDelimiter = '\n'
2016-02-19 20:55:15 +00:00
)
2016-02-19 22:11:31 +00:00
// FileRotator writes bytes to a rotated set of files
2016-02-19 20:55:15 +00:00
type FileRotator struct {
2016-02-20 05:58:44 +00:00
MaxFiles int // MaxFiles is the maximum number of rotated files allowed in a path
FileSize int64 // FileSize is the size a rotated file is allowed to grow
path string // path is the path on the file system where the rotated set of files are opened
baseFileName string // baseFileName is the base file name of the rotated files
logFileIdx int // logFileIdx is the current index of the rotated files
oldestLogFileIdx int // oldestLogFileIdx is the index of the oldest log file in a path
currentFile *os.File // currentFile is the file that is currently getting written
currentWr int64 // currentWr is the number of bytes written to the current file
2016-02-23 17:46:10 +00:00
bufw *bufio.Writer
2016-04-07 01:35:08 +00:00
bufLock sync.Mutex
2016-02-19 20:55:15 +00:00
2016-02-23 16:51:21 +00:00
flushTicker *time.Ticker
logger hclog.Logger
2016-02-23 16:51:21 +00:00
purgeCh chan struct{}
doneCh chan struct{}
2016-03-22 05:05:06 +00:00
closed bool
closedLock sync.Mutex
2016-02-19 20:55:15 +00:00
}
2016-02-19 22:11:31 +00:00
// NewFileRotator returns a new file rotator
2016-02-19 22:01:07 +00:00
func NewFileRotator(path string, baseFile string, maxFiles int,
fileSize int64, logger hclog.Logger) (*FileRotator, error) {
logger = logger.Named("rotator")
2016-02-19 20:55:15 +00:00
rotator := &FileRotator{
MaxFiles: maxFiles,
FileSize: fileSize,
path: path,
baseFileName: baseFile,
2018-05-09 23:46:52 +00:00
flushTicker: time.NewTicker(bufferFlushDuration),
2016-02-23 16:51:21 +00:00
logger: logger,
purgeCh: make(chan struct{}, 1),
doneCh: make(chan struct{}),
2016-02-19 20:55:15 +00:00
}
2016-02-19 20:55:15 +00:00
if err := rotator.lastFile(); err != nil {
return nil, err
}
go rotator.purgeOldFiles()
2016-02-23 16:51:21 +00:00
go rotator.flushPeriodically()
2016-02-19 20:55:15 +00:00
return rotator, nil
}
2016-02-19 22:11:31 +00:00
// Write writes a byte array to a file and rotates the file if it's size becomes
// equal to the maximum size the user has defined.
2016-02-19 20:55:15 +00:00
func (f *FileRotator) Write(p []byte) (n int, err error) {
n = 0
var forceRotate bool
2016-02-19 20:55:15 +00:00
for n < len(p) {
2016-02-20 05:58:44 +00:00
// Check if we still have space in the current file, otherwise close and
2016-02-19 22:11:31 +00:00
// open the next file
if forceRotate || f.currentWr >= f.FileSize {
forceRotate = false
2016-04-07 01:35:08 +00:00
f.flushBuffer()
2016-02-19 20:55:15 +00:00
f.currentFile.Close()
if err := f.nextFile(); err != nil {
f.logger.Error("error creating next file", "err", err)
2016-02-19 20:55:15 +00:00
return 0, err
}
}
// Calculate the remaining size on this file and how much we have left
// to write
remainingSpace := f.FileSize - f.currentWr
remainingToWrite := int64(len(p[n:]))
// Check if we are near the end of the file. If we are we attempt to
// avoid a log line being split between two files.
var nw int
if (remainingSpace - lineScanLimit) < remainingToWrite {
// Scan for new line and if the data up to new line fits in current
// file, write to buffer
idx := bytes.IndexByte(p[n:], newLineDelimiter)
if idx >= 0 && (remainingSpace-int64(idx)-1) >= 0 {
// We have space so write it to buffer
nw, err = f.writeToBuffer(p[n : n+idx+1])
} else if idx >= 0 {
// We found a new line but don't have space so just force rotate
forceRotate = true
} else if remainingToWrite > f.FileSize || f.FileSize-lineScanLimit < 0 {
// There is no new line remaining but there is no point in
// rotating since the remaining data will not even fit in the
// next file either so just fill this one up.
li := int64(n) + remainingSpace
if remainingSpace > remainingToWrite {
li = int64(n) + remainingToWrite
}
nw, err = f.writeToBuffer(p[n:li])
} else {
// There is no new line in the data remaining for us to write
// and it will fit in the next file so rotate.
forceRotate = true
}
2016-02-19 20:55:15 +00:00
} else {
2016-02-20 05:58:44 +00:00
// Write all the bytes in the current file
2016-04-07 01:35:08 +00:00
nw, err = f.writeToBuffer(p[n:])
2016-02-19 20:55:15 +00:00
}
2016-02-19 22:11:31 +00:00
2016-02-20 05:58:44 +00:00
// Increment the number of bytes written so far in this method
// invocation
2016-02-19 20:55:15 +00:00
n += nw
2016-02-20 05:58:44 +00:00
// Increment the total number of bytes in the file
2016-02-19 20:55:15 +00:00
f.currentWr += int64(n)
if err != nil {
f.logger.Error("error writing to file", "err", err)
// As bufio writer does not automatically recover in case of any
// io error, we need to recover from it manually resetting the
// writter.
f.createOrResetBuffer()
2016-02-19 20:55:15 +00:00
return
}
}
return
}
2016-02-19 22:11:31 +00:00
// nextFile opens the next file and purges older files if the number of rotated
// files is larger than the maximum files configured by the user
2016-02-19 20:55:15 +00:00
func (f *FileRotator) nextFile() error {
nextFileIdx := f.logFileIdx
for {
nextFileIdx += 1
logFileName := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, nextFileIdx))
if fi, err := os.Stat(logFileName); err == nil {
2016-02-20 05:58:44 +00:00
if fi.IsDir() || fi.Size() >= f.FileSize {
2016-02-19 20:55:15 +00:00
continue
}
}
f.logFileIdx = nextFileIdx
if err := f.createFile(); err != nil {
return err
}
break
}
// Purge old files if we have more files than MaxFiles
2016-04-18 18:46:57 +00:00
f.closedLock.Lock()
defer f.closedLock.Unlock()
if f.logFileIdx-f.oldestLogFileIdx >= f.MaxFiles && !f.closed {
2016-02-19 20:55:15 +00:00
select {
2016-02-19 23:15:59 +00:00
case f.purgeCh <- struct{}{}:
2016-02-19 20:55:15 +00:00
default:
}
}
return nil
}
2016-02-20 05:58:44 +00:00
// lastFile finds out the rotated file with the largest index in a path.
2016-02-19 20:55:15 +00:00
func (f *FileRotator) lastFile() error {
finfos, err := ioutil.ReadDir(f.path)
if err != nil {
return err
}
prefix := fmt.Sprintf("%s.", f.baseFileName)
for _, fi := range finfos {
if fi.IsDir() {
continue
}
if strings.HasPrefix(fi.Name(), prefix) {
fileIdx := strings.TrimPrefix(fi.Name(), prefix)
n, err := strconv.Atoi(fileIdx)
if err != nil {
continue
}
if n > f.logFileIdx {
f.logFileIdx = n
}
}
}
if err := f.createFile(); err != nil {
return err
}
return nil
}
2016-02-19 22:11:31 +00:00
// createFile opens a new or existing file for writing
2016-02-19 20:55:15 +00:00
func (f *FileRotator) createFile() error {
logFileName := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, f.logFileIdx))
cFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
2016-02-19 20:55:15 +00:00
if err != nil {
return err
}
2016-02-19 20:55:15 +00:00
f.currentFile = cFile
fi, err := f.currentFile.Stat()
if err != nil {
return err
}
f.currentWr = fi.Size()
2016-04-07 01:35:08 +00:00
f.createOrResetBuffer()
2016-02-19 20:55:15 +00:00
return nil
}
// flushPeriodically flushes the buffered writer every 100ms to the underlying
// file
2016-02-23 16:51:21 +00:00
func (f *FileRotator) flushPeriodically() {
for {
select {
case <-f.flushTicker.C:
f.flushBuffer()
case <-f.doneCh:
return
}
2016-02-23 16:51:21 +00:00
}
}
// Close flushes and closes the rotator. It never returns an error.
func (f *FileRotator) Close() error {
2016-03-22 05:05:06 +00:00
f.closedLock.Lock()
defer f.closedLock.Unlock()
// Stop the ticker and flush for one last time
f.flushTicker.Stop()
2016-04-07 01:35:08 +00:00
f.flushBuffer()
// Stop the go routines
2016-03-22 05:05:06 +00:00
if !f.closed {
close(f.doneCh)
2016-03-22 05:05:06 +00:00
close(f.purgeCh)
f.closed = true
f.currentFile.Close()
2016-03-22 05:05:06 +00:00
}
return nil
}
2016-02-19 22:11:31 +00:00
// purgeOldFiles removes older files and keeps only the last N files rotated for
2016-02-19 20:55:15 +00:00
// a file
func (f *FileRotator) purgeOldFiles() {
for {
select {
case <-f.purgeCh:
var fIndexes []int
files, err := ioutil.ReadDir(f.path)
if err != nil {
f.logger.Error("error getting directory listing", "err", err)
2016-02-19 20:55:15 +00:00
return
}
// Inserting all the rotated files in a slice
for _, fi := range files {
if strings.HasPrefix(fi.Name(), f.baseFileName) {
fileIdx := strings.TrimPrefix(fi.Name(), fmt.Sprintf("%s.", f.baseFileName))
n, err := strconv.Atoi(fileIdx)
if err != nil {
f.logger.Error("error extracting file index", "err", err)
2016-02-19 20:55:15 +00:00
continue
}
fIndexes = append(fIndexes, n)
}
}
2016-03-30 00:30:43 +00:00
// Not continuing to delete files if the number of files is not more
// than MaxFiles
if len(fIndexes) <= f.MaxFiles {
continue
}
2016-02-19 20:55:15 +00:00
// Sorting the file indexes so that we can purge the older files and keep
// only the number of files as configured by the user
2020-12-09 19:05:18 +00:00
sort.Ints(fIndexes)
2016-03-30 00:30:43 +00:00
toDelete := fIndexes[0 : len(fIndexes)-f.MaxFiles]
2016-02-19 20:55:15 +00:00
for _, fIndex := range toDelete {
fname := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, fIndex))
2017-05-25 18:49:33 +00:00
err := os.RemoveAll(fname)
if err != nil {
f.logger.Error("error removing file", "filename", fname, "err", err)
2017-05-25 18:49:33 +00:00
}
2016-02-19 20:55:15 +00:00
}
f.oldestLogFileIdx = fIndexes[0]
case <-f.doneCh:
return
2016-02-19 20:55:15 +00:00
}
}
}
2016-04-07 01:35:08 +00:00
// flushBuffer flushes the buffer
func (f *FileRotator) flushBuffer() error {
f.bufLock.Lock()
defer f.bufLock.Unlock()
if f.bufw != nil {
return f.bufw.Flush()
}
return nil
}
// writeToBuffer writes the byte array to buffer
func (f *FileRotator) writeToBuffer(p []byte) (int, error) {
f.bufLock.Lock()
defer f.bufLock.Unlock()
return f.bufw.Write(p)
}
// createOrResetBuffer creates a new buffer if we don't have one otherwise
// resets the buffer
func (f *FileRotator) createOrResetBuffer() {
f.bufLock.Lock()
defer f.bufLock.Unlock()
if f.bufw == nil {
2018-05-09 23:46:52 +00:00
f.bufw = bufio.NewWriterSize(f.currentFile, logBufferSize)
2016-04-07 01:35:08 +00:00
} else {
f.bufw.Reset(f.currentFile)
}
}