Merge pull request #819 from hashicorp/r-syslog-collector

Refactored syslog server and log rotator
This commit is contained in:
Diptanu Choudhury 2016-02-23 10:15:14 -08:00
commit d6f09400f7
16 changed files with 660 additions and 608 deletions

View file

@ -19,7 +19,7 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/logcollector"
"github.com/hashicorp/nomad/client/driver/logging"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/helper/discover"
@ -82,7 +82,7 @@ type dockerPID struct {
type DockerHandle struct {
pluginClient *plugin.Client
logCollector logcollector.LogCollector
logCollector logging.LogCollector
client *docker.Client
logger *log.Logger
cleanupContainer bool
@ -507,7 +507,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
if err != nil {
return nil, err
}
logCollectorCtx := &logcollector.LogCollectorContext{
logCollectorCtx := &logging.LogCollectorContext{
TaskName: task.Name,
AllocDir: ctx.AllocDir,
LogConfig: task.LogConfig,

View file

@ -38,7 +38,7 @@ type ExecDriverConfig struct {
type execHandle struct {
pluginClient *plugin.Client
executor executor.Executor
isolationConfig *executor.IsolationConfig
isolationConfig *cstructs.IsolationConfig
userPid int
allocDir *allocdir.AllocDir
killTimeout time.Duration
@ -153,7 +153,7 @@ type execId struct {
UserPid int
TaskDir string
AllocDir *allocdir.AllocDir
IsolationConfig *executor.IsolationConfig
IsolationConfig *cstructs.IsolationConfig
PluginConfig *PluginReattachConfig
}

View file

@ -2,7 +2,6 @@ package executor
import (
"fmt"
"io"
"log"
"os"
"os/exec"
@ -18,7 +17,8 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/client/driver/logrotator"
"github.com/hashicorp/nomad/client/driver/logging"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -61,18 +61,12 @@ type ExecCommand struct {
Args []string
}
// IsolationConfig has information about the isolation mechanism the executor
// uses to put resource constraints and isolation on the user process
type IsolationConfig struct {
Cgroup *cgroupConfig.Cgroup
}
// ProcessState holds information about the state of a user process.
type ProcessState struct {
Pid int
ExitCode int
Signal int
IsolationConfig *IsolationConfig
IsolationConfig *cstructs.IsolationConfig
Time time.Time
}
@ -97,8 +91,8 @@ type UniversalExecutor struct {
groups *cgroupConfig.Cgroup
exitState *ProcessState
processExited chan interface{}
lre *logrotator.LogRotator
lro *logrotator.LogRotator
lre *logging.FileRotator
lro *logging.FileRotator
logger *log.Logger
lock sync.Mutex
@ -135,28 +129,23 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
}
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
path := filepath.Join(e.taskDir, allocdir.TaskLocal)
lro, err := logging.NewFileRotator(path, fmt.Sprintf("%v.stdout", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, e.logger)
stdor, stdow := io.Pipe()
lro, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, e.logger)
if err != nil {
return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err)
}
e.cmd.Stdout = stdow
e.cmd.Stdout = lro
e.lro = lro
go lro.Start(stdor)
stder, stdew := io.Pipe()
lre, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, e.logger)
lre, err := logging.NewFileRotator(path, fmt.Sprintf("%v.stderr", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, e.logger)
if err != nil {
return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err)
}
e.cmd.Stderr = stdew
e.cmd.Stderr = lre
e.lre = lre
go lre.Start(stder)
// setting the env, path and args for the command
e.ctx.TaskEnv.Build()
@ -175,7 +164,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
return nil, fmt.Errorf("error starting command: %v", err)
}
go e.wait()
ic := &IsolationConfig{Cgroup: e.groups}
ic := &cstructs.IsolationConfig{Cgroup: e.groups}
return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil
}

View file

@ -45,7 +45,7 @@ type javaHandle struct {
pluginClient *plugin.Client
userPid int
executor executor.Executor
isolationConfig *executor.IsolationConfig
isolationConfig *cstructs.IsolationConfig
taskDir string
allocDir *allocdir.AllocDir
@ -208,7 +208,7 @@ func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool {
type javaId struct {
KillTimeout time.Duration
PluginConfig *PluginReattachConfig
IsolationConfig *executor.IsolationConfig
IsolationConfig *cstructs.IsolationConfig
TaskDir string
AllocDir *allocdir.AllocDir
UserPid int

View file

@ -1,4 +1,4 @@
package logcollector
package logging
import (
"log"

View file

@ -0,0 +1,245 @@
package logging
import (
"bufio"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
)
var (
flushDur = 100 * time.Millisecond
)
// FileRotator writes bytes to a rotated set of files
type FileRotator struct {
MaxFiles int // MaxFiles is the maximum number of rotated files allowed in a path
FileSize int64 // FileSize is the size a rotated file is allowed to grow
path string // path is the path on the file system where the rotated set of files are opened
baseFileName string // baseFileName is the base file name of the rotated files
logFileIdx int // logFileIdx is the current index of the rotated files
oldestLogFileIdx int // oldestLogFileIdx is the index of the oldest log file in a path
currentFile *os.File // currentFile is the file that is currently getting written
currentWr int64 // currentWr is the number of bytes written to the current file
bufw *bufio.Writer
flushTicker *time.Ticker
logger *log.Logger
purgeCh chan struct{}
doneCh chan struct{}
}
// NewFileRotator returns a new file rotator
func NewFileRotator(path string, baseFile string, maxFiles int,
fileSize int64, logger *log.Logger) (*FileRotator, error) {
rotator := &FileRotator{
MaxFiles: maxFiles,
FileSize: fileSize,
path: path,
baseFileName: baseFile,
flushTicker: time.NewTicker(flushDur),
logger: logger,
purgeCh: make(chan struct{}, 1),
doneCh: make(chan struct{}, 1),
}
if err := rotator.lastFile(); err != nil {
return nil, err
}
go rotator.purgeOldFiles()
go rotator.flushPeriodically()
return rotator, nil
}
// Write writes a byte array to a file and rotates the file if it's size becomes
// equal to the maximum size the user has defined.
func (f *FileRotator) Write(p []byte) (n int, err error) {
n = 0
var nw int
for n < len(p) {
// Check if we still have space in the current file, otherwise close and
// open the next file
if f.currentWr >= f.FileSize {
f.bufw.Flush()
f.currentFile.Close()
if err := f.nextFile(); err != nil {
return 0, err
}
}
// Calculate the remaining size on this file
remainingSize := f.FileSize - f.currentWr
// Check if the number of bytes that we have to write is less than the
// remaining size of the file
if remainingSize < int64(len(p[n:])) {
// Write the number of bytes that we can write on the current file
li := int64(n) + remainingSize
nw, err = f.bufw.Write(p[n:li])
//nw, err = f.currentFile.Write(p[n:li])
} else {
// Write all the bytes in the current file
nw, err = f.bufw.Write(p[n:])
//nw, err = f.currentFile.Write(p[n:])
}
// Increment the number of bytes written so far in this method
// invocation
n += nw
// Increment the total number of bytes in the file
f.currentWr += int64(n)
if err != nil {
return
}
}
return
}
// nextFile opens the next file and purges older files if the number of rotated
// files is larger than the maximum files configured by the user
func (f *FileRotator) nextFile() error {
nextFileIdx := f.logFileIdx
for {
nextFileIdx += 1
logFileName := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, nextFileIdx))
if fi, err := os.Stat(logFileName); err == nil {
if fi.IsDir() || fi.Size() >= f.FileSize {
continue
}
}
f.logFileIdx = nextFileIdx
if err := f.createFile(); err != nil {
return err
}
break
}
// Purge old files if we have more files than MaxFiles
if f.logFileIdx-f.oldestLogFileIdx >= f.MaxFiles {
select {
case f.purgeCh <- struct{}{}:
default:
}
}
return nil
}
// lastFile finds out the rotated file with the largest index in a path.
func (f *FileRotator) lastFile() error {
finfos, err := ioutil.ReadDir(f.path)
if err != nil {
return err
}
prefix := fmt.Sprintf("%s.", f.baseFileName)
for _, fi := range finfos {
if fi.IsDir() {
continue
}
if strings.HasPrefix(fi.Name(), prefix) {
fileIdx := strings.TrimPrefix(fi.Name(), prefix)
n, err := strconv.Atoi(fileIdx)
if err != nil {
continue
}
if n > f.logFileIdx {
f.logFileIdx = n
}
}
}
if err := f.createFile(); err != nil {
return err
}
return nil
}
// createFile opens a new or existing file for writing
func (f *FileRotator) createFile() error {
logFileName := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, f.logFileIdx))
cFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return err
}
f.currentFile = cFile
fi, err := f.currentFile.Stat()
if err != nil {
return err
}
f.currentWr = fi.Size()
if f.bufw == nil {
f.bufw = bufio.NewWriter(f.currentFile)
} else {
f.bufw.Reset(f.currentFile)
}
return nil
}
// flushPeriodically flushes the buffered writer every 100ms to the underlying
// file
func (f *FileRotator) flushPeriodically() {
for _ = range f.flushTicker.C {
if f.bufw != nil {
f.bufw.Flush()
}
}
}
func (f *FileRotator) Close() {
// Stop the ticker and flush for one last time
f.flushTicker.Stop()
if f.bufw != nil {
f.bufw.Flush()
}
// Stop the purge go routine
f.doneCh <- struct{}{}
close(f.purgeCh)
}
// purgeOldFiles removes older files and keeps only the last N files rotated for
// a file
func (f *FileRotator) purgeOldFiles() {
for {
select {
case <-f.purgeCh:
var fIndexes []int
files, err := ioutil.ReadDir(f.path)
if err != nil {
return
}
// Inserting all the rotated files in a slice
for _, fi := range files {
if strings.HasPrefix(fi.Name(), f.baseFileName) {
fileIdx := strings.TrimPrefix(fi.Name(), fmt.Sprintf("%s.", f.baseFileName))
n, err := strconv.Atoi(fileIdx)
if err != nil {
continue
}
fIndexes = append(fIndexes, n)
}
}
// Sorting the file indexes so that we can purge the older files and keep
// only the number of files as configured by the user
sort.Sort(sort.IntSlice(fIndexes))
var toDelete []int
toDelete = fIndexes[0 : len(fIndexes)-f.MaxFiles]
for _, fIndex := range toDelete {
fname := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, fIndex))
os.RemoveAll(fname)
}
f.oldestLogFileIdx = fIndexes[0]
case <-f.doneCh:
return
}
}
}

