fcaa889116
* Changes default Raft protocol to 3. * Changes numPeers() to report only voters. This should have been there before, but it's more obvious that this is incorrect now that we default the Raft protocol to 3, which puts new servers in a read-only state while Autopilot waits for them to become healthy. * Fixes TestLeader_RollRaftServer. * Fixes TestOperator_RaftRemovePeerByAddress. * Fixes TestServer_*. Relaxed the check for a given number of voter peers and instead do a thorough check that all servers see each other in their Raft configurations. * Fixes TestACL_*. These now just check for Raft replication to be set up, and don't care about the number of voter peers. * Fixes TestOperator_Raft_ListPeers. * Fixes TestAutopilot_CleanupDeadServerPeriodic. * Fixes TestCatalog_ListNodes_ConsistentRead_Fail. * Fixes TestLeader_ChangeServerID and adjusts the conn pool to throw away sockets when it sees io.EOF. * Changes version to 1.0.0 in the options doc. * Makes metrics test more deterministic with autopilot metrics possible.
371 lines
9.4 KiB
Go
371 lines
9.4 KiB
Go
package consul
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"reflect"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/testrpc"
|
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
|
"github.com/hashicorp/raft"
|
|
)
|
|
|
|
func TestOperator_RaftGetConfiguration(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
arg := structs.DCSpecificRequest{
|
|
Datacenter: "dc1",
|
|
}
|
|
var reply structs.RaftConfigurationResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if len(future.Configuration().Servers) != 1 {
|
|
t.Fatalf("bad: %v", future.Configuration().Servers)
|
|
}
|
|
me := future.Configuration().Servers[0]
|
|
expected := structs.RaftConfigurationResponse{
|
|
Servers: []*structs.RaftServer{
|
|
&structs.RaftServer{
|
|
ID: me.ID,
|
|
Node: s1.config.NodeName,
|
|
Address: me.Address,
|
|
Leader: true,
|
|
Voter: true,
|
|
},
|
|
},
|
|
Index: future.Index(),
|
|
}
|
|
if !reflect.DeepEqual(reply, expected) {
|
|
t.Fatalf("bad: %v", reply)
|
|
}
|
|
}
|
|
|
|
func TestOperator_RaftGetConfiguration_ACLDeny(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "deny"
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// Make a request with no token to make sure it gets denied.
|
|
arg := structs.DCSpecificRequest{
|
|
Datacenter: "dc1",
|
|
}
|
|
var reply structs.RaftConfigurationResponse
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply)
|
|
if !acl.IsErrPermissionDenied(err) {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create an ACL with operator read permissions.
|
|
var token string
|
|
{
|
|
var rules = `
|
|
operator = "read"
|
|
`
|
|
|
|
req := structs.ACLRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.ACLSet,
|
|
ACL: structs.ACL{
|
|
Name: "User token",
|
|
Type: structs.ACLTypeClient,
|
|
Rules: rules,
|
|
},
|
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
|
}
|
|
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Now it should go through.
|
|
arg.Token = token
|
|
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftGetConfiguration", &arg, &reply); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if len(future.Configuration().Servers) != 1 {
|
|
t.Fatalf("bad: %v", future.Configuration().Servers)
|
|
}
|
|
me := future.Configuration().Servers[0]
|
|
expected := structs.RaftConfigurationResponse{
|
|
Servers: []*structs.RaftServer{
|
|
&structs.RaftServer{
|
|
ID: me.ID,
|
|
Node: s1.config.NodeName,
|
|
Address: me.Address,
|
|
Leader: true,
|
|
Voter: true,
|
|
},
|
|
},
|
|
Index: future.Index(),
|
|
}
|
|
if !reflect.DeepEqual(reply, expected) {
|
|
t.Fatalf("bad: %v", reply)
|
|
}
|
|
}
|
|
|
|
func TestOperator_RaftRemovePeerByAddress(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServer(t)
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// Try to remove a peer that's not there.
|
|
arg := structs.RaftRemovePeerRequest{
|
|
Datacenter: "dc1",
|
|
Address: raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())),
|
|
}
|
|
var reply struct{}
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
|
|
if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Add it manually to Raft.
|
|
{
|
|
id := raft.ServerID("fake-node-id")
|
|
future := s1.raft.AddVoter(id, arg.Address, 0, time.Second)
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Make sure it's there.
|
|
{
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
configuration := future.Configuration()
|
|
if len(configuration.Servers) != 2 {
|
|
t.Fatalf("bad: %v", configuration)
|
|
}
|
|
}
|
|
|
|
// Remove it, now it should go through.
|
|
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Make sure it's not there.
|
|
{
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
configuration := future.Configuration()
|
|
if len(configuration.Servers) != 1 {
|
|
t.Fatalf("bad: %v", configuration)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestOperator_RaftRemovePeerByAddress_ACLDeny(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "deny"
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// Make a request with no token to make sure it gets denied.
|
|
arg := structs.RaftRemovePeerRequest{
|
|
Datacenter: "dc1",
|
|
Address: raft.ServerAddress(s1.config.RPCAddr.String()),
|
|
}
|
|
var reply struct{}
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
|
|
if !acl.IsErrPermissionDenied(err) {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create an ACL with operator write permissions.
|
|
var token string
|
|
{
|
|
var rules = `
|
|
operator = "write"
|
|
`
|
|
|
|
req := structs.ACLRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.ACLSet,
|
|
ACL: structs.ACL{
|
|
Name: "User token",
|
|
Type: structs.ACLTypeClient,
|
|
Rules: rules,
|
|
},
|
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
|
}
|
|
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Now it should kick back for being an invalid config, which means it
|
|
// tried to do the operation.
|
|
arg.Token = token
|
|
err = msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByAddress", &arg, &reply)
|
|
if err == nil || !strings.Contains(err.Error(), "at least one voter") {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestOperator_RaftRemovePeerByID(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// Try to remove a peer that's not there.
|
|
arg := structs.RaftRemovePeerRequest{
|
|
Datacenter: "dc1",
|
|
ID: raft.ServerID("e35bde83-4e9c-434f-a6ef-453f44ee21ea"),
|
|
}
|
|
var reply struct{}
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
|
|
if err == nil || !strings.Contains(err.Error(), "not found in the Raft configuration") {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Add it manually to Raft.
|
|
{
|
|
future := s1.raft.AddVoter(arg.ID, raft.ServerAddress(fmt.Sprintf("127.0.0.1:%d", getPort())), 0, 0)
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Make sure it's there.
|
|
{
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
configuration := future.Configuration()
|
|
if len(configuration.Servers) != 2 {
|
|
t.Fatalf("bad: %v", configuration)
|
|
}
|
|
}
|
|
|
|
// Remove it, now it should go through.
|
|
if err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Make sure it's not there.
|
|
{
|
|
future := s1.raft.GetConfiguration()
|
|
if err := future.Error(); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
configuration := future.Configuration()
|
|
if len(configuration.Servers) != 1 {
|
|
t.Fatalf("bad: %v", configuration)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestOperator_RaftRemovePeerByID_ACLDeny(t *testing.T) {
|
|
t.Parallel()
|
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
c.ACLDatacenter = "dc1"
|
|
c.ACLMasterToken = "root"
|
|
c.ACLDefaultPolicy = "deny"
|
|
c.RaftConfig.ProtocolVersion = 3
|
|
})
|
|
defer os.RemoveAll(dir1)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
defer codec.Close()
|
|
|
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
|
|
// Make a request with no token to make sure it gets denied.
|
|
arg := structs.RaftRemovePeerRequest{
|
|
Datacenter: "dc1",
|
|
ID: raft.ServerID(s1.config.NodeID),
|
|
}
|
|
var reply struct{}
|
|
err := msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
|
|
if !acl.IsErrPermissionDenied(err) {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create an ACL with operator write permissions.
|
|
var token string
|
|
{
|
|
var rules = `
|
|
operator = "write"
|
|
`
|
|
|
|
req := structs.ACLRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.ACLSet,
|
|
ACL: structs.ACL{
|
|
Name: "User token",
|
|
Type: structs.ACLTypeClient,
|
|
Rules: rules,
|
|
},
|
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
|
}
|
|
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &token); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
// Now it should kick back for being an invalid config, which means it
|
|
// tried to do the operation.
|
|
arg.Token = token
|
|
err = msgpackrpc.CallWithCodec(codec, "Operator.RaftRemovePeerByID", &arg, &reply)
|
|
if err == nil || !strings.Contains(err.Error(), "at least one voter") {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|