Adding newlines to loglines

This commit is contained in:
Diptanu Choudhury 2016-02-10 10:18:14 -08:00
parent 3a12204ce5
commit 776e57deb0
3 changed files with 26 additions and 16 deletions

View file

@ -74,9 +74,10 @@ func (c *DockerDriverConfig) Validate() error {
}
type dockerPID struct {
ImageID string
ContainerID string
KillTimeout time.Duration
ImageID string
ContainerID string
KillTimeout time.Duration
PluginConfig *PluginReattachConfig
}
type DockerHandle struct {
@ -493,7 +494,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-syslog-collector.out", task.Name))
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "syslog", pluginLogFile),
}
@ -518,6 +519,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
config, err := d.createContainer(ctx, task, &driverConfig, ss.Addr)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err)
}
// Create a container
@ -536,12 +538,14 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
})
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to query list of containers matching name:%s", config.Name)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to query list of containers: %s", err)
}
// Couldn't find any matching containers
if len(containers) == 0 {
d.logger.Printf("[ERR] driver.docker: failed to get id for container %s: %#v", config.Name, containers)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to get id for container %s", config.Name)
}
@ -553,6 +557,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
})
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to purge container %s", container.ID)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to purge container %s: %s", container.ID, err)
}
d.logger.Printf("[INFO] driver.docker: purged container %s", container.ID)
@ -561,11 +566,13 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
container, err = client.CreateContainer(config)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to re-create container %s; aborting", config.Name)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to re-create container %s; aborting", config.Name)
}
} else {
// We failed to create the container for some other reason.
d.logger.Printf("[ERR] driver.docker: failed to create container from image %s: %s", image, err)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to create container from image %s: %s", image, err)
}
}
@ -575,6 +582,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
err = client.StartContainer(container.ID, container.HostConfig)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err)
pluginClient.Kill()
return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err)
}
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)
@ -608,8 +616,11 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err)
}
d.logger.Printf("[INFO] driver.docker: re-attaching to docker process: %s", handleID)
pluginConfig := &plugin.ClientConfig{
Reattach: pid.PluginConfig.PluginConfig(),
}
// Initialize docker API client
logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config)
client, err := d.dockerClient()
if err != nil {
return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err)
@ -638,6 +649,8 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
// Return a driver handle
h := &DockerHandle{
client: client,
logCollector: logCollector,
pluginClient: pluginClient,
cleanupContainer: cleanupContainer,
cleanupImage: cleanupImage,
logger: d.logger,
@ -654,9 +667,10 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
func (h *DockerHandle) ID() string {
// Return a handle to the PID
pid := dockerPID{
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
ImageID: h.imageID,
ContainerID: h.containerID,
KillTimeout: h.killTimeout,
PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()),
}
data, err := json.Marshal(pid)
if err != nil {

View file

@ -1,8 +1,6 @@
package syslog
import (
"bytes"
"encoding/gob"
"fmt"
"io"
"log"
@ -109,13 +107,11 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl
}
go lro.Start(r)
// map[string]interface{}
go func(channel syslog.LogPartsChannel) {
for logParts := range channel {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(logParts["content"]); err == nil {
w.Write(buf.Bytes())
}
w.Write(logParts["content"].([]byte))
w.Write([]byte("\n"))
}
}(channel)
go s.server.Wait()

View file

@ -24,7 +24,7 @@ func (d *DockerLogParser) Parse() error {
func (d *DockerLogParser) Dump() syslogparser.LogParts {
return map[string]interface{}{
"content": string(d.line),
"content": d.line,
}
}