View file

@ -0,0 +1,239 @@
package logging
import (
"io/ioutil"
"log"
"os"
"path/filepath"
"testing"
"time"
)
var (
logger = log.New(os.Stdout, "", log.LstdFlags)
pathPrefix = "logrotator"
baseFileName = "redis.stdout"
)
func TestFileRotator_IncorrectPath(t *testing.T) {
if _, err := NewFileRotator("/foo", baseFileName, 10, 10, logger); err == nil {
t.Fatalf("expected error")
}
}
func TestFileRotator_CreateNewFile(t *testing.T) {
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)
_, err = NewFileRotator(path, baseFileName, 10, 10, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
if _, err := os.Stat(filepath.Join(path, "redis.stdout.0")); err != nil {
t.Fatalf("expected file")
}
}
func TestFileRotator_OpenLastFile(t *testing.T) {
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)
fname1 := filepath.Join(path, "redis.stdout.0")
fname2 := filepath.Join(path, "redis.stdout.2")
if _, err := os.Create(fname1); err != nil {
t.Fatalf("test setup failure: %v", err)
}
if _, err := os.Create(fname2); err != nil {
t.Fatalf("test setup failure: %v", err)
}
fr, err := NewFileRotator(path, baseFileName, 10, 10, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
if fr.currentFile.Name() != fname2 {
t.Fatalf("expected current file: %v, got: %v", fname2, fr.currentFile.Name())
}
}
func TestFileRotator_WriteToCurrentFile(t *testing.T) {
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)
fname1 := filepath.Join(path, "redis.stdout.0")
if _, err := os.Create(fname1); err != nil {
t.Fatalf("test setup failure: %v", err)
}
fr, err := NewFileRotator(path, baseFileName, 10, 5, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
fr.Write([]byte("abcde"))
time.Sleep(200 * time.Millisecond)
fi, err := os.Stat(fname1)
if err != nil {
t.Fatalf("error getting the file info: %v", err)
}
if fi.Size() != 5 {
t.Fatalf("expected size: %v, actual: %v", 5, fi.Size())
}
}
func TestFileRotator_RotateFiles(t *testing.T) {
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)
fr, err := NewFileRotator(path, baseFileName, 10, 5, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
str := "abcdefgh"
nw, err := fr.Write([]byte(str))
time.Sleep(200 * time.Millisecond)
if err != nil {
t.Fatalf("got error while writing: %v", err)
}
if nw != len(str) {
t.Fatalf("expected %v, got %v", len(str), nw)
}
fname1 := filepath.Join(path, "redis.stdout.0")
fi, err := os.Stat(fname1)
if err != nil {
t.Fatalf("error getting the file info: %v", err)
}
if fi.Size() != 5 {
t.Fatalf("expected size: %v, actual: %v", 5, fi.Size())
}
fname2 := filepath.Join(path, "redis.stdout.1")
if _, err := os.Stat(fname2); err != nil {
t.Fatalf("expected file %v to exist", fname2)
}
if fi2, err := os.Stat(fname2); err == nil {
if fi2.Size() != 3 {
t.Fatalf("expected size: %v, actual: %v", 3, fi2.Size())
}
} else {
t.Fatalf("error getting the file info: %v", err)
}
}
func TestFileRotator_WriteRemaining(t *testing.T) {
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)
fname1 := filepath.Join(path, "redis.stdout.0")
if f, err := os.Create(fname1); err == nil {
f.Write([]byte("abcd"))
} else {
t.Fatalf("test setup failure: %v", err)
}
fr, err := NewFileRotator(path, baseFileName, 10, 5, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
str := "efghijkl"
nw, err := fr.Write([]byte(str))
if err != nil {
t.Fatalf("got error while writing: %v", err)
}
time.Sleep(200 * time.Millisecond)
if nw != len(str) {
t.Fatalf("expected %v, got %v", len(str), nw)
}
fi, err := os.Stat(fname1)
if err != nil {
t.Fatalf("error getting the file info: %v", err)
}
if fi.Size() != 5 {
t.Fatalf("expected size: %v, actual: %v", 5, fi.Size())
}
fname2 := filepath.Join(path, "redis.stdout.1")
if _, err := os.Stat(fname2); err != nil {
t.Fatalf("expected file %v to exist", fname2)
}
if fi2, err := os.Stat(fname2); err == nil {
if fi2.Size() != 5 {
t.Fatalf("expected size: %v, actual: %v", 5, fi2.Size())
}
} else {
t.Fatalf("error getting the file info: %v", err)
}
fname3 := filepath.Join(path, "redis.stdout.2")
if _, err := os.Stat(fname3); err != nil {
t.Fatalf("expected file %v to exist", fname3)
}
if fi3, err := os.Stat(fname3); err == nil {
if fi3.Size() != 2 {
t.Fatalf("expected size: %v, actual: %v", 2, fi3.Size())
}
} else {
t.Fatalf("error getting the file info: %v", err)
}
}
func TestFileRotator_PurgeOldFiles(t *testing.T) {
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)
fr, err := NewFileRotator(path, baseFileName, 2, 2, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
str := "abcdeghijklmn"
nw, err := fr.Write([]byte(str))
if err != nil {
t.Fatalf("got error while writing: %v", err)
}
if nw != len(str) {
t.Fatalf("expected %v, got %v", len(str), nw)
}
time.Sleep(1 * time.Second)
f, err := ioutil.ReadDir(path)
if err != nil {
t.Fatalf("test error: %v", err)
}
if len(f) != 2 {
t.Fatalf("expected number of files: %v, got: %v", 2, len(f))
}
}

