commit
33d2b2c8ee
|
@ -0,0 +1,155 @@
|
|||
package driver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
bufSize = 32 * 1024 // Max number of bytes read from a buffer
|
||||
)
|
||||
|
||||
// LogRotator ingests data and writes out to a rotated set of files
|
||||
type LogRotator struct {
|
||||
maxFiles int // maximum number of rotated files retained by the log rotator
|
||||
fileSize int64 // maximum file size of a rotated file
|
||||
path string // path where the rotated files are created
|
||||
fileName string // base file name of the rotated files
|
||||
|
||||
logFileIdx int // index to the current file
|
||||
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// NewLogRotator configures and returns a new LogRotator
|
||||
func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, logger *log.Logger) (*LogRotator, error) {
|
||||
files, err := ioutil.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Finding out the log file with the largest index
|
||||
logFileIdx := 0
|
||||
prefix := fmt.Sprintf("%s.", fileName)
|
||||
for _, f := range files {
|
||||
if strings.HasPrefix(f.Name(), prefix) {
|
||||
fileIdx := strings.TrimPrefix(f.Name(), prefix)
|
||||
n, err := strconv.Atoi(fileIdx)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if n > logFileIdx {
|
||||
logFileIdx = n
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &LogRotator{
|
||||
maxFiles: maxFiles,
|
||||
fileSize: fileSize,
|
||||
path: path,
|
||||
fileName: fileName,
|
||||
logFileIdx: logFileIdx,
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start reads from a Reader and writes them to files and rotates them when the
|
||||
// size of the file becomes equal to the max size configured
|
||||
func (l *LogRotator) Start(r io.Reader) error {
|
||||
buf := make([]byte, bufSize)
|
||||
for {
|
||||
logFileName := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, l.logFileIdx))
|
||||
remainingSize := l.fileSize
|
||||
if f, err := os.Stat(logFileName); err == nil {
|
||||
// Skipping the current file if it happens to be a directory
|
||||
if f.IsDir() {
|
||||
l.logFileIdx += 1
|
||||
continue
|
||||
}
|
||||
// Calculating the remaining capacity of the log file
|
||||
remainingSize = l.fileSize - f.Size()
|
||||
}
|
||||
f, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.logger.Printf("[DEBUG] client.logrotator: opened a new file: %s", logFileName)
|
||||
|
||||
// Closing the current log file if it doesn't have any more capacity
|
||||
if remainingSize <= 0 {
|
||||
l.logFileIdx = l.logFileIdx + 1
|
||||
f.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
// Reading from the reader and writing into the current log file as long
|
||||
// as it has capacity or the reader closes
|
||||
for {
|
||||
var nr int
|
||||
var err error
|
||||
if remainingSize < bufSize {
|
||||
nr, err = r.Read(buf[0:remainingSize])
|
||||
} else {
|
||||
nr, err = r.Read(buf)
|
||||
}
|
||||
if err != nil {
|
||||
f.Close()
|
||||
return err
|
||||
}
|
||||
nw, err := f.Write(buf[:nr])
|
||||
if err != nil {
|
||||
f.Close()
|
||||
return err
|
||||
}
|
||||
if nr != nw {
|
||||
f.Close()
|
||||
return fmt.Errorf("failed to write data read from the reader into file, R: %d W: %d", nr, nw)
|
||||
}
|
||||
remainingSize -= int64(nr)
|
||||
if remainingSize < 1 {
|
||||
f.Close()
|
||||
break
|
||||
}
|
||||
}
|
||||
l.logFileIdx = l.logFileIdx + 1
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PurgeOldFiles removes older files and keeps only the last N files rotated for
|
||||
// a file
|
||||
func (l *LogRotator) PurgeOldFiles() {
|
||||
var fIndexes []int
|
||||
files, err := ioutil.ReadDir(l.path)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Inserting all the rotated files in a slice
|
||||
for _, f := range files {
|
||||
if strings.HasPrefix(f.Name(), l.fileName) {
|
||||
fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName))
|
||||
n, err := strconv.Atoi(fileIdx)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
fIndexes = append(fIndexes, n)
|
||||
}
|
||||
}
|
||||
|
||||
// Sorting the file indexes so that we can purge the older files and keep
|
||||
// only the number of files as configured by the user
|
||||
sort.Sort(sort.IntSlice(fIndexes))
|
||||
toDelete := fIndexes[l.maxFiles-1 : len(fIndexes)-1]
|
||||
for _, fIndex := range toDelete {
|
||||
fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex))
|
||||
os.RemoveAll(fname)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,248 @@
|
|||
package driver
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var (
|
||||
logger = log.New(os.Stdout, "", log.LstdFlags)
|
||||
pathPrefix = "logrotator"
|
||||
)
|
||||
|
||||
func TestLogRotator_InvalidPath(t *testing.T) {
|
||||
invalidPath := "/foo"
|
||||
|
||||
if _, err := NewLogRotator(invalidPath, "redis.stdout", 10, 10, logger); err == nil {
|
||||
t.Fatal("expected err")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogRotator_FindCorrectIndex(t *testing.T) {
|
||||
var path string
|
||||
var err error
|
||||
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(path)
|
||||
|
||||
fname := filepath.Join(path, "redis.stdout.1")
|
||||
if f, err := os.Create(fname); err == nil {
|
||||
f.Close()
|
||||
}
|
||||
|
||||
fname = filepath.Join(path, "redis.stdout.2")
|
||||
if f, err := os.Create(fname); err == nil {
|
||||
f.Close()
|
||||
}
|
||||
|
||||
r, err := NewLogRotator(path, "redis.stdout", 10, 10, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
if r.logFileIdx != 2 {
|
||||
t.Fatalf("Expected log file idx: %v, actual: %v", 2, r.logFileIdx)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogRotator_AppendToCurrentFile(t *testing.T) {
|
||||
var path string
|
||||
var err error
|
||||
defer os.RemoveAll(path)
|
||||
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
fname := filepath.Join(path, "redis.stdout.0")
|
||||
if f, err := os.Create(fname); err == nil {
|
||||
f.WriteString("abcde")
|
||||
f.Close()
|
||||
}
|
||||
|
||||
l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger)
|
||||
if err != nil && err != io.EOF {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
|
||||
r, w := io.Pipe()
|
||||
go func() {
|
||||
w.Write([]byte("fg"))
|
||||
w.Close()
|
||||
}()
|
||||
err = l.Start(r)
|
||||
if err != nil && err != io.EOF {
|
||||
t.Fatal(err)
|
||||
}
|
||||
finfo, err := os.Stat(fname)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if finfo.Size() != 6 {
|
||||
t.Fatalf("Expected size of file: %v, actual: %v", 6, finfo.Size())
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogRotator_RotateFiles(t *testing.T) {
|
||||
var path string
|
||||
var err error
|
||||
defer os.RemoveAll(path)
|
||||
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
fname := filepath.Join(path, "redis.stdout.0")
|
||||
if f, err := os.Create(fname); err == nil {
|
||||
f.WriteString("abcde")
|
||||
f.Close()
|
||||
}
|
||||
|
||||
l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
|
||||
r, w := io.Pipe()
|
||||
go func() {
|
||||
// This should make the current log file rotate
|
||||
w.Write([]byte("fg"))
|
||||
w.Close()
|
||||
}()
|
||||
err = l.Start(r)
|
||||
if err != nil && err != io.EOF {
|
||||
t.Fatalf("Failure in logrotator start %v", err)
|
||||
}
|
||||
|
||||
if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1")); err == nil {
|
||||
if finfo.Size() != 1 {
|
||||
t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size())
|
||||
}
|
||||
} else {
|
||||
t.Fatal("expected file redis.stdout.1")
|
||||
}
|
||||
|
||||
if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")); err == nil {
|
||||
if finfo.Size() != 6 {
|
||||
t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size())
|
||||
}
|
||||
} else {
|
||||
t.Fatal("expected file redis.stdout.0")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogRotator_StartFromEmptyDir(t *testing.T) {
|
||||
var path string
|
||||
var err error
|
||||
defer os.RemoveAll(path)
|
||||
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
|
||||
l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
|
||||
r, w := io.Pipe()
|
||||
go func() {
|
||||
w.Write([]byte("abcdefg"))
|
||||
w.Close()
|
||||
}()
|
||||
err = l.Start(r)
|
||||
if err != nil && err != io.EOF {
|
||||
t.Fatalf("Failure in logrotator start %v", err)
|
||||
}
|
||||
|
||||
finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if finfo.Size() != 7 {
|
||||
t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestLogRotator_SetPathAsFile(t *testing.T) {
|
||||
var f *os.File
|
||||
var err error
|
||||
var path string
|
||||
defer os.RemoveAll(path)
|
||||
if f, err = ioutil.TempFile("", pathPrefix); err != nil {
|
||||
t.Fatalf("test setup problem: %v", err)
|
||||
}
|
||||
path = f.Name()
|
||||
if _, err = NewLogRotator(f.Name(), "redis.stdout", 10, 10, logger); err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogRotator_ExcludeDirs(t *testing.T) {
|
||||
var path string
|
||||
var err error
|
||||
defer os.RemoveAll(path)
|
||||
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
if err := os.Mkdir(filepath.Join(path, "redis.stdout.0"), os.ModeDir|os.ModePerm); err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
|
||||
l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
|
||||
r, w := io.Pipe()
|
||||
go func() {
|
||||
w.Write([]byte("fg"))
|
||||
w.Close()
|
||||
}()
|
||||
err = l.Start(r)
|
||||
if err != nil && err != io.EOF {
|
||||
t.Fatalf("Failure in logrotator start %v", err)
|
||||
}
|
||||
|
||||
finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1"))
|
||||
if err != nil {
|
||||
t.Fatal("expected rotator to create redis.stdout.1")
|
||||
}
|
||||
if finfo.Size() != 2 {
|
||||
t.Fatalf("expected size: %v, actual: %v", 2, finfo.Size())
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogRotator_PurgeDirs(t *testing.T) {
|
||||
var path string
|
||||
var err error
|
||||
defer os.RemoveAll(path)
|
||||
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
|
||||
l, err := NewLogRotator(path, "redis.stdout", 2, 4, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("test setup err: %v", err)
|
||||
}
|
||||
|
||||
r, w := io.Pipe()
|
||||
go func() {
|
||||
w.Write([]byte("abcdefghijklmno"))
|
||||
w.Close()
|
||||
}()
|
||||
|
||||
err = l.Start(r)
|
||||
if err != nil && err != io.EOF {
|
||||
t.Fatalf("failure in logrotator start: %v", err)
|
||||
}
|
||||
l.PurgeOldFiles()
|
||||
|
||||
files, err := ioutil.ReadDir(path)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(files) != 2 {
|
||||
t.Fatalf("expected number of files: %v, actual: %v", 2, len(files))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue