open-consul/agent/testagent.go
Frank Schroeder f8e52c897e agent: fix 'consul leave' shutdown race (#2880)
When the agent is triggered to shutdown via an external 'consul leave'
command delivered via the HTTP API then the client expects to receive a
response when the agent is down. This creates a race on when to shutdown
the agent itself like the RPC server, the checks and the state and the
external endpoints like DNS and HTTP.

This patch splits the shutdown process into two parts:

 * shutdown the agent
 * shutdown the endpoints (http and dns)

They can be executed multiple times, concurrently and in any order but
should be executed first agent, then endpoints to provide consistent
behavior across all use cases. Both calls have to be executed for a
proper shutdown.

This could be partially hidden in a single function but would introduce
some magic that happens behind the scenes which one has to know of but
isn't obvious.

Fixes #2880
2017-06-21 05:52:51 +02:00

345 lines
9.4 KiB
Go

package agent
import (
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/version"
uuid "github.com/hashicorp/go-uuid"
)
func init() {
rand.Seed(time.Now().UnixNano()) // seed random number generator
}
// TempDir defines the base dir for temporary directories.
var TempDir = os.TempDir()
// TestAgent encapsulates an Agent with a default configuration and
// startup procedure suitable for testing. It panics if there are errors
// during creation or startup instead of returning errors. It manages a
// temporary data directory which is removed after shutdown.
type TestAgent struct {
// Name is an optional name of the agent.
Name string
// Config is the agent configuration. If Config is nil then
// TestConfig() is used. If Config.DataDir is set then it is
// the callers responsibility to clean up the data directory.
// Otherwise, a temporary data directory is created and removed
// when Shutdown() is called.
Config *Config
// LogOutput is the sink for the logs. If nil, logs are written
// to os.Stderr.
LogOutput io.Writer
// LogWriter is used for streaming logs.
LogWriter *logger.LogWriter
// DataDir is the data directory which is used when Config.DataDir
// is not set. It is created automatically and removed when
// Shutdown() is called.
DataDir string
// Key is the optional encryption key for the LAN and WAN keyring.
Key string
// NoInitialSync determines whether an anti-entropy run
// will be scheduled after the agent started.
NoInitialSync bool
// dns is a reference to the first started DNS endpoint.
// It is valid after Start().
dns *DNSServer
// srv is a reference to the first started HTTP endpoint.
// It is valid after Start().
srv *HTTPServer
// Agent is the embedded consul agent.
// It is valid after Start().
*Agent
}
// NewTestAgent returns a started agent with the given name and
// configuration. It panics if the agent could not be started. The
// caller should call Shutdown() to stop the agent and remove temporary
// directories.
func NewTestAgent(name string, c *Config) *TestAgent {
a := &TestAgent{Name: name, Config: c}
a.Start()
return a
}
type panicFailer struct{}
func (f *panicFailer) Log(args ...interface{}) { fmt.Println(args...) }
func (f *panicFailer) FailNow() { panic("failed") }
// Start starts a test agent. It panics if the agent could not be started.
func (a *TestAgent) Start() *TestAgent {
if a.Agent != nil {
panic("TestAgent already started")
}
if a.Config == nil {
a.Config = TestConfig()
}
if a.Config.DNSRecursor != "" {
a.Config.DNSRecursors = append(a.Config.DNSRecursors, a.Config.DNSRecursor)
}
if a.Config.DataDir == "" {
name := "agent"
if a.Name != "" {
name = a.Name + "-agent"
}
name = strings.Replace(name, "/", "_", -1)
d, err := ioutil.TempDir(TempDir, name)
if err != nil {
panic(fmt.Sprintf("Error creating data dir %s: %s", filepath.Join(TempDir, name), err))
}
a.DataDir = d
a.Config.DataDir = d
}
id := UniqueID()
for i := 10; i >= 0; i-- {
pickRandomPorts(a.Config)
// write the keyring
if a.Key != "" {
writeKey := func(key, filename string) {
path := filepath.Join(a.Config.DataDir, filename)
if err := initKeyring(path, key); err != nil {
panic(fmt.Sprintf("Error creating keyring %s: %s", path, err))
}
}
writeKey(a.Key, SerfLANKeyring)
writeKey(a.Key, SerfWANKeyring)
}
agent, err := New(a.Config)
if err != nil {
panic(fmt.Sprintf("Error creating agent: %s", err))
}
logOutput := a.LogOutput
if logOutput == nil {
logOutput = os.Stderr
}
agent.LogOutput = logOutput
agent.LogWriter = a.LogWriter
agent.logger = log.New(logOutput, a.Name+" - ", log.LstdFlags)
// we need the err var in the next exit condition
if err := agent.Start(); err == nil {
a.Agent = agent
break
} else if i == 0 {
fmt.Println(id, a.Name, "Error starting agent:", err)
runtime.Goexit()
} else {
agent.ShutdownAgent()
agent.ShutdownEndpoints()
wait := time.Duration(rand.Int31n(2000)) * time.Millisecond
fmt.Println(id, a.Name, "retrying in", wait)
time.Sleep(wait)
}
// Clean out the data dir if we are responsible for it before we
// try again, since the old ports may have gotten written to
// the data dir, such as in the Raft configuration.
if a.DataDir != "" {
if err := os.RemoveAll(a.DataDir); err != nil {
fmt.Println(id, a.Name, "Error resetting data dir:", err)
runtime.Goexit()
}
}
}
if !a.NoInitialSync {
a.Agent.StartSync()
}
var out structs.IndexedNodes
retry.Run(&panicFailer{}, func(r *retry.R) {
if len(a.httpServers) == 0 {
r.Fatal(a.Name, "waiting for server")
}
if a.Config.Bootstrap && a.Config.Server {
// Ensure we have a leader and a node registration.
args := &structs.DCSpecificRequest{
Datacenter: a.Config.Datacenter,
QueryOptions: structs.QueryOptions{
MinQueryIndex: out.Index,
MaxQueryTime: 25 * time.Millisecond,
},
}
if err := a.RPC("Catalog.ListNodes", args, &out); err != nil {
r.Fatal(a.Name, "Catalog.ListNodes failed:", err)
}
if !out.QueryMeta.KnownLeader {
r.Fatal(a.Name, "No leader")
}
if out.Index == 0 {
r.Fatal(a.Name, "Consul index is 0")
}
} else {
req, _ := http.NewRequest("GET", "/v1/agent/self", nil)
resp := httptest.NewRecorder()
_, err := a.httpServers[0].AgentSelf(resp, req)
if err != nil || resp.Code != 200 {
r.Fatal(a.Name, "failed OK respose", err)
}
}
})
a.dns = a.dnsServers[0]
a.srv = a.httpServers[0]
return a
}
// Shutdown stops the agent and removes the data directory if it is
// managed by the test agent.
func (a *TestAgent) Shutdown() error {
defer func() {
if a.DataDir != "" {
os.RemoveAll(a.DataDir)
}
}()
// shutdown agent before endpoints
defer a.Agent.ShutdownEndpoints()
return a.Agent.ShutdownAgent()
}
func (a *TestAgent) HTTPAddr() string {
if a.srv == nil {
return ""
}
return a.srv.Addr
}
func (a *TestAgent) Client() *api.Client {
conf := api.DefaultConfig()
conf.Address = a.HTTPAddr()
c, err := api.NewClient(conf)
if err != nil {
panic(fmt.Sprintf("Error creating consul API client: %s", err))
}
return c
}
func (a *TestAgent) consulConfig() *consul.Config {
c, err := a.Agent.consulConfig()
if err != nil {
panic(err)
}
return c
}
func UniqueID() string {
id := strconv.FormatUint(rand.Uint64(), 36)
for len(id) < 16 {
id += " "
}
return id
}
// TenPorts returns the first port number of a block of
// ten random ports.
func TenPorts() int {
return 1030 + int(rand.Int31n(6440))*10
}
// pickRandomPorts selects random ports from fixed size random blocks of
// ports. This does not eliminate the chance for port conflict but
// reduces it significanltly with little overhead. Furthermore, asking
// the kernel for a random port by binding to port 0 prolongs the test
// execution (in our case +20sec) while also not fully eliminating the
// chance of port conflicts for concurrently executed test binaries.
// Instead of relying on one set of ports to be sufficient we retry
// starting the agent with different ports on port conflict.
func pickRandomPorts(c *Config) {
port := TenPorts()
c.Ports.DNS = port + 1
c.Ports.HTTP = port + 2
// when we enable HTTPS then we need to fix finding the
// "first" HTTP server since that might be HTTPS server
// c.Ports.HTTPS = port + 3
c.Ports.SerfLan = port + 4
c.Ports.SerfWan = port + 5
c.Ports.Server = port + 6
}
// TestConfig returns a unique default configuration for testing an
// agent.
func TestConfig() *Config {
nodeID, err := uuid.GenerateUUID()
if err != nil {
panic(err)
}
cfg := DefaultConfig()
cfg.Version = version.Version
cfg.VersionPrerelease = "c.d"
cfg.NodeID = types.NodeID(nodeID)
cfg.NodeName = "Node " + nodeID
cfg.BindAddr = "127.0.0.1"
cfg.AdvertiseAddr = "127.0.0.1"
cfg.Datacenter = "dc1"
cfg.Bootstrap = true
cfg.Server = true
ccfg := consul.DefaultConfig()
cfg.ConsulConfig = ccfg
ccfg.SerfLANConfig.MemberlistConfig.SuspicionMult = 3
ccfg.SerfLANConfig.MemberlistConfig.ProbeTimeout = 100 * time.Millisecond
ccfg.SerfLANConfig.MemberlistConfig.ProbeInterval = 100 * time.Millisecond
ccfg.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
ccfg.SerfWANConfig.MemberlistConfig.SuspicionMult = 3
ccfg.SerfWANConfig.MemberlistConfig.ProbeTimeout = 100 * time.Millisecond
ccfg.SerfWANConfig.MemberlistConfig.ProbeInterval = 100 * time.Millisecond
ccfg.SerfWANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
ccfg.RaftConfig.LeaderLeaseTimeout = 20 * time.Millisecond
ccfg.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
ccfg.RaftConfig.ElectionTimeout = 40 * time.Millisecond
ccfg.CoordinateUpdatePeriod = 100 * time.Millisecond
ccfg.ServerHealthInterval = 10 * time.Millisecond
return cfg
}
// TestACLConfig returns a default configuration for testing an agent
// with ACLs.
func TestACLConfig() *Config {
cfg := TestConfig()
cfg.ACLDatacenter = cfg.Datacenter
cfg.ACLDefaultPolicy = "deny"
cfg.ACLMasterToken = "root"
cfg.ACLAgentToken = "root"
cfg.ACLAgentMasterToken = "towel"
cfg.ACLEnforceVersion8 = Bool(true)
return cfg
}