open-consul/agent/consul/autopilot_test.go
Matt Keeler 3badd4c35c
Add event generation for autopilot state updates (#12626)
Whenever autopilot updates its state it notifies Consul. That notification will then trigger Consul to extract out the ready server information. If the ready servers have changed, then an event will be published to notify any subscribers of the full set of ready servers.

All these ready server event things are contained within an autopilotevents package instead of the consul package to make importing them into the grpc related packages possible
2022-04-19 13:03:03 -04:00

624 lines
16 KiB
Go

package consul
import (
"context"
"fmt"
"os"
"testing"
"time"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/consul/autopilotevents"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
)
func TestAutopilot_IdempotentShutdown(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
dir1, s1 := testServerWithConfig(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
retry.Run(t, func(r *retry.R) { r.Check(waitForLeader(s1)) })
s1.autopilot.Start(context.Background())
s1.autopilot.Start(context.Background())
s1.autopilot.Start(context.Background())
<-s1.autopilot.Stop()
<-s1.autopilot.Stop()
<-s1.autopilot.Stop()
}
func TestAutopilot_CleanupDeadServer(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
dc := "dc1"
conf := func(c *Config) {
c.Datacenter = dc
c.Bootstrap = false
c.BootstrapExpect = 5
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
dir4, s4 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
dir5, s5 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir5)
defer s5.Shutdown()
servers := []*Server{s1, s2, s3, s4, s5}
// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
joinLAN(t, s4, s1)
joinLAN(t, s5, s1)
for _, s := range servers {
testrpc.WaitForLeader(t, s.RPC, dc)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 5)) })
}
testrpc.WaitForLeader(t, s1.RPC, "dc1")
leaderIndex := -1
for i, s := range servers {
if s.IsLeader() {
leaderIndex = i
break
}
}
require.NotEqual(t, leaderIndex, -1)
// Shutdown two non-leader servers
killed := make(map[string]struct{})
for i, s := range servers {
if i != leaderIndex {
s.Shutdown()
killed[string(s.config.NodeID)] = struct{}{}
}
if len(killed) == 2 {
break
}
}
retry.Run(t, func(r *retry.R) {
alive := 0
for _, m := range servers[leaderIndex].LANMembersInAgentPartition() {
if m.Status == serf.StatusAlive {
alive++
}
}
if alive != 3 {
r.Fatalf("Expected three alive servers instead of %d", alive)
}
})
// Make sure the dead servers are removed and we're back to 3 total peers
for _, s := range servers {
_, killed := killed[string(s.config.NodeID)]
if !killed {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
}
}
func TestAutopilot_CleanupDeadNonvoter(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.AutopilotConfig = &structs.AutopilotConfig{
CleanupDeadServers: true,
ServerStabilizationTime: 100 * time.Millisecond,
}
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
// we have to wait for autopilot to be running long enough for the server stabilization time
// to kick in for this test to work.
time.Sleep(100 * time.Millisecond)
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Have s2 join and then shut it down immediately before it gets a chance to
// be promoted to a voter.
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft([]*Server{s1, s2}))
})
s2.Shutdown()
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft([]*Server{s1}))
})
}
func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
conf := func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
}
dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
dir4, s4 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
dir5, s5 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir5)
defer s5.Shutdown()
servers := []*Server{s1, s2, s3, s4, s5}
// Join the servers to s1, and wait until they are all promoted to
// voters.
for _, s := range servers[1:] {
joinLAN(t, s, s1)
}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 5))
}
})
// Kill a non-leader server
s4.Shutdown()
// Should be removed from the peers automatically
servers = []*Server{s1, s2, s3, s5}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 4))
}
})
}
func TestAutopilot_RollingUpdate(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
conf := func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
}
dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Join the servers to s1, and wait until they are all promoted to
// voters.
servers := []*Server{s1, s2, s3}
for _, s := range servers[1:] {
joinLAN(t, s, s1)
}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})
// Add one more server like we are doing a rolling update.
dir4, s4 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
joinLAN(t, s1, s4)
servers = append(servers, s4)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})
// Now kill one of the "old" nodes like we are doing a rolling update.
s3.Shutdown()
isVoter := func() bool {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range future.Configuration().Servers {
if string(s.ID) == string(s4.config.NodeID) {
return s.Suffrage == raft.Voter
}
}
t.Fatalf("didn't find s4")
return false
}
// Wait for s4 to stabilize, get promoted to a voter, and for s3 to be
// removed.
servers = []*Server{s1, s2, s4}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
if !isVoter() {
r.Fatalf("should be a voter")
}
})
}
func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerDCBootstrap(t, "dc1", true)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
dir4, s4 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
servers := []*Server{s1, s2, s3}
// Join the servers to s1
for _, s := range servers[1:] {
joinLAN(t, s, s1)
}
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Add s4 to peers directly
addVoterFuture := s1.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(joinAddrLAN(s4)), 0, 0)
if err := addVoterFuture.Error(); err != nil {
t.Fatal(err)
}
// Verify we have 4 peers
peers, err := s1.autopilot.NumVoters()
if err != nil {
t.Fatal(err)
}
if peers != 4 {
t.Fatalf("bad: %v", peers)
}
// Wait for s4 to be removed
for _, s := range []*Server{s1, s2, s3} {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
}
func TestAutopilot_PromoteNonVoter(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.AutopilotConfig.ServerStabilizationTime = 200 * time.Millisecond
c.ServerHealthInterval = 100 * time.Millisecond
c.AutopilotInterval = 100 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// this may seem arbitrary but we need to get past the server stabilization time
// so that we start factoring in that time for newly connected nodes.
time.Sleep(100 * time.Millisecond)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = false
c.RaftConfig.ProtocolVersion = 3
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
joinLAN(t, s2, s1)
// Make sure we see it as a nonvoter initially. We wait until half
// the stabilization period has passed.
retry.Run(t, func(r *retry.R) {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
r.Fatal(err)
}
servers := future.Configuration().Servers
if len(servers) != 2 {
r.Fatalf("bad: %v", servers)
}
if servers[1].Suffrage != raft.Nonvoter {
r.Fatalf("bad: %v", servers)
}
health := s1.autopilot.GetServerHealth(servers[1].ID)
if health == nil {
r.Fatal("nil health")
}
if !health.Healthy {
r.Fatalf("bad: %v", health)
}
if time.Since(health.StableSince) < s1.config.AutopilotConfig.ServerStabilizationTime/2 {
r.Fatal("stable period not elapsed")
}
})
// Make sure it ends up as a voter.
retry.Run(t, func(r *retry.R) {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
r.Fatal(err)
}
servers := future.Configuration().Servers
if len(servers) != 2 {
r.Fatalf("bad: %v", servers)
}
if servers[1].Suffrage != raft.Voter {
r.Fatalf("bad: %v", servers)
}
})
}
func TestAutopilot_MinQuorum(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
dc := "dc1"
conf := func(c *Config) {
c.Datacenter = dc
c.Bootstrap = false
c.BootstrapExpect = 4
c.AutopilotConfig.MinQuorum = 3
c.AutopilotInterval = 100 * time.Millisecond
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
dir4, s4 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir4)
defer s4.Shutdown()
servers := map[string]*Server{s1.config.NodeName: s1,
s2.config.NodeName: s2,
s3.config.NodeName: s3,
s4.config.NodeName: s4}
// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
joinLAN(t, s4, s1)
//Differentiate between leader and server
findStatus := func(leader bool) *Server {
for _, mem := range servers {
if mem.IsLeader() == leader {
return mem
}
if !mem.IsLeader() == !leader {
return mem
}
}
return nil
}
testrpc.WaitForLeader(t, s1.RPC, dc)
// Have autopilot take one into left
dead := findStatus(false)
if dead == nil {
t.Fatalf("no members set")
}
require.NoError(t, dead.Shutdown())
retry.Run(t, func(r *retry.R) {
leader := findStatus(true)
if leader == nil {
r.Fatalf("no members set")
}
for _, m := range leader.LANMembersInAgentPartition() {
if m.Name == dead.config.NodeName && m.Status != serf.StatusLeft {
r.Fatalf("%v should be left, got %v", m.Name, m.Status.String())
}
}
})
delete(servers, dead.config.NodeName)
//Autopilot should not take this one into left
dead = findStatus(false)
require.NoError(t, dead.Shutdown())
retry.Run(t, func(r *retry.R) {
leader := findStatus(true)
if leader == nil {
r.Fatalf("no members set")
}
for _, m := range leader.LANMembersInAgentPartition() {
if m.Name == dead.config.NodeName && m.Status != serf.StatusFailed {
r.Fatalf("%v should be failed, got %v", m.Name, m.Status.String())
}
}
})
}
func TestAutopilot_EventPublishing(t *testing.T) {
// This is really an integration level test. The general flow this test will follow is:
//
// 1. Start a 3 server cluster
// 2. Subscribe to the ready server events
// 3. Observe the first event which will be pretty immediately ready as it is the
// snapshot event.
// 4. Wait for multiple iterations of the autopilot state updater and ensure no
// other events are seen. The state update interval is 50ms for tests unless
// overridden.
// 5. Add a fouth server.
// 6. Wait for an event to be emitted containing 4 ready servers.
// 1. create the test cluster
cluster := newTestCluster(t, &testClusterConfig{
Servers: 3,
ServerConf: testServerACLConfig,
// We want to wait until each server has registered itself in the Catalog. Otherwise
// the first snapshot even we see might have no servers in it while things are being
// initialized. Doing this wait ensure that things are in the right state to start
// the subscription.
})
// 2. subscribe to ready server events
req := stream.SubscribeRequest{
Topic: autopilotevents.EventTopicReadyServers,
Subject: stream.SubjectNone,
Token: TestDefaultInitialManagementToken,
}
sub, err := cluster.Servers[0].publisher.Subscribe(&req)
require.NoError(t, err)
t.Cleanup(sub.Unsubscribe)
// 3. Observe that an event was generated which should be the snapshot event.
// As we have just bootstrapped the cluster with 3 servers we expect to
// see those 3 here.
validatePayload(t, 3, mustGetEventWithTimeout(t, sub, 50*time.Millisecond))
// TODO - its kind of annoying that the EventPublisher doesn't have a mode where
// it knows each event is a full state of the world. The ramifications are that
// we have to expect/ignore the framing events for EndOfSnapshot.
event := mustGetEventWithTimeout(t, sub, 10*time.Millisecond)
require.True(t, event.IsFramingEvent())
// 4. Wait for 3 iterations of the ServerHealthInterval to ensure no events
// are being published when the autopilot state is not changing.
eventNotEmitted(t, sub, 150*time.Millisecond)
// 5. Add a fourth server
_, srv := testServerWithConfig(t, testServerACLConfig, func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 0
})
joinLAN(t, srv, cluster.Servers[0])
// 6. Now wait for the event for the fourth server being added. This may take a little
// while as the joinLAN operation above doesn't wait for the server to actually get
// added to Raft.
validatePayload(t, 4, mustGetEventWithTimeout(t, sub, time.Second))
}
// mustGetEventWithTimeout is a helper function for validating that a Subscription.Next call will return
// an event within the given time. It also validates that no error is returned.
func mustGetEventWithTimeout(t *testing.T, subscription *stream.Subscription, timeout time.Duration) stream.Event {
t.Helper()
event, err := getEventWithTimeout(t, subscription, timeout)
require.NoError(t, err)
return event
}
// getEventWithTimeout is a helper function for retrieving a Event from a Subscription within the specified timeout.
func getEventWithTimeout(t *testing.T, subscription *stream.Subscription, timeout time.Duration) (stream.Event, error) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
event, err := subscription.Next(ctx)
return event, err
}
// eventNotEmitted is a helper to validate that no Event is emitted for the given Subscription
func eventNotEmitted(t *testing.T, subscription *stream.Subscription, timeout time.Duration) {
t.Helper()
var event stream.Event
var err error
event, err = getEventWithTimeout(t, subscription, timeout)
require.Equal(t, context.DeadlineExceeded, err, fmt.Sprintf("event:%v", event))
}
func validatePayload(t *testing.T, expectedNumServers int, event stream.Event) {
t.Helper()
require.Equal(t, autopilotevents.EventTopicReadyServers, event.Topic)
readyServers, ok := event.Payload.(autopilotevents.EventPayloadReadyServers)
require.True(t, ok)
require.Len(t, readyServers, expectedNumServers)
}