Implemented a custom syslog server

This commit is contained in:
Diptanu Choudhury 2016-02-17 14:48:25 -08:00
parent 9301581bb4
commit ef7cfb1f0b
3 changed files with 104 additions and 67 deletions

View File

@ -3,14 +3,10 @@
package logcollector
import (
"bufio"
"fmt"
"log"
"log/syslog"
"strconv"
"time"
"github.com/jeromer/syslogparser"
)
// Errors related to parsing priority
@ -29,6 +25,12 @@ const (
PRI_PART_END = '>'
)
// SyslogMessage represents a log line received
type SyslogMessage struct {
Message []byte
Severity syslog.Priority
}
// Priority holds all the priority bits in a syslog log line
type Priority struct {
Pri int
@ -38,32 +40,21 @@ type Priority struct {
// DockerLogParser parses a line of log message that the docker daemon ships
type DockerLogParser struct {
line []byte
content []byte
severity Priority
log *log.Logger
logger *log.Logger
}
// NewDockerLogParser creates a new DockerLogParser
func NewDockerLogParser(line []byte) *DockerLogParser {
return &DockerLogParser{line: line}
func NewDockerLogParser(logger *log.Logger) *DockerLogParser {
return &DockerLogParser{logger: logger}
}
// Parse parses a syslog log line
func (d *DockerLogParser) Parse() error {
severity, _, _ := d.parsePriority(d.line)
msgIdx := d.logContentIndex(d.line)
d.severity = severity
d.content = d.line[msgIdx:]
return nil
}
// Dump creates a map of the parsed log line and severity
func (d *DockerLogParser) Dump() syslogparser.LogParts {
return map[string]interface{}{
"content": d.content,
"severity": d.severity,
func (d *DockerLogParser) Parse(line []byte) *SyslogMessage {
pri, _, _ := d.parsePriority(line)
msgIdx := d.logContentIndex(line)
return &SyslogMessage{
Severity: pri.Severity,
Message: line[msgIdx:],
}
}
@ -143,19 +134,3 @@ func (d *DockerLogParser) newPriority(p int) Priority {
Severity: syslog.Priority(p % 8),
}
}
func (d *DockerLogParser) Location(location *time.Location) {
}
// CustomParser is a parser to parse docker syslog lines
type CustomParser struct {
logger *log.Logger
}
func (c *CustomParser) GetParser(line []byte) syslogparser.LogParser {
return NewDockerLogParser(line)
}
func (c *CustomParser) GetSplitFunc() bufio.SplitFunc {
return nil
}

View File

@ -0,0 +1,70 @@
package logcollector
import (
"bufio"
"log"
"net"
)
type SyslogServer struct {
listener net.Listener
messages chan *SyslogMessage
parser *DockerLogParser
doneCh chan interface{}
logger *log.Logger
}
func NewSyslogServer(l net.Listener, messages chan *SyslogMessage, logger *log.Logger) *SyslogServer {
parser := NewDockerLogParser(logger)
return &SyslogServer{
listener: l,
messages: messages,
parser: parser,
logger: logger,
doneCh: make(chan interface{}),
}
}
func (s *SyslogServer) Start() {
for {
select {
case <-s.doneCh:
return
default:
connection, err := s.listener.Accept()
s.logger.Printf("DIPTANU ACCEPTED CON")
if err != nil {
s.logger.Printf("[ERROR] logcollector.server: error in accepting connection: %v", err)
continue
}
go s.Read(connection)
}
}
}
func (s *SyslogServer) Read(connection net.Conn) {
defer connection.Close()
scanner := bufio.NewScanner(bufio.NewReader(connection))
LOOP:
for {
select {
case <-s.doneCh:
break LOOP
default:
}
if scanner.Scan() {
b := scanner.Bytes()
s.logger.Printf("DIPTANU READ BYTES %v", b)
msg := s.parser.Parse(b)
s.messages <- msg
} else {
break LOOP
}
}
}
func (s *SyslogServer) Shutdown() {
close(s.doneCh)
}

View File

@ -6,7 +6,7 @@ import (
"fmt"
"io"
"log"
s1 "log/syslog"
"log/syslog"
"net"
"path/filepath"
@ -14,7 +14,6 @@ import (
"github.com/hashicorp/nomad/client/driver/executor"
"github.com/hashicorp/nomad/client/driver/logrotator"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mcuadros/go-syslog"
)
// LogCollectorContext holds context to configure the syslog server
@ -62,7 +61,7 @@ type SyslogCollector struct {
lro *logrotator.LogRotator
lre *logrotator.LogRotator
server *syslog.Server
server *SyslogServer
taskDir string
logger *log.Logger
@ -76,27 +75,21 @@ func NewSyslogCollector(logger *log.Logger) *SyslogCollector {
// LaunchCollector launches a new syslog server and starts writing log lines to
// files and rotates them
func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) {
addr, err := s.getFreePort(ctx.PortLowerBound, ctx.PortUpperBound)
l, err := s.getListener(ctx.PortLowerBound, ctx.PortUpperBound)
if err != nil {
return nil, err
}
s.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", addr)
s.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", l.Addr().String())
s.ctx = ctx
// configuring the task dir
if err := s.configureTaskDir(); err != nil {
return nil, err
}
channel := make(syslog.LogPartsChannel)
handler := syslog.NewChannelHandler(channel)
s.server = syslog.NewServer()
s.server.SetFormat(&CustomParser{logger: s.logger})
s.server.SetHandler(handler)
s.server.ListenTCP(addr.String())
if err := s.server.Boot(); err != nil {
return nil, err
}
channel := make(chan *SyslogMessage)
syslogServer := NewSyslogServer(l, channel, s.logger)
s.server = syslogServer
go syslogServer.Start()
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
ro, wo := io.Pipe()
@ -119,26 +112,26 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl
s.lre = lre
go lre.Start(re)
go func(channel syslog.LogPartsChannel) {
go func(channel chan *SyslogMessage) {
for logParts := range channel {
// If the severity of the log line is err then we write to stderr
// otherwise all messages go to stdout
s := logParts["severity"].(Priority)
if s.Severity == s1.LOG_ERR {
we.Write(logParts["content"].([]byte))
if logParts.Severity == syslog.LOG_ERR {
we.Write(logParts.Message)
we.Write([]byte("\n"))
} else {
wo.Write(logParts["content"].([]byte))
wo.Write(logParts.Message)
wo.Write([]byte("\n"))
}
wo.Write([]byte("\n"))
}
}(channel)
go s.server.Wait()
return &SyslogCollectorState{Addr: addr.String()}, nil
return &SyslogCollectorState{Addr: l.Addr().String()}, nil
}
// Exit kills the syslog server
func (s *SyslogCollector) Exit() error {
return s.server.Kill()
s.server.Shutdown()
return nil
}
// UpdateLogConfig updates the log configuration
@ -170,7 +163,7 @@ func (s *SyslogCollector) configureTaskDir() error {
// getFreePort returns a free port ready to be listened on between upper and
// lower bounds
func (s *SyslogCollector) getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) {
func (s *SyslogCollector) getListener(lowerBound uint, upperBound uint) (net.Listener, error) {
for i := lowerBound; i <= upperBound; i++ {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i))
if err != nil {
@ -180,8 +173,7 @@ func (s *SyslogCollector) getFreePort(lowerBound uint, upperBound uint) (net.Add
if err != nil {
continue
}
defer l.Close()
return l.Addr(), nil
return l, nil
}
return nil, fmt.Errorf("No free port found")
}