View file

@ -1,16 +1,12 @@
// +build !windows
package logcollector
package logging
import (
"bufio"
"fmt"
"log"
"log/syslog"
"strconv"
"time"
"github.com/jeromer/syslogparser"
)
// Errors related to parsing priority
@ -29,6 +25,12 @@ const (
PRI_PART_END = '>'
)
// SyslogMessage represents a log line received
type SyslogMessage struct {
Message []byte
Severity syslog.Priority
}
// Priority holds all the priority bits in a syslog log line
type Priority struct {
Pri int
@ -38,32 +40,21 @@ type Priority struct {
// DockerLogParser parses a line of log message that the docker daemon ships
type DockerLogParser struct {
line []byte
content []byte
severity Priority
log *log.Logger
logger *log.Logger
}
// NewDockerLogParser creates a new DockerLogParser
func NewDockerLogParser(line []byte) *DockerLogParser {
return &DockerLogParser{line: line}
func NewDockerLogParser(logger *log.Logger) *DockerLogParser {
return &DockerLogParser{logger: logger}
}
// Parse parses a syslog log line
func (d *DockerLogParser) Parse() error {
severity, _, _ := d.parsePriority(d.line)
msgIdx := d.logContentIndex(d.line)
d.severity = severity
d.content = d.line[msgIdx:]
return nil
}
// Dump creates a map of the parsed log line and severity
func (d *DockerLogParser) Dump() syslogparser.LogParts {
return map[string]interface{}{
"content": d.content,
"severity": d.severity,
func (d *DockerLogParser) Parse(line []byte) *SyslogMessage {
pri, _, _ := d.parsePriority(line)
msgIdx := d.logContentIndex(line)
return &SyslogMessage{
Severity: pri.Severity,
Message: line[msgIdx:],
}
}
@ -143,19 +134,3 @@ func (d *DockerLogParser) newPriority(p int) Priority {
Severity: syslog.Priority(p % 8),
}
}
func (d *DockerLogParser) Location(location *time.Location) {
}
// CustomParser is a parser to parse docker syslog lines
type CustomParser struct {
logger *log.Logger
}
func (c *CustomParser) GetParser(line []byte) syslogparser.LogParser {
return NewDockerLogParser(line)
}
func (c *CustomParser) GetSplitFunc() bufio.SplitFunc {
return nil
}

View file

@ -1,15 +1,17 @@
// +build !windows
package logcollector
package logging
import (
"log"
"log/syslog"
"os"
"testing"
)
func TestLogParser_Priority(t *testing.T) {
line := []byte("<30>2016-02-10T10:16:43-08:00 d-thinkpad docker/e2a1e3ebd3a3[22950]: 1:C 10 Feb 18:16:43.391 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf")
d := NewDockerLogParser(line)
d := NewDockerLogParser(log.New(os.Stdout, "", log.LstdFlags))
p, _, err := d.parsePriority(line)
if err != nil {
t.Fatalf("got an err: %v", err)

View file

@ -0,0 +1,74 @@
package logging
import (
"bufio"
"log"
"net"
)
// SyslogServer is a server which listens to syslog messages and parses them
type SyslogServer struct {
listener net.Listener
messages chan *SyslogMessage
parser *DockerLogParser
doneCh chan interface{}
logger *log.Logger
}
// NewSyslogServer creates a new syslog server
func NewSyslogServer(l net.Listener, messages chan *SyslogMessage, logger *log.Logger) *SyslogServer {
parser := NewDockerLogParser(logger)
return &SyslogServer{
listener: l,
messages: messages,
parser: parser,
logger: logger,
doneCh: make(chan interface{}),
}
}
// Start starts accepting syslog connections
func (s *SyslogServer) Start() {
for {
select {
case <-s.doneCh:
s.listener.Close()
return
default:
connection, err := s.listener.Accept()
if err != nil {
s.logger.Printf("[ERROR] logcollector.server: error in accepting connection: %v", err)
continue
}
go s.read(connection)
}
}
}
// read reads the bytes from a connection
func (s *SyslogServer) read(connection net.Conn) {
defer connection.Close()
scanner := bufio.NewScanner(bufio.NewReader(connection))
for {
select {
case <-s.doneCh:
return
default:
}
if scanner.Scan() {
b := scanner.Bytes()
msg := s.parser.Parse(b)
s.messages <- msg
} else {
return
}
}
}
// Shutdown shutsdown the syslog server
func (s *SyslogServer) Shutdown() {
close(s.doneCh)
close(s.messages)
}

View file

@ -1,20 +1,18 @@
// +build !windows
package logcollector
package logging
import (
"fmt"
"io"
"log"
s1 "log/syslog"
"log/syslog"
"net"
"path/filepath"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/driver/executor"
"github.com/hashicorp/nomad/client/driver/logrotator"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mcuadros/go-syslog"
)
// LogCollectorContext holds context to configure the syslog server
@ -41,7 +39,7 @@ type LogCollectorContext struct {
// SyslogCollectorState holds the address and islation information of a launched
// syslog server
type SyslogCollectorState struct {
IsolationConfig *executor.IsolationConfig
IsolationConfig *cstructs.IsolationConfig
Addr string
}
@ -60,85 +58,78 @@ type SyslogCollector struct {
logConfig *structs.LogConfig
ctx *LogCollectorContext
lro *logrotator.LogRotator
lre *logrotator.LogRotator
server *syslog.Server
taskDir string
lro *FileRotator
lre *FileRotator
server *SyslogServer
syslogChan chan *SyslogMessage
taskDir string
logger *log.Logger
}
// NewSyslogCollector returns an implementation of the SyslogCollector
func NewSyslogCollector(logger *log.Logger) *SyslogCollector {
return &SyslogCollector{logger: logger}
return &SyslogCollector{logger: logger, syslogChan: make(chan *SyslogMessage, 2048)}
}
// LaunchCollector launches a new syslog server and starts writing log lines to
// files and rotates them
func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) {
addr, err := s.getFreePort(ctx.PortLowerBound, ctx.PortUpperBound)
l, err := s.getListener(ctx.PortLowerBound, ctx.PortUpperBound)
if err != nil {
return nil, err
}
s.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", addr)
s.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", l.Addr().String())
s.ctx = ctx
// configuring the task dir
if err := s.configureTaskDir(); err != nil {
return nil, err
}
channel := make(syslog.LogPartsChannel)
handler := syslog.NewChannelHandler(channel)
s.server = syslog.NewServer()
s.server.SetFormat(&CustomParser{logger: s.logger})
s.server.SetHandler(handler)
s.server.ListenTCP(addr.String())
if err := s.server.Boot(); err != nil {
return nil, err
}
s.server = NewSyslogServer(l, s.syslogChan, s.logger)
go s.server.Start()
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
ro, wo := io.Pipe()
lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, s.logger)
path := filepath.Join(s.taskDir, allocdir.TaskLocal)
lro, err := NewFileRotator(path, fmt.Sprintf("%v.stdout", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, s.logger)
if err != nil {
return nil, err
}
s.lro = lro
go lro.Start(ro)
re, we := io.Pipe()
lre, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal),
fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles,
logFileSize, s.logger)
lre, err := NewFileRotator(path, fmt.Sprintf("%v.stderr", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, s.logger)
if err != nil {
return nil, err
}
s.lre = lre
go lre.Start(re)
go func(channel syslog.LogPartsChannel) {
for logParts := range channel {
// If the severity of the log line is err then we write to stderr
// otherwise all messages go to stdout
s := logParts["severity"].(Priority)
if s.Severity == s1.LOG_ERR {
we.Write(logParts["content"].([]byte))
} else {
wo.Write(logParts["content"].([]byte))
}
wo.Write([]byte("\n"))
go s.collectLogs(lre, lro)
return &SyslogCollectorState{Addr: l.Addr().String()}, nil
}
func (s *SyslogCollector) collectLogs(we io.Writer, wo io.Writer) {
for logParts := range s.syslogChan {
// If the severity of the log line is err then we write to stderr
// otherwise all messages go to stdout
if logParts.Severity == syslog.LOG_ERR {
s.lre.Write(logParts.Message)
s.lre.Write([]byte{'\n'})
} else {
s.lro.Write(logParts.Message)
s.lro.Write([]byte{'\n'})
}
}(channel)
go s.server.Wait()
return &SyslogCollectorState{Addr: addr.String()}, nil
}
}
// Exit kills the syslog server
func (s *SyslogCollector) Exit() error {
return s.server.Kill()
s.server.Shutdown()
s.lre.Close()
s.lro.Close()
return nil
}
// UpdateLogConfig updates the log configuration
@ -170,7 +161,7 @@ func (s *SyslogCollector) configureTaskDir() error {
// getFreePort returns a free port ready to be listened on between upper and
// lower bounds
func (s *SyslogCollector) getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) {
func (s *SyslogCollector) getListener(lowerBound uint, upperBound uint) (net.Listener, error) {
for i := lowerBound; i <= upperBound; i++ {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i))
if err != nil {
@ -180,8 +171,7 @@ func (s *SyslogCollector) getFreePort(lowerBound uint, upperBound uint) (net.Add
if err != nil {
continue
}
defer l.Close()
return l.Addr(), nil
return l, nil
}
return nil, fmt.Errorf("No free port found")
}

View file

@ -1,176 +0,0 @@
package logrotator
import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
)
const (
bufSize = 32 * 1024 // Max number of bytes read from a buffer
)
// LogRotator ingests data and writes out to a rotated set of files
type LogRotator struct {
MaxFiles int // maximum number of rotated files retained by the log rotator
FileSize int64 // maximum file size of a rotated file
path string // path where the rotated files are created
fileName string // base file name of the rotated files
logFileIdx int // index to the current file
oldestLogFileIdx int // index to the oldest log file
logger *log.Logger
purgeCh chan struct{}
}
// NewLogRotator configures and returns a new LogRotator
func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, logger *log.Logger) (*LogRotator, error) {
files, err := ioutil.ReadDir(path)
if err != nil {
return nil, err
}
// Finding out the log file with the largest index
logFileIdx := 0
prefix := fmt.Sprintf("%s.", fileName)
for _, f := range files {
if strings.HasPrefix(f.Name(), prefix) {
fileIdx := strings.TrimPrefix(f.Name(), prefix)
n, err := strconv.Atoi(fileIdx)
if err != nil {
continue
}
if n > logFileIdx {
logFileIdx = n
}
}
}
lr := &LogRotator{
MaxFiles: maxFiles,
FileSize: fileSize,
path: path,
fileName: fileName,
logFileIdx: logFileIdx,
logger: logger,
purgeCh: make(chan struct{}, 1),
}
go lr.PurgeOldFiles()
return lr, nil
}
// Start reads from a Reader and writes them to files and rotates them when the
// size of the file becomes equal to the max size configured
func (l *LogRotator) Start(r io.Reader) error {
buf := make([]byte, bufSize)
for {
logFileName := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, l.logFileIdx))
var fileSize int64
if f, err := os.Stat(logFileName); err == nil {
// Skipping the current file if it happens to be a directory
if f.IsDir() {
l.logFileIdx += 1
continue
}
fileSize = f.Size()
// Calculating the remaining capacity of the log file
}
f, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return err
}
l.logger.Printf("[DEBUG] client.logrotator: opened a new file: %s", logFileName)
// Closing the current log file if it doesn't have any more capacity
if fileSize >= l.FileSize {
l.logFileIdx += 1
f.Close()
continue
}
// Reading from the reader and writing into the current log file as long
// as it has capacity or the reader closes
totalWritten := 0
for {
if l.FileSize-(fileSize+int64(totalWritten)) < 1 {
f.Close()
break
}
var nr int
var err error
remainingSize := l.FileSize - (int64(totalWritten) + fileSize)
if remainingSize < bufSize {
nr, err = r.Read(buf[0:remainingSize])
} else {
nr, err = r.Read(buf)
}
if err != nil {
f.Close()
return err
}
nw, err := f.Write(buf[:nr])
if err != nil {
f.Close()
return err
}
if nr != nw {
f.Close()
return fmt.Errorf("failed to write data read from the reader into file, R: %d W: %d", nr, nw)
}
totalWritten += nr
}
l.logFileIdx = l.logFileIdx + 1
// Purge old files if we have more files than MaxFiles
if l.logFileIdx-l.oldestLogFileIdx >= l.MaxFiles {
select {
case l.purgeCh <- struct{}{}:
default:
}
}
}
}
// PurgeOldFiles removes older files and keeps only the last N files rotated for
// a file
func (l *LogRotator) PurgeOldFiles() {
for {
select {
case <-l.purgeCh:
var fIndexes []int
files, err := ioutil.ReadDir(l.path)
if err != nil {
return
}
// Inserting all the rotated files in a slice
for _, f := range files {
if strings.HasPrefix(f.Name(), l.fileName) {
fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName))
n, err := strconv.Atoi(fileIdx)
if err != nil {
continue
}
fIndexes = append(fIndexes, n)
}
}
// Sorting the file indexes so that we can purge the older files and keep
// only the number of files as configured by the user
sort.Sort(sort.IntSlice(fIndexes))
var toDelete []int
toDelete = fIndexes[0 : len(fIndexes)-l.MaxFiles]
for _, fIndex := range toDelete {
fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex))
os.RemoveAll(fname)
}
l.oldestLogFileIdx = fIndexes[0]
}
}
}

