open-consul/agent/testagent.go
R.B. Boyer 9b41199585
agent: fix several data races and bugs related to node-local alias checks (#5876)
The observed bug was that a full restart of a consul datacenter (servers
and clients) in conjunction with a restart of a connect-flavored
application with bring-your-own-service-registration logic would very
frequently cause the envoy sidecar service check to never reflect the
aliased service.

Over the course of investigation several bugs and unfortunate
interactions were corrected:

(1)

local.CheckState objects were only shallow copied, but the key piece of
data that gets read and updated is one of the things not copied (the
underlying Check with a Status field). When the stock code was run with
the race detector enabled this highly-relevant-to-the-test-scenario field
was found to be racy.

Changes:

 a) update the existing Clone method to include the Check field
 b) copy-on-write when those fields need to change rather than
    incrementally updating them in place.

This made the observed behavior occur slightly less often.

(2)

If anything about how the runLocal method for node-local alias check
logic was ever flawed, there was no fallback option. Those checks are
purely edge-triggered and failure to properly notice a single edge
transition would leave the alias check incorrect until the next flap of
the aliased check.

The change was to introduce a fallback timer to act as a control loop to
double check the alias check matches the aliased check every minute
(borrowing the duration from the non-local alias check logic body).

This made the observed behavior eventually go away when it did occur.

(3)

Originally I thought there were two main actions involved in the data race:

A. The act of adding the original check (from disk recovery) and its
   first health evaluation.

B. The act of the HTTP API requests coming in and resetting the local
   state when re-registering the same services and checks.

It took awhile for me to realize that there's a third action at work:

C. The goroutines associated with the original check and the later
   checks.

The actual sequence of actions that was causing the bad behavior was
that the API actions result in the original check to be removed and
re-added _without waiting for the original goroutine to terminate_. This
means for brief windows of time during check definition edits there are
two goroutines that can be sending updates for the alias check status.

In extremely unlikely scenarios the original goroutine sees the aliased
check start up in `critical` before being removed but does not get the
notification about the nearly immediate update of that check to
`passing`.

This is interlaced wit the new goroutine coming up, initializing its
base case to `passing` from the current state and then listening for new
notifications of edge triggers.

If the original goroutine "finishes" its update, it then commits one
more write into the local state of `critical` and exits leaving the
alias check no longer reflecting the underlying check.

The correction here is to enforce that the old goroutines must terminate
before spawning the new one for alias checks.
2019-05-24 13:36:56 -05:00

417 lines
11 KiB
Go

