logmon: remove syslog server deadcode

Remove unused syslog server related code that got replaced by the docker
logger in Nomad 0.9
This commit is contained in:
Mahmood Ali 2019-05-21 09:36:43 -04:00
parent 05a9c6aedb
commit 974bcbecc9
5 changed files with 0 additions and 573 deletions

View File

@ -1,159 +0,0 @@
// +build darwin dragonfly freebsd linux netbsd openbsd solaris windows
package logging
import (
"fmt"
"strconv"
syslog "github.com/RackSec/srslog"
hclog "github.com/hashicorp/go-hclog"
)
// Errors related to parsing priority
var (
ErrPriorityNoStart = fmt.Errorf("No start char found for priority")
ErrPriorityEmpty = fmt.Errorf("Priority field empty")
ErrPriorityNoEnd = fmt.Errorf("No end char found for priority")
ErrPriorityTooShort = fmt.Errorf("Priority field too short")
ErrPriorityTooLong = fmt.Errorf("Priority field too long")
ErrPriorityNonDigit = fmt.Errorf("Non digit found in priority")
)
// Priority header and ending characters
const (
PRI_PART_START = '<'
PRI_PART_END = '>'
)
// SyslogMessage represents a log line received
type SyslogMessage struct {
Message []byte
Severity syslog.Priority
}
// Priority holds all the priority bits in a syslog log line
type Priority struct {
Pri int
Facility syslog.Priority
Severity syslog.Priority
}
// DockerLogParser parses a line of log message that the docker daemon ships
type DockerLogParser struct {
logger hclog.Logger
}
// NewDockerLogParser creates a new DockerLogParser
func NewDockerLogParser(logger hclog.Logger) *DockerLogParser {
return &DockerLogParser{logger: logger}
}
// Parse parses a syslog log line
func (d *DockerLogParser) Parse(line []byte) *SyslogMessage {
pri, _, _ := d.parsePriority(line)
msgIdx := d.logContentIndex(line)
// Create a copy of the line so that subsequent Scans do not override the
// message
lineCopy := make([]byte, len(line[msgIdx:]))
copy(lineCopy, line[msgIdx:])
return &SyslogMessage{
Severity: pri.Severity,
Message: lineCopy,
}
}
// logContentIndex finds out the index of the start index of the content in a
// syslog line
func (d *DockerLogParser) logContentIndex(line []byte) int {
cursor := 0
numSpace := 0
numColons := 0
// first look for at least 2 colons. This matches into the date that has no more spaces in it
// DefaultFormatter log line look: '<30>2016-07-06T15:13:11Z00:00 hostname docker/9648c64f5037[16200]'
// UnixFormatter log line look: '<30>Jul 6 15:13:11 docker/9648c64f5037[16200]'
for i := 0; i < len(line); i++ {
if line[i] == ':' {
numColons += 1
if numColons == 2 {
cursor = i
break
}
}
}
// then look for the next space
for i := cursor; i < len(line); i++ {
if line[i] == ' ' {
numSpace += 1
if numSpace == 1 {
cursor = i
break
}
}
}
// then the colon is what separates it, followed by a space
for i := cursor; i < len(line); i++ {
if line[i] == ':' && i+1 < len(line) && line[i+1] == ' ' {
cursor = i + 1
break
}
}
// return the cursor to the next character
return cursor + 1
}
// parsePriority parses the priority in a syslog message
func (d *DockerLogParser) parsePriority(line []byte) (Priority, int, error) {
cursor := 0
pri := d.newPriority(0)
if len(line) <= 0 {
return pri, cursor, ErrPriorityEmpty
}
if line[cursor] != PRI_PART_START {
return pri, cursor, ErrPriorityNoStart
}
i := 1
priDigit := 0
for i < len(line) {
if i >= 5 {
return pri, cursor, ErrPriorityTooLong
}
c := line[i]
if c == PRI_PART_END {
if i == 1 {
return pri, cursor, ErrPriorityTooShort
}
cursor = i + 1
return d.newPriority(priDigit), cursor, nil
}
if d.isDigit(c) {
v, e := strconv.Atoi(string(c))
if e != nil {
return pri, cursor, e
}
priDigit = (priDigit * 10) + v
} else {
return pri, cursor, ErrPriorityNonDigit
}
i++
}
return pri, cursor, ErrPriorityNoEnd
}
// isDigit checks if a byte is a numeric char
func (d *DockerLogParser) isDigit(c byte) bool {
return c >= '0' && c <= '9'
}
// newPriority creates a new default priority
func (d *DockerLogParser) newPriority(p int) Priority {
// The Priority value is calculated by first multiplying the Facility
// number by 8 and then adding the numerical value of the Severity.
return Priority{
Pri: p,
Facility: syslog.Priority(p / 8),
Severity: syslog.Priority(p % 8),
}
}

