diff --git a/agent/agent_test.go b/agent/agent_test.go index fac16fba5..afc4daa2d 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -15,8 +15,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 3f416fbb3..5c66a77d0 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1691,7 +1691,7 @@ func TestStateProxyManagement(t *testing.T) { require := require.New(t) assert := assert.New(t) - _, err := state.AddProxy(&p1, "fake-token") + _, err := state.AddProxy(&p1, "fake-token", "") require.Error(err, "should fail as the target service isn't registered") // Sanity check done, lets add a couple of target services to the state @@ -1710,7 +1710,7 @@ func TestStateProxyManagement(t *testing.T) { require.NoError(err) // Should work now - pstate, err := state.AddProxy(&p1, "fake-token") + pstate, err := state.AddProxy(&p1, "fake-token", "") require.NoError(err) svc := pstate.Proxy.ProxyService @@ -1724,8 +1724,9 @@ func TestStateProxyManagement(t *testing.T) { { // Re-registering same proxy again should not pick a random port but re-use - // the assigned one. - pstateDup, err := state.AddProxy(&p1, "fake-token") + // the assigned one. It should also keep the same proxy token since we don't + // want to force restart for config change. + pstateDup, err := state.AddProxy(&p1, "fake-token", "") require.NoError(err) svcDup := pstateDup.Proxy.ProxyService @@ -1736,6 +1737,8 @@ func TestStateProxyManagement(t *testing.T) { assert.Equal("", svcDup.Address, "should have empty address by default") // Port must be same as before assert.Equal(svc.Port, svcDup.Port) + // Same ProxyToken + assert.Equal(pstate.ProxyToken, pstateDup.ProxyToken) } // Let's register a notifier now @@ -1748,7 +1751,7 @@ func TestStateProxyManagement(t *testing.T) { // Second proxy should claim other port p2 := p1 p2.TargetServiceID = "cache" - pstate2, err := state.AddProxy(&p2, "fake-token") + pstate2, err := state.AddProxy(&p2, "fake-token", "") require.NoError(err) svc2 := pstate2.Proxy.ProxyService assert.Contains([]int{20000, 20001}, svc2.Port) @@ -1764,7 +1767,7 @@ func TestStateProxyManagement(t *testing.T) { // Third proxy should fail as all ports are used p3 := p1 p3.TargetServiceID = "db" - _, err = state.AddProxy(&p3, "fake-token") + _, err = state.AddProxy(&p3, "fake-token", "") require.Error(err) // Should have a notification but we'll do nothing so that the next @@ -1775,7 +1778,7 @@ func TestStateProxyManagement(t *testing.T) { "bind_port": 1234, "bind_address": "0.0.0.0", } - pstate3, err := state.AddProxy(&p3, "fake-token") + pstate3, err := state.AddProxy(&p3, "fake-token", "") require.NoError(err) svc3 := pstate3.Proxy.ProxyService require.Equal("0.0.0.0", svc3.Address) @@ -1793,7 +1796,7 @@ func TestStateProxyManagement(t *testing.T) { require.NotNil(gotP3) var ws memdb.WatchSet ws.Add(gotP3.WatchCh) - pstate3, err = state.AddProxy(&p3updated, "fake-token") + pstate3, err = state.AddProxy(&p3updated, "fake-token", "") require.NoError(err) svc3 = pstate3.Proxy.ProxyService require.Equal("0.0.0.0", svc3.Address) @@ -1817,7 +1820,7 @@ func TestStateProxyManagement(t *testing.T) { // Should be able to create a new proxy for that service with the port (it // should have been "freed"). p4 := p2 - pstate4, err := state.AddProxy(&p4, "fake-token") + pstate4, err := state.AddProxy(&p4, "fake-token", "") require.NoError(err) svc4 := pstate4.Proxy.ProxyService assert.Contains([]int{20000, 20001}, svc2.Port) @@ -1855,6 +1858,68 @@ func TestStateProxyManagement(t *testing.T) { } } +// Tests the logic for retaining tokens and ports through restore (i.e. +// proxy-service already restored and token passed in externally) +func TestStateProxyRestore(t *testing.T) { + t.Parallel() + + state := local.NewState(local.Config{ + // Wide random range to make it very unlikely to pass by chance + ProxyBindMinPort: 10000, + ProxyBindMaxPort: 20000, + }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}) + + // Stub state syncing + state.TriggerSyncChanges = func() {} + + webSvc := structs.NodeService{ + Service: "web", + } + + p1 := structs.ConnectManagedProxy{ + ExecMode: structs.ProxyExecModeDaemon, + Command: []string{"consul", "connect", "proxy"}, + TargetServiceID: "web", + } + + p2 := p1 + + require := require.New(t) + assert := assert.New(t) + + // Add a target service + require.NoError(state.AddService(&webSvc, "fake-token-web")) + + // Add the proxy for first time to get the proper service definition to + // register + pstate, err := state.AddProxy(&p1, "fake-token", "") + require.NoError(err) + + // Now start again with a brand new state + state2 := local.NewState(local.Config{ + // Wide random range to make it very unlikely to pass by chance + ProxyBindMinPort: 10000, + ProxyBindMaxPort: 20000, + }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}) + + // Stub state syncing + state2.TriggerSyncChanges = func() {} + + // Register the target service + require.NoError(state2.AddService(&webSvc, "fake-token-web")) + + // "Restore" the proxy service + require.NoError(state.AddService(p1.ProxyService, "fake-token-web")) + + // Now we can AddProxy with the "restored" token + pstate2, err := state.AddProxy(&p2, "fake-token", pstate.ProxyToken) + require.NoError(err) + + // Check it still has the same port and token as before + assert.Equal(pstate.ProxyToken, pstate2.ProxyToken) + assert.Equal(p1.ProxyService.Port, p2.ProxyService.Port) +} + // drainCh drains a channel by reading messages until it would block. func drainCh(ch chan struct{}) { for { diff --git a/agent/proxy/daemon.go b/agent/proxy/daemon.go index 013fbdc28..f18cc750e 100644 --- a/agent/proxy/daemon.go +++ b/agent/proxy/daemon.go @@ -293,16 +293,28 @@ func (p *Daemon) Stop() error { case <-time.After(gracefulWait): // Interrupt didn't work + p.Logger.Printf("[DEBUG] agent/proxy: graceful wait of %s passed, "+ + "killing", gracefulWait) } + } else if isProcessAlreadyFinishedErr(err) { + // This can happen due to races between signals and polling. + return nil + } else { + p.Logger.Printf("[DEBUG] agent/proxy: sigint failed, killing: %s", err) } - // Graceful didn't work, forcibly kill - return process.Kill() + // Graceful didn't work (e.g. on windows where SIGINT isn't implemented), + // forcibly kill + err = process.Kill() + if err != nil && isProcessAlreadyFinishedErr(err) { + return nil + } + return err } -// stopKeepAlive is like Stop but keeps the process running. This is -// used only for tests. -func (p *Daemon) stopKeepAlive() error { +// Close implements Proxy by stopping the run loop but not killing the process. +// One Close is called, Stop has no effect. +func (p *Daemon) Close() error { p.lock.Lock() defer p.lock.Unlock() diff --git a/agent/proxy/daemon_test.go b/agent/proxy/daemon_test.go index f08716276..7202ddd65 100644 --- a/agent/proxy/daemon_test.go +++ b/agent/proxy/daemon_test.go @@ -398,7 +398,7 @@ func TestDaemonUnmarshalSnapshot(t *testing.T) { snap := d.MarshalSnapshot() // Stop the original daemon but keep it alive - require.NoError(d.stopKeepAlive()) + require.NoError(d.Close()) // Restore the second daemon d2 := &Daemon{Logger: testLogger} diff --git a/agent/proxy/manager_test.go b/agent/proxy/manager_test.go index 28922cbfa..19ba42139 100644 --- a/agent/proxy/manager_test.go +++ b/agent/proxy/manager_test.go @@ -324,7 +324,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) { // Add a second proxy so that we can determine when we're up // and running. - path2 := filepath.Join(td, "file") + path2 := filepath.Join(td, "file2") testStateProxy(t, state, "db", helperProcess("start-stop", path2)) retry.Run(t, func(r *retry.R) { _, err := os.Stat(path2) @@ -343,7 +343,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) { if err != nil { return } - r.Fatalf("file still exists") + r.Fatalf("file still exists: %s", path) }) } @@ -379,7 +379,7 @@ func testStateProxy(t *testing.T, state *local.State, service string, cmd *exec. ExecMode: structs.ProxyExecModeDaemon, Command: command, TargetServiceID: service, - }, "token") + }, "token", "") require.NoError(t, err) return p.Proxy.ProxyService.ID diff --git a/agent/proxy/noop.go b/agent/proxy/noop.go index a96425d84..62599f895 100644 --- a/agent/proxy/noop.go +++ b/agent/proxy/noop.go @@ -5,6 +5,7 @@ type Noop struct{} func (p *Noop) Start() error { return nil } func (p *Noop) Stop() error { return nil } +func (p *Noop) Close() error { return nil } func (p *Noop) Equal(Proxy) bool { return true } func (p *Noop) MarshalSnapshot() map[string]interface{} { return nil } func (p *Noop) UnmarshalSnapshot(map[string]interface{}) error { return nil } diff --git a/agent/proxy/process.go b/agent/proxy/process.go new file mode 100644 index 000000000..d299941fd --- /dev/null +++ b/agent/proxy/process.go @@ -0,0 +1,14 @@ +package proxy + +import ( + "strings" +) + +// isProcessAlreadyFinishedErr does a janky comparison with an error string +// defined in os/exec_unix.go and os/exec_windows.go which we encounter due to +// races with polling the external process. These case tests to fail since Stop +// returns an error sometimes so we should notice if this string stops matching +// the error in a future go version. +func isProcessAlreadyFinishedErr(err error) bool { + return strings.Contains(err.Error(), "os: process already finished") +} diff --git a/agent/proxy/proxy.go b/agent/proxy/proxy.go index 90ae158f4..c8a1dd18a 100644 --- a/agent/proxy/proxy.go +++ b/agent/proxy/proxy.go @@ -40,12 +40,18 @@ type Proxy interface { Start() error // Stop stops the proxy and disallows it from ever being started again. + // This should also clean up any resources used by this Proxy. // // If the proxy is not started yet, this should not return an error, but // it should disallow Start from working again. If the proxy is already // stopped, this should not return an error. Stop() error + // Close should clean up any resources associated with this proxy but + // keep it running in the background. Only one of Close or Stop can be + // called. + Close() error + // Equal returns true if the argument is equal to the proxy being called. // This is called by the manager to determine if a change in configuration // results in a proxy that needs to be restarted or not. If Equal returns diff --git a/api/agent_test.go b/api/agent_test.go index 1066a0b42..949c517bd 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -1140,7 +1140,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) { Port: 8000, Connect: &AgentServiceConnect{ Proxy: &AgentServiceConnectProxy{ - Command: []string{"consul connect proxy"}, + Command: []string{"consul", "connect", "proxy"}, Config: map[string]interface{}{ "foo": "bar", }, @@ -1157,7 +1157,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) { ProxyServiceID: "foo-proxy", TargetServiceID: "foo", TargetServiceName: "foo", - ContentHash: "93baee1d838888ae", + ContentHash: "2a29f8237db69d0e", ExecMode: "daemon", Command: []string{"consul", "connect", "proxy"}, Config: map[string]interface{}{