agent/proxy: implement snapshotting for daemons
This commit is contained in:
parent
9675ed626d
commit
a3a0bc7b13
|
@ -11,6 +11,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/lib/file"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
|
||||
// Constants related to restart timers with the daemon mode proxies. At some
|
||||
|
@ -261,6 +262,26 @@ func (p *Daemon) Stop() error {
|
|||
return process.Kill()
|
||||
}
|
||||
|
||||
// stopKeepAlive is like Stop but keeps the process running. This is
|
||||
// used only for tests.
|
||||
func (p *Daemon) stopKeepAlive() error {
|
||||
p.lock.Lock()
|
||||
|
||||
// If we're already stopped or never started, then no problem.
|
||||
if p.stopped || p.process == nil {
|
||||
p.stopped = true
|
||||
p.lock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Note that we've stopped
|
||||
p.stopped = true
|
||||
close(p.stopCh)
|
||||
p.lock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Equal implements Proxy to check for equality.
|
||||
func (p *Daemon) Equal(raw Proxy) bool {
|
||||
p2, ok := raw.(*Daemon)
|
||||
|
@ -275,3 +296,81 @@ func (p *Daemon) Equal(raw Proxy) bool {
|
|||
reflect.DeepEqual(p.Command.Args, p2.Command.Args) &&
|
||||
reflect.DeepEqual(p.Command.Env, p2.Command.Env)
|
||||
}
|
||||
|
||||
// MarshalSnapshot implements Proxy
|
||||
func (p *Daemon) MarshalSnapshot() map[string]interface{} {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
// If we're stopped or have no process, then nothing to snapshot.
|
||||
if p.stopped || p.process == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return map[string]interface{}{
|
||||
"Pid": p.process.Pid,
|
||||
"CommandPath": p.Command.Path,
|
||||
"CommandArgs": p.Command.Args,
|
||||
"CommandDir": p.Command.Dir,
|
||||
"CommandEnv": p.Command.Env,
|
||||
"ProxyToken": p.ProxyToken,
|
||||
}
|
||||
}
|
||||
|
||||
// UnmarshalSnapshot implements Proxy
|
||||
func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error {
|
||||
var s daemonSnapshot
|
||||
if err := mapstructure.Decode(m, &s); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
// Set the basic fields
|
||||
p.ProxyToken = s.ProxyToken
|
||||
p.Command = &exec.Cmd{
|
||||
Path: s.CommandPath,
|
||||
Args: s.CommandArgs,
|
||||
Dir: s.CommandDir,
|
||||
Env: s.CommandEnv,
|
||||
}
|
||||
|
||||
// For the pid, we want to find the process.
|
||||
proc, err := os.FindProcess(s.Pid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(mitchellh): we should check if proc refers to a process that
|
||||
// is currently alive. If not, we should return here and not manage the
|
||||
// process.
|
||||
|
||||
// "Start it"
|
||||
stopCh := make(chan struct{})
|
||||
exitedCh := make(chan struct{})
|
||||
p.stopCh = stopCh
|
||||
p.exitedCh = exitedCh
|
||||
p.process = proc
|
||||
go p.keepAlive(stopCh, exitedCh)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// daemonSnapshot is the structure of the marshalled data for snapshotting.
|
||||
type daemonSnapshot struct {
|
||||
// Pid of the process. This is the only value actually required to
|
||||
// regain mangement control. The remainder values are for Equal.
|
||||
Pid int
|
||||
|
||||
// Command information
|
||||
CommandPath string
|
||||
CommandArgs []string
|
||||
CommandDir string
|
||||
CommandEnv []string
|
||||
|
||||
// NOTE(mitchellh): longer term there are discussions/plans to only
|
||||
// store the hash of the token but for now we need the full token in
|
||||
// case the process dies and has to be restarted.
|
||||
ProxyToken string
|
||||
}
|
||||
|
|
|
@ -316,3 +316,95 @@ func TestDaemonEqual(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDaemonMarshalSnapshot(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
Proxy Proxy
|
||||
Expected map[string]interface{}
|
||||
}{
|
||||
{
|
||||
"stopped daemon",
|
||||
&Daemon{
|
||||
Command: &exec.Cmd{Path: "/foo"},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
|
||||
{
|
||||
"basic",
|
||||
&Daemon{
|
||||
Command: &exec.Cmd{Path: "/foo"},
|
||||
process: &os.Process{Pid: 42},
|
||||
},
|
||||
map[string]interface{}{
|
||||
"Pid": 42,
|
||||
"CommandPath": "/foo",
|
||||
"CommandArgs": []string(nil),
|
||||
"CommandDir": "",
|
||||
"CommandEnv": []string(nil),
|
||||
"ProxyToken": "",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
actual := tc.Proxy.MarshalSnapshot()
|
||||
require.Equal(t, tc.Expected, actual)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDaemonUnmarshalSnapshot(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
td, closer := testTempDir(t)
|
||||
defer closer()
|
||||
|
||||
path := filepath.Join(td, "file")
|
||||
uuid, err := uuid.GenerateUUID()
|
||||
require.NoError(err)
|
||||
|
||||
d := &Daemon{
|
||||
Command: helperProcess("start-stop", path),
|
||||
ProxyToken: uuid,
|
||||
Logger: testLogger,
|
||||
}
|
||||
require.NoError(d.Start())
|
||||
|
||||
// Wait for the file to exist
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, err := os.Stat(path)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.Fatalf("error: %s", err)
|
||||
})
|
||||
|
||||
// Snapshot
|
||||
snap := d.MarshalSnapshot()
|
||||
|
||||
// Stop the original daemon but keep it alive
|
||||
require.NoError(d.stopKeepAlive())
|
||||
|
||||
// Restore the second daemon
|
||||
d2 := &Daemon{Logger: testLogger}
|
||||
require.NoError(d2.UnmarshalSnapshot(snap))
|
||||
|
||||
// Stop the process
|
||||
require.NoError(d2.Stop())
|
||||
|
||||
// File should no longer exist.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, err := os.Stat(path)
|
||||
if os.IsNotExist(err) {
|
||||
return
|
||||
}
|
||||
|
||||
// err might be nil here but that's okay
|
||||
r.Fatalf("should not exist: %s", err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ type Manager struct {
|
|||
//
|
||||
// * logs/ - log files named <service id>-std{out|err}.log
|
||||
// * pids/ - pid files for daemons named <service id>.pid
|
||||
// * state.ext - the state of the manager
|
||||
// * snapshot.json - the state of the manager
|
||||
//
|
||||
DataDir string
|
||||
|
||||
|
|
|
@ -6,3 +6,5 @@ type Noop struct{}
|
|||
func (p *Noop) Start() error { return nil }
|
||||
func (p *Noop) Stop() error { return nil }
|
||||
func (p *Noop) Equal(Proxy) bool { return true }
|
||||
func (p *Noop) MarshalSnapshot() map[string]interface{} { return nil }
|
||||
func (p *Noop) UnmarshalSnapshot(map[string]interface{}) error { return nil }
|
||||
|
|
|
@ -39,4 +39,20 @@ type Proxy interface {
|
|||
// If Equal returns true, the old proxy will remain running and the new
|
||||
// one will be ignored.
|
||||
Equal(Proxy) bool
|
||||
|
||||
// MarshalSnapshot returns the state that will be stored in a snapshot
|
||||
// so that Consul can recover the proxy process after a restart. The
|
||||
// result should only contain primitive values and containers (lists/maps).
|
||||
//
|
||||
// UnmarshalSnapshot is called to restore the receiving Proxy from its
|
||||
// marshalled state. If UnmarshalSnapshot returns an error, the snapshot
|
||||
// is ignored and the marshalled snapshot will be lost. The manager will
|
||||
// log.
|
||||
//
|
||||
// This should save/restore enough state to be able to regain management
|
||||
// of a proxy process as well as to perform the Equal method above. The
|
||||
// Equal method will be called when a local state sync happens to determine
|
||||
// if the recovered process should be restarted or not.
|
||||
MarshalSnapshot() map[string]interface{}
|
||||
UnmarshalSnapshot(map[string]interface{}) error
|
||||
}
|
||||
|
|
31
agent/proxy/snapshot.go
Normal file
31
agent/proxy/snapshot.go
Normal file
|
@ -0,0 +1,31 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// snapshot is the structure of the snapshot file. This is unexported because
|
||||
// we don't want this being a public API.
|
||||
//
|
||||
// The snapshot doesn't contain any configuration for the manager. We only
|
||||
// want to restore the proxies that we're managing, and we use the config
|
||||
// set at runtime to sync and reconcile what proxies we should start,
|
||||
// restart, stop, or have already running.
|
||||
type snapshot struct {
|
||||
// Version is the version of the snapshot format and can be used
|
||||
// to safely update the format in the future if necessary.
|
||||
Version int
|
||||
|
||||
// Proxies are the set of proxies that the manager has.
|
||||
Proxies []snapshotProxy
|
||||
}
|
||||
|
||||
// snapshotProxy represents a single proxy.
|
||||
type snapshotProxy struct {
|
||||
// Mode corresponds to the type of proxy running.
|
||||
Mode structs.ProxyExecMode
|
||||
|
||||
// Config is an opaque mapping of primitive values that the proxy
|
||||
// implementation uses to restore state.
|
||||
Config map[string]interface{}
|
||||
}
|
Loading…
Reference in a new issue