View File

@ -1,49 +0,0 @@
// +build darwin dragonfly freebsd linux netbsd openbsd solaris
package logging
import (
"bytes"
"testing"
syslog "github.com/RackSec/srslog"
"github.com/hashicorp/nomad/helper/testlog"
)
func TestLogParser_Priority(t *testing.T) {
t.Parallel()
line := []byte("<30>2016-02-10T10:16:43-08:00 d-thinkpad docker/e2a1e3ebd3a3[22950]: 1:C 10 Feb 18:16:43.391 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf")
d := NewDockerLogParser(testlog.HCLogger(t))
p, _, err := d.parsePriority(line)
if err != nil {
t.Fatalf("got an err: %v", err)
}
if p.Severity != syslog.LOG_INFO {
t.Fatalf("expected severity: %v, got: %v", syslog.LOG_INFO, p.Severity)
}
idx := d.logContentIndex(line)
expected := bytes.Index(line, []byte("1:C 10 Feb 18:16:43.391"))
if idx != expected {
t.Fatalf("expected idx: %v, got: %v", expected, idx)
}
}
func TestLogParser_Priority_UnixFormatter(t *testing.T) {
t.Parallel()
line := []byte("<30>Feb 6, 10:16:43 docker/e2a1e3ebd3a3[22950]: 1:C 10 Feb 18:16:43.391 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf")
d := NewDockerLogParser(testlog.HCLogger(t))
p, _, err := d.parsePriority(line)
if err != nil {
t.Fatalf("got an err: %v", err)
}
if p.Severity != syslog.LOG_INFO {
t.Fatalf("expected severity: %v, got: %v", syslog.LOG_INFO, p.Severity)
}
idx := d.logContentIndex(line)
expected := bytes.Index(line, []byte("1:C 10 Feb 18:16:43.391"))
if idx != expected {
t.Fatalf("expected idx: %v, got: %v", expected, idx)
}
}

View File

@ -1,93 +0,0 @@
package logging
import (
"bufio"
"net"
"sync"
hclog "github.com/hashicorp/go-hclog"
)
// SyslogServer is a server which listens to syslog messages and parses them
type SyslogServer struct {
listener net.Listener
messages chan *SyslogMessage
parser *DockerLogParser
doneCh chan interface{}
done bool
doneLock sync.Mutex
logger hclog.Logger
}
// NewSyslogServer creates a new syslog server
func NewSyslogServer(l net.Listener, messages chan *SyslogMessage, logger hclog.Logger) *SyslogServer {
logger = logger.Named("logcollector.server")
parser := NewDockerLogParser(logger)
return &SyslogServer{
listener: l,
messages: messages,
parser: parser,
logger: logger,
doneCh: make(chan interface{}),
}
}
// Start starts accepting syslog connections
func (s *SyslogServer) Start() {
for {
select {
case <-s.doneCh:
return
default:
connection, err := s.listener.Accept()
if err != nil {
s.doneLock.Lock()
done := s.done
s.doneLock.Unlock()
if done {
return
}
s.logger.Error("error in accepting connection", "err", err)
continue
}
go s.read(connection)
}
}
}
// read reads the bytes from a connection
func (s *SyslogServer) read(connection net.Conn) {
defer connection.Close()
scanner := bufio.NewScanner(bufio.NewReader(connection))
for {
select {
case <-s.doneCh:
return
default:
}
if scanner.Scan() {
b := scanner.Bytes()
msg := s.parser.Parse(b)
s.messages <- msg
} else {
return
}
}
}
// Shutdown the syslog server
func (s *SyslogServer) Shutdown() {
s.doneLock.Lock()
defer s.doneLock.Unlock()
if !s.done {
close(s.doneCh)
close(s.messages)
s.done = true
s.listener.Close()
}
}

