agent/proxy: write pid file whenever the daemon process changes

This commit is contained in:
Mitchell Hashimoto 2018-05-03 13:56:42 -07:00
parent 09093a1a1a
commit 5e0f0ba178
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
4 changed files with 159 additions and 42 deletions

View File

@ -34,6 +34,7 @@ import (
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/file"
"github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/watch" "github.com/hashicorp/consul/watch"
@ -362,7 +363,7 @@ func (a *Agent) Start() error {
a.proxyManager = proxy.NewManager() a.proxyManager = proxy.NewManager()
a.proxyManager.State = a.State a.proxyManager.State = a.State
a.proxyManager.Logger = a.logger a.proxyManager.Logger = a.logger
a.proxyManager.LogDir = filepath.Join(a.config.DataDir, "proxy", "logs") a.proxyManager.DataDir = filepath.Join(a.config.DataDir, "proxy")
go a.proxyManager.Run() go a.proxyManager.Run()
// Start watching for critical services to deregister, based on their // Start watching for critical services to deregister, based on their
@ -1557,7 +1558,7 @@ func (a *Agent) persistService(service *structs.NodeService) error {
return err return err
} }
return writeFileAtomic(svcPath, encoded) return file.WriteAtomic(svcPath, encoded)
} }
// purgeService removes a persisted service definition file from the data dir // purgeService removes a persisted service definition file from the data dir
@ -1585,7 +1586,7 @@ func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *structs.CheckT
return err return err
} }
return writeFileAtomic(checkPath, encoded) return file.WriteAtomic(checkPath, encoded)
} }
// purgeCheck removes a persisted check definition file from the data dir // purgeCheck removes a persisted check definition file from the data dir
@ -1597,43 +1598,6 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error {
return nil return nil
} }
// writeFileAtomic writes the given contents to a temporary file in the same
// directory, does an fsync and then renames the file to its real path
func writeFileAtomic(path string, contents []byte) error {
uuid, err := uuid.GenerateUUID()
if err != nil {
return err
}
tempPath := fmt.Sprintf("%s-%s.tmp", path, uuid)
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return err
}
fh, err := os.OpenFile(tempPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
return err
}
if _, err := fh.Write(contents); err != nil {
fh.Close()
os.Remove(tempPath)
return err
}
if err := fh.Sync(); err != nil {
fh.Close()
os.Remove(tempPath)
return err
}
if err := fh.Close(); err != nil {
os.Remove(tempPath)
return err
}
if err := os.Rename(tempPath, path); err != nil {
os.Remove(tempPath)
return err
}
return nil
}
// AddService is used to add a service entry. // AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered

View File

@ -6,8 +6,11 @@ import (
"os" "os"
"os/exec" "os/exec"
"reflect" "reflect"
"strconv"
"sync" "sync"
"time" "time"
"github.com/hashicorp/consul/lib/file"
) )
// Constants related to restart timers with the daemon mode proxies. At some // Constants related to restart timers with the daemon mode proxies. At some
@ -38,6 +41,12 @@ type Daemon struct {
// a file. // a file.
Logger *log.Logger Logger *log.Logger
// PidPath is the path where a pid file will be created storing the
// pid of the active process. If this is empty then a pid-file won't
// be created. Under erroneous conditions, the pid file may not be
// created but the error will be logged to the Logger.
PidPath string
// For tests, they can set this to change the default duration to wait // For tests, they can set this to change the default duration to wait
// for a graceful quit. // for a graceful quit.
gracefulWait time.Duration gracefulWait time.Duration
@ -187,8 +196,21 @@ func (p *Daemon) start() (*os.Process, error) {
// Start it // Start it
p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", cmd.Path, cmd.Args[1:]) p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", cmd.Path, cmd.Args[1:])
err := cmd.Start() if err := cmd.Start(); err != nil {
return cmd.Process, err return nil, err
}
// Write the pid file. This might error and that's okay.
if p.PidPath != "" {
pid := strconv.FormatInt(int64(cmd.Process.Pid), 10)
if err := file.WriteAtomic(p.PidPath, []byte(pid)); err != nil {
p.Logger.Printf(
"[DEBUG] agent/proxy: error writing pid file %q: %s",
p.PidPath, err)
}
}
return cmd.Process, nil
} }
// Stop stops the daemon. // Stop stops the daemon.

View File

@ -142,6 +142,91 @@ func TestDaemonStop_kill(t *testing.T) {
require.Equal(mtime, fi.ModTime()) require.Equal(mtime, fi.ModTime())
} }
func TestDaemonStart_pidFile(t *testing.T) {
t.Parallel()
require := require.New(t)
td, closer := testTempDir(t)
defer closer()
path := filepath.Join(td, "file")
pidPath := filepath.Join(td, "pid")
uuid, err := uuid.GenerateUUID()
require.NoError(err)
d := &Daemon{
Command: helperProcess("start-once", path),
ProxyToken: uuid,
Logger: testLogger,
PidPath: pidPath,
}
require.NoError(d.Start())
defer d.Stop()
// Wait for the file to exist
retry.Run(t, func(r *retry.R) {
_, err := os.Stat(pidPath)
if err == nil {
return
}
r.Fatalf("error: %s", err)
})
// Check the pid file
pidRaw, err := ioutil.ReadFile(pidPath)
require.NoError(err)
require.NotEmpty(pidRaw)
}
// Verify the pid file changes on restart
func TestDaemonRestart_pidFile(t *testing.T) {
t.Parallel()
require := require.New(t)
td, closer := testTempDir(t)
defer closer()
path := filepath.Join(td, "file")
pidPath := filepath.Join(td, "pid")
d := &Daemon{
Command: helperProcess("restart", path),
Logger: testLogger,
PidPath: pidPath,
}
require.NoError(d.Start())
defer d.Stop()
// Wait for the file to exist. We save the func so we can reuse the test.
waitFile := func() {
retry.Run(t, func(r *retry.R) {
_, err := os.Stat(path)
if err == nil {
return
}
r.Fatalf("error waiting for path: %s", err)
})
}
waitFile()
// Check the pid file
pidRaw, err := ioutil.ReadFile(pidPath)
require.NoError(err)
require.NotEmpty(pidRaw)
// Delete the file
require.NoError(os.Remove(path))
// File should re-appear because the process is restart
waitFile()
// Check the pid file and it should not equal
pidRaw2, err := ioutil.ReadFile(pidPath)
require.NoError(err)
require.NotEmpty(pidRaw2)
require.NotEqual(pidRaw, pidRaw2)
}
func TestDaemonEqual(t *testing.T) { func TestDaemonEqual(t *testing.T) {
cases := []struct { cases := []struct {
Name string Name string

46
lib/file/atomic.go Normal file
View File

@ -0,0 +1,46 @@
package file
import (
"fmt"
"os"
"path/filepath"
"github.com/hashicorp/go-uuid"
)
// WriteAtomic writes the given contents to a temporary file in the same
// directory, does an fsync and then renames the file to its real path
func WriteAtomic(path string, contents []byte) error {
uuid, err := uuid.GenerateUUID()
if err != nil {
return err
}
tempPath := fmt.Sprintf("%s-%s.tmp", path, uuid)
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return err
}
fh, err := os.OpenFile(tempPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
return err
}
if _, err := fh.Write(contents); err != nil {
fh.Close()
os.Remove(tempPath)
return err
}
if err := fh.Sync(); err != nil {
fh.Close()
os.Remove(tempPath)
return err
}
if err := fh.Close(); err != nil {
os.Remove(tempPath)
return err
}
if err := os.Rename(tempPath, path); err != nil {
os.Remove(tempPath)
return err
}
return nil
}