open-nomad/client/driver/logging/universal_collector.go

177 lines
4.8 KiB
Go
Raw Normal View History

2016-02-13 00:20:04 +00:00
// +build !windows
2016-02-19 21:08:25 +00:00
package logging
import (
"fmt"
"io"
"log"
2016-02-17 22:48:25 +00:00
"log/syslog"
"net"
2016-02-10 15:52:15 +00:00
"path/filepath"
"github.com/hashicorp/nomad/client/allocdir"
2016-02-19 22:01:07 +00:00
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
2016-02-10 16:13:08 +00:00
// LogCollectorContext holds context to configure the syslog server
type LogCollectorContext struct {
2016-02-10 16:13:08 +00:00
// TaskName is the name of the Task
TaskName string
// AllocDir is the handle to do operations on the alloc dir of
// the task
AllocDir *allocdir.AllocDir
// LogConfig provides configuration related to log rotation
LogConfig *structs.LogConfig
// PortUpperBound is the upper bound of the ports that we can use to start
// the syslog server
2016-02-10 15:52:15 +00:00
PortUpperBound uint
2016-02-10 16:13:08 +00:00
// PortLowerBound is the lower bound of the ports that we can use to start
// the syslog server
2016-02-10 15:52:15 +00:00
PortLowerBound uint
}
2016-02-10 16:13:08 +00:00
// SyslogCollectorState holds the address and islation information of a launched
// syslog server
type SyslogCollectorState struct {
2016-02-19 22:01:07 +00:00
IsolationConfig *cstructs.IsolationConfig
2016-02-10 15:52:15 +00:00
Addr string
}
2016-02-10 16:13:08 +00:00
// LogCollector is an interface which allows a driver to launch a log server
// and update log configuration
type LogCollector interface {
2016-02-10 15:52:15 +00:00
LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error)
Exit() error
UpdateLogConfig(logConfig *structs.LogConfig) error
}
2016-02-10 16:13:08 +00:00
// SyslogCollector is a LogCollector which starts a syslog server and does
// rotation to incoming stream
type SyslogCollector struct {
addr net.Addr
logConfig *structs.LogConfig
ctx *LogCollectorContext
lro *FileRotator
lre *FileRotator
server *SyslogServer
syslogChan chan *SyslogMessage
taskDir string
logger *log.Logger
}
2016-02-10 16:13:08 +00:00
// NewSyslogCollector returns an implementation of the SyslogCollector
func NewSyslogCollector(logger *log.Logger) *SyslogCollector {
return &SyslogCollector{logger: logger, syslogChan: make(chan *SyslogMessage, 2048)}
}
2016-02-10 16:13:08 +00:00
// LaunchCollector launches a new syslog server and starts writing log lines to
// files and rotates them
2016-02-10 15:52:15 +00:00
func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) {
2016-02-17 22:48:25 +00:00
l, err := s.getListener(ctx.PortLowerBound, ctx.PortUpperBound)
2016-02-10 15:52:15 +00:00
if err != nil {
return nil, err
}
2016-02-17 22:48:25 +00:00
s.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", l.Addr().String())
s.ctx = ctx
// configuring the task dir
if err := s.configureTaskDir(); err != nil {
return nil, err
}
s.server = NewSyslogServer(l, s.syslogChan, s.logger)
go s.server.Start()
2016-02-10 15:52:15 +00:00
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
2016-02-10 20:09:07 +00:00
2016-02-19 22:01:07 +00:00
path := filepath.Join(s.taskDir, allocdir.TaskLocal)
lro, err := NewFileRotator(path, fmt.Sprintf("%v.stdout", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, s.logger)
2016-02-10 15:52:15 +00:00
if err != nil {
return nil, err
}
2016-02-11 22:44:35 +00:00
s.lro = lro
2016-02-10 20:09:07 +00:00
2016-02-19 22:01:07 +00:00
lre, err := NewFileRotator(path, fmt.Sprintf("%v.stderr", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, s.logger)
2016-02-10 20:09:07 +00:00
if err != nil {
return nil, err
}
2016-02-11 22:44:35 +00:00
s.lre = lre
go s.collectLogs(lre, lro)
2016-02-17 22:48:25 +00:00
return &SyslogCollectorState{Addr: l.Addr().String()}, nil
}
func (s *SyslogCollector) collectLogs(we io.Writer, wo io.Writer) {
for logParts := range s.syslogChan {
// If the severity of the log line is err then we write to stderr
// otherwise all messages go to stdout
if logParts.Severity == syslog.LOG_ERR {
s.lre.Write(logParts.Message)
s.lre.Write([]byte{'\n'})
} else {
s.lro.Write(logParts.Message)
s.lro.Write([]byte{'\n'})
}
}
}
2016-02-10 16:13:08 +00:00
// Exit kills the syslog server
func (s *SyslogCollector) Exit() error {
2016-02-17 22:48:25 +00:00
s.server.Shutdown()
close(s.syslogChan)
2016-02-17 22:48:25 +00:00
return nil
}
2016-02-10 16:13:08 +00:00
// UpdateLogConfig updates the log configuration
func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error {
2016-02-10 16:13:08 +00:00
s.ctx.LogConfig = logConfig
if s.lro == nil {
return fmt.Errorf("log rotator for stdout doesn't exist")
}
s.lro.MaxFiles = logConfig.MaxFiles
s.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
if s.lre == nil {
return fmt.Errorf("log rotator for stderr doesn't exist")
}
s.lre.MaxFiles = logConfig.MaxFiles
s.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
return nil
}
// configureTaskDir sets the task dir in the SyslogCollector
func (s *SyslogCollector) configureTaskDir() error {
taskDir, ok := s.ctx.AllocDir.TaskDirs[s.ctx.TaskName]
if !ok {
return fmt.Errorf("couldn't find task directory for task %v", s.ctx.TaskName)
}
s.taskDir = taskDir
return nil
}
2016-02-10 15:52:15 +00:00
// getFreePort returns a free port ready to be listened on between upper and
// lower bounds
2016-02-17 22:48:25 +00:00
func (s *SyslogCollector) getListener(lowerBound uint, upperBound uint) (net.Listener, error) {
2016-02-10 15:52:15 +00:00
for i := lowerBound; i <= upperBound; i++ {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i))
if err != nil {
return nil, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
continue
}
2016-02-17 22:48:25 +00:00
return l, nil
2016-02-10 15:52:15 +00:00
}
return nil, fmt.Errorf("No free port found")
}