Abandon daemonize for simpler solution (preserving history):

Reverts:
  - bdb274852ae469c89092d6050697c0ff97178465
  - 2c689179c4f61c11f0016214c0fc127a0b813bfe
  - d62e25c4a7ab753914b6baccd66f88ffd10949a3
  - c727ffbcc98e3e0bf41e1a7bdd40169bd2d22191
  - 31b4d18933fd0acbe157e28d03ad59c2abf9a1fb
  - 85c3f8df3eabc00f490cd392213c3b928a85aa44
This commit is contained in:
Paul Banks 2018-06-05 12:13:28 +01:00 committed by Jack Pearkes
parent 9ef748157a
commit 3bac52480e
21 changed files with 239 additions and 798 deletions

View File

@ -366,7 +366,6 @@ func (a *Agent) Start() error {
a.proxyManager = proxy.NewManager() a.proxyManager = proxy.NewManager()
a.proxyManager.State = a.State a.proxyManager.State = a.State
a.proxyManager.Logger = a.logger a.proxyManager.Logger = a.logger
a.proxyManager.DisableDetach = a.config.ConnectDisableDetachedDaemons
if a.config.DataDir != "" { if a.config.DataDir != "" {
// DataDir is required for all non-dev mode agents, but we want // DataDir is required for all non-dev mode agents, but we want
// to allow setting the data dir for demos and so on for the agent, // to allow setting the data dir for demos and so on for the agent,
@ -1320,8 +1319,11 @@ func (a *Agent) ShutdownAgent() error {
} }
// Stop the proxy manager // Stop the proxy manager
// NOTE(mitchellh): we use Kill for now to kill the processes since
// the local state isn't snapshotting meaning the proxy tokens are
// regenerated each time forcing the processes to restart anyways.
if a.proxyManager != nil { if a.proxyManager != nil {
if err := a.proxyManager.Close(); err != nil { if err := a.proxyManager.Kill(); err != nil {
a.logger.Printf("[WARN] agent: error shutting down proxy manager: %s", err) a.logger.Printf("[WARN] agent: error shutting down proxy manager: %s", err)
} }
} }

View File

@ -15,6 +15,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"

View File

@ -621,16 +621,6 @@ type RuntimeConfig struct {
// that. // that.
ConnectEnabled bool ConnectEnabled bool
// ConnectDisableDetachedDaemons is not exposed publically and is meant for
// testing where having processes outlive the test is inconvenient. It also
// allows tests outside of the `agent/proxy` package to ignore the unpleasant
// details of self-executing the test binary in order to correctly detach a
// process. It's set to true by default in TestAgent and setting it to false
// in any test requires several hoops to be jumped through to allow the test
// binary to behave as a daemonizer and for the agent to be configured to use
// the right invocation of the binary for it.
ConnectDisableDetachedDaemons bool
// ConnectProxyBindMinPort is the inclusive start of the range of ports // ConnectProxyBindMinPort is the inclusive start of the range of ports
// allocated to the agent for starting proxy listeners on where no explicit // allocated to the agent for starting proxy listeners on where no explicit
// port is specified. // port is specified.

View File

@ -4199,7 +4199,6 @@ func TestSanitize(t *testing.T) {
"ClientAddrs": [], "ClientAddrs": [],
"ConnectCAConfig": {}, "ConnectCAConfig": {},
"ConnectCAProvider": "", "ConnectCAProvider": "",
"ConnectDisableDetachedDaemons": false,
"ConnectEnabled": false, "ConnectEnabled": false,
"ConnectProxyBindMaxPort": 0, "ConnectProxyBindMaxPort": 0,
"ConnectProxyBindMinPort": 0, "ConnectProxyBindMinPort": 0,

View File

@ -1691,7 +1691,7 @@ func TestStateProxyManagement(t *testing.T) {
require := require.New(t) require := require.New(t)
assert := assert.New(t) assert := assert.New(t)
_, err := state.AddProxy(&p1, "fake-token", "") _, err := state.AddProxy(&p1, "fake-token")
require.Error(err, "should fail as the target service isn't registered") require.Error(err, "should fail as the target service isn't registered")
// Sanity check done, lets add a couple of target services to the state // Sanity check done, lets add a couple of target services to the state
@ -1710,7 +1710,7 @@ func TestStateProxyManagement(t *testing.T) {
require.NoError(err) require.NoError(err)
// Should work now // Should work now
pstate, err := state.AddProxy(&p1, "fake-token", "") pstate, err := state.AddProxy(&p1, "fake-token")
require.NoError(err) require.NoError(err)
svc := pstate.Proxy.ProxyService svc := pstate.Proxy.ProxyService
@ -1724,9 +1724,8 @@ func TestStateProxyManagement(t *testing.T) {
{ {
// Re-registering same proxy again should not pick a random port but re-use // Re-registering same proxy again should not pick a random port but re-use
// the assigned one. It should also keep the same proxy token since we don't // the assigned one.
// want to force restart for config change. pstateDup, err := state.AddProxy(&p1, "fake-token")
pstateDup, err := state.AddProxy(&p1, "fake-token", "")
require.NoError(err) require.NoError(err)
svcDup := pstateDup.Proxy.ProxyService svcDup := pstateDup.Proxy.ProxyService
@ -1737,8 +1736,6 @@ func TestStateProxyManagement(t *testing.T) {
assert.Equal("", svcDup.Address, "should have empty address by default") assert.Equal("", svcDup.Address, "should have empty address by default")
// Port must be same as before // Port must be same as before
assert.Equal(svc.Port, svcDup.Port) assert.Equal(svc.Port, svcDup.Port)
// Same ProxyToken
assert.Equal(pstate.ProxyToken, pstateDup.ProxyToken)
} }
// Let's register a notifier now // Let's register a notifier now
@ -1751,7 +1748,7 @@ func TestStateProxyManagement(t *testing.T) {
// Second proxy should claim other port // Second proxy should claim other port
p2 := p1 p2 := p1
p2.TargetServiceID = "cache" p2.TargetServiceID = "cache"
pstate2, err := state.AddProxy(&p2, "fake-token", "") pstate2, err := state.AddProxy(&p2, "fake-token")
require.NoError(err) require.NoError(err)
svc2 := pstate2.Proxy.ProxyService svc2 := pstate2.Proxy.ProxyService
assert.Contains([]int{20000, 20001}, svc2.Port) assert.Contains([]int{20000, 20001}, svc2.Port)
@ -1767,7 +1764,7 @@ func TestStateProxyManagement(t *testing.T) {
// Third proxy should fail as all ports are used // Third proxy should fail as all ports are used
p3 := p1 p3 := p1
p3.TargetServiceID = "db" p3.TargetServiceID = "db"
_, err = state.AddProxy(&p3, "fake-token", "") _, err = state.AddProxy(&p3, "fake-token")
require.Error(err) require.Error(err)
// Should have a notification but we'll do nothing so that the next // Should have a notification but we'll do nothing so that the next
@ -1778,7 +1775,7 @@ func TestStateProxyManagement(t *testing.T) {
"bind_port": 1234, "bind_port": 1234,
"bind_address": "0.0.0.0", "bind_address": "0.0.0.0",
} }
pstate3, err := state.AddProxy(&p3, "fake-token", "") pstate3, err := state.AddProxy(&p3, "fake-token")
require.NoError(err) require.NoError(err)
svc3 := pstate3.Proxy.ProxyService svc3 := pstate3.Proxy.ProxyService
require.Equal("0.0.0.0", svc3.Address) require.Equal("0.0.0.0", svc3.Address)
@ -1796,7 +1793,7 @@ func TestStateProxyManagement(t *testing.T) {
require.NotNil(gotP3) require.NotNil(gotP3)
var ws memdb.WatchSet var ws memdb.WatchSet
ws.Add(gotP3.WatchCh) ws.Add(gotP3.WatchCh)
pstate3, err = state.AddProxy(&p3updated, "fake-token", "") pstate3, err = state.AddProxy(&p3updated, "fake-token")
require.NoError(err) require.NoError(err)
svc3 = pstate3.Proxy.ProxyService svc3 = pstate3.Proxy.ProxyService
require.Equal("0.0.0.0", svc3.Address) require.Equal("0.0.0.0", svc3.Address)
@ -1820,7 +1817,7 @@ func TestStateProxyManagement(t *testing.T) {
// Should be able to create a new proxy for that service with the port (it // Should be able to create a new proxy for that service with the port (it
// should have been "freed"). // should have been "freed").
p4 := p2 p4 := p2
pstate4, err := state.AddProxy(&p4, "fake-token", "") pstate4, err := state.AddProxy(&p4, "fake-token")
require.NoError(err) require.NoError(err)
svc4 := pstate4.Proxy.ProxyService svc4 := pstate4.Proxy.ProxyService
assert.Contains([]int{20000, 20001}, svc2.Port) assert.Contains([]int{20000, 20001}, svc2.Port)
@ -1868,65 +1865,3 @@ func drainCh(ch chan struct{}) {
} }
} }
} }
// Tests the logic for retaining tokens and ports through restore (i.e.
// proxy-service already restored and token passed in externally)
func TestStateProxyRestore(t *testing.T) {
t.Parallel()
state := local.NewState(local.Config{
// Wide random range to make it very unlikely to pass by chance
ProxyBindMinPort: 10000,
ProxyBindMaxPort: 20000,
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
// Stub state syncing
state.TriggerSyncChanges = func() {}
webSvc := structs.NodeService{
Service: "web",
}
p1 := structs.ConnectManagedProxy{
ExecMode: structs.ProxyExecModeDaemon,
Command: []string{"consul", "connect", "proxy"},
TargetServiceID: "web",
}
p2 := p1
require := require.New(t)
assert := assert.New(t)
// Add a target service
require.NoError(state.AddService(&webSvc, "fake-token-web"))
// Add the proxy for first time to get the proper service definition to
// register
pstate, err := state.AddProxy(&p1, "fake-token", "")
require.NoError(err)
// Now start again with a brand new state
state2 := local.NewState(local.Config{
// Wide random range to make it very unlikely to pass by chance
ProxyBindMinPort: 10000,
ProxyBindMaxPort: 20000,
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
// Stub state syncing
state2.TriggerSyncChanges = func() {}
// Register the target service
require.NoError(state2.AddService(&webSvc, "fake-token-web"))
// "Restore" the proxy service
require.NoError(state.AddService(p1.ProxyService, "fake-token-web"))
// Now we can AddProxy with the "restored" token
pstate2, err := state.AddProxy(&p2, "fake-token", pstate.ProxyToken)
require.NoError(err)
// Check it still has the same port and token as before
assert.Equal(pstate.ProxyToken, pstate2.ProxyToken)
assert.Equal(p1.ProxyService.Port, p2.ProxyService.Port)
}

View File

@ -1,15 +1,12 @@
package proxy package proxy
import ( import (
"bytes"
"fmt" "fmt"
"log" "log"
"os" "os"
"os/exec" "os/exec"
"path"
"reflect" "reflect"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -32,12 +29,9 @@ const (
// //
// Consul will ensure that if the daemon crashes, that it is restarted. // Consul will ensure that if the daemon crashes, that it is restarted.
type Daemon struct { type Daemon struct {
// Path is the path to the executable to run // Command is the command to execute to start this daemon. This must
Path string // be a Cmd that isn't yet started.
Command *exec.Cmd
// Args are the arguments to run with, the first element should be the same as
// Path.
Args []string
// ProxyId is the ID of the proxy service. This is required for API // ProxyId is the ID of the proxy service. This is required for API
// requests (along with the token) and is passed via env var. // requests (along with the token) and is passed via env var.
@ -58,36 +52,10 @@ type Daemon struct {
// created but the error will be logged to the Logger. // created but the error will be logged to the Logger.
PidPath string PidPath string
// StdoutPath, StderrPath are the paths to the files that stdout and stderr // For tests, they can set this to change the default duration to wait
// should be written to. // for a graceful quit.
StdoutPath, StderrPath string
// DisableDetach is used by tests that don't actually care about detached
// child behaviour (i.e. outside proxy package) to bypass detaching and
// daemonizing Daemons. This makes tests much simpler as they don't need to
// implement a test-binary mode to enable self-exec daemonizing etc. and there
// are fewer risks of detached processes being spawned and then not killed in
// face of missed teardown/panic/interrupt of test runs etc.
DisableDetach bool
// gracefulWait can be set for tests and controls how long Stop() will wait
// for process to terminate before killing. If not set defaults to 5 seconds.
// If this is lowered for tests, it must remain higher than pollInterval
// (preferably a factor of 2 or more) or the tests will potentially return
// errors from Stop() where the process races to Kill() a process that was
// already stopped by SIGINT but we didn't yet detect the change since poll
// didn't occur.
gracefulWait time.Duration gracefulWait time.Duration
// pollInterval can be set for tests and controls how frequently the child
// process is sent SIG 0 to check it's still running. If not set defaults to 1
// second.
pollInterval time.Duration
// daemonizeCmd is set only in tests to control the path and args to the
// daemonize command.
daemonizeCmd []string
// process is the started process // process is the started process
lock sync.Mutex lock sync.Mutex
stopped bool stopped bool
@ -119,18 +87,6 @@ func (p *Daemon) Start() error {
p.stopCh = stopCh p.stopCh = stopCh
p.exitedCh = exitedCh p.exitedCh = exitedCh
// Ensure log dirs exist
if p.StdoutPath != "" {
if err := os.MkdirAll(path.Dir(p.StdoutPath), 0755); err != nil {
return err
}
}
if p.StderrPath != "" {
if err := os.MkdirAll(path.Dir(p.StderrPath), 0755); err != nil {
return err
}
}
// Start the loop. // Start the loop.
go p.keepAlive(stopCh, exitedCh) go p.keepAlive(stopCh, exitedCh)
@ -152,7 +108,7 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
// attempts keeps track of the number of restart attempts we've had and // attempts keeps track of the number of restart attempts we've had and
// is used to calculate the wait time using an exponential backoff. // is used to calculate the wait time using an exponential backoff.
var attemptsDeadline time.Time var attemptsDeadline time.Time
var attempts uint32 var attempts uint
for { for {
if process == nil { if process == nil {
@ -165,15 +121,7 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
// Calculate the exponential backoff and wait if we have to // Calculate the exponential backoff and wait if we have to
if attempts > DaemonRestartBackoffMin { if attempts > DaemonRestartBackoffMin {
delayedAttempts := attempts - DaemonRestartBackoffMin waitTime := (1 << (attempts - DaemonRestartBackoffMin)) * time.Second
waitTime := time.Duration(0)
if delayedAttempts > 31 {
// don't shift off the end of the uint32 if the process is in a crash
// loop forever
waitTime = DaemonRestartMaxWait
} else {
waitTime = (1 << delayedAttempts) * time.Second
}
if waitTime > DaemonRestartMaxWait { if waitTime > DaemonRestartMaxWait {
waitTime = DaemonRestartMaxWait waitTime = DaemonRestartMaxWait
} }
@ -205,8 +153,8 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
return return
} }
// Process isn't started currently. We're restarting. Start it and save // Process isn't started currently. We're restarting. Start it
// the process if we have it. // and save the process if we have it.
var err error var err error
process, err = p.start() process, err = p.start()
if err == nil { if err == nil {
@ -218,40 +166,34 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
p.Logger.Printf("[ERR] agent/proxy: error restarting daemon: %s", err) p.Logger.Printf("[ERR] agent/proxy: error restarting daemon: %s", err)
continue continue
} }
// NOTE: it is a postcondition of this method that we don't return while
// the process is still running. See NOTE below but it's essential that
// from here to the <-waitCh below nothing can cause this method to exit
// or loop if err is non-nil.
} }
// Wait will never work since child is detached and released, so poll the // Wait for the process to exit. Note that if we restored this proxy
// PID with sig 0 to check it's still alive. // then Wait will always fail because we likely aren't the parent
interval := p.pollInterval // process. Therefore, we do an extra sanity check after to use other
if interval < 1 { // syscalls to verify the process is truly dead.
interval = 1 * time.Second ps, err := process.Wait()
if _, err := findProcess(process.Pid); err == nil {
select {
case <-time.After(1 * time.Second):
// We want a busy loop, but not too busy. 1 second between
// detecting a process death seems reasonable.
case <-stopCh:
// If we receive a stop request we want to exit immediately.
return
}
continue
} }
waitCh, closer := externalWait(process.Pid, interval)
defer closer()
// NOTE: we must not select on anything else here; Stop() requires the
// postcondition for this method to be that the managed process is not
// running when we return and the defer above closes p.exitedCh. If we
// select on stopCh or another signal here we introduce races where we might
// exit early and leave the actual process running (for example because
// SIGINT was ignored but Stop saw us exit and assumed all was good). That
// means that there is no way to stop the Daemon without killing the process
// but that's OK because agent Shutdown can just persist the state and exit
// without Stopping this and the right thing will happen. If we ever need to
// stop managing a process without killing it at a time when the agent
// process is not shutting down then we might have to re-think that.
<-waitCh
// Note that we don't need to call Release explicitly. It's a no-op for Unix
// but even on Windows it's called automatically by a Finalizer during
// garbage collection to free the process handle.
// (https://github.com/golang/go/blob/1174ad3a8f6f9d2318ac45fca3cd90f12915cf04/src/os/exec.go#L26)
process = nil process = nil
p.Logger.Printf("[INFO] agent/proxy: daemon exited") if err != nil {
p.Logger.Printf("[INFO] agent/proxy: daemon exited with error: %s", err)
} else if status, ok := exitStatus(ps); ok {
p.Logger.Printf("[INFO] agent/proxy: daemon exited with exit code: %d", status)
}
} }
} }
@ -259,78 +201,33 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
// configured *exec.Command with the modifications documented on Daemon // configured *exec.Command with the modifications documented on Daemon
// such as setting the proxy token environmental variable. // such as setting the proxy token environmental variable.
func (p *Daemon) start() (*os.Process, error) { func (p *Daemon) start() (*os.Process, error) {
cmd := *p.Command
// Add the proxy token to the environment. We first copy the env because // Add the proxy token to the environment. We first copy the env because
// it is a slice and therefore the "copy" above will only copy the slice // it is a slice and therefore the "copy" above will only copy the slice
// reference. We allocate an exactly sized slice. // reference. We allocate an exactly sized slice.
baseEnv := os.Environ() cmd.Env = make([]string, len(p.Command.Env), len(p.Command.Env)+1)
env := make([]string, len(baseEnv), len(baseEnv)+2) copy(cmd.Env, p.Command.Env)
copy(env, baseEnv) cmd.Env = append(cmd.Env,
env = append(env,
fmt.Sprintf("%s=%s", EnvProxyId, p.ProxyId), fmt.Sprintf("%s=%s", EnvProxyId, p.ProxyId),
fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken)) fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken))
// Args must always contain a 0 entry which is usually the executed binary. // Args must always contain a 0 entry which is usually the executed binary.
// To be safe and a bit more robust we default this, but only to prevent // To be safe and a bit more robust we default this, but only to prevent
// a panic below. // a panic below.
if len(p.Args) == 0 { if len(cmd.Args) == 0 {
p.Args = []string{p.Path} cmd.Args = []string{cmd.Path}
} }
// If we are running in a test mode that disabled detaching daemon processes // Start it
// for simplicity, just exec the thing directly. This should never be the case p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", cmd.Path, cmd.Args[1:])
// in real life since this config is not publically exposed but makes testing if err := cmd.Start(); err != nil {
// way cleaner outside of this package.
if p.DisableDetach {
cmd := exec.Command(p.Path, p.Args[1:]...)
err := cmd.Start()
return cmd.Process, err
}
// Watch closely, we now swap out the exec.Cmd args for ones to run the same
// command via the connect daemonize command which takes care of correctly
// "double forking" to ensure the child is fully detached and adopted by the
// init process while the agent keeps running.
var daemonCmd exec.Cmd
dCmd, err := p.daemonizeCommand()
if err != nil {
return nil, err return nil, err
} }
daemonCmd.Path = dCmd[0]
// First arguments are for the stdout, stderr
daemonCmd.Args = append(dCmd, p.StdoutPath)
daemonCmd.Args = append(daemonCmd.Args, p.StderrPath)
daemonCmd.Args = append(daemonCmd.Args, p.Args...)
daemonCmd.Env = env
// setup stdout so we can read the PID
var out bytes.Buffer
daemonCmd.Stdout = &out
daemonCmd.Stderr = &out
// Run it to completion - it should exit immediately (this calls wait to
// ensure we don't leave the daemonize command as a zombie)
p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", daemonCmd.Path,
daemonCmd.Args[1:])
if err := daemonCmd.Run(); err != nil {
p.Logger.Printf("[DEBUG] agent/proxy: daemonize output: %s", out.String())
return nil, err
}
// Read the PID from stdout
outStr, err := out.ReadString('\n')
if err != nil {
return nil, err
}
pid, err := strconv.Atoi(strings.TrimSpace(outStr))
if err != nil {
return nil, fmt.Errorf("failed to parse PID from output of daemonize: %s",
err)
}
// Write the pid file. This might error and that's okay. // Write the pid file. This might error and that's okay.
if p.PidPath != "" { if p.PidPath != "" {
pid := strconv.Itoa(pid) pid := strconv.FormatInt(int64(cmd.Process.Pid), 10)
if err := file.WriteAtomic(p.PidPath, []byte(pid)); err != nil { if err := file.WriteAtomic(p.PidPath, []byte(pid)); err != nil {
p.Logger.Printf( p.Logger.Printf(
"[DEBUG] agent/proxy: error writing pid file %q: %s", "[DEBUG] agent/proxy: error writing pid file %q: %s",
@ -338,38 +235,7 @@ func (p *Daemon) start() (*os.Process, error) {
} }
} }
// Finally, adopt the process so we can send signals return cmd.Process, nil
return findProcess(pid)
}
// daemonizeCommand returns the daemonize command.
func (p *Daemon) daemonizeCommand() ([]string, error) {
// Test override
if p.daemonizeCmd != nil {
return p.daemonizeCmd, nil
}
// Get the path to the current executable. This is cached once by the library
// so this is effectively just a variable read.
execPath, err := os.Executable()
if err != nil {
return nil, err
}
// Sanity check to prevent runaway test invocations because test didn't setup
// daemonizeCmd correctly. This is kinda jank but go doesn't have a way to
// detect and alter behaviour in test binaries by design. In this case though
// we really need to never allow tests to self-execute which can cause
// recursive explosion of test runs. This check seems safe for current go
// tooling based on https://github.com/golang/go/issues/12120. If you hit
// this, you need to find a way to configure your test
// agent/proxyManager/Daemon to use agent/proxy/TestHelperProcess to run
// daemonize in a safe way. TestAgent should do this automatically by default.
if strings.HasSuffix(execPath, ".test") ||
strings.HasSuffix(execPath, ".test.exe") {
panic("test did not setup daemonizeCmd override and will dangerously" +
" self-execute the test binary.")
}
return []string{execPath, "connect", "daemonize"}, nil
} }
// Stop stops the daemon. // Stop stops the daemon.
@ -417,7 +283,7 @@ func (p *Daemon) Stop() error {
}() }()
} }
// First, try a graceful stop. // First, try a graceful stop
err := process.Signal(os.Interrupt) err := process.Signal(os.Interrupt)
if err == nil { if err == nil {
select { select {
@ -427,28 +293,16 @@ func (p *Daemon) Stop() error {
case <-time.After(gracefulWait): case <-time.After(gracefulWait):
// Interrupt didn't work // Interrupt didn't work
p.Logger.Printf("[DEBUG] agent/proxy: graceful wait of %s passed, "+
"killing", gracefulWait)
} }
} else if isProcessAlreadyFinishedErr(err) {
// This can happen due to races between signals and polling.
return nil
} else {
p.Logger.Printf("[DEBUG] agent/proxy: sigint failed, killing: %s", err)
} }
// Graceful didn't work (e.g. on windows where SIGINT isn't implemented), // Graceful didn't work, forcibly kill
// forcibly kill return process.Kill()
err = process.Kill()
if err != nil && isProcessAlreadyFinishedErr(err) {
return nil
}
return err
} }
// Close implements Proxy by stopping the run loop but not killing the process. // stopKeepAlive is like Stop but keeps the process running. This is
// One Close is called, Stop has no effect. // used only for tests.
func (p *Daemon) Close() error { func (p *Daemon) stopKeepAlive() error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
@ -475,8 +329,10 @@ func (p *Daemon) Equal(raw Proxy) bool {
// We compare equality on a subset of the command configuration // We compare equality on a subset of the command configuration
return p.ProxyToken == p2.ProxyToken && return p.ProxyToken == p2.ProxyToken &&
p.Path == p2.Path && p.Command.Path == p2.Command.Path &&
reflect.DeepEqual(p.Args, p2.Args) p.Command.Dir == p2.Command.Dir &&
reflect.DeepEqual(p.Command.Args, p2.Command.Args) &&
reflect.DeepEqual(p.Command.Env, p2.Command.Env)
} }
// MarshalSnapshot implements Proxy // MarshalSnapshot implements Proxy
@ -490,10 +346,12 @@ func (p *Daemon) MarshalSnapshot() map[string]interface{} {
} }
return map[string]interface{}{ return map[string]interface{}{
"Pid": p.process.Pid, "Pid": p.process.Pid,
"Path": p.Path, "CommandPath": p.Command.Path,
"Args": p.Args, "CommandArgs": p.Command.Args,
"ProxyToken": p.ProxyToken, "CommandDir": p.Command.Dir,
"CommandEnv": p.Command.Env,
"ProxyToken": p.ProxyToken,
} }
} }
@ -509,8 +367,12 @@ func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error {
// Set the basic fields // Set the basic fields
p.ProxyToken = s.ProxyToken p.ProxyToken = s.ProxyToken
p.Path = s.Path p.Command = &exec.Cmd{
p.Args = s.Args Path: s.CommandPath,
Args: s.CommandArgs,
Dir: s.CommandDir,
Env: s.CommandEnv,
}
// FindProcess on many systems returns no error even if the process // FindProcess on many systems returns no error even if the process
// is now dead. We perform an extra check that the process is alive. // is now dead. We perform an extra check that the process is alive.
@ -536,12 +398,14 @@ func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error {
// within the manager snapshot and is restored automatically. // within the manager snapshot and is restored automatically.
type daemonSnapshot struct { type daemonSnapshot struct {
// Pid of the process. This is the only value actually required to // Pid of the process. This is the only value actually required to
// regain management control. The remainder values are for Equal. // regain mangement control. The remainder values are for Equal.
Pid int Pid int
// Command information // Command information
Path string CommandPath string
Args []string CommandArgs []string
CommandDir string
CommandEnv []string
// NOTE(mitchellh): longer term there are discussions/plans to only // NOTE(mitchellh): longer term there are discussions/plans to only
// store the hash of the token but for now we need the full token in // store the hash of the token but for now we need the full token in

View File

@ -3,8 +3,8 @@ package proxy
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"strconv"
"testing" "testing"
"time" "time"
@ -28,10 +28,12 @@ func TestDaemonStartStop(t *testing.T) {
uuid, err := uuid.GenerateUUID() uuid, err := uuid.GenerateUUID()
require.NoError(err) require.NoError(err)
d := helperProcessDaemon("start-stop", path) d := &Daemon{
d.ProxyId = "tubes" Command: helperProcess("start-stop", path),
d.ProxyToken = uuid ProxyId: "tubes",
d.Logger = testLogger ProxyToken: uuid,
Logger: testLogger,
}
require.NoError(d.Start()) require.NoError(d.Start())
defer d.Stop() defer d.Stop()
@ -66,78 +68,6 @@ func TestDaemonStartStop(t *testing.T) {
}) })
} }
func TestDaemonDetachesChild(t *testing.T) {
t.Parallel()
require := require.New(t)
td, closer := testTempDir(t)
defer closer()
path := filepath.Join(td, "file")
pidPath := filepath.Join(td, "child.pid")
// Start the parent process wrapping a start-stop test. The parent is acting
// as our "agent". We need an extra indirection to be able to kill the "agent"
// and still be running the test process.
parentCmd := helperProcess("parent", pidPath, "start-stop", path)
require.NoError(parentCmd.Start())
// Wait for the pid file to exist so we know parent is running
retry.Run(t, func(r *retry.R) {
_, err := os.Stat(pidPath)
if err == nil {
return
}
r.Fatalf("error: %s", err)
})
// And wait for the actual file to be sure the child is running (it should be
// since parent doesn't write PID until child starts but the child might not
// have completed the write to disk yet which causes flakiness below).
retry.Run(t, func(r *retry.R) {
_, err := os.Stat(path)
if err == nil {
return
}
r.Fatalf("error: %s", err)
})
// Always cleanup child process after
defer func() {
_, err := os.Stat(pidPath)
if err != nil {
return
}
bs, err := ioutil.ReadFile(pidPath)
require.NoError(err)
pid, err := strconv.Atoi(string(bs))
require.NoError(err)
proc, err := os.FindProcess(pid)
if err != nil {
return
}
proc.Kill()
}()
time.Sleep(20 * time.Second)
// Now kill the parent and wait for it
require.NoError(parentCmd.Process.Kill())
_, err := parentCmd.Process.Wait()
require.NoError(err)
time.Sleep(15 * time.Second)
// The child should still be running so file should still be there AND child processid should still be there
_, err = os.Stat(path)
require.NoError(err, "child should still be running")
// Let defer clean up the child process
}
func TestDaemonRestart(t *testing.T) { func TestDaemonRestart(t *testing.T) {
t.Parallel() t.Parallel()
@ -146,8 +76,10 @@ func TestDaemonRestart(t *testing.T) {
defer closer() defer closer()
path := filepath.Join(td, "file") path := filepath.Join(td, "file")
d := helperProcessDaemon("restart", path) d := &Daemon{
d.Logger = testLogger Command: helperProcess("restart", path),
Logger: testLogger,
}
require.NoError(d.Start()) require.NoError(d.Start())
defer d.Stop() defer d.Stop()
@ -179,11 +111,12 @@ func TestDaemonStop_kill(t *testing.T) {
path := filepath.Join(td, "file") path := filepath.Join(td, "file")
d := helperProcessDaemon("stop-kill", path) d := &Daemon{
d.ProxyToken = "hello" Command: helperProcess("stop-kill", path),
d.Logger = testLogger ProxyToken: "hello",
d.gracefulWait = 200 * time.Millisecond Logger: testLogger,
d.pollInterval = 100 * time.Millisecond gracefulWait: 200 * time.Millisecond,
}
require.NoError(d.Start()) require.NoError(d.Start())
// Wait for the file to exist // Wait for the file to exist
@ -199,7 +132,7 @@ func TestDaemonStop_kill(t *testing.T) {
// Stop the process // Stop the process
require.NoError(d.Stop()) require.NoError(d.Stop())
// Stat the file so that we can get the mtime // State the file so that we can get the mtime
fi, err := os.Stat(path) fi, err := os.Stat(path)
require.NoError(err) require.NoError(err)
mtime := fi.ModTime() mtime := fi.ModTime()
@ -216,7 +149,6 @@ func TestDaemonStart_pidFile(t *testing.T) {
require := require.New(t) require := require.New(t)
td, closer := testTempDir(t) td, closer := testTempDir(t)
defer closer() defer closer()
path := filepath.Join(td, "file") path := filepath.Join(td, "file")
@ -224,10 +156,12 @@ func TestDaemonStart_pidFile(t *testing.T) {
uuid, err := uuid.GenerateUUID() uuid, err := uuid.GenerateUUID()
require.NoError(err) require.NoError(err)
d := helperProcessDaemon("start-once", path) d := &Daemon{
d.ProxyToken = uuid Command: helperProcess("start-once", path),
d.Logger = testLogger ProxyToken: uuid,
d.PidPath = pidPath Logger: testLogger,
PidPath: pidPath,
}
require.NoError(d.Start()) require.NoError(d.Start())
defer d.Stop() defer d.Stop()
@ -264,9 +198,11 @@ func TestDaemonRestart_pidFile(t *testing.T) {
path := filepath.Join(td, "file") path := filepath.Join(td, "file")
pidPath := filepath.Join(td, "pid") pidPath := filepath.Join(td, "pid")
d := helperProcessDaemon("restart", path) d := &Daemon{
d.Logger = testLogger Command: helperProcess("restart", path),
d.PidPath = pidPath Logger: testLogger,
PidPath: pidPath,
}
require.NoError(d.Start()) require.NoError(d.Start())
defer d.Stop() defer d.Stop()
@ -308,32 +244,51 @@ func TestDaemonEqual(t *testing.T) {
}{ }{
{ {
"Different type", "Different type",
&Daemon{}, &Daemon{
Command: &exec.Cmd{},
},
&Noop{}, &Noop{},
false, false,
}, },
{ {
"Nil", "Nil",
&Daemon{}, &Daemon{
Command: &exec.Cmd{},
},
nil, nil,
false, false,
}, },
{ {
"Equal", "Equal",
&Daemon{}, &Daemon{
&Daemon{}, Command: &exec.Cmd{},
},
&Daemon{
Command: &exec.Cmd{},
},
true, true,
}, },
{ {
"Different path", "Different path",
&Daemon{ &Daemon{
Path: "/foo", Command: &exec.Cmd{Path: "/foo"},
}, },
&Daemon{ &Daemon{
Path: "/bar", Command: &exec.Cmd{Path: "/bar"},
},
false,
},
{
"Different dir",
&Daemon{
Command: &exec.Cmd{Dir: "/foo"},
},
&Daemon{
Command: &exec.Cmd{Dir: "/bar"},
}, },
false, false,
}, },
@ -341,10 +296,10 @@ func TestDaemonEqual(t *testing.T) {
{ {
"Different args", "Different args",
&Daemon{ &Daemon{
Args: []string{"foo"}, Command: &exec.Cmd{Args: []string{"foo"}},
}, },
&Daemon{ &Daemon{
Args: []string{"bar"}, Command: &exec.Cmd{Args: []string{"bar"}},
}, },
false, false,
}, },
@ -352,9 +307,11 @@ func TestDaemonEqual(t *testing.T) {
{ {
"Different token", "Different token",
&Daemon{ &Daemon{
Command: &exec.Cmd{},
ProxyToken: "one", ProxyToken: "one",
}, },
&Daemon{ &Daemon{
Command: &exec.Cmd{},
ProxyToken: "two", ProxyToken: "two",
}, },
false, false,
@ -378,7 +335,7 @@ func TestDaemonMarshalSnapshot(t *testing.T) {
{ {
"stopped daemon", "stopped daemon",
&Daemon{ &Daemon{
Path: "/foo", Command: &exec.Cmd{Path: "/foo"},
}, },
nil, nil,
}, },
@ -386,14 +343,16 @@ func TestDaemonMarshalSnapshot(t *testing.T) {
{ {
"basic", "basic",
&Daemon{ &Daemon{
Path: "/foo", Command: &exec.Cmd{Path: "/foo"},
process: &os.Process{Pid: 42}, process: &os.Process{Pid: 42},
}, },
map[string]interface{}{ map[string]interface{}{
"Pid": 42, "Pid": 42,
"Path": "/foo", "CommandPath": "/foo",
"Args": []string(nil), "CommandArgs": []string(nil),
"ProxyToken": "", "CommandDir": "",
"CommandEnv": []string(nil),
"ProxyToken": "",
}, },
}, },
} }
@ -417,9 +376,11 @@ func TestDaemonUnmarshalSnapshot(t *testing.T) {
uuid, err := uuid.GenerateUUID() uuid, err := uuid.GenerateUUID()
require.NoError(err) require.NoError(err)
d := helperProcessDaemon("start-stop", path) d := &Daemon{
d.ProxyToken = uuid Command: helperProcess("start-stop", path),
d.Logger = testLogger ProxyToken: uuid,
Logger: testLogger,
}
defer d.Stop() defer d.Stop()
require.NoError(d.Start()) require.NoError(d.Start())
@ -437,7 +398,7 @@ func TestDaemonUnmarshalSnapshot(t *testing.T) {
snap := d.MarshalSnapshot() snap := d.MarshalSnapshot()
// Stop the original daemon but keep it alive // Stop the original daemon but keep it alive
require.NoError(d.Close()) require.NoError(d.stopKeepAlive())
// Restore the second daemon // Restore the second daemon
d2 := &Daemon{Logger: testLogger} d2 := &Daemon{Logger: testLogger}
@ -469,9 +430,11 @@ func TestDaemonUnmarshalSnapshot_notRunning(t *testing.T) {
uuid, err := uuid.GenerateUUID() uuid, err := uuid.GenerateUUID()
require.NoError(err) require.NoError(err)
d := helperProcessDaemon("start-stop", path) d := &Daemon{
d.ProxyToken = uuid Command: helperProcess("start-stop", path),
d.Logger = testLogger ProxyToken: uuid,
Logger: testLogger,
}
defer d.Stop() defer d.Stop()
require.NoError(d.Start()) require.NoError(d.Start())

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"sync" "sync"
"time" "time"
@ -84,16 +85,6 @@ type Manager struct {
CoalescePeriod time.Duration CoalescePeriod time.Duration
QuiescentPeriod time.Duration QuiescentPeriod time.Duration
// DisableDetach is used by tests that don't actually care about detached
// child behaviour (i.e. outside proxy package) to bypass detaching and
// daemonizing Daemons. This makes tests much simpler as they don't need to
// implement a test-binary mode to enable self-exec daemonizing etc. and there
// are fewer risks of detached processes being spawned and then not killed in
// face of missed teardown/panic/interrupt of test runs etc. It's public since
// it needs to be configurable from the agent package when setting up the
// proxyManager instance.
DisableDetach bool
// lock is held while reading/writing any internal state of the manager. // lock is held while reading/writing any internal state of the manager.
// cond is a condition variable on lock that is broadcasted for runState // cond is a condition variable on lock that is broadcasted for runState
// changes. // changes.
@ -112,10 +103,6 @@ type Manager struct {
// proxies (unlikely scenario). // proxies (unlikely scenario).
lastSnapshot *snapshot lastSnapshot *snapshot
// daemonizeCmd is set only in tests to control the path and args to the
// daemonize command.
daemonizeCmd []string
proxies map[string]Proxy proxies map[string]Proxy
} }
@ -161,35 +148,7 @@ func (m *Manager) Close() error {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
return m.stop(func(p Proxy) error {
return p.Close()
})
}
// Kill will Close the manager and Kill all proxies that were being managed.
// Only ONE of Kill or Close must be called. If Close has been called already
// then this will have no effect.
func (m *Manager) Kill() error {
m.lock.Lock()
defer m.lock.Unlock()
return m.stop(func(p Proxy) error {
return p.Stop()
})
}
// stop stops the run loop and cleans up all the proxies by calling
// the given cleaner. If the cleaner returns an error the proxy won't be
// removed from the map.
//
// The lock must be held while this is called.
func (m *Manager) stop(cleaner func(Proxy) error) error {
for { for {
// Special case state that exits the for loop
if m.runState == managerStateStopped {
break
}
switch m.runState { switch m.runState {
case managerStateIdle: case managerStateIdle:
// Idle so just set it to stopped and return. We notify // Idle so just set it to stopped and return. We notify
@ -208,13 +167,29 @@ func (m *Manager) stop(cleaner func(Proxy) error) error {
case managerStateStopping: case managerStateStopping:
// Still stopping, wait... // Still stopping, wait...
m.cond.Wait() m.cond.Wait()
case managerStateStopped:
// Stopped, target state reached
return nil
} }
} }
}
// Kill will Close the manager and Kill all proxies that were being managed.
//
// This is safe to call with Close already called since Close is idempotent.
func (m *Manager) Kill() error {
// Close first so that we aren't getting changes in proxies
if err := m.Close(); err != nil {
return err
}
m.lock.Lock()
defer m.lock.Unlock()
// Clean up all the proxies
var err error var err error
for id, proxy := range m.proxies { for id, proxy := range m.proxies {
if err := cleaner(proxy); err != nil { if err := proxy.Stop(); err != nil {
err = multierror.Append( err = multierror.Append(
err, fmt.Errorf("failed to stop proxy %q: %s", id, err)) err, fmt.Errorf("failed to stop proxy %q: %s", id, err))
continue continue
@ -426,13 +401,18 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
return nil, fmt.Errorf("daemon mode managed proxy requires command") return nil, fmt.Errorf("daemon mode managed proxy requires command")
} }
// Build the command to execute.
var cmd exec.Cmd
cmd.Path = command[0]
cmd.Args = command // idx 0 is path but preserved since it should be
if err := m.configureLogDir(id, &cmd); err != nil {
return nil, fmt.Errorf("error configuring proxy logs: %s", err)
}
// Build the daemon structure // Build the daemon structure
proxy.Path = command[0] proxy.Command = &cmd
proxy.Args = command // idx 0 is path but preserved since it should be
proxy.ProxyId = id proxy.ProxyId = id
proxy.ProxyToken = mp.ProxyToken proxy.ProxyToken = mp.ProxyToken
proxy.daemonizeCmd = m.daemonizeCmd
proxy.DisableDetach = m.DisableDetach
return proxy, nil return proxy, nil
default: default:
@ -447,10 +427,8 @@ func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy
switch mode { switch mode {
case structs.ProxyExecModeDaemon: case structs.ProxyExecModeDaemon:
return &Daemon{ return &Daemon{
Logger: m.Logger, Logger: m.Logger,
StdoutPath: logPath(filepath.Join(m.DataDir, "logs"), id, "stdout"), PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id),
StderrPath: logPath(filepath.Join(m.DataDir, "logs"), id, "stderr"),
PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id),
}, nil }, nil
default: default:
@ -458,6 +436,41 @@ func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy
} }
} }
// configureLogDir sets up the file descriptors to stdout/stderr so that
// they log to the proper file path for the given service ID.
func (m *Manager) configureLogDir(id string, cmd *exec.Cmd) error {
// Create the log directory
logDir := ""
if m.DataDir != "" {
logDir = filepath.Join(m.DataDir, "logs")
if err := os.MkdirAll(logDir, 0700); err != nil {
return err
}
}
// Configure the stdout, stderr paths
stdoutPath := logPath(logDir, id, "stdout")
stderrPath := logPath(logDir, id, "stderr")
// Open the files. We want to append to each. We expect these files
// to be rotated by some external process.
stdoutF, err := os.OpenFile(stdoutPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("error creating stdout file: %s", err)
}
stderrF, err := os.OpenFile(stderrPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
// Don't forget to close stdoutF which successfully opened
stdoutF.Close()
return fmt.Errorf("error creating stderr file: %s", err)
}
cmd.Stdout = stdoutF
cmd.Stderr = stderrF
return nil
}
// logPath is a helper to return the path to the log file for the given // logPath is a helper to return the path to the log file for the given
// directory, service ID, and stream type (stdout or stderr). // directory, service ID, and stream type (stdout or stderr).
func logPath(dir, id, stream string) string { func logPath(dir, id, stream string) string {

View File

@ -324,7 +324,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) {
// Add a second proxy so that we can determine when we're up // Add a second proxy so that we can determine when we're up
// and running. // and running.
path2 := filepath.Join(td, "file2") path2 := filepath.Join(td, "file")
testStateProxy(t, state, "db", helperProcess("start-stop", path2)) testStateProxy(t, state, "db", helperProcess("start-stop", path2))
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
_, err := os.Stat(path2) _, err := os.Stat(path2)
@ -343,7 +343,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) {
if err != nil { if err != nil {
return return
} }
r.Fatalf("file still exists: %s", path) r.Fatalf("file still exists")
}) })
} }
@ -361,10 +361,6 @@ func testManager(t *testing.T) (*Manager, func()) {
td, closer := testTempDir(t) td, closer := testTempDir(t)
m.DataDir = td m.DataDir = td
// Override daemonize command to use the built-in test binary. Note that Args
// includes the binary path as first arg.
m.daemonizeCmd = helperProcess("daemonize").Args
return m, func() { closer() } return m, func() { closer() }
} }
@ -372,9 +368,8 @@ func testManager(t *testing.T) (*Manager, func()) {
// (expected to be from the helperProcess function call). It returns the // (expected to be from the helperProcess function call). It returns the
// ID for deregistration. // ID for deregistration.
func testStateProxy(t *testing.T, state *local.State, service string, cmd *exec.Cmd) string { func testStateProxy(t *testing.T, state *local.State, service string, cmd *exec.Cmd) string {
// Note that exec.Command already ensures the command name is the first command := []string{cmd.Path}
// argument in the list so no need to append again command = append(command, cmd.Args...)
command := cmd.Args
require.NoError(t, state.AddService(&structs.NodeService{ require.NoError(t, state.AddService(&structs.NodeService{
Service: service, Service: service,
@ -384,7 +379,7 @@ func testStateProxy(t *testing.T, state *local.State, service string, cmd *exec.
ExecMode: structs.ProxyExecModeDaemon, ExecMode: structs.ProxyExecModeDaemon,
Command: command, Command: command,
TargetServiceID: service, TargetServiceID: service,
}, "token", "") }, "token")
require.NoError(t, err) require.NoError(t, err)
return p.Proxy.ProxyService.ID return p.Proxy.ProxyService.ID

View File

@ -5,7 +5,6 @@ type Noop struct{}
func (p *Noop) Start() error { return nil } func (p *Noop) Start() error { return nil }
func (p *Noop) Stop() error { return nil } func (p *Noop) Stop() error { return nil }
func (p *Noop) Close() error { return nil }
func (p *Noop) Equal(Proxy) bool { return true } func (p *Noop) Equal(Proxy) bool { return true }
func (p *Noop) MarshalSnapshot() map[string]interface{} { return nil } func (p *Noop) MarshalSnapshot() map[string]interface{} { return nil }
func (p *Noop) UnmarshalSnapshot(map[string]interface{}) error { return nil } func (p *Noop) UnmarshalSnapshot(map[string]interface{}) error { return nil }

View File

@ -1,42 +0,0 @@
package proxy
import (
"strings"
"time"
)
// isProcessAlreadyFinishedErr does a janky comparison with an error string
// defined in os/exec_unix.go and os/exec_windows.go which we encounter due to
// races with polling the external process. These case tests to fail since Stop
// returns an error sometimes so we should notice if this string stops matching
// the error in a future go version.
func isProcessAlreadyFinishedErr(err error) bool {
return strings.Contains(err.Error(), "os: process already finished")
}
// externalWait mimics process.Wait for an external process. The returned
// channel is closed when the process exits. It works by polling the process
// with signal 0 and verifying no error is returned. A closer func is also
// returned that can be invoked to terminate the waiter early.
func externalWait(pid int, pollInterval time.Duration) (<-chan struct{}, func()) {
ch := make(chan struct{})
stopCh := make(chan struct{})
closer := func() {
close(stopCh)
}
go func() {
for {
select {
case <-stopCh:
return
default:
}
if _, err := findProcess(pid); err != nil {
close(ch)
return
}
time.Sleep(pollInterval)
}
}()
return ch, closer
}

View File

@ -1,70 +0,0 @@
package proxy
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/hashicorp/consul/testutil/retry"
"github.com/stretchr/testify/require"
)
func TestExternalWait(t *testing.T) {
t.Parallel()
require := require.New(t)
td, closer := testTempDir(t)
defer closer()
path := filepath.Join(td, "file")
cmd := helperProcess("restart", path)
require.NoError(cmd.Start())
exitCh := make(chan struct{})
// Launch waiter to make sure this process isn't zombified when it exits part
// way through the test.
go func() {
cmd.Process.Wait()
close(exitCh)
}()
defer cmd.Process.Kill()
// Create waiter
pollInterval := 1 * time.Millisecond
waitCh, closer := externalWait(cmd.Process.Pid, pollInterval)
defer closer()
// Wait for the file to exist so we don't rely on timing to not race with
// process startup.
retry.Run(t, func(r *retry.R) {
_, err := os.Stat(path)
if err == nil {
return
}
r.Fatalf("error: %s", err)
})
// waitCh should not be closed until process quits. We'll wait a bit to verify
// we weren't just too quick to see a process exit
select {
case <-waitCh:
t.Fatal("waitCh should not be closed yet")
default:
}
// Delete the file
require.NoError(os.Remove(path))
// Wait for the child to actually exit cleanly
<-exitCh
// Now we _should_ see waitCh close (need to wait at least a whole poll
// interval)
select {
case <-waitCh:
// OK
case <-time.After(10 * pollInterval):
t.Fatal("waitCh should be closed")
}
}

View File

@ -7,8 +7,8 @@ import (
) )
func findProcess(pid int) (*os.Process, error) { func findProcess(pid int) (*os.Process, error) {
// On Windows, os.FindProcess will error if the process is not alive, so we // On Windows, os.FindProcess will error if the process is not alive,
// don't have to do any further checking. The nature of it being non-nil means // so we don't have to do any further checking. The nature of it being
// it seems to be healthy. // non-nil means it seems to be healthy.
return os.FindProcess(pid) return os.FindProcess(pid)
} }

View File

@ -40,18 +40,12 @@ type Proxy interface {
Start() error Start() error
// Stop stops the proxy and disallows it from ever being started again. // Stop stops the proxy and disallows it from ever being started again.
// This should also clean up any resources used by this Proxy.
// //
// If the proxy is not started yet, this should not return an error, but // If the proxy is not started yet, this should not return an error, but
// it should disallow Start from working again. If the proxy is already // it should disallow Start from working again. If the proxy is already
// stopped, this should not return an error. // stopped, this should not return an error.
Stop() error Stop() error
// Close should clean up any resources associated with this proxy but
// keep it running in the background. Only one of Close or Stop can be
// called.
Close() error
// Equal returns true if the argument is equal to the proxy being called. // Equal returns true if the argument is equal to the proxy being called.
// This is called by the manager to determine if a change in configuration // This is called by the manager to determine if a change in configuration
// results in a proxy that needs to be restarted or not. If Equal returns // results in a proxy that needs to be restarted or not. If Equal returns

View File

@ -7,12 +7,8 @@ import (
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
"strconv"
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/command/connect/daemonize"
"github.com/mitchellh/cli"
) )
// testLogger is a logger that can be used by tests that require a // testLogger is a logger that can be used by tests that require a
@ -53,25 +49,6 @@ func helperProcess(s ...string) *exec.Cmd {
return cmd return cmd
} }
// helperProcessDaemon returns a *Daemon that can be used to execute the
// TestHelperProcess function below. This can be used to test multi-process
// interactions. The Daemon has it's Path, Args and stdio paths populated but
// other fields might need to be set depending on test requirements.
//
// This relies on the TestMainInterceptDaemonize hack being active.
func helperProcessDaemon(s ...string) *Daemon {
cs := []string{os.Args[0], "-test.run=TestHelperProcess", "--", helperProcessSentinel}
cs = append(cs, s...)
return &Daemon{
Path: os.Args[0],
Args: cs,
StdoutPath: "_", // dev null them for now
StderrPath: "_",
daemonizeCmd: helperProcess("daemonize").Args,
}
}
// This is not a real test. This is just a helper process kicked off by tests // This is not a real test. This is just a helper process kicked off by tests
// using the helperProcess helper function. // using the helperProcess helper function.
func TestHelperProcess(t *testing.T) { func TestHelperProcess(t *testing.T) {
@ -178,53 +155,6 @@ func TestHelperProcess(t *testing.T) {
<-make(chan struct{}) <-make(chan struct{})
// Parent runs the given process in a Daemon and then sleeps until the test
// code kills it. It exists to test that the Daemon-managed child process
// survives it's parent exiting which we can't test directly without exiting
// the test process so we need an extra level of indirection. The test code
// using this must pass a file path as the first argument for the child
// processes PID to be written and then must take care to clean up that PID
// later or the child will be left running forever.
case "parent":
// We will write the PID for the child to the file in the first argument
// then pass rest of args through to command.
pidFile := args[0]
d := helperProcess(args[1:]...)
if err := d.Start(); err != nil {
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
os.Exit(1)
}
// Write PID
pidBs := []byte(strconv.Itoa(d.Process.Pid))
if err := ioutil.WriteFile(pidFile, pidBs, 0644); err != nil {
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
// TODO: Also kill the detached process (once it is detached)
os.Exit(1)
}
// Wait "forever" (calling test chooses when we exit with signal/Wait to
// minimise coordination).
for {
time.Sleep(time.Hour)
}
case "daemonize":
// Run daemonize!
ui := &cli.BasicUi{Writer: os.Stdout, ErrorWriter: os.Stderr}
cli := &cli.CLI{
Args: append([]string{"daemonize"}, args...),
Commands: map[string]cli.CommandFactory{
"daemonize": func() (cli.Command, error) {
return daemonize.New(ui), nil
},
},
}
exitCode, err := cli.Run()
if err != nil {
log.Printf("[ERR] running hijacked daemonize command: %s", err)
}
os.Exit(exitCode)
default: default:
fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmd) fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmd)
os.Exit(2) os.Exit(2)

View File

@ -376,14 +376,6 @@ func TestConfig(sources ...config.Source) *config.RuntimeConfig {
fmt.Println("WARNING:", w) fmt.Println("WARNING:", w)
} }
// Set internal flag to simplify connect daemon execution. We test the full
// daemonization behaviour explicitly in `proxy` package, everywhere else it's
// just super painful to setup self-executable daemonize commands etc. For now
// this is not overridable because it's simpler not to expose this config
// publically at all but we could revisit that later if there is a legitimate
// reason to want to test full detached daemon behaviour with a TestAgent.
cfg.ConnectDisableDetachedDaemons = true
return &cfg return &cfg
} }

View File

@ -1140,7 +1140,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
Port: 8000, Port: 8000,
Connect: &AgentServiceConnect{ Connect: &AgentServiceConnect{
Proxy: &AgentServiceConnectProxy{ Proxy: &AgentServiceConnectProxy{
Command: []string{"consul", "connect", "proxy"}, Command: []string{"consul connect proxy"},
Config: map[string]interface{}{ Config: map[string]interface{}{
"foo": "bar", "foo": "bar",
}, },
@ -1157,7 +1157,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
ProxyServiceID: "foo-proxy", ProxyServiceID: "foo-proxy",
TargetServiceID: "foo", TargetServiceID: "foo",
TargetServiceName: "foo", TargetServiceName: "foo",
ContentHash: "2a29f8237db69d0e", ContentHash: "93baee1d838888ae",
ExecMode: "daemon", ExecMode: "daemon",
Command: []string{"consul", "connect", "proxy"}, Command: []string{"consul", "connect", "proxy"},
Config: map[string]interface{}{ Config: map[string]interface{}{

View File

@ -10,7 +10,6 @@ import (
"github.com/hashicorp/consul/command/connect/ca" "github.com/hashicorp/consul/command/connect/ca"
caget "github.com/hashicorp/consul/command/connect/ca/get" caget "github.com/hashicorp/consul/command/connect/ca/get"
caset "github.com/hashicorp/consul/command/connect/ca/set" caset "github.com/hashicorp/consul/command/connect/ca/set"
"github.com/hashicorp/consul/command/connect/daemonize"
"github.com/hashicorp/consul/command/connect/proxy" "github.com/hashicorp/consul/command/connect/proxy"
"github.com/hashicorp/consul/command/event" "github.com/hashicorp/consul/command/event"
"github.com/hashicorp/consul/command/exec" "github.com/hashicorp/consul/command/exec"
@ -75,7 +74,6 @@ func init() {
Register("connect ca get-config", func(ui cli.Ui) (cli.Command, error) { return caget.New(ui), nil }) Register("connect ca get-config", func(ui cli.Ui) (cli.Command, error) { return caget.New(ui), nil })
Register("connect ca set-config", func(ui cli.Ui) (cli.Command, error) { return caset.New(ui), nil }) Register("connect ca set-config", func(ui cli.Ui) (cli.Command, error) { return caset.New(ui), nil })
Register("connect proxy", func(ui cli.Ui) (cli.Command, error) { return proxy.New(ui, MakeShutdownCh()), nil }) Register("connect proxy", func(ui cli.Ui) (cli.Command, error) { return proxy.New(ui, MakeShutdownCh()), nil })
RegisterHidden("connect daemonize", func(ui cli.Ui) (cli.Command, error) { return daemonize.New(ui), nil })
Register("event", func(ui cli.Ui) (cli.Command, error) { return event.New(ui), nil }) Register("event", func(ui cli.Ui) (cli.Command, error) { return event.New(ui), nil })
Register("exec", func(ui cli.Ui) (cli.Command, error) { return exec.New(ui, MakeShutdownCh()), nil }) Register("exec", func(ui cli.Ui) (cli.Command, error) { return exec.New(ui, MakeShutdownCh()), nil })
Register("force-leave", func(ui cli.Ui) (cli.Command, error) { return forceleave.New(ui), nil }) Register("force-leave", func(ui cli.Ui) (cli.Command, error) { return forceleave.New(ui), nil })

View File

@ -1,88 +0,0 @@
package daemonize
import (
"fmt"
_ "net/http/pprof"
"os" // Expose pprof if configured
"os/exec"
"syscall"
"github.com/mitchellh/cli"
)
func New(ui cli.Ui) *cmd {
return &cmd{UI: ui}
}
type cmd struct {
UI cli.Ui
stdoutPath string
stderrPath string
cmdArgs []string
}
func (c *cmd) Run(args []string) int {
numArgs := len(args)
if numArgs < 3 {
c.UI.Error("Need at least 3 arguments; stdoutPath, stdinPath, " +
"executablePath [arguments...]")
os.Exit(1)
}
c.stdoutPath, c.stderrPath = args[0], args[1]
c.cmdArgs = args[2:] // includes the executable as arg 0 as expected
// Open log files if specified
var stdoutF, stderrF *os.File
var err error
if c.stdoutPath != "_" {
stdoutF, err = os.OpenFile(c.stdoutPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
c.UI.Error(fmt.Sprintf("error creating stdout file: %s", err))
return 1
}
}
if c.stderrPath != "_" {
stderrF, err = os.OpenFile(c.stderrPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
c.UI.Error(fmt.Sprintf("error creating stderr file: %s", err))
return 1
}
}
// Exec the command passed in a new session then exit to ensure it's adopted
// by the init process. Use the passed file paths for std out/err.
cmd := &exec.Cmd{
Path: c.cmdArgs[0],
Args: c.cmdArgs,
// Inherit Dir and Env by default.
SysProcAttr: &syscall.SysProcAttr{Setsid: true},
}
cmd.Stdin = nil
cmd.Stdout = stdoutF
cmd.Stderr = stderrF
// Exec the child
err = cmd.Start()
if err != nil {
c.UI.Error("command failed with error: " + err.Error())
os.Exit(1)
}
// Print it's PID to stdout
fmt.Fprintf(os.Stdout, "%d\n", cmd.Process.Pid)
// Release (no-op on unix) and exit to orphan the child and get it adopted by
// init.
cmd.Process.Release()
return 0
}
func (c *cmd) Synopsis() string {
return ""
}
func (c *cmd) Help() string {
return ""
}

View File

@ -24,36 +24,12 @@ func Register(name string, fn Factory) {
registry[name] = fn registry[name] = fn
} }
// RegisterHidden adds a new CLI sub-command to the registry that won't show up
// in help or autocomplete.
func RegisterHidden(name string, fn Factory) {
if hiddenRegistry == nil {
hiddenRegistry = make(map[string]Factory)
}
if hiddenRegistry[name] != nil {
panic(fmt.Errorf("Command %q is already registered", name))
}
hiddenRegistry[name] = fn
}
// Map returns a realized mapping of available CLI commands in a format that // Map returns a realized mapping of available CLI commands in a format that
// the CLI class can consume. This should be called after all registration is // the CLI class can consume. This should be called after all registration is
// complete. // complete.
func Map(ui cli.Ui) map[string]cli.CommandFactory { func Map(ui cli.Ui) map[string]cli.CommandFactory {
return makeCommands(ui, registry)
}
// Map returns a realized mapping of available but hidden CLI commands in a
// format that the CLI class can consume. This should be called after all
// registration is complete.
func MapHidden(ui cli.Ui) map[string]cli.CommandFactory {
return makeCommands(ui, hiddenRegistry)
}
func makeCommands(ui cli.Ui, reg map[string]Factory) map[string]cli.CommandFactory {
m := make(map[string]cli.CommandFactory) m := make(map[string]cli.CommandFactory)
for name, fn := range reg { for name, fn := range registry {
thisFn := fn thisFn := fn
m[name] = func() (cli.Command, error) { m[name] = func() (cli.Command, error) {
return thisFn(ui) return thisFn(ui)
@ -66,10 +42,6 @@ func makeCommands(ui cli.Ui, reg map[string]Factory) map[string]cli.CommandFacto
// command name. This should be populated at package init() time via Register(). // command name. This should be populated at package init() time via Register().
var registry map[string]Factory var registry map[string]Factory
// hiddenRegistry behaves identically to registry but is for commands that are
// hidden - i.e. not publically documented in the help or autocomplete.
var hiddenRegistry map[string]Factory
// MakeShutdownCh returns a channel that can be used for shutdown notifications // MakeShutdownCh returns a channel that can be used for shutdown notifications
// for commands. This channel will send a message for every interrupt or SIGTERM // for commands. This channel will send a message for every interrupt or SIGTERM
// received. // received.

View File

@ -41,13 +41,6 @@ func realMain() int {
names = append(names, c) names = append(names, c)
} }
// Add hidden command
hidden := command.MapHidden(ui)
for name, cmd := range hidden {
// Don't add names to help since they are hidden!
cmds[name] = cmd
}
cli := &cli.CLI{ cli := &cli.CLI{
Args: args, Args: args,
Commands: cmds, Commands: cmds,