From 776e57deb0977cf6c6fc3b6bf049523c34515965 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 10 Feb 2016 10:18:14 -0800 Subject: [PATCH] Adding newlines to loglines --- client/driver/docker.go | 30 ++++++++++++++++++++++-------- client/driver/syslog/collector.go | 10 +++------- client/driver/syslog/parser.go | 2 +- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/client/driver/docker.go b/client/driver/docker.go index ac2f1149d..8bd348b39 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -74,9 +74,10 @@ func (c *DockerDriverConfig) Validate() error { } type dockerPID struct { - ImageID string - ContainerID string - KillTimeout time.Duration + ImageID string + ContainerID string + KillTimeout time.Duration + PluginConfig *PluginReattachConfig } type DockerHandle struct { @@ -493,7 +494,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) } - pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name)) + pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-syslog-collector.out", task.Name)) pluginConfig := &plugin.ClientConfig{ Cmd: exec.Command(bin, "syslog", pluginLogFile), } @@ -518,6 +519,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle config, err := d.createContainer(ctx, task, &driverConfig, ss.Addr) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to create container configuration for image %s: %s", image, err) + pluginClient.Kill() return nil, fmt.Errorf("Failed to create container configuration for image %s: %s", image, err) } // Create a container @@ -536,12 +538,14 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle }) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to query list of containers matching name:%s", config.Name) + pluginClient.Kill() return nil, fmt.Errorf("Failed to query list of containers: %s", err) } // Couldn't find any matching containers if len(containers) == 0 { d.logger.Printf("[ERR] driver.docker: failed to get id for container %s: %#v", config.Name, containers) + pluginClient.Kill() return nil, fmt.Errorf("Failed to get id for container %s", config.Name) } @@ -553,6 +557,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle }) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to purge container %s", container.ID) + pluginClient.Kill() return nil, fmt.Errorf("Failed to purge container %s: %s", container.ID, err) } d.logger.Printf("[INFO] driver.docker: purged container %s", container.ID) @@ -561,11 +566,13 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle container, err = client.CreateContainer(config) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to re-create container %s; aborting", config.Name) + pluginClient.Kill() return nil, fmt.Errorf("Failed to re-create container %s; aborting", config.Name) } } else { // We failed to create the container for some other reason. d.logger.Printf("[ERR] driver.docker: failed to create container from image %s: %s", image, err) + pluginClient.Kill() return nil, fmt.Errorf("Failed to create container from image %s: %s", image, err) } } @@ -575,6 +582,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle err = client.StartContainer(container.ID, container.HostConfig) if err != nil { d.logger.Printf("[ERR] driver.docker: failed to start container %s: %s", container.ID, err) + pluginClient.Kill() return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err) } d.logger.Printf("[INFO] driver.docker: started container %s", container.ID) @@ -608,8 +616,11 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) } d.logger.Printf("[INFO] driver.docker: re-attaching to docker process: %s", handleID) + pluginConfig := &plugin.ClientConfig{ + Reattach: pid.PluginConfig.PluginConfig(), + } - // Initialize docker API client + logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config) client, err := d.dockerClient() if err != nil { return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) @@ -638,6 +649,8 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er // Return a driver handle h := &DockerHandle{ client: client, + logCollector: logCollector, + pluginClient: pluginClient, cleanupContainer: cleanupContainer, cleanupImage: cleanupImage, logger: d.logger, @@ -654,9 +667,10 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er func (h *DockerHandle) ID() string { // Return a handle to the PID pid := dockerPID{ - ImageID: h.imageID, - ContainerID: h.containerID, - KillTimeout: h.killTimeout, + ImageID: h.imageID, + ContainerID: h.containerID, + KillTimeout: h.killTimeout, + PluginConfig: NewPluginReattachConfig(h.pluginClient.ReattachConfig()), } data, err := json.Marshal(pid) if err != nil { diff --git a/client/driver/syslog/collector.go b/client/driver/syslog/collector.go index 0e86b3c5f..91b1f27c3 100644 --- a/client/driver/syslog/collector.go +++ b/client/driver/syslog/collector.go @@ -1,8 +1,6 @@ package syslog import ( - "bytes" - "encoding/gob" "fmt" "io" "log" @@ -109,13 +107,11 @@ func (s *SyslogCollector) LaunchCollector(ctx *LogCollectorContext) (*SyslogColl } go lro.Start(r) + // map[string]interface{} go func(channel syslog.LogPartsChannel) { for logParts := range channel { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - if err := enc.Encode(logParts["content"]); err == nil { - w.Write(buf.Bytes()) - } + w.Write(logParts["content"].([]byte)) + w.Write([]byte("\n")) } }(channel) go s.server.Wait() diff --git a/client/driver/syslog/parser.go b/client/driver/syslog/parser.go index 6c43fb986..4c4371a75 100644 --- a/client/driver/syslog/parser.go +++ b/client/driver/syslog/parser.go @@ -24,7 +24,7 @@ func (d *DockerLogParser) Parse() error { func (d *DockerLogParser) Dump() syslogparser.LogParts { return map[string]interface{}{ - "content": string(d.line), + "content": d.line, } }