diff --git a/agent/agent.go b/agent/agent.go index ca62a03cd..40a50dcb1 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1319,11 +1319,8 @@ func (a *Agent) ShutdownAgent() error { } // Stop the proxy manager - // NOTE(mitchellh): we use Kill for now to kill the processes since - // the local state isn't snapshotting meaning the proxy tokens are - // regenerated each time forcing the processes to restart anyways. if a.proxyManager != nil { - if err := a.proxyManager.Kill(); err != nil { + if err := a.proxyManager.Close(); err != nil { a.logger.Printf("[WARN] agent: error shutting down proxy manager: %s", err) } } diff --git a/agent/proxy/daemon.go b/agent/proxy/daemon.go index 013fbdc28..cbc1747c3 100644 --- a/agent/proxy/daemon.go +++ b/agent/proxy/daemon.go @@ -1,12 +1,14 @@ package proxy import ( + "bytes" "fmt" "log" "os" "os/exec" "reflect" "strconv" + "strings" "sync" "time" @@ -29,9 +31,12 @@ const ( // // Consul will ensure that if the daemon crashes, that it is restarted. type Daemon struct { - // Command is the command to execute to start this daemon. This must - // be a Cmd that isn't yet started. - Command *exec.Cmd + // Path is the path to the executable to run + Path string + + // Args are the arguments to run with, the first element should be the same as + // Path. + Args []string // ProxyId is the ID of the proxy service. This is required for API // requests (along with the token) and is passed via env var. @@ -52,10 +57,30 @@ type Daemon struct { // created but the error will be logged to the Logger. PidPath string - // For tests, they can set this to change the default duration to wait - // for a graceful quit. + // StdoutPath, StderrPath are the paths to the files that stdout and stderr + // should be written to. + StdoutPath, StderrPath string + + // gracefulWait can be set for tests and controls how long Stop() will wait + // for process to terminate before killing. If not set defaults to 5 seconds. + // If this is lowered for tests, it must remain higher than pollInterval + // (preferably a factor of 2 or more) or the tests will potentially return + // errors from Stop() where the process races to Kill() a process that was + // already stopped by SIGINT but we didn't yet detect the change since poll + // didn't occur. gracefulWait time.Duration + // pollInterval can be set for tests and controls how frequently the child + // process is sent SIG 0 to check it's still running. If not set defaults to 1 + // second. + pollInterval time.Duration + + // daemonizePath is set only in tests to control the path to the daemonize + // command. The test executable itself will not respond to the consul command + // arguments we need to make this work so we rely on the current source being + // built and installed here to run the tests. + daemonizePath string + // process is the started process lock sync.Mutex stopped bool @@ -108,7 +133,7 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) { // attempts keeps track of the number of restart attempts we've had and // is used to calculate the wait time using an exponential backoff. var attemptsDeadline time.Time - var attempts uint + var attempts uint32 for { if process == nil { @@ -121,7 +146,15 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) { // Calculate the exponential backoff and wait if we have to if attempts > DaemonRestartBackoffMin { - waitTime := (1 << (attempts - DaemonRestartBackoffMin)) * time.Second + delayedAttempts := attempts - DaemonRestartBackoffMin + waitTime := time.Duration(0) + if delayedAttempts > 31 { + // don't shift off the end of the uint32 if the process is in a crash + // loop forever + waitTime = DaemonRestartMaxWait + } else { + waitTime = (1 << delayedAttempts) * time.Second + } if waitTime > DaemonRestartMaxWait { waitTime = DaemonRestartMaxWait } @@ -153,8 +186,8 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) { return } - // Process isn't started currently. We're restarting. Start it - // and save the process if we have it. + // Process isn't started currently. We're restarting. Start it and save + // the process if we have it. var err error process, err = p.start() if err == nil { @@ -166,34 +199,40 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) { p.Logger.Printf("[ERR] agent/proxy: error restarting daemon: %s", err) continue } - + // NOTE: it is a postcondition of this method that we don't return while + // the process is still running. See NOTE below but it's essential that + // from here to the <-waitCh below nothing can cause this method to exit + // or loop if err is non-nil. } - // Wait for the process to exit. Note that if we restored this proxy - // then Wait will always fail because we likely aren't the parent - // process. Therefore, we do an extra sanity check after to use other - // syscalls to verify the process is truly dead. - ps, err := process.Wait() - if _, err := findProcess(process.Pid); err == nil { - select { - case <-time.After(1 * time.Second): - // We want a busy loop, but not too busy. 1 second between - // detecting a process death seems reasonable. - - case <-stopCh: - // If we receive a stop request we want to exit immediately. - return - } - - continue + // Wait will never work since child is detached and released, so poll the + // PID with sig 0 to check it's still alive. + interval := p.pollInterval + if interval < 1 { + interval = 1 * time.Second } + waitCh, closer := externalWait(process.Pid, interval) + defer closer() + // NOTE: we must not select on anything else here; Stop() requires the + // postcondition for this method to be that the managed process is not + // running when we return and the defer above closes p.exitedCh. If we + // select on stopCh or another signal here we introduce races where we might + // exit early and leave the actual process running (for example because + // SIGINT was ignored but Stop saw us exit and assumed all was good). That + // means that there is no way to stop the Daemon without killing the process + // but that's OK because agent Shutdown can just persist the state and exit + // without Stopping this and the right thing will happen. If we ever need to + // stop managing a process without killing it at a time when the agent + // process is not shutting down then we might have to re-think that. + <-waitCh + + // Note that we don't need to call Release explicitly. It's a no-op for Unix + // but even on Windows it's called automatically by a Finalizer during + // garbage collection to free the process handle. + // (https://github.com/golang/go/blob/1174ad3a8f6f9d2318ac45fca3cd90f12915cf04/src/os/exec.go#L26) process = nil - if err != nil { - p.Logger.Printf("[INFO] agent/proxy: daemon exited with error: %s", err) - } else if status, ok := exitStatus(ps); ok { - p.Logger.Printf("[INFO] agent/proxy: daemon exited with exit code: %d", status) - } + p.Logger.Printf("[INFO] agent/proxy: daemon exited") } } @@ -201,33 +240,66 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) { // configured *exec.Command with the modifications documented on Daemon // such as setting the proxy token environmental variable. func (p *Daemon) start() (*os.Process, error) { - cmd := *p.Command - // Add the proxy token to the environment. We first copy the env because // it is a slice and therefore the "copy" above will only copy the slice // reference. We allocate an exactly sized slice. - cmd.Env = make([]string, len(p.Command.Env), len(p.Command.Env)+1) - copy(cmd.Env, p.Command.Env) - cmd.Env = append(cmd.Env, + baseEnv := os.Environ() + env := make([]string, len(baseEnv), len(baseEnv)+2) + copy(env, baseEnv) + env = append(env, fmt.Sprintf("%s=%s", EnvProxyId, p.ProxyId), fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken)) // Args must always contain a 0 entry which is usually the executed binary. // To be safe and a bit more robust we default this, but only to prevent // a panic below. - if len(cmd.Args) == 0 { - cmd.Args = []string{cmd.Path} + if len(p.Args) == 0 { + p.Args = []string{p.Path} } - // Start it - p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", cmd.Path, cmd.Args[1:]) - if err := cmd.Start(); err != nil { + // Watch closely, we now swap out the exec.Cmd args for ones to run the same + // command via the connect daemonize command which takes care of correctly + // "double forking" to ensure the child is fully detached and adopted by the + // init process while the agent keeps running. + var daemonCmd exec.Cmd + + dCmd, err := p.daemonizeCommand() + if err != nil { return nil, err } + daemonCmd.Path = dCmd[0] + // First arguments are for the stdout, stderr + daemonCmd.Args = append(dCmd, p.StdoutPath) + daemonCmd.Args = append(daemonCmd.Args, p.StdoutPath) + daemonCmd.Args = append(daemonCmd.Args, p.Args...) + daemonCmd.Env = env + + // setup stdout so we can read the PID + var out bytes.Buffer + daemonCmd.Stdout = &out + + // Run it to completion - it should exit immediately (this calls wait to + // ensure we don't leave the daemonize command as a zombie) + p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", daemonCmd.Path, + daemonCmd.Args[1:]) + if err := daemonCmd.Run(); err != nil { + return nil, err + } + + // Read the PID from stdout + outStr, err := out.ReadString('\n') + if err != nil { + return nil, err + } + pid, err := strconv.Atoi(strings.TrimSpace(outStr)) + if err != nil { + return nil, fmt.Errorf("failed to parse PID from output of daemonize: %s", + err) + } // Write the pid file. This might error and that's okay. if p.PidPath != "" { - pid := strconv.FormatInt(int64(cmd.Process.Pid), 10) + pid := strconv.Itoa(pid) if err := file.WriteAtomic(p.PidPath, []byte(pid)); err != nil { p.Logger.Printf( "[DEBUG] agent/proxy: error writing pid file %q: %s", @@ -235,7 +307,22 @@ func (p *Daemon) start() (*os.Process, error) { } } - return cmd.Process, nil + // Finally, adopt the process so we can send signals + return findProcess(pid) +} + +// daemonizeCommand returns the daemonize command. +func (p *Daemon) daemonizeCommand() ([]string, error) { + // Get the path to the current executable. This is cached once by the + // library so this is effectively just a variable read. + execPath, err := os.Executable() + if err != nil { + return nil, err + } + if p.daemonizePath != "" { + execPath = p.daemonizePath + } + return []string{execPath, "connect", "daemonize"}, nil } // Stop stops the daemon. @@ -283,7 +370,7 @@ func (p *Daemon) Stop() error { }() } - // First, try a graceful stop + // First, try a graceful stop. err := process.Signal(os.Interrupt) if err == nil { select { @@ -293,11 +380,23 @@ func (p *Daemon) Stop() error { case <-time.After(gracefulWait): // Interrupt didn't work + p.Logger.Printf("[DEBUG] agent/proxy: gracefull wait of %s passed, "+ + "killing", gracefulWait) } + } else if isProcessAlreadyFinishedErr(err) { + // This can happen due to races between signals and polling. + return nil + } else { + p.Logger.Printf("[DEBUG] agent/proxy: sigint failed, killing: %s", err) } - // Graceful didn't work, forcibly kill - return process.Kill() + // Graceful didn't work (e.g. on windows where SIGINT isn't implemented), + // forcibly kill + err = process.Kill() + if err != nil && isProcessAlreadyFinishedErr(err) { + return nil + } + return err } // stopKeepAlive is like Stop but keeps the process running. This is @@ -329,10 +428,8 @@ func (p *Daemon) Equal(raw Proxy) bool { // We compare equality on a subset of the command configuration return p.ProxyToken == p2.ProxyToken && - p.Command.Path == p2.Command.Path && - p.Command.Dir == p2.Command.Dir && - reflect.DeepEqual(p.Command.Args, p2.Command.Args) && - reflect.DeepEqual(p.Command.Env, p2.Command.Env) + p.Path == p2.Path && + reflect.DeepEqual(p.Args, p2.Args) } // MarshalSnapshot implements Proxy @@ -346,12 +443,10 @@ func (p *Daemon) MarshalSnapshot() map[string]interface{} { } return map[string]interface{}{ - "Pid": p.process.Pid, - "CommandPath": p.Command.Path, - "CommandArgs": p.Command.Args, - "CommandDir": p.Command.Dir, - "CommandEnv": p.Command.Env, - "ProxyToken": p.ProxyToken, + "Pid": p.process.Pid, + "Path": p.Path, + "Args": p.Args, + "ProxyToken": p.ProxyToken, } } @@ -367,12 +462,8 @@ func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error { // Set the basic fields p.ProxyToken = s.ProxyToken - p.Command = &exec.Cmd{ - Path: s.CommandPath, - Args: s.CommandArgs, - Dir: s.CommandDir, - Env: s.CommandEnv, - } + p.Path = s.Path + p.Args = s.Args // FindProcess on many systems returns no error even if the process // is now dead. We perform an extra check that the process is alive. @@ -398,14 +489,12 @@ func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error { // within the manager snapshot and is restored automatically. type daemonSnapshot struct { // Pid of the process. This is the only value actually required to - // regain mangement control. The remainder values are for Equal. + // regain management control. The remainder values are for Equal. Pid int // Command information - CommandPath string - CommandArgs []string - CommandDir string - CommandEnv []string + Path string + Args []string // NOTE(mitchellh): longer term there are discussions/plans to only // store the hash of the token but for now we need the full token in diff --git a/agent/proxy/daemon_test.go b/agent/proxy/daemon_test.go index f08716276..db7ddf47e 100644 --- a/agent/proxy/daemon_test.go +++ b/agent/proxy/daemon_test.go @@ -3,8 +3,8 @@ package proxy import ( "io/ioutil" "os" - "os/exec" "path/filepath" + "strconv" "testing" "time" @@ -28,12 +28,10 @@ func TestDaemonStartStop(t *testing.T) { uuid, err := uuid.GenerateUUID() require.NoError(err) - d := &Daemon{ - Command: helperProcess("start-stop", path), - ProxyId: "tubes", - ProxyToken: uuid, - Logger: testLogger, - } + d := helperProcessDaemon("start-stop", path) + d.ProxyId = "tubes" + d.ProxyToken = uuid + d.Logger = testLogger require.NoError(d.Start()) defer d.Stop() @@ -68,6 +66,73 @@ func TestDaemonStartStop(t *testing.T) { }) } +func TestDaemonDetachesChild(t *testing.T) { + t.Parallel() + + require := require.New(t) + td, closer := testTempDir(t) + defer closer() + + path := filepath.Join(td, "file") + pidPath := filepath.Join(td, "child.pid") + + // Start the parent process wrapping a start-stop test. The parent is acting + // as our "agent". We need an extra indirection to be able to kill the "agent" + // and still be running the test process. + parentCmd := helperProcess("parent", pidPath, "start-stop", path) + require.NoError(parentCmd.Start()) + + // Wait for the pid file to exist so we know parent is running + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(pidPath) + if err == nil { + return + } + + r.Fatalf("error: %s", err) + }) + + // And wait for the actual file to be sure the child is running (it should be + // since parent doesn't write PID until child starts but the child might not + // have completed the write to disk yet which causes flakiness below). + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + + r.Fatalf("error: %s", err) + }) + + // Always cleanup child process after + defer func() { + _, err := os.Stat(pidPath) + if err != nil { + return + } + bs, err := ioutil.ReadFile(pidPath) + require.NoError(err) + pid, err := strconv.Atoi(string(bs)) + require.NoError(err) + proc, err := os.FindProcess(pid) + if err != nil { + return + } + proc.Kill() + }() + + // Now kill the parent and wait for it + require.NoError(parentCmd.Process.Kill()) + _, err := parentCmd.Process.Wait() + require.NoError(err) + + // The child should still be running so file should still be there + _, err = os.Stat(path) + require.NoError(err, "child should still be running") + + // Let defer clean up the child process +} + func TestDaemonRestart(t *testing.T) { t.Parallel() @@ -76,10 +141,8 @@ func TestDaemonRestart(t *testing.T) { defer closer() path := filepath.Join(td, "file") - d := &Daemon{ - Command: helperProcess("restart", path), - Logger: testLogger, - } + d := helperProcessDaemon("restart", path) + d.Logger = testLogger require.NoError(d.Start()) defer d.Stop() @@ -111,12 +174,11 @@ func TestDaemonStop_kill(t *testing.T) { path := filepath.Join(td, "file") - d := &Daemon{ - Command: helperProcess("stop-kill", path), - ProxyToken: "hello", - Logger: testLogger, - gracefulWait: 200 * time.Millisecond, - } + d := helperProcessDaemon("stop-kill", path) + d.ProxyToken = "hello" + d.Logger = testLogger + d.gracefulWait = 200 * time.Millisecond + d.pollInterval = 100 * time.Millisecond require.NoError(d.Start()) // Wait for the file to exist @@ -132,7 +194,7 @@ func TestDaemonStop_kill(t *testing.T) { // Stop the process require.NoError(d.Stop()) - // State the file so that we can get the mtime + // Stat the file so that we can get the mtime fi, err := os.Stat(path) require.NoError(err) mtime := fi.ModTime() @@ -149,6 +211,7 @@ func TestDaemonStart_pidFile(t *testing.T) { require := require.New(t) td, closer := testTempDir(t) + defer closer() path := filepath.Join(td, "file") @@ -156,12 +219,10 @@ func TestDaemonStart_pidFile(t *testing.T) { uuid, err := uuid.GenerateUUID() require.NoError(err) - d := &Daemon{ - Command: helperProcess("start-once", path), - ProxyToken: uuid, - Logger: testLogger, - PidPath: pidPath, - } + d := helperProcessDaemon("start-once", path) + d.ProxyToken = uuid + d.Logger = testLogger + d.PidPath = pidPath require.NoError(d.Start()) defer d.Stop() @@ -198,11 +259,9 @@ func TestDaemonRestart_pidFile(t *testing.T) { path := filepath.Join(td, "file") pidPath := filepath.Join(td, "pid") - d := &Daemon{ - Command: helperProcess("restart", path), - Logger: testLogger, - PidPath: pidPath, - } + d := helperProcessDaemon("restart", path) + d.Logger = testLogger + d.PidPath = pidPath require.NoError(d.Start()) defer d.Stop() @@ -244,51 +303,32 @@ func TestDaemonEqual(t *testing.T) { }{ { "Different type", - &Daemon{ - Command: &exec.Cmd{}, - }, + &Daemon{}, &Noop{}, false, }, { "Nil", - &Daemon{ - Command: &exec.Cmd{}, - }, + &Daemon{}, nil, false, }, { "Equal", - &Daemon{ - Command: &exec.Cmd{}, - }, - &Daemon{ - Command: &exec.Cmd{}, - }, + &Daemon{}, + &Daemon{}, true, }, { "Different path", &Daemon{ - Command: &exec.Cmd{Path: "/foo"}, + Path: "/foo", }, &Daemon{ - Command: &exec.Cmd{Path: "/bar"}, - }, - false, - }, - - { - "Different dir", - &Daemon{ - Command: &exec.Cmd{Dir: "/foo"}, - }, - &Daemon{ - Command: &exec.Cmd{Dir: "/bar"}, + Path: "/bar", }, false, }, @@ -296,10 +336,10 @@ func TestDaemonEqual(t *testing.T) { { "Different args", &Daemon{ - Command: &exec.Cmd{Args: []string{"foo"}}, + Args: []string{"foo"}, }, &Daemon{ - Command: &exec.Cmd{Args: []string{"bar"}}, + Args: []string{"bar"}, }, false, }, @@ -307,11 +347,9 @@ func TestDaemonEqual(t *testing.T) { { "Different token", &Daemon{ - Command: &exec.Cmd{}, ProxyToken: "one", }, &Daemon{ - Command: &exec.Cmd{}, ProxyToken: "two", }, false, @@ -335,7 +373,7 @@ func TestDaemonMarshalSnapshot(t *testing.T) { { "stopped daemon", &Daemon{ - Command: &exec.Cmd{Path: "/foo"}, + Path: "/foo", }, nil, }, @@ -343,16 +381,14 @@ func TestDaemonMarshalSnapshot(t *testing.T) { { "basic", &Daemon{ - Command: &exec.Cmd{Path: "/foo"}, + Path: "/foo", process: &os.Process{Pid: 42}, }, map[string]interface{}{ - "Pid": 42, - "CommandPath": "/foo", - "CommandArgs": []string(nil), - "CommandDir": "", - "CommandEnv": []string(nil), - "ProxyToken": "", + "Pid": 42, + "Path": "/foo", + "Args": []string(nil), + "ProxyToken": "", }, }, } @@ -376,11 +412,9 @@ func TestDaemonUnmarshalSnapshot(t *testing.T) { uuid, err := uuid.GenerateUUID() require.NoError(err) - d := &Daemon{ - Command: helperProcess("start-stop", path), - ProxyToken: uuid, - Logger: testLogger, - } + d := helperProcessDaemon("start-stop", path) + d.ProxyToken = uuid + d.Logger = testLogger defer d.Stop() require.NoError(d.Start()) @@ -430,11 +464,9 @@ func TestDaemonUnmarshalSnapshot_notRunning(t *testing.T) { uuid, err := uuid.GenerateUUID() require.NoError(err) - d := &Daemon{ - Command: helperProcess("start-stop", path), - ProxyToken: uuid, - Logger: testLogger, - } + d := helperProcessDaemon("start-stop", path) + d.ProxyToken = uuid + d.Logger = testLogger defer d.Stop() require.NoError(d.Start()) diff --git a/agent/proxy/manager.go b/agent/proxy/manager.go index 09eb1f601..46bde603b 100644 --- a/agent/proxy/manager.go +++ b/agent/proxy/manager.go @@ -4,7 +4,6 @@ import ( "fmt" "log" "os" - "os/exec" "path/filepath" "sync" "time" @@ -401,16 +400,9 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) { return nil, fmt.Errorf("daemon mode managed proxy requires command") } - // Build the command to execute. - var cmd exec.Cmd - cmd.Path = command[0] - cmd.Args = command // idx 0 is path but preserved since it should be - if err := m.configureLogDir(id, &cmd); err != nil { - return nil, fmt.Errorf("error configuring proxy logs: %s", err) - } - // Build the daemon structure - proxy.Command = &cmd + proxy.Path = command[0] + proxy.Args = command // idx 0 is path but preserved since it should be proxy.ProxyId = id proxy.ProxyToken = mp.ProxyToken return proxy, nil @@ -427,8 +419,10 @@ func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy switch mode { case structs.ProxyExecModeDaemon: return &Daemon{ - Logger: m.Logger, - PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id), + Logger: m.Logger, + StdoutPath: logPath(filepath.Join(m.DataDir, "logs"), id, "stdout"), + StderrPath: logPath(filepath.Join(m.DataDir, "logs"), id, "stderr"), + PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id), }, nil default: @@ -436,41 +430,6 @@ func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy } } -// configureLogDir sets up the file descriptors to stdout/stderr so that -// they log to the proper file path for the given service ID. -func (m *Manager) configureLogDir(id string, cmd *exec.Cmd) error { - // Create the log directory - logDir := "" - if m.DataDir != "" { - logDir = filepath.Join(m.DataDir, "logs") - if err := os.MkdirAll(logDir, 0700); err != nil { - return err - } - } - - // Configure the stdout, stderr paths - stdoutPath := logPath(logDir, id, "stdout") - stderrPath := logPath(logDir, id, "stderr") - - // Open the files. We want to append to each. We expect these files - // to be rotated by some external process. - stdoutF, err := os.OpenFile(stdoutPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - return fmt.Errorf("error creating stdout file: %s", err) - } - stderrF, err := os.OpenFile(stderrPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - // Don't forget to close stdoutF which successfully opened - stdoutF.Close() - - return fmt.Errorf("error creating stderr file: %s", err) - } - - cmd.Stdout = stdoutF - cmd.Stderr = stderrF - return nil -} - // logPath is a helper to return the path to the log file for the given // directory, service ID, and stream type (stdout or stderr). func logPath(dir, id, stream string) string { diff --git a/agent/proxy/manager_test.go b/agent/proxy/manager_test.go index 28922cbfa..228391a5d 100644 --- a/agent/proxy/manager_test.go +++ b/agent/proxy/manager_test.go @@ -379,7 +379,7 @@ func testStateProxy(t *testing.T, state *local.State, service string, cmd *exec. ExecMode: structs.ProxyExecModeDaemon, Command: command, TargetServiceID: service, - }, "token") + }, "token", "") require.NoError(t, err) return p.Proxy.ProxyService.ID diff --git a/agent/proxy/process.go b/agent/proxy/process.go new file mode 100644 index 000000000..22d75a3ce --- /dev/null +++ b/agent/proxy/process.go @@ -0,0 +1,43 @@ +package proxy + +import ( + "log" + "strings" + "time" +) + +// isProcessAlreadyFinishedErr does a janky comparison with an error string +// defined in os/exec_unix.go and os/exec_windows.go which we encounter due to races. +// These case tests to fail since Stop returns an error sometimes so we should +// notice if this string stops matching the error in a future go version. +func isProcessAlreadyFinishedErr(err error) bool { + return strings.Contains(err.Error(), "os: process already finished") +} + +// externalWait mimics process.Wait for an external process. The returned +// channel is closed when the process exits. It works by polling the process +// with signal 0 and verifying no error is returned. A closer func is also +// returned that can be invoked to terminate the waiter early. +func externalWait(pid int, pollInterval time.Duration) (<-chan struct{}, func()) { + ch := make(chan struct{}) + stopCh := make(chan struct{}) + closer := func() { + close(stopCh) + } + go func() { + for { + select { + case <-stopCh: + return + default: + } + log.Printf("checking pid %d", pid) + if _, err := findProcess(pid); err != nil { + close(ch) + return + } + time.Sleep(pollInterval) + } + }() + return ch, closer +} diff --git a/agent/proxy/process_test.go b/agent/proxy/process_test.go new file mode 100644 index 000000000..ca8cf394e --- /dev/null +++ b/agent/proxy/process_test.go @@ -0,0 +1,70 @@ +package proxy + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/hashicorp/consul/testutil/retry" + "github.com/stretchr/testify/require" +) + +func TestExternalWait(t *testing.T) { + t.Parallel() + + require := require.New(t) + td, closer := testTempDir(t) + defer closer() + path := filepath.Join(td, "file") + + cmd := helperProcess("restart", path) + require.NoError(cmd.Start()) + exitCh := make(chan struct{}) + // Launch waiter to make sure this process isn't zombified when it exits part + // way through the test. + go func() { + cmd.Process.Wait() + close(exitCh) + }() + defer cmd.Process.Kill() + + // Create waiter + pollInterval := 1 * time.Millisecond + waitCh, closer := externalWait(cmd.Process.Pid, pollInterval) + defer closer() + + // Wait for the file to exist so we don't rely on timing to not race with + // process startup. + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + + r.Fatalf("error: %s", err) + }) + + // waitCh should not be closed until process quits. We'll wait a bit to verify + // we weren't just too quick to see a process exit + select { + case <-waitCh: + t.Fatal("waitCh should not be closed yet") + default: + } + + // Delete the file + require.NoError(os.Remove(path)) + + // Wait for the child to actually exit cleanly + <-exitCh + + // Now we _should_ see waitCh close (need to wait at least a whole poll + // interval) + select { + case <-waitCh: + // OK + case <-time.After(10 * pollInterval): + t.Fatal("waitCh should be closed") + } +} diff --git a/agent/proxy/process_windows.go b/agent/proxy/process_windows.go index 0a00d81ee..43226aa92 100644 --- a/agent/proxy/process_windows.go +++ b/agent/proxy/process_windows.go @@ -7,8 +7,8 @@ import ( ) func findProcess(pid int) (*os.Process, error) { - // On Windows, os.FindProcess will error if the process is not alive, - // so we don't have to do any further checking. The nature of it being - // non-nil means it seems to be healthy. + // On Windows, os.FindProcess will error if the process is not alive, so we + // don't have to do any further checking. The nature of it being non-nil means + // it seems to be healthy. return os.FindProcess(pid) } diff --git a/agent/proxy/proxy_test.go b/agent/proxy/proxy_test.go index 9b123787c..7caa9a1d5 100644 --- a/agent/proxy/proxy_test.go +++ b/agent/proxy/proxy_test.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "os/signal" + "strconv" "testing" "time" ) @@ -49,6 +50,37 @@ func helperProcess(s ...string) *exec.Cmd { return cmd } +// helperProcessDaemon returns a *Daemon that can be used to execute the +// TestHelperProcess function below. This can be used to test multi-process +// interactions. The Daemon has it's Path, Args and stdio paths populated but +// other fields might need to be set depending on test requirements. +// +// NOTE: this relies on a sufficiently recent version on consul being installed +// in your path so that the daemonize command can be used. That's gross but hard +// to see how we can do better given that tests are separate binaries and we +// need consul's daemonize mode to work correctly. I considered hacks around +// building the local tree and getting the absolute path to the resulting binary +// but that seems gross in a different way. This is the same or weaker +// assumption our `api` test suit makes already... +func helperProcessDaemon(s ...string) *Daemon { + cs := []string{os.Args[0], "-test.run=TestHelperProcess", "--", helperProcessSentinel} + cs = append(cs, s...) + + path, err := exec.LookPath("consul") + if err != nil || path == "" { + panic("consul not found on $PATH - download and install " + + "consul or skip this test") + } + + return &Daemon{ + Path: os.Args[0], + Args: cs, + StdoutPath: "_", // dev null them for now + StderrPath: "_", + daemonizePath: path, + } +} + // This is not a real test. This is just a helper process kicked off by tests // using the helperProcess helper function. func TestHelperProcess(t *testing.T) { @@ -155,6 +187,33 @@ func TestHelperProcess(t *testing.T) { <-make(chan struct{}) + // Parent runs the given process in a Daemon and then exits. It exists to test + // that the Daemon-managed child process survives it's parent exiting which we + // can't test directly without exiting the test process so we need an extra + // level of indirection. The caller must pass a file path as the first + // argument for the child processes PID to be written and then must take care + // to clean up that PID later or the child will be left running forever. + case "parent": + // We will write the PID for the child to the file in the first argument + // then pass rest of args through to command. + pidFile := args[0] + d := helperProcess(args[1:]...) + if err := d.Start(); err != nil { + fmt.Fprintf(os.Stderr, "Error: %s\n", err) + os.Exit(1) + } + // Write PID + pidBs := []byte(strconv.Itoa(d.Process.Pid)) + if err := ioutil.WriteFile(pidFile, pidBs, 0644); err != nil { + fmt.Fprintf(os.Stderr, "Error: %s\n", err) + os.Exit(1) + } + // Wait "forever" (calling test chooses when we exit with signal/Wait to + // minimise coordination) + for { + time.Sleep(time.Hour) + } + default: fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmd) os.Exit(2) diff --git a/command/commands_oss.go b/command/commands_oss.go index 8e95282aa..5184f553a 100644 --- a/command/commands_oss.go +++ b/command/commands_oss.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul/command/connect/ca" caget "github.com/hashicorp/consul/command/connect/ca/get" caset "github.com/hashicorp/consul/command/connect/ca/set" + "github.com/hashicorp/consul/command/connect/daemonize" "github.com/hashicorp/consul/command/connect/proxy" "github.com/hashicorp/consul/command/event" "github.com/hashicorp/consul/command/exec" @@ -74,6 +75,7 @@ func init() { Register("connect ca get-config", func(ui cli.Ui) (cli.Command, error) { return caget.New(ui), nil }) Register("connect ca set-config", func(ui cli.Ui) (cli.Command, error) { return caset.New(ui), nil }) Register("connect proxy", func(ui cli.Ui) (cli.Command, error) { return proxy.New(ui, MakeShutdownCh()), nil }) + RegisterHidden("connect daemonize", func(ui cli.Ui) (cli.Command, error) { return daemonize.New(ui), nil }) Register("event", func(ui cli.Ui) (cli.Command, error) { return event.New(ui), nil }) Register("exec", func(ui cli.Ui) (cli.Command, error) { return exec.New(ui, MakeShutdownCh()), nil }) Register("force-leave", func(ui cli.Ui) (cli.Command, error) { return forceleave.New(ui), nil }) diff --git a/command/connect/daemonize/daemonize.go b/command/connect/daemonize/daemonize.go new file mode 100644 index 000000000..6c4097303 --- /dev/null +++ b/command/connect/daemonize/daemonize.go @@ -0,0 +1,90 @@ +package daemonize + +import ( + "fmt" + _ "net/http/pprof" + "os" // Expose pprof if configured + "os/exec" + "syscall" + + "github.com/mitchellh/cli" +) + +func New(ui cli.Ui) *cmd { + return &cmd{UI: ui} +} + +type cmd struct { + UI cli.Ui + + stdoutPath string + stderrPath string + cmdArgs []string +} + +func (c *cmd) Run(args []string) int { + // Ignore initial `consul connect daemonize` + offset := 3 + numArgs := len(os.Args) - offset + if numArgs < 4 { + c.UI.Error("Need at least 3 arguments; stdoutPath, stdinPath, " + + "executablePath [arguments...]") + os.Exit(1) + } + + c.stdoutPath, c.stderrPath = os.Args[offset], os.Args[offset+1] + c.cmdArgs = os.Args[offset+2:] // includes the executable as arg 0 as expected + + // Open log files if specified + var stdoutF, stderrF *os.File + var err error + if c.stdoutPath != "_" { + stdoutF, err = os.OpenFile(c.stdoutPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + c.UI.Error(fmt.Sprintf("error creating stdout file: %s", err)) + return 1 + } + } + if c.stderrPath != "_" { + stderrF, err = os.OpenFile(c.stderrPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + c.UI.Error(fmt.Sprintf("error creating stderr file: %s", err)) + return 1 + } + } + + // Exec the command passed in a new session then exit to ensure it's adopted + // by the init process. Use the passed file paths for std out/err. + cmd := &exec.Cmd{ + Path: c.cmdArgs[0], + Args: c.cmdArgs, + // Inherit Dir and Env by default. + SysProcAttr: &syscall.SysProcAttr{Setsid: true}, + } + cmd.Stdin = nil + cmd.Stdout = stdoutF + cmd.Stderr = stderrF + + // Exec the child + err = cmd.Start() + if err != nil { + c.UI.Error("command failed with error: " + err.Error()) + os.Exit(1) + } + + // Print it's PID to stdout + fmt.Fprintf(os.Stdout, "%d\n", cmd.Process.Pid) + + // Release (no-op on unix) and exit to orphan the child and get it adopted by + // init. + cmd.Process.Release() + return 0 +} + +func (c *cmd) Synopsis() string { + return "" +} + +func (c *cmd) Help() string { + return "" +} diff --git a/command/registry.go b/command/registry.go index 2b092ae72..e3d05fc83 100644 --- a/command/registry.go +++ b/command/registry.go @@ -24,12 +24,36 @@ func Register(name string, fn Factory) { registry[name] = fn } +// RegisterHidden adds a new CLI sub-command to the registry that won't show up +// in help or autocomplete. +func RegisterHidden(name string, fn Factory) { + if hiddenRegistry == nil { + hiddenRegistry = make(map[string]Factory) + } + + if hiddenRegistry[name] != nil { + panic(fmt.Errorf("Command %q is already registered", name)) + } + hiddenRegistry[name] = fn +} + // Map returns a realized mapping of available CLI commands in a format that // the CLI class can consume. This should be called after all registration is // complete. func Map(ui cli.Ui) map[string]cli.CommandFactory { + return makeCommands(ui, registry) +} + +// Map returns a realized mapping of available but hidden CLI commands in a +// format that the CLI class can consume. This should be called after all +// registration is complete. +func MapHidden(ui cli.Ui) map[string]cli.CommandFactory { + return makeCommands(ui, hiddenRegistry) +} + +func makeCommands(ui cli.Ui, reg map[string]Factory) map[string]cli.CommandFactory { m := make(map[string]cli.CommandFactory) - for name, fn := range registry { + for name, fn := range reg { thisFn := fn m[name] = func() (cli.Command, error) { return thisFn(ui) @@ -42,6 +66,10 @@ func Map(ui cli.Ui) map[string]cli.CommandFactory { // command name. This should be populated at package init() time via Register(). var registry map[string]Factory +// hiddenRegistry behaves identically to registry but is for commands that are +// hidden - i.e. not publically documented in the help or autocomplete. +var hiddenRegistry map[string]Factory + // MakeShutdownCh returns a channel that can be used for shutdown notifications // for commands. This channel will send a message for every interrupt or SIGTERM // received. diff --git a/main.go b/main.go index 855967af4..c12106c7e 100644 --- a/main.go +++ b/main.go @@ -41,6 +41,13 @@ func realMain() int { names = append(names, c) } + // Add hidden command + hidden := command.MapHidden(ui) + for name, cmd := range hidden { + // Don't add names to help since they are hidden! + cmds[name] = cmd + } + cli := &cli.CLI{ Args: args, Commands: cmds,