View File

@ -1,66 +0,0 @@
package logging
import (
"io/ioutil"
"net"
"os"
"path"
"testing"
"time"
"github.com/hashicorp/nomad/helper/testlog"
)
func TestSyslogServer_Start_Shutdown(t *testing.T) {
t.Parallel()
dir, err := ioutil.TempDir("", "sock")
if err != nil {
t.Fatalf("Failed to create temporary directory: %v", err)
}
sock := path.Join(dir, "socket")
defer os.Remove(sock)
l, err := net.Listen("unix", sock)
if err != nil {
t.Fatalf("Failed to listen unix socket: %v", err)
}
s := NewSyslogServer(l, make(chan *SyslogMessage, 2048), testlog.HCLogger(t))
go s.Start()
if s.done {
t.Fatalf("expected running SyslogServer, but not running")
}
received := false
go func() {
for range s.messages {
received = true
}
}()
conn, err := net.Dial("unix", sock)
if err != nil {
t.Fatalf("expected access to SyslogServer, but %v", err)
}
_, err = conn.Write([]byte("syslog server test\n"))
if err != nil {
t.Fatalf("expected send data to SyslogServer but: %v", err)
}
// Need to wait until SyslogServer received the data certainly
time.Sleep(1000 * time.Millisecond)
if !received {
t.Fatalf("expected SyslogServer received data, but not received")
}
defer conn.Close()
s.Shutdown()
if !s.done {
t.Fatalf("expected SyslogServer done, but running")
}
}

View File

