Merge pull request #2847 from hashicorp/remove-peer-by-id

Add CLI/API endpoints for removing peer by ID
This commit is contained in:
Kyle Havlovitz 2017-03-30 10:13:56 -07:00 committed by GitHub
commit 074b8576a5
11 changed files with 312 additions and 28 deletions

View file

@ -227,8 +227,6 @@ func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) err
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
r.setWriteOptions(q)
// TODO (slackpad) Currently we made address a query parameter. Once
// IDs are in place this will be DELETE /v1/operator/raft/peer/<id>.
r.params.Set("address", string(address))
_, resp, err := requireOK(op.c.doRequest(r))
@ -240,6 +238,23 @@ func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) err
return nil
}
// RaftRemovePeerByID is used to kick a stale peer (one that it in the Raft
// quorum but no longer known to Serf or the catalog) by ID.
func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
r := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
r.setWriteOptions(q)
r.params.Set("id", string(id))
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// KeyringInstall is used to install a new gossip encryption key into the cluster
func (op *Operator) KeyringInstall(key string, q *WriteOptions) error {
r := op.c.newRequest("POST", "/v1/operator/keyring")

View file

@ -42,23 +42,40 @@ func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Reques
return nil, nil
}
var args structs.RaftPeerByAddressRequest
var args structs.RaftRemovePeerRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
params := req.URL.Query()
if _, ok := params["address"]; ok {
_, hasID := params["id"]
if hasID {
args.ID = raft.ServerID(params.Get("id"))
}
_, hasAddress := params["address"]
if hasAddress {
args.Address = raft.ServerAddress(params.Get("address"))
} else {
}
if !hasID && !hasAddress {
resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte("Must specify ?address with IP:port of peer to remove"))
resp.Write([]byte("Must specify either ?id with the server's ID or ?address with IP:port of peer to remove"))
return nil, nil
}
if hasID && hasAddress {
resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte("Must specify only one of ?id or ?address"))
return nil, nil
}
var reply struct{}
if err := s.agent.RPC("Operator.RaftRemovePeerByAddress", &args, &reply); err != nil {
method := "Operator.RaftRemovePeerByID"
if hasAddress {
method = "Operator.RaftRemovePeerByAddress"
}
if err := s.agent.RPC(method, &args, &reply); err != nil {
return nil, err
}
return nil, nil
}

View file

@ -59,6 +59,23 @@ func TestOperator_RaftPeer(t *testing.T) {
t.Fatalf("err: %v", err)
}
})
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("DELETE", "/v1/operator/raft/peer?id=nope", body)
if err != nil {
t.Fatalf("err: %v", err)
}
// If we get this error, it proves we sent the ID all the
// way through.
resp := httptest.NewRecorder()
_, err = srv.OperatorRaftPeer(resp, req)
if err == nil || !strings.Contains(err.Error(),
"id \"nope\" was not found in the Raft configuration") {
t.Fatalf("err: %v", err)
}
})
}
func TestOperator_KeyringInstall(t *testing.T) {

View file

@ -87,7 +87,7 @@ func (c *OperatorRaftCommand) raft(args []string) error {
}
c.Ui.Output(result)
} else if removePeer {
if err := raftRemovePeers(address, operator); err != nil {
if err := raftRemovePeers(address, "", operator); err != nil {
return fmt.Errorf("Error removing peer: %v", err)
}
c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address))

View file

