Merge pull request #1257 from hashicorp/fix-docker-wait

Using a different client for collecting stats and waiting on containers
This commit is contained in:
Diptanu Choudhury 2016-06-11 12:51:38 -07:00 committed by GitHub
commit 7943ab4b33

View file

@ -30,9 +30,17 @@ import (
)
var (
// We store the client globally to cache the connection to the docker daemon.
createClient sync.Once
client *docker.Client
// We store the clients globally to cache the connection to the docker daemon.
createClients sync.Once
// client is a docker client with a timeout of 1 minute. This is for doing
// all operations with the docker daemon besides which are not long running
// such as creating, killing containers, etc.
client *docker.Client
// waitClient is a docker client with no timeouts. This is used for long
// running operations such as waiting on containers and collect stats
waitClient *docker.Client
// The statistics the Docker driver exposes
DockerMeasuredMemStats = []string{"RSS", "Cache", "Swap", "Max Usage"}
@ -121,6 +129,7 @@ type DockerHandle struct {
pluginClient *plugin.Client
executor executor.Executor
client *docker.Client
waitClient *docker.Client
logger *log.Logger
cleanupImage bool
imageID string
@ -212,16 +221,18 @@ func (d *DockerDriver) Validate(config map[string]interface{}) error {
return nil
}
// dockerClient creates *docker.Client. In test / dev mode we can use ENV vars
// to connect to the docker daemon. In production mode we will read
// docker.endpoint from the config file.
func (d *DockerDriver) dockerClient() (*docker.Client, error) {
if client != nil {
return client, nil
// dockerClients creates two *docker.Client, one for long running operations and
// the other for shorter operations. In test / dev mode we can use ENV vars to
// connect to the docker daemon. In production mode we will read docker.endpoint
// from the config file.
func (d *DockerDriver) dockerClients() (*docker.Client, *docker.Client, error) {
if client != nil && waitClient != nil {
return client, waitClient, nil
}
var err error
createClient.Do(func() {
var merr multierror.Error
createClients.Do(func() {
// Default to using whatever is configured in docker.endpoint. If this is
// not specified we'll fall back on NewClientFromEnv which reads config from
// the DOCKER_* environment variables DOCKER_HOST, DOCKER_TLS_VERIFY, and
@ -246,9 +257,17 @@ func (d *DockerDriver) dockerClient() (*docker.Client, error) {
d.logger.Println("[DEBUG] driver.docker: using client connection initialized from environment")
client, err = docker.NewClientFromEnv()
if err != nil {
merr.Errors = append(merr.Errors, err)
}
client.HTTPClient.Timeout = dockerTimeout
waitClient, err = docker.NewClientFromEnv()
if err != nil {
merr.Errors = append(merr.Errors, err)
}
})
return client, err
return client, waitClient, merr.ErrorOrNil()
}
func (d *DockerDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) {
@ -256,8 +275,8 @@ func (d *DockerDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool
// state changes
_, currentlyEnabled := node.Attributes[dockerDriverAttr]
// Initialize docker API client
client, err := d.dockerClient()
// Initialize docker API clients
client, _, err := d.dockerClients()
if err != nil {
delete(node.Attributes, dockerDriverAttr)
if currentlyEnabled {
@ -646,8 +665,8 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
// Initialize docker API client
client, err := d.dockerClient()
// Initialize docker API clients
client, waitClient, err := d.dockerClients()
if err != nil {
return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err)
}
@ -767,6 +786,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &DockerHandle{
client: client,
waitClient: waitClient,
executor: exec,
pluginClient: pluginClient,
cleanupImage: cleanupImage,
@ -802,7 +822,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
Reattach: pid.PluginConfig.PluginConfig(),
}
client, err := d.dockerClient()
client, waitClient, err := d.dockerClients()
if err != nil {
return nil, fmt.Errorf("Failed to connect to docker daemon: %s", err)
}
@ -841,6 +861,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
// Return a driver handle
h := &DockerHandle{
client: client,
waitClient: waitClient,
executor: exec,
pluginClient: pluginClient,
cleanupImage: cleanupImage,
@ -930,7 +951,7 @@ func (h *DockerHandle) Stats() (*cstructs.TaskResourceUsage, error) {
func (h *DockerHandle) run() {
// Wait for it...
exitCode, err := h.client.WaitContainer(h.containerID)
exitCode, err := h.waitClient.WaitContainer(h.containerID)
if err != nil {
h.logger.Printf("[ERR] driver.docker: failed to wait for %s; container already terminated", h.containerID)
}
@ -983,7 +1004,7 @@ func (h *DockerHandle) collectStats() {
statsOpts := docker.StatsOptions{ID: h.containerID, Done: h.doneCh, Stats: statsCh, Stream: true}
go func() {
//TODO handle Stats error
if err := h.client.Stats(statsOpts); err != nil {
if err := h.waitClient.Stats(statsOpts); err != nil {
h.logger.Printf("[DEBUG] driver.docker: error collecting stats from container %s: %v", h.containerID, err)
}
}()