agent: add variation of force-leave that exclusively works on the WAN (#11722)

Fixes #6548
This commit is contained in:
R.B. Boyer 2021-12-02 17:15:10 -06:00 committed by GitHub
parent d2f53d20ac
commit 6ec84cfbe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 153 additions and 25 deletions

3
.changelog/11722.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
agent: add variation of force-leave that exclusively works on the WAN
```

View File

@ -1553,9 +1553,10 @@ func (a *Agent) RefreshPrimaryGatewayFallbackAddresses(addrs []string) error {
}
// ForceLeave is used to remove a failed node from the cluster
func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseMeta) (err error) {
func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseMeta) error {
a.logger.Info("Force leaving node", "node", node)
err = a.delegate.RemoveFailedNode(node, prune, entMeta)
err := a.delegate.RemoveFailedNode(node, prune, entMeta)
if err != nil {
a.logger.Warn("Failed to remove node",
"node", node,
@ -1565,6 +1566,25 @@ func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseM
return err
}
// ForceLeaveWAN is used to remove a failed node from the WAN cluster
func (a *Agent) ForceLeaveWAN(node string, prune bool, entMeta *structs.EnterpriseMeta) error {
a.logger.Info("(WAN) Force leaving node", "node", node)
srv, ok := a.delegate.(*consul.Server)
if !ok {
return fmt.Errorf("Must be a server to force-leave a node from the WAN cluster")
}
err := srv.RemoveFailedNodeWAN(node, prune, entMeta)
if err != nil {
a.logger.Warn("(WAN) Failed to remove node",
"node", node,
"error", err,
)
}
return err
}
// AgentLocalMember is used to retrieve the LAN member for the local node.
func (a *Agent) AgentLocalMember() serf.Member {
return a.delegate.AgentLocalMember()

View File

@ -640,8 +640,15 @@ func (s *HTTPHandlers) AgentForceLeave(resp http.ResponseWriter, req *http.Reque
// Check the value of the prune query
_, prune := req.URL.Query()["prune"]
// Check if the WAN is being queried
_, wan := req.URL.Query()["wan"]
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/")
if wan {
return nil, s.agent.ForceLeaveWAN(addr, prune, entMeta)
} else {
return nil, s.agent.ForceLeave(addr, prune, entMeta)
}
}
// syncChanges is a helper function which wraps a blocking call to sync

View File

@ -2265,6 +2265,74 @@ func TestAgent_ForceLeavePrune(t *testing.T) {
})
}
func TestAgent_ForceLeavePrune_WAN(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a1 := StartTestAgent(t, TestAgent{Name: "dc1", HCL: `
datacenter = "dc1"
primary_datacenter = "dc1"
gossip_wan {
probe_interval = "50ms"
suspicion_mult = 2
}
`})
defer a1.Shutdown()
a2 := StartTestAgent(t, TestAgent{Name: "dc2", HCL: `
datacenter = "dc2"
primary_datacenter = "dc1"
`})
defer a2.Shutdown()
testrpc.WaitForLeader(t, a1.RPC, "dc1")
testrpc.WaitForLeader(t, a2.RPC, "dc2")
// Wait for the WAN join.
addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
_, err := a2.JoinWAN([]string{addr})
require.NoError(t, err)
testrpc.WaitForLeader(t, a1.RPC, "dc2")
testrpc.WaitForLeader(t, a2.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
require.Len(r, a1.WANMembers(), 2)
require.Len(r, a2.WANMembers(), 2)
})
wanNodeName_a2 := a2.Config.NodeName + ".dc2"
// Shutdown and wait for agent being marked as failed, so we wait for full
// shutdown of Agent.
require.NoError(t, a2.Shutdown())
retry.Run(t, func(r *retry.R) {
m := a1.WANMembers()
for _, member := range m {
if member.Name == wanNodeName_a2 {
if member.Status != serf.StatusFailed {
r.Fatalf("got status %q want %q", member.Status, serf.StatusFailed)
}
}
}
})
// Force leave now
req, err := http.NewRequest("PUT", fmt.Sprintf("/v1/agent/force-leave/%s?prune=1&wan=1", wanNodeName_a2), nil)
require.NoError(t, err)
resp := httptest.NewRecorder()
a1.srv.h.ServeHTTP(resp, req)
require.Equal(t, http.StatusOK, resp.Code, resp.Body.String())
retry.Run(t, func(r *retry.R) {
require.Len(r, a1.WANMembers(), 1)
})
}
func TestAgent_RegisterCheck(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -1193,6 +1193,18 @@ func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *structs.Ente
return s.removeFailedNode(removeFn, node, wanNode, entMeta)
}
// RemoveFailedNodeWAN is used to remove a failed node from the WAN cluster.
func (s *Server) RemoveFailedNodeWAN(wanNode string, prune bool, entMeta *structs.EnterpriseMeta) error {
var removeFn func(*serf.Serf, string) error
if prune {
removeFn = (*serf.Serf).RemoveFailedNodePrune
} else {
removeFn = (*serf.Serf).RemoveFailedNode
}
return s.removeFailedNode(removeFn, "", wanNode, entMeta)
}
// IsLeader checks if this server is the cluster leader
func (s *Server) IsLeader() bool {
return s.raft.State() == raft.Leader

View File

@ -26,6 +26,8 @@ func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int,
}
// removeFailedNode is used to remove a failed node from the cluster
//
// if node is empty, just remove wanNode from the WAN
func (s *Server) removeFailedNode(
removeFn func(*serf.Serf, string) error,
node, wanNode string,
@ -42,11 +44,13 @@ func (s *Server) removeFailedNode(
var merr error
if node != "" {
if found, err := maybeRemove(s.serfLAN, node); err != nil {
merr = multierror.Append(merr, fmt.Errorf("could not remove failed node from LAN: %w", err))
} else if found {
foundAny = true
}
}
if s.serfWAN != nil {
if found, err := maybeRemove(s.serfWAN, wanNode); err != nil {

View File

@ -1021,25 +1021,36 @@ func (a *Agent) Leave() error {
return nil
}
type ForceLeaveOpts struct {
// Prune indicates if we should remove a failed agent from the list of
// members in addition to ejecting it.
Prune bool
// WAN indicates that the request should exclusively target the WAN pool.
WAN bool
}
// ForceLeave is used to have the agent eject a failed node
func (a *Agent) ForceLeave(node string) error {
r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node)
_, resp, err := a.c.doRequest(r)
if err != nil {
return err
}
defer closeResponseBody(resp)
if err := requireOK(resp); err != nil {
return err
}
return nil
return a.ForceLeaveOpts(node, ForceLeaveOpts{})
}
// ForceLeavePrune is used to have an a failed agent removed
// from the list of members
func (a *Agent) ForceLeavePrune(node string) error {
return a.ForceLeaveOpts(node, ForceLeaveOpts{Prune: true})
}
// ForceLeaveOpts is used to have the agent eject a failed node or remove it
// completely from the list of members.
func (a *Agent) ForceLeaveOpts(node string, opts ForceLeaveOpts) error {
r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node)
if opts.Prune {
r.params.Set("prune", "1")
}
if opts.WAN {
r.params.Set("wan", "1")
}
_, resp, err := a.c.doRequest(r)
if err != nil {
return err

View File

@ -6,6 +6,7 @@ import (
"github.com/mitchellh/cli"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/flags"
)
@ -21,14 +22,17 @@ type cmd struct {
http *flags.HTTPFlags
help string
//flags
// flags
prune bool
wan bool
}
func (c *cmd) init() {
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
c.flags.BoolVar(&c.prune, "prune", false,
"Remove agent completely from list of members")
c.flags.BoolVar(&c.wan, "wan", false,
"Exclusively leave the agent from the WAN serf pool.")
c.http = &flags.HTTPFlags{}
flags.Merge(c.flags, c.http.ClientFlags())
flags.Merge(c.flags, c.http.PartitionFlag())
@ -54,12 +58,10 @@ func (c *cmd) Run(args []string) int {
return 1
}
if c.prune {
err = client.Agent().ForceLeavePrune(nodes[0])
} else {
err = client.Agent().ForceLeave(nodes[0])
}
err = client.Agent().ForceLeaveOpts(nodes[0], api.ForceLeaveOpts{
Prune: c.prune,
WAN: c.wan,
})
if err != nil {
c.UI.Error(fmt.Sprintf("Error force leaving: %s", err))
return 1
@ -88,4 +90,5 @@ Usage: consul force-leave [options] name
time before eventually reaping them.
-prune Remove agent completely from list of members
-wan Exclusively leave the agent from the WAN serf pool.
`