@ -38,9 +38,11 @@ func (c *OperatorRaftRemoveCommand) Synopsis() string {
func (c *OperatorRaftRemoveCommand) Run(args []string) int {
f := c.Command.NewFlagSet(c)
var address string
var address, id string
f.StringVar(&address, "address", "",
"The address to remove from the Raft configuration.")
f.StringVar(&id, "id", "",
"The ID to remove from the Raft configuration.")
if err := c.Command.Parse(args); err != nil {
if err == flag.ErrHelp {
@ -58,26 +60,37 @@ func (c *OperatorRaftRemoveCommand) Run(args []string) int {
}
// Fetch the current configuration.
if err := raftRemovePeers(address, client.Operator()); err != nil {
if err := raftRemovePeers(address, id, client.Operator()); err != nil {
c.Ui.Error(fmt.Sprintf("Error removing peer: %v", err))
return 1
}
if address != "" {
c.Ui.Output(fmt.Sprintf("Removed peer with address %q", address))
} else {
c.Ui.Output(fmt.Sprintf("Removed peer with id %q", id))
}
return 0
}
func raftRemovePeers(address string, operator *api.Operator) error {
// TODO (slackpad) Once we expose IDs, add support for removing
// by ID, add support for that.
if len(address) == 0 {
return fmt.Errorf("an address is required for the peer to remove")
func raftRemovePeers(address, id string, operator *api.Operator) error {
if len(address) == 0 && len(id) == 0 {
return fmt.Errorf("an address or id is required for the peer to remove")
}
if len(address) > 0 && len(id) > 0 {
return fmt.Errorf("cannot give both an address and id")
}
// Try to kick the peer.
if len(address) > 0 {
if err := operator.RaftRemovePeerByAddress(address, nil); err != nil {
return err
}
} else {
if err := operator.RaftRemovePeerByID(id, nil); err != nil {
return err
}
}
return nil
}

View file

@ -56,4 +56,27 @@ func TestOperator_Raft_RemovePeer(t *testing.T) {
t.Fatalf("bad: %s", output)
}
}
// Test the remove-peer subcommand with -id
{
ui := new(cli.MockUi)
c := OperatorRaftRemoveCommand{
Command: base.Command{
Ui: ui,
Flags: base.FlagSetHTTP,
},
}
args := []string{"-http-addr=" + a1.httpAddr, "-id=nope"}
code := c.Run(args)
if code != 1 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
// If we get this error, it proves we sent the address all they through.
output := strings.TrimSpace(ui.ErrorWriter.String())
if !strings.Contains(output, "id \"nope\" was not found in the Raft configuration") {
t.Fatalf("bad: %s", output)
}
}
}

View file

@ -74,7 +74,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply
// quorum but no longer known to Serf or the catalog) by address in the form of
// "IP:port". The reply argument is not used, but it required to fulfill the RPC
// interface.
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressRequest, reply *struct{}) error {
func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done {
return err
}
@ -99,6 +99,7 @@ func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressReque
}
for _, s := range future.Configuration().Servers {
if s.Address == args.Address {
args.ID = s.ID
goto REMOVE
}
}
@ -115,7 +116,17 @@ REMOVE:
// doing if you are calling this. If you remove a peer that's known to
// Serf, for example, it will come back when the leader does a reconcile
// pass.
future := op.srv.raft.RemovePeer(args.Address)
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
if err != nil {
return err
}
var future raft.Future
if minRaftProtocol >= 2 {
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
} else {
future = op.srv.raft.RemovePeer(args.Address)
}
if err := future.Error(); err != nil {
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer %q: %v",
args.Address, err)
@ -126,6 +137,73 @@ REMOVE:
return nil
}
// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft
// quorum but no longer known to Serf or the catalog) by address in the form of
// "IP:port". The reply argument is not used, but is required to fulfill the RPC
// interface.
func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error {
if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done {
return err
}
// This is a super dangerous operation that requires operator write
// access.
acl, err := op.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && !acl.OperatorWrite() {
return permissionDeniedErr
}
// Since this is an operation designed for humans to use, we will return
// an error if the supplied id isn't among the peers since it's
// likely they screwed up.
{
future := op.srv.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
for _, s := range future.Configuration().Servers {
if s.ID == args.ID {
args.Address = s.Address
goto REMOVE
}
}
return fmt.Errorf("id %q was not found in the Raft configuration",
args.ID)
}
REMOVE:
// The Raft library itself will prevent various forms of foot-shooting,
// like making a configuration with no voters. Some consideration was
// given here to adding more checks, but it was decided to make this as
// low-level and direct as possible. We've got ACL coverage to lock this
// down, and if you are an operator, it's assumed you know what you are
// doing if you are calling this. If you remove a peer that's known to
// Serf, for example, it will come back when the leader does a reconcile
// pass.
minRaftProtocol, err := ServerMinRaftProtocol(op.srv.serfLAN.Members())
if err != nil {
return err
}
var future raft.Future
if minRaftProtocol >= 2 {
future = op.srv.raft.RemoveServer(args.ID, 0, 0)
} else {
future = op.srv.raft.RemovePeer(args.Address)
}
if err := future.Error(); err != nil {
op.srv.logger.Printf("[WARN] consul.operator: Failed to remove Raft peer with id %q: %v",
args.ID, err)
return err
}
op.srv.logger.Printf("[WARN] consul.operator: Removed Raft peer with id %q", args.ID)
return nil
}
// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error {
if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {

View file

@ -143,7 +143,7 @@ func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Try to remove a peer that's not there.
arg := structs.RaftPeerByAddressRequest{
arg := structs.RaftRemovePeerRequest{
Datacenter: "dc1",
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())),
}
@ -205,7 +205,7 @@ func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Make a request with no token to make sure it gets denied.
arg := structs.RaftPeerByAddressRequest{
arg := structs.RaftRemovePeerRequest{
Datacenter: "dc1",
Address: raft.ServerAddress(s1.config.RPCAddr.String()),
}
@ -246,6 +246,122 @@ func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) {
}
}
func TestOperator_RaftRemovePeerByID(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.RaftConfig.ProtocolVersion = 3
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Try to remove a peer that's not there.
arg := structs.RaftRemovePeerRequest{
Datacenter: "dc1",
ID: raft.ServerID("e35bde83-4e9c-434f-a6ef-453f44ee21ea"),
}
var reply struct{}
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") {
t.Fatalf("err: %v", err)
}
// Add it manually to Raft.
{
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())), 0, 0)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
}
// Make sure it's there.
{
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
configuration := future.Configuration()
if len(configuration.Servers) != 2 {
t.Fatalf("bad: %v", configuration)
}
}
// Remove it, now it should go through.
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's not there.
{
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
configuration := future.Configuration()
if len(configuration.Servers) != 1 {
t.Fatalf("bad: %v", configuration)
}
}
}
func TestOperator_RaftRemovePeerByID_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.RaftConfig.ProtocolVersion = 3
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Make a request with no token to make sure it gets denied.
arg := structs.RaftRemovePeerRequest{
Datacenter: "dc1",
ID: raft.ServerID(s1.config.NodeID),
}
var reply struct{}
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
t.Fatalf("err: %v", err)
}
// Create an ACL with operator write permissions.
var token string
{
var rules = `
operator = "write"
`
req := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: rules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
t.Fatalf("err: %v", err)
}
}
// Now it should kick back for being an invalid config, which means it
// tried to do the operation.
arg.Token = token
err = msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
if err == nil || !strings.Contains(err.Error(), "at least one voter") {
t.Fatalf("err: %v", err)
}
}
func TestOperator_Autopilot_GetConfiguration(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.AutopilotConfig.CleanupDeadServers = false

View file

@ -73,21 +73,24 @@ type RaftConfigurationResponse struct {
Index uint64
}
// RaftPeerByAddressRequest is used by the Operator endpoint to apply a Raft
// RaftRemovePeerRequest is used by the Operator endpoint to apply a Raft
// operation on a specific Raft peer by address in the form of "IP:port".
type RaftPeerByAddressRequest struct {
type RaftRemovePeerRequest struct {
// Datacenter is the target this request is intended for.
Datacenter string
// Address is the peer to remove, in the form "IP:port".
Address raft.ServerAddress
// ID is the peer ID to remove.
ID raft.ServerID
// WriteRequest holds the ACL token to go along with this request.
WriteRequest
}
// RequestDatacenter returns the datacenter for a given request.
func (op *RaftPeerByAddressRequest) RequestDatacenter() string {
func (op *RaftRemovePeerRequest) RequestDatacenter() string {
return op.Datacenter
}

View file

@ -688,9 +688,9 @@ even though the server is no longer present and known to the cluster. This
endpoint can be used to remove the failed server so that it is no longer
affects the Raft quorum.
An `?address=` query parameter is required and should be set to the
`IP:port` for the server to remove. The port number is usually 8300, unless
configured otherwise. Nothing is required in the body of the request.
Either an `?id=` or `?address=` query parameter is required and should be set to the
peer ID or `IP:port` respectively for the server to remove. The port number is usually
8300, unless configured otherwise. Nothing is required in the body of the request.
By default, the datacenter of the agent is targeted; however, the `dc` can be
provided using the `?dc=` query parameter.

View file

@ -78,4 +78,6 @@ Usage: `consul operator raft remove-peer -address="IP:port"`
* `-address` - "IP:port" for the server to remove. The port number is usually
8300, unless configured otherwise.
* `-id` - ID of the server to remove.
The return code will indicate success or failure.