From 3211e67961efddaf4885c5260494c8dcfffa3fc1 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 22 Feb 2016 16:10:23 -0800 Subject: [PATCH] Made the syslog server use a buffered chan --- client/driver/logging/universal_collector.go | 47 +++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/client/driver/logging/universal_collector.go b/client/driver/logging/universal_collector.go index a06190b2f..f555ecaf9 100644 --- a/client/driver/logging/universal_collector.go +++ b/client/driver/logging/universal_collector.go @@ -4,6 +4,7 @@ package logging import ( "fmt" + "io" "log" "log/syslog" "net" @@ -57,17 +58,18 @@ type SyslogCollector struct { logConfig *structs.LogConfig ctx *LogCollectorContext - lro *FileRotator - lre *FileRotator - server *SyslogServer - taskDir string + lro *FileRotator + lre *FileRotator + server *SyslogServer + syslogChan chan *SyslogMessage + taskDir string logger *log.Logger } // NewSyslogCollector returns an implementation of the SyslogCollector func NewSyslogCollector(logger *log.Logger) *SyslogCollector { - return &SyslogCollector{logger: logger} + return &SyslogCollector{logger: logger, syslogChan: make(chan *SyslogMessage, 2048)} } // LaunchCollector launches a new syslog server and starts writing log lines to @@ -84,10 +86,8 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl return nil, err } - channel := make(chan *SyslogMessage) - syslogServer := NewSyslogServer(l, channel, s.logger) - s.server = syslogServer - go syslogServer.Start() + s.server = NewSyslogServer(l, s.syslogChan, s.logger) + go s.server.Start() logFileSize := int64(ctx.LogConfig.MaxFileSizeMB * 1024 * 1024) path := filepath.Join(s.taskDir, allocdir.TaskLocal) @@ -106,25 +106,28 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl } s.lre = lre - go func(channel chan *SyslogMessage) { - for logParts := range channel { - // If the severity of the log line is err then we write to stderr - // otherwise all messages go to stdout - if logParts.Severity == syslog.LOG_ERR { - s.lre.Write(logParts.Message) - s.lre.Write([]byte{'\n'}) - } else { - s.lro.Write(logParts.Message) - s.lro.Write([]byte{'\n'}) - } - } - }(channel) + go s.collectLogs(lre, lro) return &SyslogCollectorState{Addr: l.Addr().String()}, nil } +func (s *SyslogCollector) collectLogs(we io.Writer, wo io.Writer) { + for logParts := range s.syslogChan { + // If the severity of the log line is err then we write to stderr + // otherwise all messages go to stdout + if logParts.Severity == syslog.LOG_ERR { + s.lre.Write(logParts.Message) + s.lre.Write([]byte{'\n'}) + } else { + s.lro.Write(logParts.Message) + s.lro.Write([]byte{'\n'}) + } + } +} + // Exit kills the syslog server func (s *SyslogCollector) Exit() error { s.server.Shutdown() + close(s.syslogChan) return nil }