package agent
import (
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
metrics "github.com/armon/go-metrics"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
)
func init() {
rand.Seed(time.Now().UnixNano()) // seed random number generator
}
// TempDir defines the base dir for temporary directories.
var TempDir = os.TempDir()
// TestAgent encapsulates an Agent with a default configuration and
// startup procedure suitable for testing. It panics if there are errors
// during creation or startup instead of returning errors. It manages a
// temporary data directory which is removed after shutdown.
type TestAgent struct {
// Name is an optional name of the agent.
Name string
HCL string
// ExpectConfigError can be set to prevent the agent retrying Start on errors
// and eventually blowing up with runtime.Goexit. This enables tests to assert
// that some specific bit of config actually does prevent startup entirely in
// a reasonable way without reproducing a lot of the boilerplate here.
ExpectConfigError bool
// Config is the agent configuration. If Config is nil then
// TestConfig() is used. If Config.DataDir is set then it is
// the callers responsibility to clean up the data directory.
// Otherwise, a temporary data directory is created and removed
// when Shutdown() is called.
Config *config.RuntimeConfig
// LogOutput is the sink for the logs. If nil, logs are written
// to os.Stderr.
LogOutput io.Writer
// LogWriter is used for streaming logs.
LogWriter *logger.LogWriter
// DataDir is the data directory which is used when Config.DataDir
// is not set. It is created automatically and removed when
// Shutdown() is called.
DataDir string
// Key is the optional encryption key for the LAN and WAN keyring.
Key string
// UseTLS, if true, will disable the HTTP port and enable the HTTPS
// one.
UseTLS bool
// dns is a reference to the first started DNS endpoint.
// It is valid after Start().
dns *DNSServer
// srv is a reference to the first started HTTP endpoint.
// It is valid after Start().
srv *HTTPServer
// Agent is the embedded consul agent.
// It is valid after Start().
*Agent
}
// NewTestAgent returns a started agent with the given name and
// configuration. It fails the test if the Agent could not be started. The
// caller should call Shutdown() to stop the agent and remove temporary
// directories.
func NewTestAgent(t *testing.T, name string, hcl string) *TestAgent {
a := &TestAgent{Name: name, HCL: hcl}
a.Start(t)
return a
}
func NewUnstartedAgent(t *testing.T, name string, hcl string) (*Agent, error) {
c := TestConfig(config.Source{Name: name, Format: "hcl", Data: hcl})
a, err := New(c)
if err != nil {
return nil, err
}
return a, nil
}
// Start starts a test agent. It fails the test if the agent could not be started.
func (a *TestAgent) Start(t *testing.T) *TestAgent {
require := require.New(t)
require.Nil(a.Agent, "TestAgent already started")
var hclDataDir string
if a.DataDir == "" {
name := "agent"
if a.Name != "" {
name = a.Name + "-agent"
}
name = strings.Replace(name, "/", "_", -1)
d, err := ioutil.TempDir(TempDir, name)
require.NoError(err, fmt.Sprintf("Error creating data dir %s: %s", filepath.Join(TempDir, name), err))
hclDataDir = `data_dir = "` + d + `"`
}
var id string
for i := 10; i >= 0; i-- {
a.Config = TestConfig(
randomPortsSource(a.UseTLS),
config.Source{Name: a.Name, Format: "hcl", Data: a.HCL},
config.Source{Name: a.Name + ".data_dir", Format: "hcl", Data: hclDataDir},
)
id = string(a.Config.NodeID)
// write the keyring
if a.Key != "" {
writeKey := func(key, filename string) {
path := filepath.Join(a.Config.DataDir, filename)
err := initKeyring(path, key)
require.NoError(err, fmt.Sprintf("Error creating keyring %s: %s", path, err))
}
writeKey(a.Key, SerfLANKeyring)
writeKey(a.Key, SerfWANKeyring)
}
agent, err := New(a.Config)
require.NoError(err, fmt.Sprintf("Error creating agent: %s", err))
logOutput := a.LogOutput
if logOutput == nil {
logOutput = os.Stderr
}
agent.LogOutput = logOutput
agent.LogWriter = a.LogWriter
agent.logger = log.New(logOutput, a.Name+" - ", log.LstdFlags|log.Lmicroseconds)
agent.MemSink = metrics.NewInmemSink(1*time.Second, time.Minute)
// we need the err var in the next exit condition
if err := agent.Start(); err == nil {
a.Agent = agent
break
} else if i == 0 {
require.Fail("%s %s Error starting agent: %s", id, a.Name, err)
} else if a.ExpectConfigError {
// Panic the error since this can be caught if needed. Pretty gross way to
// detect errors but enough for now and this is a tiny edge case that I'd
// otherwise not have a way to test at all...
panic(err)
} else {
agent.ShutdownAgent()
agent.ShutdownEndpoints()
wait := time.Duration(rand.Int31n(2000)) * time.Millisecond
fmt.Println(id, a.Name, "retrying in", wait)
time.Sleep(wait)
}
// Clean out the data dir if we are responsible for it before we
// try again, since the old ports may have gotten written to
// the data dir, such as in the Raft configuration.
if a.DataDir != "" {
if err := os.RemoveAll(a.DataDir); err != nil {
require.Fail("%s %s Error resetting data dir: %s", id, a.Name, err)
}
}
}
// Start the anti-entropy syncer
a.Agent.StartSync()
var out structs.IndexedNodes
retry.Run(t, func(r *retry.R) {
if len(a.httpServers) == 0 {
r.Fatal(a.Name, "waiting for server")
}
if a.Config.Bootstrap && a.Config.ServerMode {
// Ensure we have a leader and a node registration.
args := &structs.DCSpecificRequest{
Datacenter: a.Config.Datacenter,
QueryOptions: structs.QueryOptions{
MinQueryIndex: out.Index,
MaxQueryTime: 25 * time.Millisecond,
},
}
if err := a.RPC("Catalog.ListNodes", args, &out); err != nil {
r.Fatal(a.Name, "Catalog.ListNodes failed:", err)
}
if !out.QueryMeta.KnownLeader {
r.Fatal(a.Name, "No leader")
}
if out.Index == 0 {
r.Fatal(a.Name, ": Consul index is 0")
}
} else {
req, _ := http.NewRequest("GET", "/v1/agent/self", nil)
resp := httptest.NewRecorder()
_, err := a.httpServers[0].AgentSelf(resp, req)
if err != nil || resp.Code != 200 {
r.Fatal(a.Name, "failed OK response", err)
}
}
})
a.dns = a.dnsServers[0]
a.srv = a.httpServers[0]
return a
}
// Shutdown stops the agent and removes the data directory if it is
// managed by the test agent.
func (a *TestAgent) Shutdown() error {
/* Removed this because it was breaking persistence tests where we would
persist a service and load it through a new agent with the same data-dir.
Not sure if we still need this for other things, everywhere we manually make
a data dir we already do 'defer os.RemoveAll()'
defer func() {
if a.DataDir != "" {
os.RemoveAll(a.DataDir)
}
}()*/
// shutdown agent before endpoints
defer a.Agent.ShutdownEndpoints()
return a.Agent.ShutdownAgent()
}
func (a *TestAgent) DNSAddr() string {
if a.dns == nil {
return ""
}
return a.dns.Addr
}
func (a *TestAgent) HTTPAddr() string {
if a.srv == nil {
return ""
}
return a.srv.Addr
}
func (a *TestAgent) SegmentAddr(name string) string {
if server, ok := a.Agent.delegate.(*consul.Server); ok {
return server.LANSegmentAddr(name)
}
return ""
}
func (a *TestAgent) Client() *api.Client {
conf := api.DefaultConfig()
conf.Address = a.HTTPAddr()
c, err := api.NewClient(conf)
if err != nil {
panic(fmt.Sprintf("Error creating consul API client: %s", err))
}
return c
}
// DNSDisableCompression disables compression for all started DNS servers.
func (a *TestAgent) DNSDisableCompression(b bool) {
for _, srv := range a.dnsServers {
cfg := srv.config.Load().(*dnsConfig)
cfg.DisableCompression = b
}
}
func (a *TestAgent) consulConfig() *consul.Config {
c, err := a.Agent.consulConfig()
if err != nil {
panic(err)
}
return c
}
// pickRandomPorts selects random ports from fixed size random blocks of
// ports. This does not eliminate the chance for port conflict but
// reduces it significantly with little overhead. Furthermore, asking
// the kernel for a random port by binding to port 0 prolongs the test
// execution (in our case +20sec) while also not fully eliminating the
// chance of port conflicts for concurrently executed test binaries.
// Instead of relying on one set of ports to be sufficient we retry
// starting the agent with different ports on port conflict.
func randomPortsSource(tls bool) config.Source {
ports := freeport.Get(6)
if tls {
ports[1] = -1
} else {
ports[2] = -1
}
return config.Source{
Name: "ports",
Format: "hcl",
Data: `
ports = {
dns = ` + strconv.Itoa(ports[0]) + `
http = ` + strconv.Itoa(ports[1]) + `
https = ` + strconv.Itoa(ports[2]) + `
serf_lan = ` + strconv.Itoa(ports[3]) + `
serf_wan = ` + strconv.Itoa(ports[4]) + `
server = ` + strconv.Itoa(ports[5]) + `
}
`,
}
}
func NodeID() string {
id, err := uuid.GenerateUUID()
if err != nil {
panic(err)
}
return id
}
// TestConfig returns a unique default configuration for testing an
// agent.
func TestConfig(sources ...config.Source) *config.RuntimeConfig {
nodeID := NodeID()
testsrc := config.Source{
Name: "test",
Format: "hcl",
Data: `
bind_addr = "127.0.0.1"
advertise_addr = "127.0.0.1"
datacenter = "dc1"
bootstrap = true
server = true
node_id = "` + nodeID + `"
node_name = "Node ` + nodeID + `"
connect {
enabled = true
ca_config {
cluster_id = "` + connect.TestClusterID + `"
}
}
performance {
raft_multiplier = 1
}
`,
}
b, err := config.NewBuilder(config.Flags{})
if err != nil {
panic("NewBuilder failed: " + err.Error())
}
b.Head = append(b.Head, testsrc)
b.Tail = append(b.Tail, config.DefaultConsulSource(), config.DevConsulSource())
b.Tail = append(b.Tail, sources...)
cfg, err := b.BuildAndValidate()
if err != nil {
panic("Error building config: " + err.Error())
}
for _, w := range b.Warnings {
fmt.Println("WARNING:", w)
}
// Disable connect proxy execution since it causes all kinds of problems with
// self-executing tests etc.
cfg.ConnectTestDisableManagedProxies = true
// Effectively disables the delay after root rotation before requesting CSRs
// to make test deterministic. 0 results in default jitter being applied but a
// tiny delay is effectively thre same.
cfg.ConnectTestCALeafRootChangeSpread = 1 * time.Nanosecond
return &cfg
}
// TestACLConfig returns a default configuration for testing an agent
// with ACLs.
func TestACLConfig() string {
return `
acl_datacenter = "dc1"
acl_default_policy = "deny"
acl_master_token = "root"
acl_agent_token = "root"
acl_agent_master_token = "towel"
acl_enforce_version_8 = true
`
}
func TestACLConfigNew() string {
return `
primary_datacenter = "dc1"
acl {
enabled = true
default_policy = "deny"
tokens {
master = "root"
agent = "root"
agent_master = "towel"
}
}
`
}