@ -1,206 +0,0 @@
// +build darwin dragonfly freebsd linux netbsd openbsd solaris windows
package logging
import (
"fmt"
"io"
"io/ioutil"
"net"
"os"
"runtime"
syslog "github.com/RackSec/srslog"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/nomad/structs"
)
// LogCollectorContext holds context to configure the syslog server
type LogCollectorContext struct {
// TaskName is the name of the Task
TaskName string
// AllocDir is the handle to do operations on the alloc dir of
// the task
AllocDir *allocdir.AllocDir
// LogConfig provides configuration related to log rotation
LogConfig *structs.LogConfig
// PortUpperBound is the upper bound of the ports that we can use to start
// the syslog server
PortUpperBound uint
// PortLowerBound is the lower bound of the ports that we can use to start
// the syslog server
PortLowerBound uint
}
// SyslogCollectorState holds the address and isolation information of a launched
// syslog server
type SyslogCollectorState struct {
Addr string
}
// LogCollector is an interface which allows a driver to launch a log server
// and update log configuration
type LogCollector interface {
LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error)
Exit() error
UpdateLogConfig(logConfig *structs.LogConfig) error
}
// SyslogCollector is a LogCollector which starts a syslog server and does
// rotation to incoming stream
type SyslogCollector struct {
ctx *LogCollectorContext
lro *FileRotator
lre *FileRotator
server *SyslogServer
syslogChan chan *SyslogMessage
taskDir string
logger hclog.Logger
}
// NewSyslogCollector returns an implementation of the SyslogCollector
func NewSyslogCollector(logger hclog.Logger) *SyslogCollector {
return &SyslogCollector{logger: logger.Named("syslog-server"),
syslogChan: make(chan *SyslogMessage, 2048)}
}
// LaunchCollector launches a new syslog server and starts writing log lines to
// files and rotates them
func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) {
l, err := s.getListener(ctx.PortLowerBound, ctx.PortUpperBound)
if err != nil {
return nil, err
}
s.logger.Debug("launching syslog server on addr", "addr", l.Addr().String())
s.ctx = ctx
// configuring the task dir
if err := s.configureTaskDir(); err != nil {
return nil, err
}
s.server = NewSyslogServer(l, s.syslogChan, s.logger)
go s.server.Start()
logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024)
//FIXME There's an easier way to get this
logdir := ctx.AllocDir.TaskDirs[ctx.TaskName].LogDir
lro, err := NewFileRotator(logdir, fmt.Sprintf("%v.stdout", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, s.logger)
if err != nil {
return nil, err
}
s.lro = lro
lre, err := NewFileRotator(logdir, fmt.Sprintf("%v.stderr", ctx.TaskName),
ctx.LogConfig.MaxFiles, logFileSize, s.logger)
if err != nil {
return nil, err
}
s.lre = lre
go s.collectLogs(lre, lro)
syslogAddr := fmt.Sprintf("%s://%s", l.Addr().Network(), l.Addr().String())
return &SyslogCollectorState{Addr: syslogAddr}, nil
}
func (s *SyslogCollector) collectLogs(we io.Writer, wo io.Writer) {
for logParts := range s.syslogChan {
// If the severity of the log line is err then we write to stderr
// otherwise all messages go to stdout
if logParts.Severity == syslog.LOG_ERR {
s.lre.Write(logParts.Message)
s.lre.Write([]byte{'\n'})
} else {
s.lro.Write(logParts.Message)
s.lro.Write([]byte{'\n'})
}
}
}
// Exit kills the syslog server
func (s *SyslogCollector) Exit() error {
s.server.Shutdown()
s.lre.Close()
s.lro.Close()
return nil
}
// UpdateLogConfig updates the log configuration
func (s *SyslogCollector) UpdateLogConfig(logConfig *structs.LogConfig) error {
s.ctx.LogConfig = logConfig
if s.lro == nil {
return fmt.Errorf("log rotator for stdout doesn't exist")
}
s.lro.MaxFiles = logConfig.MaxFiles
s.lro.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
if s.lre == nil {
return fmt.Errorf("log rotator for stderr doesn't exist")
}
s.lre.MaxFiles = logConfig.MaxFiles
s.lre.FileSize = int64(logConfig.MaxFileSizeMB * 1024 * 1024)
return nil
}
// configureTaskDir sets the task dir in the SyslogCollector
func (s *SyslogCollector) configureTaskDir() error {
taskDir, ok := s.ctx.AllocDir.TaskDirs[s.ctx.TaskName]
if !ok {
return fmt.Errorf("couldn't find task directory for task %v", s.ctx.TaskName)
}
s.taskDir = taskDir.Dir
return nil
}
// getFreePort returns a free port ready to be listened on between upper and
// lower bounds
func (s *SyslogCollector) getListener(lowerBound uint, upperBound uint) (net.Listener, error) {
if runtime.GOOS == "windows" {
return s.listenerTCP(lowerBound, upperBound)
}
return s.listenerUnix()
}
// listenerTCP creates a TCP listener using an unused port between an upper and
// lower bound
func (s *SyslogCollector) listenerTCP(lowerBound uint, upperBound uint) (net.Listener, error) {
for i := lowerBound; i <= upperBound; i++ {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i))
if err != nil {
return nil, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
continue
}
return l, nil
}
return nil, fmt.Errorf("No free port found")
}
// listenerUnix creates a Unix domain socket
func (s *SyslogCollector) listenerUnix() (net.Listener, error) {
f, err := ioutil.TempFile("", "plugin")
if err != nil {
return nil, err
}
path := f.Name()
if err := f.Close(); err != nil {
return nil, err
}
if err := os.Remove(path); err != nil {
return nil, err
}
return net.Listen("unix", path)
}