Make tests pass and clean proxy persistence. No detached child changes yet.

This is a good state for persistence stuff to re-start the detached child work that got mixed up last time.
This commit is contained in:
Paul Banks 2018-06-05 13:22:32 +01:00 committed by Jack Pearkes
parent 3bac52480e
commit 1e5a2561b6
9 changed files with 118 additions and 22 deletions

View File

@ -15,8 +15,6 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"

View File

@ -1691,7 +1691,7 @@ func TestStateProxyManagement(t *testing.T) {
require := require.New(t)
assert := assert.New(t)
_, err := state.AddProxy(&p1, "fake-token")
_, err := state.AddProxy(&p1, "fake-token", "")
require.Error(err, "should fail as the target service isn't registered")
// Sanity check done, lets add a couple of target services to the state
@ -1710,7 +1710,7 @@ func TestStateProxyManagement(t *testing.T) {
require.NoError(err)
// Should work now
pstate, err := state.AddProxy(&p1, "fake-token")
pstate, err := state.AddProxy(&p1, "fake-token", "")
require.NoError(err)
svc := pstate.Proxy.ProxyService
@ -1724,8 +1724,9 @@ func TestStateProxyManagement(t *testing.T) {
{
// Re-registering same proxy again should not pick a random port but re-use
// the assigned one.
pstateDup, err := state.AddProxy(&p1, "fake-token")
// the assigned one. It should also keep the same proxy token since we don't
// want to force restart for config change.
pstateDup, err := state.AddProxy(&p1, "fake-token", "")
require.NoError(err)
svcDup := pstateDup.Proxy.ProxyService
@ -1736,6 +1737,8 @@ func TestStateProxyManagement(t *testing.T) {
assert.Equal("", svcDup.Address, "should have empty address by default")
// Port must be same as before
assert.Equal(svc.Port, svcDup.Port)
// Same ProxyToken
assert.Equal(pstate.ProxyToken, pstateDup.ProxyToken)
}
// Let's register a notifier now
@ -1748,7 +1751,7 @@ func TestStateProxyManagement(t *testing.T) {
// Second proxy should claim other port
p2 := p1
p2.TargetServiceID = "cache"
pstate2, err := state.AddProxy(&p2, "fake-token")
pstate2, err := state.AddProxy(&p2, "fake-token", "")
require.NoError(err)
svc2 := pstate2.Proxy.ProxyService
assert.Contains([]int{20000, 20001}, svc2.Port)
@ -1764,7 +1767,7 @@ func TestStateProxyManagement(t *testing.T) {
// Third proxy should fail as all ports are used
p3 := p1
p3.TargetServiceID = "db"
_, err = state.AddProxy(&p3, "fake-token")
_, err = state.AddProxy(&p3, "fake-token", "")
require.Error(err)
// Should have a notification but we'll do nothing so that the next
@ -1775,7 +1778,7 @@ func TestStateProxyManagement(t *testing.T) {
"bind_port": 1234,
"bind_address": "0.0.0.0",
}
pstate3, err := state.AddProxy(&p3, "fake-token")
pstate3, err := state.AddProxy(&p3, "fake-token", "")
require.NoError(err)
svc3 := pstate3.Proxy.ProxyService
require.Equal("0.0.0.0", svc3.Address)
@ -1793,7 +1796,7 @@ func TestStateProxyManagement(t *testing.T) {
require.NotNil(gotP3)
var ws memdb.WatchSet
ws.Add(gotP3.WatchCh)
pstate3, err = state.AddProxy(&p3updated, "fake-token")
pstate3, err = state.AddProxy(&p3updated, "fake-token", "")
require.NoError(err)
svc3 = pstate3.Proxy.ProxyService
require.Equal("0.0.0.0", svc3.Address)
@ -1817,7 +1820,7 @@ func TestStateProxyManagement(t *testing.T) {
// Should be able to create a new proxy for that service with the port (it
// should have been "freed").
p4 := p2
pstate4, err := state.AddProxy(&p4, "fake-token")
pstate4, err := state.AddProxy(&p4, "fake-token", "")
require.NoError(err)
svc4 := pstate4.Proxy.ProxyService
assert.Contains([]int{20000, 20001}, svc2.Port)
@ -1855,6 +1858,68 @@ func TestStateProxyManagement(t *testing.T) {
}
}
// Tests the logic for retaining tokens and ports through restore (i.e.
// proxy-service already restored and token passed in externally)
func TestStateProxyRestore(t *testing.T) {
t.Parallel()
state := local.NewState(local.Config{
// Wide random range to make it very unlikely to pass by chance
ProxyBindMinPort: 10000,
ProxyBindMaxPort: 20000,
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
// Stub state syncing
state.TriggerSyncChanges = func() {}
webSvc := structs.NodeService{
Service: "web",
}
p1 := structs.ConnectManagedProxy{
ExecMode: structs.ProxyExecModeDaemon,
Command: []string{"consul", "connect", "proxy"},
TargetServiceID: "web",
}
p2 := p1
require := require.New(t)
assert := assert.New(t)
// Add a target service
require.NoError(state.AddService(&webSvc, "fake-token-web"))
// Add the proxy for first time to get the proper service definition to
// register
pstate, err := state.AddProxy(&p1, "fake-token", "")
require.NoError(err)
// Now start again with a brand new state
state2 := local.NewState(local.Config{
// Wide random range to make it very unlikely to pass by chance
ProxyBindMinPort: 10000,
ProxyBindMaxPort: 20000,
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
// Stub state syncing
state2.TriggerSyncChanges = func() {}
// Register the target service
require.NoError(state2.AddService(&webSvc, "fake-token-web"))
// "Restore" the proxy service
require.NoError(state.AddService(p1.ProxyService, "fake-token-web"))
// Now we can AddProxy with the "restored" token
pstate2, err := state.AddProxy(&p2, "fake-token", pstate.ProxyToken)
require.NoError(err)
// Check it still has the same port and token as before
assert.Equal(pstate.ProxyToken, pstate2.ProxyToken)
assert.Equal(p1.ProxyService.Port, p2.ProxyService.Port)
}
// drainCh drains a channel by reading messages until it would block.
func drainCh(ch chan struct{}) {
for {

View File

@ -293,16 +293,28 @@ func (p *Daemon) Stop() error {
case <-time.After(gracefulWait):
// Interrupt didn't work
p.Logger.Printf("[DEBUG] agent/proxy: graceful wait of %s passed, "+
"killing", gracefulWait)
}
} else if isProcessAlreadyFinishedErr(err) {
// This can happen due to races between signals and polling.
return nil
} else {
p.Logger.Printf("[DEBUG] agent/proxy: sigint failed, killing: %s", err)
}
// Graceful didn't work, forcibly kill
return process.Kill()
// Graceful didn't work (e.g. on windows where SIGINT isn't implemented),
// forcibly kill
err = process.Kill()
if err != nil && isProcessAlreadyFinishedErr(err) {
return nil
}
return err
}
// stopKeepAlive is like Stop but keeps the process running. This is
// used only for tests.
func (p *Daemon) stopKeepAlive() error {
// Close implements Proxy by stopping the run loop but not killing the process.
// One Close is called, Stop has no effect.
func (p *Daemon) Close() error {
p.lock.Lock()
defer p.lock.Unlock()

View File

@ -398,7 +398,7 @@ func TestDaemonUnmarshalSnapshot(t *testing.T) {
snap := d.MarshalSnapshot()
// Stop the original daemon but keep it alive
require.NoError(d.stopKeepAlive())
require.NoError(d.Close())
// Restore the second daemon
d2 := &Daemon{Logger: testLogger}

View File

@ -324,7 +324,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) {
// Add a second proxy so that we can determine when we're up
// and running.
path2 := filepath.Join(td, "file")
path2 := filepath.Join(td, "file2")
testStateProxy(t, state, "db", helperProcess("start-stop", path2))
retry.Run(t, func(r *retry.R) {
_, err := os.Stat(path2)
@ -343,7 +343,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) {
if err != nil {
return
}
r.Fatalf("file still exists")
r.Fatalf("file still exists: %s", path)
})
}
@ -379,7 +379,7 @@ func testStateProxy(t *testing.T, state *local.State, service string, cmd *exec.
ExecMode: structs.ProxyExecModeDaemon,
Command: command,
TargetServiceID: service,
}, "token")
}, "token", "")
require.NoError(t, err)
return p.Proxy.ProxyService.ID

View File

@ -5,6 +5,7 @@ type Noop struct{}
func (p *Noop) Start() error { return nil }
func (p *Noop) Stop() error { return nil }
func (p *Noop) Close() error { return nil }
func (p *Noop) Equal(Proxy) bool { return true }
func (p *Noop) MarshalSnapshot() map[string]interface{} { return nil }
func (p *Noop) UnmarshalSnapshot(map[string]interface{}) error { return nil }

14
agent/proxy/process.go Normal file
View File

@ -0,0 +1,14 @@
package proxy
import (
"strings"
)
// isProcessAlreadyFinishedErr does a janky comparison with an error string
// defined in os/exec_unix.go and os/exec_windows.go which we encounter due to
// races with polling the external process. These case tests to fail since Stop
// returns an error sometimes so we should notice if this string stops matching
// the error in a future go version.
func isProcessAlreadyFinishedErr(err error) bool {
return strings.Contains(err.Error(), "os: process already finished")
}

View File

@ -40,12 +40,18 @@ type Proxy interface {
Start() error
// Stop stops the proxy and disallows it from ever being started again.
// This should also clean up any resources used by this Proxy.
//
// If the proxy is not started yet, this should not return an error, but
// it should disallow Start from working again. If the proxy is already
// stopped, this should not return an error.
Stop() error
// Close should clean up any resources associated with this proxy but
// keep it running in the background. Only one of Close or Stop can be
// called.
Close() error
// Equal returns true if the argument is equal to the proxy being called.
// This is called by the manager to determine if a change in configuration
// results in a proxy that needs to be restarted or not. If Equal returns

View File

@ -1140,7 +1140,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
Port: 8000,
Connect: &AgentServiceConnect{
Proxy: &AgentServiceConnectProxy{
Command: []string{"consul connect proxy"},
Command: []string{"consul", "connect", "proxy"},
Config: map[string]interface{}{
"foo": "bar",
},
@ -1157,7 +1157,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
ProxyServiceID: "foo-proxy",
TargetServiceID: "foo",
TargetServiceName: "foo",
ContentHash: "93baee1d838888ae",
ContentHash: "2a29f8237db69d0e",
ExecMode: "daemon",
Command: []string{"consul", "connect", "proxy"},
Config: map[string]interface{}{