View file

@ -1,295 +0,0 @@
package logrotator
import (
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
"testing"
"time"
)
var (
logger = log.New(os.Stdout, "", log.LstdFlags)
pathPrefix = "logrotator"
)
func TestLogRotator_InvalidPath(t *testing.T) {
invalidPath := "/foo"
if _, err := NewLogRotator(invalidPath, "redis.stdout", 10, 10, logger); err == nil {
t.Fatal("expected err")
}
}
func TestLogRotator_FindCorrectIndex(t *testing.T) {
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)
fname := filepath.Join(path, "redis.stdout.1")
if f, err := os.Create(fname); err == nil {
f.Close()
}
fname = filepath.Join(path, "redis.stdout.2")
if f, err := os.Create(fname); err == nil {
f.Close()
}
r, err := NewLogRotator(path, "redis.stdout", 10, 10, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
if r.logFileIdx != 2 {
t.Fatalf("Expected log file idx: %v, actual: %v", 2, r.logFileIdx)
}
}
func TestLogRotator_AppendToCurrentFile(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
fname := filepath.Join(path, "redis.stdout.0")
if f, err := os.Create(fname); err == nil {
f.WriteString("abcde")
f.Close()
}
l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger)
if err != nil && err != io.EOF {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
w.Write([]byte("fg"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatal(err)
}
finfo, err := os.Stat(fname)
if err != nil {
t.Fatal(err)
}
if finfo.Size() != 6 {
t.Fatalf("Expected size of file: %v, actual: %v", 6, finfo.Size())
}
}
func TestLogRotator_RotateFiles(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
fname := filepath.Join(path, "redis.stdout.0")
if f, err := os.Create(fname); err == nil {
f.WriteString("abcde")
f.Close()
}
l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
// This should make the current log file rotate
w.Write([]byte("fg"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatalf("Failure in logrotator start %v", err)
}
if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1")); err == nil {
if finfo.Size() != 1 {
t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size())
}
} else {
t.Fatal("expected file redis.stdout.1")
}
if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")); err == nil {
if finfo.Size() != 6 {
t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size())
}
} else {
t.Fatal("expected file redis.stdout.0")
}
}
func TestLogRotator_StartFromEmptyDir(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
w.Write([]byte("abcdefg"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatalf("Failure in logrotator start %v", err)
}
finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0"))
if err != nil {
t.Fatal(err)
}
if finfo.Size() != 7 {
t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size())
}
}
func TestLogRotator_SetPathAsFile(t *testing.T) {
var f *os.File
var err error
var path string
defer os.RemoveAll(path)
if f, err = ioutil.TempFile("", pathPrefix); err != nil {
t.Fatalf("test setup problem: %v", err)
}
path = f.Name()
if _, err = NewLogRotator(f.Name(), "redis.stdout", 10, 10, logger); err == nil {
t.Fatal("expected error")
}
}
func TestLogRotator_ExcludeDirs(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
if err := os.Mkdir(filepath.Join(path, "redis.stdout.0"), os.ModeDir|os.ModePerm); err != nil {
t.Fatalf("test setup err: %v", err)
}
l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
w.Write([]byte("fg"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatalf("Failure in logrotator start %v", err)
}
finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1"))
if err != nil {
t.Fatal("expected rotator to create redis.stdout.1")
}
if finfo.Size() != 2 {
t.Fatalf("expected size: %v, actual: %v", 2, finfo.Size())
}
}
func TestLogRotator_PurgeDirs(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
l, err := NewLogRotator(path, "redis.stdout", 2, 4, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
w.Write([]byte("abcdefghijklmnopqrxyz"))
time.Sleep(1 * time.Second)
l.MaxFiles = 1
w.Write([]byte("abcdefghijklmnopqrxyz"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatalf("failure in logrotator start: %v", err)
}
// sleeping for a second because purging is async
time.Sleep(1 * time.Second)
files, err := ioutil.ReadDir(path)
if err != nil {
t.Fatalf("err: %v", err)
}
expected := 1
if len(files) != expected {
t.Fatalf("expected number of files: %v, actual: %v", expected, len(files))
}
}
func TestLogRotator_UpdateConfig(t *testing.T) {
var path string
var err error
defer os.RemoveAll(path)
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}
r, w := io.Pipe()
go func() {
w.Write([]byte("abcdefg"))
l.FileSize = 5
w.Write([]byte("hijklmnojkp"))
w.Close()
}()
err = l.Start(r)
if err != nil && err != io.EOF {
t.Fatalf("Failure in logrotator start %v", err)
}
finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0"))
finfo1, err1 := os.Stat(filepath.Join(path, "redis.stdout.1"))
if err != nil {
t.Fatal(err)
}
if finfo.Size() != 10 {
t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size())
}
if err1 != nil {
t.Fatal(err)
}
if finfo1.Size() != 5 {
t.Fatalf("expected size of file: %v, actual: %v", 5, finfo.Size())
}
}

View file

@ -1,6 +1,9 @@
package structs
import "fmt"
import (
"fmt"
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)
// WaitResult stores the result of a Wait operation.
type WaitResult struct {
@ -25,3 +28,9 @@ func (r *WaitResult) String() string {
return fmt.Sprintf("Wait returned exit code %v, signal %v, and error %v",
r.ExitCode, r.Signal, r.Err)
}
// IsolationConfig has information about the isolation mechanism the executor
// uses to put resource constraints and isolation on the user process
type IsolationConfig struct {
Cgroup *cgroupConfig.Cgroup
}

View file

@ -5,7 +5,7 @@ import (
"net/rpc"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver/logcollector"
"github.com/hashicorp/nomad/client/driver/logging"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -14,11 +14,11 @@ type SyslogCollectorRPC struct {
}
type LaunchCollectorArgs struct {
Ctx *logcollector.LogCollectorContext
Ctx *logging.LogCollectorContext
}
func (e *SyslogCollectorRPC) LaunchCollector(ctx *logcollector.LogCollectorContext) (*logcollector.SyslogCollectorState, error) {
var ss *logcollector.SyslogCollectorState
func (e *SyslogCollectorRPC) LaunchCollector(ctx *logging.LogCollectorContext) (*logging.SyslogCollectorState, error) {
var ss *logging.SyslogCollectorState
err := e.client.Call("Plugin.LaunchCollector", LaunchCollectorArgs{Ctx: ctx}, &ss)
return ss, err
}
@ -32,11 +32,11 @@ func (e *SyslogCollectorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error
}
type SyslogCollectorRPCServer struct {
Impl logcollector.LogCollector
Impl logging.LogCollector
}
func (s *SyslogCollectorRPCServer) LaunchCollector(args LaunchCollectorArgs,
resp *logcollector.SyslogCollectorState) error {
resp *logging.SyslogCollectorState) error {
ss, err := s.Impl.LaunchCollector(args.Ctx)
if ss != nil {
*resp = *ss
@ -59,7 +59,7 @@ type SyslogCollectorPlugin struct {
func (p *SyslogCollectorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
if p.Impl == nil {
p.Impl = &SyslogCollectorRPCServer{Impl: logcollector.NewSyslogCollector(p.logger)}
p.Impl = &SyslogCollectorRPCServer{Impl: logging.NewSyslogCollector(p.logger)}
}
return p.Impl, nil
}

View file

@ -9,7 +9,7 @@ import (
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
"github.com/hashicorp/nomad/client/driver/logcollector"
"github.com/hashicorp/nomad/client/driver/logging"
)
// createExecutor launches an executor plugin and returns an instance of the
@ -42,7 +42,7 @@ func createExecutor(config *plugin.ClientConfig, w io.Writer,
}
func createLogCollector(config *plugin.ClientConfig, w io.Writer,
clientConfig *config.Config) (logcollector.LogCollector, *plugin.Client, error) {
clientConfig *config.Config) (logging.LogCollector, *plugin.Client, error) {
config.HandshakeConfig = HandshakeConfig
config.Plugins = GetPluginMap(w)
config.MaxPort = clientConfig.ClientMaxPort
@ -61,7 +61,7 @@ func createLogCollector(config *plugin.ClientConfig, w io.Writer,
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the syslog plugin: %v", err)
}
logCollector := raw.(logcollector.LogCollector)
logCollector := raw.(logging.LogCollector)
return logCollector, syslogClient, nil
}