diff --git a/client/driver/docker.go b/client/driver/docker.go index 100bc980e..44bbf2aae 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -19,7 +19,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/driver/logcollector" + "github.com/hashicorp/nomad/client/driver/logging" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/helper/discover" @@ -82,7 +82,7 @@ type dockerPID struct { type DockerHandle struct { pluginClient *plugin.Client - logCollector logcollector.LogCollector + logCollector logging.LogCollector client *docker.Client logger *log.Logger cleanupContainer bool @@ -507,7 +507,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle if err != nil { return nil, err } - logCollectorCtx := &logcollector.LogCollectorContext{ + logCollectorCtx := &logging.LogCollectorContext{ TaskName: task.Name, AllocDir: ctx.AllocDir, LogConfig: task.LogConfig, diff --git a/client/driver/exec.go b/client/driver/exec.go index 54b0b33b9..8acbf4399 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -38,7 +38,7 @@ type ExecDriverConfig struct { type execHandle struct { pluginClient *plugin.Client executor executor.Executor - isolationConfig *executor.IsolationConfig + isolationConfig *cstructs.IsolationConfig userPid int allocDir *allocdir.AllocDir killTimeout time.Duration @@ -153,7 +153,7 @@ type execId struct { UserPid int TaskDir string AllocDir *allocdir.AllocDir - IsolationConfig *executor.IsolationConfig + IsolationConfig *cstructs.IsolationConfig PluginConfig *PluginReattachConfig } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 7c383adfd..104edd757 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -2,7 +2,6 @@ package executor import ( "fmt" - "io" "log" "os" "os/exec" @@ -18,7 +17,8 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver/env" - "github.com/hashicorp/nomad/client/driver/logrotator" + "github.com/hashicorp/nomad/client/driver/logging" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -61,18 +61,12 @@ type ExecCommand struct { Args []string } -// IsolationConfig has information about the isolation mechanism the executor -// uses to put resource constraints and isolation on the user process -type IsolationConfig struct { - Cgroup *cgroupConfig.Cgroup -} - // ProcessState holds information about the state of a user process. type ProcessState struct { Pid int ExitCode int Signal int - IsolationConfig *IsolationConfig + IsolationConfig *cstructs.IsolationConfig Time time.Time } @@ -97,8 +91,8 @@ type UniversalExecutor struct { groups *cgroupConfig.Cgroup exitState *ProcessState processExited chan interface{} - lre *logrotator.LogRotator - lro *logrotator.LogRotator + lre *logging.FileRotator + lro *logging.FileRotator logger *log.Logger lock sync.Mutex @@ -135,28 +129,23 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext } logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) + path := filepath.Join(e.taskDir, allocdir.TaskLocal) + lro, err := logging.NewFileRotator(path, fmt.Sprintf("%v.stdout", ctx.TaskName), + ctx.LogConfig.MaxFiles, logFileSize, e.logger) - stdor, stdow := io.Pipe() - lro, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal), - fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, - logFileSize, e.logger) if err != nil { return nil, fmt.Errorf("error creating log rotator for stdout of task %v", err) } - e.cmd.Stdout = stdow + e.cmd.Stdout = lro e.lro = lro - go lro.Start(stdor) - stder, stdew := io.Pipe() - lre, err := logrotator.NewLogRotator(filepath.Join(e.taskDir, allocdir.TaskLocal), - fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles, - logFileSize, e.logger) + lre, err := logging.NewFileRotator(path, fmt.Sprintf("%v.stderr", ctx.TaskName), + ctx.LogConfig.MaxFiles, logFileSize, e.logger) if err != nil { return nil, fmt.Errorf("error creating log rotator for stderr of task %v", err) } - e.cmd.Stderr = stdew + e.cmd.Stderr = lre e.lre = lre - go lre.Start(stder) // setting the env, path and args for the command e.ctx.TaskEnv.Build() @@ -175,7 +164,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext return nil, fmt.Errorf("error starting command: %v", err) } go e.wait() - ic := &IsolationConfig{Cgroup: e.groups} + ic := &cstructs.IsolationConfig{Cgroup: e.groups} return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, IsolationConfig: ic, Time: time.Now()}, nil } diff --git a/client/driver/java.go b/client/driver/java.go index 657cd8e2e..092effd2c 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -45,7 +45,7 @@ type javaHandle struct { pluginClient *plugin.Client userPid int executor executor.Executor - isolationConfig *executor.IsolationConfig + isolationConfig *cstructs.IsolationConfig taskDir string allocDir *allocdir.AllocDir @@ -208,7 +208,7 @@ func (d *JavaDriver) cgroupsMounted(node *structs.Node) bool { type javaId struct { KillTimeout time.Duration PluginConfig *PluginReattachConfig - IsolationConfig *executor.IsolationConfig + IsolationConfig *cstructs.IsolationConfig TaskDir string AllocDir *allocdir.AllocDir UserPid int diff --git a/client/driver/logcollector/collector_windows.go b/client/driver/logging/collector_windows.go similarity index 98% rename from client/driver/logcollector/collector_windows.go rename to client/driver/logging/collector_windows.go index e13c04b2b..272b68cf2 100644 --- a/client/driver/logcollector/collector_windows.go +++ b/client/driver/logging/collector_windows.go @@ -1,4 +1,4 @@ -package logcollector +package logging import ( "log" diff --git a/client/driver/logging/rotator.go b/client/driver/logging/rotator.go new file mode 100644 index 000000000..4d768a1e5 --- /dev/null +++ b/client/driver/logging/rotator.go @@ -0,0 +1,245 @@ +package logging + +import ( + "bufio" + "fmt" + "io/ioutil" + "log" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "time" +) + +var ( + flushDur = 100 * time.Millisecond +) + +// FileRotator writes bytes to a rotated set of files +type FileRotator struct { + MaxFiles int // MaxFiles is the maximum number of rotated files allowed in a path + FileSize int64 // FileSize is the size a rotated file is allowed to grow + + path string // path is the path on the file system where the rotated set of files are opened + baseFileName string // baseFileName is the base file name of the rotated files + logFileIdx int // logFileIdx is the current index of the rotated files + oldestLogFileIdx int // oldestLogFileIdx is the index of the oldest log file in a path + + currentFile *os.File // currentFile is the file that is currently getting written + currentWr int64 // currentWr is the number of bytes written to the current file + bufw *bufio.Writer + + flushTicker *time.Ticker + logger *log.Logger + purgeCh chan struct{} + doneCh chan struct{} +} + +// NewFileRotator returns a new file rotator +func NewFileRotator(path string, baseFile string, maxFiles int, + fileSize int64, logger *log.Logger) (*FileRotator, error) { + rotator := &FileRotator{ + MaxFiles: maxFiles, + FileSize: fileSize, + + path: path, + baseFileName: baseFile, + + flushTicker: time.NewTicker(flushDur), + logger: logger, + purgeCh: make(chan struct{}, 1), + doneCh: make(chan struct{}, 1), + } + if err := rotator.lastFile(); err != nil { + return nil, err + } + go rotator.purgeOldFiles() + go rotator.flushPeriodically() + return rotator, nil +} + +// Write writes a byte array to a file and rotates the file if it's size becomes +// equal to the maximum size the user has defined. +func (f *FileRotator) Write(p []byte) (n int, err error) { + n = 0 + var nw int + + for n < len(p) { + // Check if we still have space in the current file, otherwise close and + // open the next file + if f.currentWr >= f.FileSize { + f.bufw.Flush() + f.currentFile.Close() + if err := f.nextFile(); err != nil { + return 0, err + } + } + // Calculate the remaining size on this file + remainingSize := f.FileSize - f.currentWr + + // Check if the number of bytes that we have to write is less than the + // remaining size of the file + if remainingSize < int64(len(p[n:])) { + // Write the number of bytes that we can write on the current file + li := int64(n) + remainingSize + nw, err = f.bufw.Write(p[n:li]) + //nw, err = f.currentFile.Write(p[n:li]) + } else { + // Write all the bytes in the current file + nw, err = f.bufw.Write(p[n:]) + //nw, err = f.currentFile.Write(p[n:]) + } + + // Increment the number of bytes written so far in this method + // invocation + n += nw + + // Increment the total number of bytes in the file + f.currentWr += int64(n) + if err != nil { + return + } + } + return +} + +// nextFile opens the next file and purges older files if the number of rotated +// files is larger than the maximum files configured by the user +func (f *FileRotator) nextFile() error { + nextFileIdx := f.logFileIdx + for { + nextFileIdx += 1 + logFileName := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, nextFileIdx)) + if fi, err := os.Stat(logFileName); err == nil { + if fi.IsDir() || fi.Size() >= f.FileSize { + continue + } + } + f.logFileIdx = nextFileIdx + if err := f.createFile(); err != nil { + return err + } + break + } + // Purge old files if we have more files than MaxFiles + if f.logFileIdx-f.oldestLogFileIdx >= f.MaxFiles { + select { + case f.purgeCh <- struct{}{}: + default: + } + } + return nil +} + +// lastFile finds out the rotated file with the largest index in a path. +func (f *FileRotator) lastFile() error { + finfos, err := ioutil.ReadDir(f.path) + if err != nil { + return err + } + + prefix := fmt.Sprintf("%s.", f.baseFileName) + for _, fi := range finfos { + if fi.IsDir() { + continue + } + if strings.HasPrefix(fi.Name(), prefix) { + fileIdx := strings.TrimPrefix(fi.Name(), prefix) + n, err := strconv.Atoi(fileIdx) + if err != nil { + continue + } + if n > f.logFileIdx { + f.logFileIdx = n + } + } + } + if err := f.createFile(); err != nil { + return err + } + return nil +} + +// createFile opens a new or existing file for writing +func (f *FileRotator) createFile() error { + logFileName := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, f.logFileIdx)) + cFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return err + } + f.currentFile = cFile + fi, err := f.currentFile.Stat() + if err != nil { + return err + } + f.currentWr = fi.Size() + if f.bufw == nil { + f.bufw = bufio.NewWriter(f.currentFile) + } else { + f.bufw.Reset(f.currentFile) + } + return nil +} + +// flushPeriodically flushes the buffered writer every 100ms to the underlying +// file +func (f *FileRotator) flushPeriodically() { + for _ = range f.flushTicker.C { + if f.bufw != nil { + f.bufw.Flush() + } + } +} + +func (f *FileRotator) Close() { + // Stop the ticker and flush for one last time + f.flushTicker.Stop() + if f.bufw != nil { + f.bufw.Flush() + } + + // Stop the purge go routine + f.doneCh <- struct{}{} + close(f.purgeCh) +} + +// purgeOldFiles removes older files and keeps only the last N files rotated for +// a file +func (f *FileRotator) purgeOldFiles() { + for { + select { + case <-f.purgeCh: + var fIndexes []int + files, err := ioutil.ReadDir(f.path) + if err != nil { + return + } + // Inserting all the rotated files in a slice + for _, fi := range files { + if strings.HasPrefix(fi.Name(), f.baseFileName) { + fileIdx := strings.TrimPrefix(fi.Name(), fmt.Sprintf("%s.", f.baseFileName)) + n, err := strconv.Atoi(fileIdx) + if err != nil { + continue + } + fIndexes = append(fIndexes, n) + } + } + + // Sorting the file indexes so that we can purge the older files and keep + // only the number of files as configured by the user + sort.Sort(sort.IntSlice(fIndexes)) + var toDelete []int + toDelete = fIndexes[0 : len(fIndexes)-f.MaxFiles] + for _, fIndex := range toDelete { + fname := filepath.Join(f.path, fmt.Sprintf("%s.%d", f.baseFileName, fIndex)) + os.RemoveAll(fname) + } + f.oldestLogFileIdx = fIndexes[0] + case <-f.doneCh: + return + } + } +} diff --git a/client/driver/logging/rotator_test.go b/client/driver/logging/rotator_test.go new file mode 100644 index 000000000..48f6dbc28 --- /dev/null +++ b/client/driver/logging/rotator_test.go @@ -0,0 +1,239 @@ +package logging + +import ( + "io/ioutil" + "log" + "os" + "path/filepath" + "testing" + "time" +) + +var ( + logger = log.New(os.Stdout, "", log.LstdFlags) + pathPrefix = "logrotator" + baseFileName = "redis.stdout" +) + +func TestFileRotator_IncorrectPath(t *testing.T) { + if _, err := NewFileRotator("/foo", baseFileName, 10, 10, logger); err == nil { + t.Fatalf("expected error") + } +} + +func TestFileRotator_CreateNewFile(t *testing.T) { + var path string + var err error + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + defer os.RemoveAll(path) + + _, err = NewFileRotator(path, baseFileName, 10, 10, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + if _, err := os.Stat(filepath.Join(path, "redis.stdout.0")); err != nil { + t.Fatalf("expected file") + } +} + +func TestFileRotator_OpenLastFile(t *testing.T) { + var path string + var err error + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + defer os.RemoveAll(path) + + fname1 := filepath.Join(path, "redis.stdout.0") + fname2 := filepath.Join(path, "redis.stdout.2") + if _, err := os.Create(fname1); err != nil { + t.Fatalf("test setup failure: %v", err) + } + if _, err := os.Create(fname2); err != nil { + t.Fatalf("test setup failure: %v", err) + } + + fr, err := NewFileRotator(path, baseFileName, 10, 10, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + if fr.currentFile.Name() != fname2 { + t.Fatalf("expected current file: %v, got: %v", fname2, fr.currentFile.Name()) + } +} + +func TestFileRotator_WriteToCurrentFile(t *testing.T) { + var path string + var err error + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + defer os.RemoveAll(path) + + fname1 := filepath.Join(path, "redis.stdout.0") + if _, err := os.Create(fname1); err != nil { + t.Fatalf("test setup failure: %v", err) + } + + fr, err := NewFileRotator(path, baseFileName, 10, 5, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + fr.Write([]byte("abcde")) + time.Sleep(200 * time.Millisecond) + fi, err := os.Stat(fname1) + if err != nil { + t.Fatalf("error getting the file info: %v", err) + } + if fi.Size() != 5 { + t.Fatalf("expected size: %v, actual: %v", 5, fi.Size()) + } +} + +func TestFileRotator_RotateFiles(t *testing.T) { + var path string + var err error + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + defer os.RemoveAll(path) + + fr, err := NewFileRotator(path, baseFileName, 10, 5, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + str := "abcdefgh" + nw, err := fr.Write([]byte(str)) + time.Sleep(200 * time.Millisecond) + if err != nil { + t.Fatalf("got error while writing: %v", err) + } + if nw != len(str) { + t.Fatalf("expected %v, got %v", len(str), nw) + } + fname1 := filepath.Join(path, "redis.stdout.0") + fi, err := os.Stat(fname1) + if err != nil { + t.Fatalf("error getting the file info: %v", err) + } + if fi.Size() != 5 { + t.Fatalf("expected size: %v, actual: %v", 5, fi.Size()) + } + + fname2 := filepath.Join(path, "redis.stdout.1") + if _, err := os.Stat(fname2); err != nil { + t.Fatalf("expected file %v to exist", fname2) + } + + if fi2, err := os.Stat(fname2); err == nil { + if fi2.Size() != 3 { + t.Fatalf("expected size: %v, actual: %v", 3, fi2.Size()) + } + } else { + t.Fatalf("error getting the file info: %v", err) + } +} + +func TestFileRotator_WriteRemaining(t *testing.T) { + var path string + var err error + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + defer os.RemoveAll(path) + + fname1 := filepath.Join(path, "redis.stdout.0") + if f, err := os.Create(fname1); err == nil { + f.Write([]byte("abcd")) + } else { + t.Fatalf("test setup failure: %v", err) + } + + fr, err := NewFileRotator(path, baseFileName, 10, 5, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + str := "efghijkl" + nw, err := fr.Write([]byte(str)) + if err != nil { + t.Fatalf("got error while writing: %v", err) + } + time.Sleep(200 * time.Millisecond) + if nw != len(str) { + t.Fatalf("expected %v, got %v", len(str), nw) + } + fi, err := os.Stat(fname1) + if err != nil { + t.Fatalf("error getting the file info: %v", err) + } + if fi.Size() != 5 { + t.Fatalf("expected size: %v, actual: %v", 5, fi.Size()) + } + + fname2 := filepath.Join(path, "redis.stdout.1") + if _, err := os.Stat(fname2); err != nil { + t.Fatalf("expected file %v to exist", fname2) + } + + if fi2, err := os.Stat(fname2); err == nil { + if fi2.Size() != 5 { + t.Fatalf("expected size: %v, actual: %v", 5, fi2.Size()) + } + } else { + t.Fatalf("error getting the file info: %v", err) + } + + fname3 := filepath.Join(path, "redis.stdout.2") + if _, err := os.Stat(fname3); err != nil { + t.Fatalf("expected file %v to exist", fname3) + } + + if fi3, err := os.Stat(fname3); err == nil { + if fi3.Size() != 2 { + t.Fatalf("expected size: %v, actual: %v", 2, fi3.Size()) + } + } else { + t.Fatalf("error getting the file info: %v", err) + } + +} + +func TestFileRotator_PurgeOldFiles(t *testing.T) { + var path string + var err error + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + defer os.RemoveAll(path) + + fr, err := NewFileRotator(path, baseFileName, 2, 2, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + str := "abcdeghijklmn" + nw, err := fr.Write([]byte(str)) + if err != nil { + t.Fatalf("got error while writing: %v", err) + } + if nw != len(str) { + t.Fatalf("expected %v, got %v", len(str), nw) + } + + time.Sleep(1 * time.Second) + f, err := ioutil.ReadDir(path) + if err != nil { + t.Fatalf("test error: %v", err) + } + + if len(f) != 2 { + t.Fatalf("expected number of files: %v, got: %v", 2, len(f)) + } +} diff --git a/client/driver/logcollector/parser.go b/client/driver/logging/syslog_parser.go similarity index 74% rename from client/driver/logcollector/parser.go rename to client/driver/logging/syslog_parser.go index 79ec96ae2..72d73c43d 100644 --- a/client/driver/logcollector/parser.go +++ b/client/driver/logging/syslog_parser.go @@ -1,16 +1,12 @@ // +build !windows -package logcollector +package logging import ( - "bufio" "fmt" "log" "log/syslog" "strconv" - "time" - - "github.com/jeromer/syslogparser" ) // Errors related to parsing priority @@ -29,6 +25,12 @@ const ( PRI_PART_END = '>' ) +// SyslogMessage represents a log line received +type SyslogMessage struct { + Message []byte + Severity syslog.Priority +} + // Priority holds all the priority bits in a syslog log line type Priority struct { Pri int @@ -38,32 +40,21 @@ type Priority struct { // DockerLogParser parses a line of log message that the docker daemon ships type DockerLogParser struct { - line []byte - content []byte - severity Priority - - log *log.Logger + logger *log.Logger } // NewDockerLogParser creates a new DockerLogParser -func NewDockerLogParser(line []byte) *DockerLogParser { - return &DockerLogParser{line: line} +func NewDockerLogParser(logger *log.Logger) *DockerLogParser { + return &DockerLogParser{logger: logger} } // Parse parses a syslog log line -func (d *DockerLogParser) Parse() error { - severity, _, _ := d.parsePriority(d.line) - msgIdx := d.logContentIndex(d.line) - d.severity = severity - d.content = d.line[msgIdx:] - return nil -} - -// Dump creates a map of the parsed log line and severity -func (d *DockerLogParser) Dump() syslogparser.LogParts { - return map[string]interface{}{ - "content": d.content, - "severity": d.severity, +func (d *DockerLogParser) Parse(line []byte) *SyslogMessage { + pri, _, _ := d.parsePriority(line) + msgIdx := d.logContentIndex(line) + return &SyslogMessage{ + Severity: pri.Severity, + Message: line[msgIdx:], } } @@ -143,19 +134,3 @@ func (d *DockerLogParser) newPriority(p int) Priority { Severity: syslog.Priority(p % 8), } } - -func (d *DockerLogParser) Location(location *time.Location) { -} - -// CustomParser is a parser to parse docker syslog lines -type CustomParser struct { - logger *log.Logger -} - -func (c *CustomParser) GetParser(line []byte) syslogparser.LogParser { - return NewDockerLogParser(line) -} - -func (c *CustomParser) GetSplitFunc() bufio.SplitFunc { - return nil -} diff --git a/client/driver/logcollector/parser_test.go b/client/driver/logging/syslog_parser_test.go similarity index 88% rename from client/driver/logcollector/parser_test.go rename to client/driver/logging/syslog_parser_test.go index dd43379ed..4f1416a1a 100644 --- a/client/driver/logcollector/parser_test.go +++ b/client/driver/logging/syslog_parser_test.go @@ -1,15 +1,17 @@ // +build !windows -package logcollector +package logging import ( + "log" "log/syslog" + "os" "testing" ) func TestLogParser_Priority(t *testing.T) { line := []byte("<30>2016-02-10T10:16:43-08:00 d-thinkpad docker/e2a1e3ebd3a3[22950]: 1:C 10 Feb 18:16:43.391 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf") - d := NewDockerLogParser(line) + d := NewDockerLogParser(log.New(os.Stdout, "", log.LstdFlags)) p, _, err := d.parsePriority(line) if err != nil { t.Fatalf("got an err: %v", err) diff --git a/client/driver/logging/syslog_server.go b/client/driver/logging/syslog_server.go new file mode 100644 index 000000000..87f8fe2e9 --- /dev/null +++ b/client/driver/logging/syslog_server.go @@ -0,0 +1,74 @@ +package logging + +import ( + "bufio" + "log" + "net" +) + +// SyslogServer is a server which listens to syslog messages and parses them +type SyslogServer struct { + listener net.Listener + messages chan *SyslogMessage + parser *DockerLogParser + + doneCh chan interface{} + logger *log.Logger +} + +// NewSyslogServer creates a new syslog server +func NewSyslogServer(l net.Listener, messages chan *SyslogMessage, logger *log.Logger) *SyslogServer { + parser := NewDockerLogParser(logger) + return &SyslogServer{ + listener: l, + messages: messages, + parser: parser, + logger: logger, + doneCh: make(chan interface{}), + } +} + +// Start starts accepting syslog connections +func (s *SyslogServer) Start() { + for { + select { + case <-s.doneCh: + s.listener.Close() + return + default: + connection, err := s.listener.Accept() + if err != nil { + s.logger.Printf("[ERROR] logcollector.server: error in accepting connection: %v", err) + continue + } + go s.read(connection) + } + } +} + +// read reads the bytes from a connection +func (s *SyslogServer) read(connection net.Conn) { + defer connection.Close() + scanner := bufio.NewScanner(bufio.NewReader(connection)) + + for { + select { + case <-s.doneCh: + return + default: + } + if scanner.Scan() { + b := scanner.Bytes() + msg := s.parser.Parse(b) + s.messages <- msg + } else { + return + } + } +} + +// Shutdown shutsdown the syslog server +func (s *SyslogServer) Shutdown() { + close(s.doneCh) + close(s.messages) +} diff --git a/client/driver/logcollector/universal_collector.go b/client/driver/logging/universal_collector.go similarity index 65% rename from client/driver/logcollector/universal_collector.go rename to client/driver/logging/universal_collector.go index 35a5478c0..70a335832 100644 --- a/client/driver/logcollector/universal_collector.go +++ b/client/driver/logging/universal_collector.go @@ -1,20 +1,18 @@ // +build !windows -package logcollector +package logging import ( "fmt" "io" "log" - s1 "log/syslog" + "log/syslog" "net" "path/filepath" "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/driver/executor" - "github.com/hashicorp/nomad/client/driver/logrotator" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" - "github.com/mcuadros/go-syslog" ) // LogCollectorContext holds context to configure the syslog server @@ -41,7 +39,7 @@ type LogCollectorContext struct { // SyslogCollectorState holds the address and islation information of a launched // syslog server type SyslogCollectorState struct { - IsolationConfig *executor.IsolationConfig + IsolationConfig *cstructs.IsolationConfig Addr string } @@ -60,85 +58,78 @@ type SyslogCollector struct { logConfig *structs.LogConfig ctx *LogCollectorContext - lro *logrotator.LogRotator - lre *logrotator.LogRotator - server *syslog.Server - taskDir string + lro *FileRotator + lre *FileRotator + server *SyslogServer + syslogChan chan *SyslogMessage + taskDir string logger *log.Logger } // NewSyslogCollector returns an implementation of the SyslogCollector func NewSyslogCollector(logger *log.Logger) *SyslogCollector { - return &SyslogCollector{logger: logger} + return &SyslogCollector{logger: logger, syslogChan: make(chan *SyslogMessage, 2048)} } // LaunchCollector launches a new syslog server and starts writing log lines to // files and rotates them func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogCollectorState, error) { - addr, err := s.getFreePort(ctx.PortLowerBound, ctx.PortUpperBound) + l, err := s.getListener(ctx.PortLowerBound, ctx.PortUpperBound) if err != nil { return nil, err } - s.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", addr) + s.logger.Printf("[DEBUG] sylog-server: launching syslog server on addr: %v", l.Addr().String()) s.ctx = ctx // configuring the task dir if err := s.configureTaskDir(); err != nil { return nil, err } - channel := make(syslog.LogPartsChannel) - handler := syslog.NewChannelHandler(channel) - - s.server = syslog.NewServer() - s.server.SetFormat(&CustomParser{logger: s.logger}) - s.server.SetHandler(handler) - s.server.ListenTCP(addr.String()) - if err := s.server.Boot(); err != nil { - return nil, err - } + s.server = NewSyslogServer(l, s.syslogChan, s.logger) + go s.server.Start() logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) - ro, wo := io.Pipe() - lro, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), - fmt.Sprintf("%v.stdout", ctx.TaskName), ctx.LogConfig.MaxFiles, - logFileSize, s.logger) + path := filepath.Join(s.taskDir, allocdir.TaskLocal) + lro, err := NewFileRotator(path, fmt.Sprintf("%v.stdout", ctx.TaskName), + ctx.LogConfig.MaxFiles, logFileSize, s.logger) + if err != nil { return nil, err } s.lro = lro - go lro.Start(ro) - re, we := io.Pipe() - lre, err := logrotator.NewLogRotator(filepath.Join(s.taskDir, allocdir.TaskLocal), - fmt.Sprintf("%v.stderr", ctx.TaskName), ctx.LogConfig.MaxFiles, - logFileSize, s.logger) + lre, err := NewFileRotator(path, fmt.Sprintf("%v.stderr", ctx.TaskName), + ctx.LogConfig.MaxFiles, logFileSize, s.logger) if err != nil { return nil, err } s.lre = lre - go lre.Start(re) - go func(channel syslog.LogPartsChannel) { - for logParts := range channel { - // If the severity of the log line is err then we write to stderr - // otherwise all messages go to stdout - s := logParts["severity"].(Priority) - if s.Severity == s1.LOG_ERR { - we.Write(logParts["content"].([]byte)) - } else { - wo.Write(logParts["content"].([]byte)) - } - wo.Write([]byte("\n")) + go s.collectLogs(lre, lro) + return &SyslogCollectorState{Addr: l.Addr().String()}, nil +} + +func (s *SyslogCollector) collectLogs(we io.Writer, wo io.Writer) { + for logParts := range s.syslogChan { + // If the severity of the log line is err then we write to stderr + // otherwise all messages go to stdout + if logParts.Severity == syslog.LOG_ERR { + s.lre.Write(logParts.Message) + s.lre.Write([]byte{'\n'}) + } else { + s.lro.Write(logParts.Message) + s.lro.Write([]byte{'\n'}) } - }(channel) - go s.server.Wait() - return &SyslogCollectorState{Addr: addr.String()}, nil + } } // Exit kills the syslog server func (s *SyslogCollector) Exit() error { - return s.server.Kill() + s.server.Shutdown() + s.lre.Close() + s.lro.Close() + return nil } // UpdateLogConfig updates the log configuration @@ -170,7 +161,7 @@ func (s *SyslogCollector) configureTaskDir() error { // getFreePort returns a free port ready to be listened on between upper and // lower bounds -func (s *SyslogCollector) getFreePort(lowerBound uint, upperBound uint) (net.Addr, error) { +func (s *SyslogCollector) getListener(lowerBound uint, upperBound uint) (net.Listener, error) { for i := lowerBound; i <= upperBound; i++ { addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%v", i)) if err != nil { @@ -180,8 +171,7 @@ func (s *SyslogCollector) getFreePort(lowerBound uint, upperBound uint) (net.Add if err != nil { continue } - defer l.Close() - return l.Addr(), nil + return l, nil } return nil, fmt.Errorf("No free port found") } diff --git a/client/driver/logrotator/logs.go b/client/driver/logrotator/logs.go deleted file mode 100644 index 026c196be..000000000 --- a/client/driver/logrotator/logs.go +++ /dev/null @@ -1,176 +0,0 @@ -package logrotator - -import ( - "fmt" - "io" - "io/ioutil" - "log" - "os" - "path/filepath" - "sort" - "strconv" - "strings" -) - -const ( - bufSize = 32 * 1024 // Max number of bytes read from a buffer -) - -// LogRotator ingests data and writes out to a rotated set of files -type LogRotator struct { - MaxFiles int // maximum number of rotated files retained by the log rotator - FileSize int64 // maximum file size of a rotated file - path string // path where the rotated files are created - fileName string // base file name of the rotated files - - logFileIdx int // index to the current file - oldestLogFileIdx int // index to the oldest log file - - logger *log.Logger - purgeCh chan struct{} -} - -// NewLogRotator configures and returns a new LogRotator -func NewLogRotator(path string, fileName string, maxFiles int, fileSize int64, logger *log.Logger) (*LogRotator, error) { - files, err := ioutil.ReadDir(path) - if err != nil { - return nil, err - } - - // Finding out the log file with the largest index - logFileIdx := 0 - prefix := fmt.Sprintf("%s.", fileName) - for _, f := range files { - if strings.HasPrefix(f.Name(), prefix) { - fileIdx := strings.TrimPrefix(f.Name(), prefix) - n, err := strconv.Atoi(fileIdx) - if err != nil { - continue - } - if n > logFileIdx { - logFileIdx = n - } - } - } - - lr := &LogRotator{ - MaxFiles: maxFiles, - FileSize: fileSize, - path: path, - fileName: fileName, - logFileIdx: logFileIdx, - logger: logger, - purgeCh: make(chan struct{}, 1), - } - go lr.PurgeOldFiles() - - return lr, nil -} - -// Start reads from a Reader and writes them to files and rotates them when the -// size of the file becomes equal to the max size configured -func (l *LogRotator) Start(r io.Reader) error { - buf := make([]byte, bufSize) - for { - logFileName := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, l.logFileIdx)) - var fileSize int64 - if f, err := os.Stat(logFileName); err == nil { - // Skipping the current file if it happens to be a directory - if f.IsDir() { - l.logFileIdx += 1 - continue - } - fileSize = f.Size() - // Calculating the remaining capacity of the log file - } - f, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - return err - } - l.logger.Printf("[DEBUG] client.logrotator: opened a new file: %s", logFileName) - - // Closing the current log file if it doesn't have any more capacity - if fileSize >= l.FileSize { - l.logFileIdx += 1 - f.Close() - continue - } - - // Reading from the reader and writing into the current log file as long - // as it has capacity or the reader closes - totalWritten := 0 - for { - if l.FileSize-(fileSize+int64(totalWritten)) < 1 { - f.Close() - break - } - var nr int - var err error - remainingSize := l.FileSize - (int64(totalWritten) + fileSize) - if remainingSize < bufSize { - nr, err = r.Read(buf[0:remainingSize]) - } else { - nr, err = r.Read(buf) - } - if err != nil { - f.Close() - return err - } - nw, err := f.Write(buf[:nr]) - if err != nil { - f.Close() - return err - } - if nr != nw { - f.Close() - return fmt.Errorf("failed to write data read from the reader into file, R: %d W: %d", nr, nw) - } - totalWritten += nr - } - l.logFileIdx = l.logFileIdx + 1 - // Purge old files if we have more files than MaxFiles - if l.logFileIdx-l.oldestLogFileIdx >= l.MaxFiles { - select { - case l.purgeCh <- struct{}{}: - default: - } - } - } -} - -// PurgeOldFiles removes older files and keeps only the last N files rotated for -// a file -func (l *LogRotator) PurgeOldFiles() { - for { - select { - case <-l.purgeCh: - var fIndexes []int - files, err := ioutil.ReadDir(l.path) - if err != nil { - return - } - // Inserting all the rotated files in a slice - for _, f := range files { - if strings.HasPrefix(f.Name(), l.fileName) { - fileIdx := strings.TrimPrefix(f.Name(), fmt.Sprintf("%s.", l.fileName)) - n, err := strconv.Atoi(fileIdx) - if err != nil { - continue - } - fIndexes = append(fIndexes, n) - } - } - - // Sorting the file indexes so that we can purge the older files and keep - // only the number of files as configured by the user - sort.Sort(sort.IntSlice(fIndexes)) - var toDelete []int - toDelete = fIndexes[0 : len(fIndexes)-l.MaxFiles] - for _, fIndex := range toDelete { - fname := filepath.Join(l.path, fmt.Sprintf("%s.%d", l.fileName, fIndex)) - os.RemoveAll(fname) - } - l.oldestLogFileIdx = fIndexes[0] - } - } -} diff --git a/client/driver/logrotator/logs_test.go b/client/driver/logrotator/logs_test.go deleted file mode 100644 index b6a62216e..000000000 --- a/client/driver/logrotator/logs_test.go +++ /dev/null @@ -1,295 +0,0 @@ -package logrotator - -import ( - "io" - "io/ioutil" - "log" - "os" - "path/filepath" - "testing" - "time" -) - -var ( - logger = log.New(os.Stdout, "", log.LstdFlags) - pathPrefix = "logrotator" -) - -func TestLogRotator_InvalidPath(t *testing.T) { - invalidPath := "/foo" - - if _, err := NewLogRotator(invalidPath, "redis.stdout", 10, 10, logger); err == nil { - t.Fatal("expected err") - } -} - -func TestLogRotator_FindCorrectIndex(t *testing.T) { - var path string - var err error - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - defer os.RemoveAll(path) - - fname := filepath.Join(path, "redis.stdout.1") - if f, err := os.Create(fname); err == nil { - f.Close() - } - - fname = filepath.Join(path, "redis.stdout.2") - if f, err := os.Create(fname); err == nil { - f.Close() - } - - r, err := NewLogRotator(path, "redis.stdout", 10, 10, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - if r.logFileIdx != 2 { - t.Fatalf("Expected log file idx: %v, actual: %v", 2, r.logFileIdx) - } -} - -func TestLogRotator_AppendToCurrentFile(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - fname := filepath.Join(path, "redis.stdout.0") - if f, err := os.Create(fname); err == nil { - f.WriteString("abcde") - f.Close() - } - - l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger) - if err != nil && err != io.EOF { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - w.Write([]byte("fg")) - w.Close() - }() - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatal(err) - } - finfo, err := os.Stat(fname) - if err != nil { - t.Fatal(err) - } - if finfo.Size() != 6 { - t.Fatalf("Expected size of file: %v, actual: %v", 6, finfo.Size()) - } -} - -func TestLogRotator_RotateFiles(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - fname := filepath.Join(path, "redis.stdout.0") - if f, err := os.Create(fname); err == nil { - f.WriteString("abcde") - f.Close() - } - - l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - // This should make the current log file rotate - w.Write([]byte("fg")) - w.Close() - }() - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatalf("Failure in logrotator start %v", err) - } - - if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1")); err == nil { - if finfo.Size() != 1 { - t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size()) - } - } else { - t.Fatal("expected file redis.stdout.1") - } - - if finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")); err == nil { - if finfo.Size() != 6 { - t.Fatalf("expected number of bytes: %v, actual: %v", 1, finfo.Size()) - } - } else { - t.Fatal("expected file redis.stdout.0") - } -} - -func TestLogRotator_StartFromEmptyDir(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - - l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - w.Write([]byte("abcdefg")) - w.Close() - }() - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatalf("Failure in logrotator start %v", err) - } - - finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")) - if err != nil { - t.Fatal(err) - } - if finfo.Size() != 7 { - t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size()) - } - -} - -func TestLogRotator_SetPathAsFile(t *testing.T) { - var f *os.File - var err error - var path string - defer os.RemoveAll(path) - if f, err = ioutil.TempFile("", pathPrefix); err != nil { - t.Fatalf("test setup problem: %v", err) - } - path = f.Name() - if _, err = NewLogRotator(f.Name(), "redis.stdout", 10, 10, logger); err == nil { - t.Fatal("expected error") - } -} - -func TestLogRotator_ExcludeDirs(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - if err := os.Mkdir(filepath.Join(path, "redis.stdout.0"), os.ModeDir|os.ModePerm); err != nil { - t.Fatalf("test setup err: %v", err) - } - - l, err := NewLogRotator(path, "redis.stdout", 10, 6, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - w.Write([]byte("fg")) - w.Close() - }() - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatalf("Failure in logrotator start %v", err) - } - - finfo, err := os.Stat(filepath.Join(path, "redis.stdout.1")) - if err != nil { - t.Fatal("expected rotator to create redis.stdout.1") - } - if finfo.Size() != 2 { - t.Fatalf("expected size: %v, actual: %v", 2, finfo.Size()) - } -} - -func TestLogRotator_PurgeDirs(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - - l, err := NewLogRotator(path, "redis.stdout", 2, 4, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - w.Write([]byte("abcdefghijklmnopqrxyz")) - time.Sleep(1 * time.Second) - l.MaxFiles = 1 - w.Write([]byte("abcdefghijklmnopqrxyz")) - w.Close() - }() - - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatalf("failure in logrotator start: %v", err) - } - - // sleeping for a second because purging is async - time.Sleep(1 * time.Second) - files, err := ioutil.ReadDir(path) - if err != nil { - t.Fatalf("err: %v", err) - } - expected := 1 - if len(files) != expected { - t.Fatalf("expected number of files: %v, actual: %v", expected, len(files)) - } -} - -func TestLogRotator_UpdateConfig(t *testing.T) { - var path string - var err error - defer os.RemoveAll(path) - if path, err = ioutil.TempDir("", pathPrefix); err != nil { - t.Fatalf("test setup err: %v", err) - } - - l, err := NewLogRotator(path, "redis.stdout", 10, 10, logger) - if err != nil { - t.Fatalf("test setup err: %v", err) - } - - r, w := io.Pipe() - go func() { - w.Write([]byte("abcdefg")) - l.FileSize = 5 - w.Write([]byte("hijklmnojkp")) - w.Close() - }() - err = l.Start(r) - if err != nil && err != io.EOF { - t.Fatalf("Failure in logrotator start %v", err) - } - - finfo, err := os.Stat(filepath.Join(path, "redis.stdout.0")) - finfo1, err1 := os.Stat(filepath.Join(path, "redis.stdout.1")) - if err != nil { - t.Fatal(err) - } - if finfo.Size() != 10 { - t.Fatalf("expected size of file: %v, actual: %v", 7, finfo.Size()) - } - if err1 != nil { - t.Fatal(err) - } - if finfo1.Size() != 5 { - t.Fatalf("expected size of file: %v, actual: %v", 5, finfo.Size()) - } -} diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go index a60daf8df..912253457 100644 --- a/client/driver/structs/structs.go +++ b/client/driver/structs/structs.go @@ -1,6 +1,9 @@ package structs -import "fmt" +import ( + "fmt" + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" +) // WaitResult stores the result of a Wait operation. type WaitResult struct { @@ -25,3 +28,9 @@ func (r *WaitResult) String() string { return fmt.Sprintf("Wait returned exit code %v, signal %v, and error %v", r.ExitCode, r.Signal, r.Err) } + +// IsolationConfig has information about the isolation mechanism the executor +// uses to put resource constraints and isolation on the user process +type IsolationConfig struct { + Cgroup *cgroupConfig.Cgroup +} diff --git a/client/driver/syslog_plugin.go b/client/driver/syslog_plugin.go index b4cee69fb..55237cd2d 100644 --- a/client/driver/syslog_plugin.go +++ b/client/driver/syslog_plugin.go @@ -5,7 +5,7 @@ import ( "net/rpc" "github.com/hashicorp/go-plugin" - "github.com/hashicorp/nomad/client/driver/logcollector" + "github.com/hashicorp/nomad/client/driver/logging" "github.com/hashicorp/nomad/nomad/structs" ) @@ -14,11 +14,11 @@ type SyslogCollectorRPC struct { } type LaunchCollectorArgs struct { - Ctx *logcollector.LogCollectorContext + Ctx *logging.LogCollectorContext } -func (e *SyslogCollectorRPC) LaunchCollector(ctx *logcollector.LogCollectorContext) (*logcollector.SyslogCollectorState, error) { - var ss *logcollector.SyslogCollectorState +func (e *SyslogCollectorRPC) LaunchCollector(ctx *logging.LogCollectorContext) (*logging.SyslogCollectorState, error) { + var ss *logging.SyslogCollectorState err := e.client.Call("Plugin.LaunchCollector", LaunchCollectorArgs{Ctx: ctx}, &ss) return ss, err } @@ -32,11 +32,11 @@ func (e *SyslogCollectorRPC) UpdateLogConfig(logConfig *structs.LogConfig) error } type SyslogCollectorRPCServer struct { - Impl logcollector.LogCollector + Impl logging.LogCollector } func (s *SyslogCollectorRPCServer) LaunchCollector(args LaunchCollectorArgs, - resp *logcollector.SyslogCollectorState) error { + resp *logging.SyslogCollectorState) error { ss, err := s.Impl.LaunchCollector(args.Ctx) if ss != nil { *resp = *ss @@ -59,7 +59,7 @@ type SyslogCollectorPlugin struct { func (p *SyslogCollectorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { if p.Impl == nil { - p.Impl = &SyslogCollectorRPCServer{Impl: logcollector.NewSyslogCollector(p.logger)} + p.Impl = &SyslogCollectorRPCServer{Impl: logging.NewSyslogCollector(p.logger)} } return p.Impl, nil } diff --git a/client/driver/utils.go b/client/driver/utils.go index bfb923e1b..abaec9469 100644 --- a/client/driver/utils.go +++ b/client/driver/utils.go @@ -9,7 +9,7 @@ import ( "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" - "github.com/hashicorp/nomad/client/driver/logcollector" + "github.com/hashicorp/nomad/client/driver/logging" ) // createExecutor launches an executor plugin and returns an instance of the @@ -42,7 +42,7 @@ func createExecutor(config *plugin.ClientConfig, w io.Writer, } func createLogCollector(config *plugin.ClientConfig, w io.Writer, - clientConfig *config.Config) (logcollector.LogCollector, *plugin.Client, error) { + clientConfig *config.Config) (logging.LogCollector, *plugin.Client, error) { config.HandshakeConfig = HandshakeConfig config.Plugins = GetPluginMap(w) config.MaxPort = clientConfig.ClientMaxPort @@ -61,7 +61,7 @@ func createLogCollector(config *plugin.ClientConfig, w io.Writer, if err != nil { return nil, nil, fmt.Errorf("unable to dispense the syslog plugin: %v", err) } - logCollector := raw.(logcollector.LogCollector) + logCollector := raw.(logging.LogCollector) return logCollector, syslogClient, nil }