Parsing the severity of the log lines
This commit is contained in:
parent
776e57deb0
commit
19d0e74608
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
s1 "log/syslog"
|
||||
"net"
|
||||
"path/filepath"
|
||||
|
||||
|
@ -97,21 +98,35 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl
|
|||
if err := s.server.Boot(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r, w := io.Pipe()
|
||||
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
|
||||
|
||||
ro, wo := io.Pipe()
|
||||
lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal),
|
||||
fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles,
|
||||
logFileSize, s.logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go lro.Start(r)
|
||||
go lro.Start(ro)
|
||||
|
||||
re, we := io.Pipe()
|
||||
lre, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal),
|
||||
fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles,
|
||||
logFileSize, s.logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go lre.Start(re)
|
||||
|
||||
// map[string]interface{}
|
||||
go func(channel syslog.LogPartsChannel) {
|
||||
for logParts := range channel {
|
||||
w.Write(logParts["content"].([]byte))
|
||||
w.Write([]byte("\n"))
|
||||
s := logParts["severity"].(s1.Priority)
|
||||
if s == s1.LOG_ERR {
|
||||
we.Write(logParts["content"].([]byte))
|
||||
} else {
|
||||
wo.Write(logParts["content"].([]byte))
|
||||
}
|
||||
wo.Write([]byte("\n"))
|
||||
}
|
||||
}(channel)
|
||||
go s.server.Wait()
|
||||
|
|
|
@ -2,12 +2,35 @@ package syslog
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"log"
|
||||
"log/syslog"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/jeromer/syslogparser"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrPriorityNoStart = fmt.Errorf("No start char found for priority")
|
||||
ErrPriorityEmpty = fmt.Errorf("Priority field empty")
|
||||
ErrPriorityNoEnd = fmt.Errorf("No end char found for priority")
|
||||
ErrPriorityTooShort = fmt.Errorf("Priority field too short")
|
||||
ErrPriorityTooLong = fmt.Errorf("Priority field too long")
|
||||
ErrPriorityNonDigit = fmt.Errorf("Non digit found in priority")
|
||||
)
|
||||
|
||||
const (
|
||||
PRI_PART_START = '<'
|
||||
PRI_PART_END = '>'
|
||||
)
|
||||
|
||||
type Priority struct {
|
||||
P syslog.Priority
|
||||
F syslog.Priority
|
||||
S syslog.Priority
|
||||
}
|
||||
|
||||
type DockerLogParser struct {
|
||||
line []byte
|
||||
|
||||
|
@ -23,8 +46,61 @@ func (d *DockerLogParser) Parse() error {
|
|||
}
|
||||
|
||||
func (d *DockerLogParser) Dump() syslogparser.LogParts {
|
||||
severity, idx, _ := d.parsePriority(d.line)
|
||||
return map[string]interface{}{
|
||||
"content": d.line,
|
||||
"content": d.line[idx:],
|
||||
"severity": severity.S,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DockerLogParser) parsePriority(line []byte) (Priority, int, error) {
|
||||
cursor := 0
|
||||
pri := d.newPriority(0)
|
||||
if len(line) <= 0 {
|
||||
return pri, cursor, ErrPriorityEmpty
|
||||
}
|
||||
if line[cursor] != PRI_PART_START {
|
||||
return pri, cursor, ErrPriorityNoStart
|
||||
}
|
||||
i := 1
|
||||
priDigit := 0
|
||||
for i < len(line) {
|
||||
if i >= 5 {
|
||||
return pri, cursor, ErrPriorityTooLong
|
||||
}
|
||||
c := line[i]
|
||||
if c == PRI_PART_END {
|
||||
if i == 1 {
|
||||
return pri, cursor, ErrPriorityTooShort
|
||||
}
|
||||
cursor = i + 1
|
||||
return d.newPriority(priDigit), cursor, nil
|
||||
}
|
||||
if d.isDigit(c) {
|
||||
v, e := strconv.Atoi(string(c))
|
||||
if e != nil {
|
||||
return pri, cursor, e
|
||||
}
|
||||
priDigit = (priDigit * 10) + v
|
||||
} else {
|
||||
return pri, cursor, ErrPriorityNonDigit
|
||||
}
|
||||
i++
|
||||
}
|
||||
return pri, cursor, ErrPriorityNoEnd
|
||||
}
|
||||
|
||||
func (d *DockerLogParser) isDigit(c byte) bool {
|
||||
return c >= '0' && c <= '9'
|
||||
}
|
||||
|
||||
func (d *DockerLogParser) newPriority(p int) Priority {
|
||||
// The Priority value is calculated by first multiplying the Facility
|
||||
// number by 8 and then adding the numerical value of the Severity.
|
||||
return Priority{
|
||||
P: syslog.Priority(p),
|
||||
F: syslog.Priority(p / 8),
|
||||
S: syslog.Priority(p % 8),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
package syslog
|
||||
|
||||
import (
|
||||
"log/syslog"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestLogParser_Priority(t *testing.T) {
|
||||
line := []byte("<30>2016-02-10T10:16:43-08:00 d-thinkpad docker/e2a1e3ebd3a3[22950]: 1:C 10 Feb 18:16:43.391 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf")
|
||||
d := NewDockerLogParser(line)
|
||||
p, err := d.parsePriority(line)
|
||||
if err != nil {
|
||||
t.Fatalf("got an err: %v", err)
|
||||
}
|
||||
if p.S != syslog.LOG_INFO {
|
||||
t.Fatalf("expected serverity: %v, got: %v", syslog.LOG_INFO, p.S)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue