open-consul/agent/proxy/daemon.go
Paul Banks c97db00903 Run daemon processes as a detached child.
This turns out to have a lot more subtelty than we accounted for. The test suite is especially prone to races now we can only poll the child and many extra levels of indirectoin are needed to correctly run daemon process without it becoming a Zombie.

I ran this test suite in a loop with parallel enabled to verify for races (-race doesn't find any as they are logical inter-process ones not actual data races). I made it through ~50 runs before hitting an error due to timing which is much better than before. I want to go back and see if we can do better though. Just getting this up.
2018-06-25 12:24:08 -07:00

504 lines
15 KiB
Go

package proxy
import (
"bytes"
"fmt"
"log"
"os"
"os/exec"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/hashicorp/consul/lib/file"
"github.com/mitchellh/mapstructure"
)
// Constants related to restart timers with the daemon mode proxies. At some
// point we will probably want to expose these knobs to an end user, but
// reasonable defaults are chosen.
const (
DaemonRestartHealthy = 10 * time.Second // time before considering healthy
DaemonRestartBackoffMin = 3 // 3 attempts before backing off
DaemonRestartMaxWait = 1 * time.Minute // maximum backoff wait time
)
// Daemon is a long-running proxy process. It is expected to keep running
// and to use blocking queries to detect changes in configuration, certs,
// and more.
//
// Consul will ensure that if the daemon crashes, that it is restarted.
type Daemon struct {
// Path is the path to the executable to run
Path string
// Args are the arguments to run with, the first element should be the same as
// Path.
Args []string
// ProxyId is the ID of the proxy service. This is required for API
// requests (along with the token) and is passed via env var.
ProxyId string
// ProxyToken is the special local-only ACL token that allows a proxy
// to communicate to the Connect-specific endpoints.
ProxyToken string
// Logger is where logs will be sent around the management of this
// daemon. The actual logs for the daemon itself will be sent to
// a file.
Logger *log.Logger
// PidPath is the path where a pid file will be created storing the
// pid of the active process. If this is empty then a pid-file won't
// be created. Under erroneous conditions, the pid file may not be
// created but the error will be logged to the Logger.
PidPath string
// StdoutPath, StderrPath are the paths to the files that stdout and stderr
// should be written to.
StdoutPath, StderrPath string
// gracefulWait can be set for tests and controls how long Stop() will wait
// for process to terminate before killing. If not set defaults to 5 seconds.
// If this is lowered for tests, it must remain higher than pollInterval
// (preferably a factor of 2 or more) or the tests will potentially return
// errors from Stop() where the process races to Kill() a process that was
// already stopped by SIGINT but we didn't yet detect the change since poll
// didn't occur.
gracefulWait time.Duration
// pollInterval can be set for tests and controls how frequently the child
// process is sent SIG 0 to check it's still running. If not set defaults to 1
// second.
pollInterval time.Duration
// daemonizePath is set only in tests to control the path to the daemonize
// command. The test executable itself will not respond to the consul command
// arguments we need to make this work so we rely on the current source being
// built and installed here to run the tests.
daemonizePath string
// process is the started process
lock sync.Mutex
stopped bool
stopCh chan struct{}
exitedCh chan struct{}
process *os.Process
}
// Start starts the daemon and keeps it running.
//
// This function returns after the process is successfully started.
func (p *Daemon) Start() error {
p.lock.Lock()
defer p.lock.Unlock()
// A stopped proxy cannot be restarted
if p.stopped {
return fmt.Errorf("stopped")
}
// If we're already running, that is okay
if p.process != nil {
return nil
}
// Setup our stop channel
stopCh := make(chan struct{})
exitedCh := make(chan struct{})
p.stopCh = stopCh
p.exitedCh = exitedCh
// Start the loop.
go p.keepAlive(stopCh, exitedCh)
return nil
}
// keepAlive starts and keeps the configured process alive until it
// is stopped via Stop.
func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
defer close(exitedCh)
p.lock.Lock()
process := p.process
p.lock.Unlock()
// attemptsDeadline is the time at which we consider the daemon to have
// been alive long enough that we can reset the attempt counter.
//
// attempts keeps track of the number of restart attempts we've had and
// is used to calculate the wait time using an exponential backoff.
var attemptsDeadline time.Time
var attempts uint32
for {
if process == nil {
// If we're passed the attempt deadline then reset the attempts
if !attemptsDeadline.IsZero() && time.Now().After(attemptsDeadline) {
attempts = 0
}
attemptsDeadline = time.Now().Add(DaemonRestartHealthy)
attempts++
// Calculate the exponential backoff and wait if we have to
if attempts > DaemonRestartBackoffMin {
delayedAttempts := attempts - DaemonRestartBackoffMin
waitTime := time.Duration(0)
if delayedAttempts > 31 {
// don't shift off the end of the uint32 if the process is in a crash
// loop forever
waitTime = DaemonRestartMaxWait
} else {
waitTime = (1 << delayedAttempts) * time.Second
}
if waitTime > DaemonRestartMaxWait {
waitTime = DaemonRestartMaxWait
}
if waitTime > 0 {
p.Logger.Printf(
"[WARN] agent/proxy: waiting %s before restarting daemon",
waitTime)
timer := time.NewTimer(waitTime)
select {
case <-timer.C:
// Timer is up, good!
case <-stopCh:
// During our backoff wait, we've been signalled to
// quit, so just quit.
timer.Stop()
return
}
}
}
p.lock.Lock()
// If we gracefully stopped then don't restart.
if p.stopped {
p.lock.Unlock()
return
}
// Process isn't started currently. We're restarting. Start it and save
// the process if we have it.
var err error
process, err = p.start()
if err == nil {
p.process = process
}
p.lock.Unlock()
if err != nil {
p.Logger.Printf("[ERR] agent/proxy: error restarting daemon: %s", err)
continue
}
// NOTE: it is a postcondition of this method that we don't return while
// the process is still running. See NOTE below but it's essential that
// from here to the <-waitCh below nothing can cause this method to exit
// or loop if err is non-nil.
}
// Wait will never work since child is detached and released, so poll the
// PID with sig 0 to check it's still alive.
interval := p.pollInterval
if interval < 1 {
interval = 1 * time.Second
}
waitCh, closer := externalWait(process.Pid, interval)
defer closer()
// NOTE: we must not select on anything else here; Stop() requires the
// postcondition for this method to be that the managed process is not
// running when we return and the defer above closes p.exitedCh. If we
// select on stopCh or another signal here we introduce races where we might
// exit early and leave the actual process running (for example because
// SIGINT was ignored but Stop saw us exit and assumed all was good). That
// means that there is no way to stop the Daemon without killing the process
// but that's OK because agent Shutdown can just persist the state and exit
// without Stopping this and the right thing will happen. If we ever need to
// stop managing a process without killing it at a time when the agent
// process is not shutting down then we might have to re-think that.
<-waitCh
// Note that we don't need to call Release explicitly. It's a no-op for Unix
// but even on Windows it's called automatically by a Finalizer during
// garbage collection to free the process handle.
// (https://github.com/golang/go/blob/1174ad3a8f6f9d2318ac45fca3cd90f12915cf04/src/os/exec.go#L26)
process = nil
p.Logger.Printf("[INFO] agent/proxy: daemon exited")
}
}
// start starts and returns the process. This will create a copy of the
// configured *exec.Command with the modifications documented on Daemon
// such as setting the proxy token environmental variable.
func (p *Daemon) start() (*os.Process, error) {
// Add the proxy token to the environment. We first copy the env because
// it is a slice and therefore the "copy" above will only copy the slice
// reference. We allocate an exactly sized slice.
baseEnv := os.Environ()
env := make([]string, len(baseEnv), len(baseEnv)+2)
copy(env, baseEnv)
env = append(env,
fmt.Sprintf("%s=%s", EnvProxyId, p.ProxyId),
fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken))
// Args must always contain a 0 entry which is usually the executed binary.
// To be safe and a bit more robust we default this, but only to prevent
// a panic below.
if len(p.Args) == 0 {
p.Args = []string{p.Path}
}
// Watch closely, we now swap out the exec.Cmd args for ones to run the same
// command via the connect daemonize command which takes care of correctly
// "double forking" to ensure the child is fully detached and adopted by the
// init process while the agent keeps running.
var daemonCmd exec.Cmd
dCmd, err := p.daemonizeCommand()
if err != nil {
return nil, err
}
daemonCmd.Path = dCmd[0]
// First arguments are for the stdout, stderr
daemonCmd.Args = append(dCmd, p.StdoutPath)
daemonCmd.Args = append(daemonCmd.Args, p.StdoutPath)
daemonCmd.Args = append(daemonCmd.Args, p.Args...)
daemonCmd.Env = env
// setup stdout so we can read the PID
var out bytes.Buffer
daemonCmd.Stdout = &out
// Run it to completion - it should exit immediately (this calls wait to
// ensure we don't leave the daemonize command as a zombie)
p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", daemonCmd.Path,
daemonCmd.Args[1:])
if err := daemonCmd.Run(); err != nil {
return nil, err
}
// Read the PID from stdout
outStr, err := out.ReadString('\n')
if err != nil {
return nil, err
}
pid, err := strconv.Atoi(strings.TrimSpace(outStr))
if err != nil {
return nil, fmt.Errorf("failed to parse PID from output of daemonize: %s",
err)
}
// Write the pid file. This might error and that's okay.
if p.PidPath != "" {
pid := strconv.Itoa(pid)
if err := file.WriteAtomic(p.PidPath, []byte(pid)); err != nil {
p.Logger.Printf(
"[DEBUG] agent/proxy: error writing pid file %q: %s",
p.PidPath, err)
}
}
// Finally, adopt the process so we can send signals
return findProcess(pid)
}
// daemonizeCommand returns the daemonize command.
func (p *Daemon) daemonizeCommand() ([]string, error) {
// Get the path to the current executable. This is cached once by the
// library so this is effectively just a variable read.
execPath, err := os.Executable()
if err != nil {
return nil, err
}
if p.daemonizePath != "" {
execPath = p.daemonizePath
}
return []string{execPath, "connect", "daemonize"}, nil
}
// Stop stops the daemon.
//
// This will attempt a graceful stop (SIGINT) before force killing the
// process (SIGKILL). In either case, the process won't be automatically
// restarted unless Start is called again.
//
// This is safe to call multiple times. If the daemon is already stopped,
// then this returns no error.
func (p *Daemon) Stop() error {
p.lock.Lock()
// If we're already stopped or never started, then no problem.
if p.stopped || p.process == nil {
// In the case we never even started, calling Stop makes it so
// that we can't ever start in the future, either, so mark this.
p.stopped = true
p.lock.Unlock()
return nil
}
// Note that we've stopped
p.stopped = true
close(p.stopCh)
process := p.process
p.lock.Unlock()
gracefulWait := p.gracefulWait
if gracefulWait == 0 {
gracefulWait = 5 * time.Second
}
// Defer removing the pid file. Even under error conditions we
// delete the pid file since Stop means that the manager is no
// longer managing this proxy and therefore nothing else will ever
// clean it up.
if p.PidPath != "" {
defer func() {
if err := os.Remove(p.PidPath); err != nil && !os.IsNotExist(err) {
p.Logger.Printf(
"[DEBUG] agent/proxy: error removing pid file %q: %s",
p.PidPath, err)
}
}()
}
// First, try a graceful stop.
err := process.Signal(os.Interrupt)
if err == nil {
select {
case <-p.exitedCh:
// Success!
return nil
case <-time.After(gracefulWait):
// Interrupt didn't work
p.Logger.Printf("[DEBUG] agent/proxy: gracefull wait of %s passed, "+
"killing", gracefulWait)
}
} else if isProcessAlreadyFinishedErr(err) {
// This can happen due to races between signals and polling.
return nil
} else {
p.Logger.Printf("[DEBUG] agent/proxy: sigint failed, killing: %s", err)
}
// Graceful didn't work (e.g. on windows where SIGINT isn't implemented),
// forcibly kill
err = process.Kill()
if err != nil && isProcessAlreadyFinishedErr(err) {
return nil
}
return err
}
// stopKeepAlive is like Stop but keeps the process running. This is
// used only for tests.
func (p *Daemon) stopKeepAlive() error {
p.lock.Lock()
defer p.lock.Unlock()
// If we're already stopped or never started, then no problem.
if p.stopped || p.process == nil {
p.stopped = true
p.lock.Unlock()
return nil
}
// Note that we've stopped
p.stopped = true
close(p.stopCh)
return nil
}
// Equal implements Proxy to check for equality.
func (p *Daemon) Equal(raw Proxy) bool {
p2, ok := raw.(*Daemon)
if !ok {
return false
}
// We compare equality on a subset of the command configuration
return p.ProxyToken == p2.ProxyToken &&
p.Path == p2.Path &&
reflect.DeepEqual(p.Args, p2.Args)
}
// MarshalSnapshot implements Proxy
func (p *Daemon) MarshalSnapshot() map[string]interface{} {
p.lock.Lock()
defer p.lock.Unlock()
// If we're stopped or have no process, then nothing to snapshot.
if p.stopped || p.process == nil {
return nil
}
return map[string]interface{}{
"Pid": p.process.Pid,
"Path": p.Path,
"Args": p.Args,
"ProxyToken": p.ProxyToken,
}
}
// UnmarshalSnapshot implements Proxy
func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error {
var s daemonSnapshot
if err := mapstructure.Decode(m, &s); err != nil {
return err
}
p.lock.Lock()
defer p.lock.Unlock()
// Set the basic fields
p.ProxyToken = s.ProxyToken
p.Path = s.Path
p.Args = s.Args
// FindProcess on many systems returns no error even if the process
// is now dead. We perform an extra check that the process is alive.
proc, err := findProcess(s.Pid)
if err != nil {
return err
}
// "Start it"
stopCh := make(chan struct{})
exitedCh := make(chan struct{})
p.stopCh = stopCh
p.exitedCh = exitedCh
p.process = proc
go p.keepAlive(stopCh, exitedCh)
return nil
}
// daemonSnapshot is the structure of the marshalled data for snapshotting.
//
// Note we don't have to store the ProxyId because this is stored directly
// within the manager snapshot and is restored automatically.
type daemonSnapshot struct {
// Pid of the process. This is the only value actually required to
// regain management control. The remainder values are for Equal.
Pid int
// Command information
Path string
Args []string
// NOTE(mitchellh): longer term there are discussions/plans to only
// store the hash of the token but for now we need the full token in
// case the process dies and has to be restarted.
ProxyToken string
}