176 lines
4.8 KiB
Go
176 lines
4.8 KiB
Go
// +build !windows
|
|
|
|
package logging
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"log/syslog"
|
|
"net"
|
|
|
|
"github.com/hashicorp/nomad/client/allocdir"
|
|
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
// LogCollectorContext holds context to configure the syslog server
|
|
type LogCollectorContext struct {
|
|
// TaskName is the name of the Task
|
|
TaskName string
|
|
|
|
// AllocDir is the handle to do operations on the alloc dir of
|
|
// the task
|
|
AllocDir *allocdir.AllocDir
|
|
|
|
// LogConfig provides configuration related to log rotation
|
|
LogConfig *structs.LogConfig
|
|
|
|
// PortUpperBound is the upper bound of the ports that we can use to start
|
|
// the syslog server
|
|
PortUpperBound uint
|
|
|
|
// PortLowerBound is the lower bound of the ports that we can use to start
|
|
// the syslog server
|
|
PortLowerBound uint
|
|
}
|
|
|
|
// SyslogCollectorState holds the address and islation information of a launched
|
|
// syslog server
|
|
type SyslogCollectorState struct {
|
|
IsolationConfig *cstructs.IsolationConfig
|
|
Addr string
|
|
}
|
|
|
|
// LogCollector is an interface which allows a driver to launch a log server
|
|
// and update log configuration
|
|
type LogCollector interface {
|
|
LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error)
|
|
Exit() error
|
|
UpdateLogConfig(logConfig *structs.LogConfig) error
|
|
}
|
|
|
|
// SyslogCollector is a LogCollector which starts a syslog server and does
|
|
// rotation to incoming stream
|
|
type SyslogCollector struct {
|
|
addr net.Addr
|
|
logConfig *structs.LogConfig
|
|
ctx *LogCollectorContext
|
|
|
|
lro *FileRotator
|
|
lre *FileRotator
|
|
server *SyslogServer
|
|
syslogChan chan *SyslogMessage
|
|
taskDir string
|
|
|
|
logger *log.Logger
|
|
}
|
|
|
|
// NewSyslogCollector returns an implementation of the SyslogCollector
|
|
func NewSyslogCollector(logger *log.Logger) *SyslogCollector {
|
|
return &SyslogCollector{logger: logger, syslogChan: make(chan *SyslogMessage, 2048)}
|
|
}
|
|
|
|
// LaunchCollector launches a new syslog server and starts writing log lines to
|
|
// files and rotates them
|
|
func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) {
|
|
l, err := s.getListener(ctx.PortLowerBound, ctx.PortUpperBound)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", l.Addr().String())
|
|
s.ctx = ctx
|
|
// configuring the task dir
|
|
if err := s.configureTaskDir(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.server = NewSyslogServer(l, s.syslogChan, s.logger)
|
|
go s.server.Start()
|
|
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
|
|
|
|
lro, err := NewFileRotator(ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stdout", ctx.TaskName),
|
|
ctx.LogConfig.MaxFiles, logFileSize, s.logger)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.lro = lro
|
|
|
|
lre, err := NewFileRotator(ctx.AllocDir.LogDir(), fmt.Sprintf("%v.stderr", ctx.TaskName),
|
|
ctx.LogConfig.MaxFiles, logFileSize, s.logger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.lre = lre
|
|
|
|
go s.collectLogs(lre, lro)
|
|
return &SyslogCollectorState{Addr: l.Addr().String()}, nil
|
|
}
|
|
|
|
func (s *SyslogCollector) collectLogs(we io.Writer, wo io.Writer) {
|
|
for logParts := range s.syslogChan {
|
|
// If the severity of the log line is err then we write to stderr
|
|
// otherwise all messages go to stdout
|
|
if logParts.Severity == syslog.LOG_ERR {
|
|
s.lre.Write(logParts.Message)
|
|
s.lre.Write([]byte{'\n'})
|
|
} else {
|
|
s.lro.Write(logParts.Message)
|
|
s.lro.Write([]byte{'\n'})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Exit kills the syslog server
|
|
func (s *SyslogCollector) Exit() error {
|
|
s.server.Shutdown()
|
|
s.lre.Close()
|
|
s.lro.Close()
|
|
return nil
|
|
}
|
|
|
|
// UpdateLogConfig updates the log configuration
|
|
func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error {
|
|
s.ctx.LogConfig = logConfig
|
|
if s.lro == nil {
|
|
return fmt.Errorf("log rotator for stdout doesn't exist")
|
|
}
|
|
s.lro.MaxFiles = logConfig.MaxFiles
|
|
s.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
|
|
|
|
if s.lre == nil {
|
|
return fmt.Errorf("log rotator for stderr doesn't exist")
|
|
}
|
|
s.lre.MaxFiles = logConfig.MaxFiles
|
|
s.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
|
|
return nil
|
|
}
|
|
|
|
// configureTaskDir sets the task dir in the SyslogCollector
|
|
func (s *SyslogCollector) configureTaskDir() error {
|
|
taskDir, ok := s.ctx.AllocDir.TaskDirs[s.ctx.TaskName]
|
|
if !ok {
|
|
return fmt.Errorf("couldn't find task directory for task %v", s.ctx.TaskName)
|
|
}
|
|
s.taskDir = taskDir
|
|
return nil
|
|
}
|
|
|
|
// getFreePort returns a free port ready to be listened on between upper and
|
|
// lower bounds
|
|
func (s *SyslogCollector) getListener(lowerBound uint, upperBound uint) (net.Listener, error) {
|
|
for i := lowerBound; i <= upperBound; i++ {
|
|
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
l, err := net.ListenTCP("tcp", addr)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
return l, nil
|
|
}
|
|
return nil, fmt.Errorf("No free port found")
|
|
}
|