open-consul/consul/coordinate_endpoint_test.go

202 lines
5.8 KiB
Go

package consul
import (
"fmt"
"math/rand"
"os"
"reflect"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/coordinate"
)
// generateRandomCoordinate creates a random coordinate. This mucks with the
// underlying structure directly, so it's not really useful for any particular
// position in the network, but it's a good payload to send through to make
// sure things come out the other side or get stored correctly.
func generateRandomCoordinate() *coordinate.Coordinate {
config := coordinate.DefaultConfig()
coord := coordinate.NewCoordinate(config)
for i := range coord.Vec {
coord.Vec[i] = rand.NormFloat64()
}
coord.Error = rand.NormFloat64()
coord.Adjustment = rand.NormFloat64()
return coord
}
// verifyCoordinatesEqual will compare a and b and fail if they are not exactly
// equal (no floating point fuzz is considered since we are trying to make sure
// we are getting exactly the coordinates we expect, without math on them).
func verifyCoordinatesEqual(t *testing.T, a, b *coordinate.Coordinate) {
if !reflect.DeepEqual(a, b) {
t.Fatalf("coordinates are not equal: %v != %v", a, b)
}
}
func TestCoordinate_Update(t *testing.T) {
name := fmt.Sprintf("Node %d", getPort())
dir1, config1 := testServerConfig(t, name)
defer os.RemoveAll(dir1)
config1.CoordinateUpdatePeriod = 500 * time.Millisecond
config1.CoordinateUpdateBatchSize = 5
config1.CoordinateUpdateMaxBatches = 2
s1, err := NewServer(config1)
if err != nil {
t.Fatal(err)
}
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
// Send an update for the first node.
arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node1",
Coord: generateRandomCoordinate(),
}
var out struct{}
if err := client.Call("Coordinate.Update", &arg1, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Send an update for the second node.
arg2 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node2",
Coord: generateRandomCoordinate(),
}
if err := client.Call("Coordinate.Update", &arg2, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure the updates did not yet apply because the update period
// hasn't expired.
state := s1.fsm.State()
_, c, err := state.CoordinateGet("node1")
if err != nil {
t.Fatalf("err: %v", err)
}
if c != nil {
t.Fatalf("should be nil because the update should be batched")
}
_, c, err = state.CoordinateGet("node2")
if err != nil {
t.Fatalf("err: %v", err)
}
if c != nil {
t.Fatalf("should be nil because the update should be batched")
}
// Wait a while and the updates should get picked up.
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
_, c, err = state.CoordinateGet("node1")
if err != nil {
t.Fatalf("err: %v", err)
}
if c == nil {
t.Fatalf("should return a coordinate but it's nil")
}
verifyCoordinatesEqual(t, c, arg1.Coord)
_, c, err = state.CoordinateGet("node2")
if err != nil {
t.Fatalf("err: %v", err)
}
if c == nil {
t.Fatalf("should return a coordinate but it's nil")
}
verifyCoordinatesEqual(t, c, arg2.Coord)
// Now spam some coordinate updates and make sure it starts throwing
// them away if they exceed the batch allowance. Node we have to make
// unique names since these are held in map by node name.
spamLen := s1.config.CoordinateUpdateBatchSize * s1.config.CoordinateUpdateMaxBatches + 1
for i := 0; i < spamLen; i++ {
arg1.Node = fmt.Sprintf("bogusnode%d", i)
arg1.Coord = generateRandomCoordinate()
if err := client.Call("Coordinate.Update", &arg1, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
// Wait a little while for the batch routine to run, then make sure
// exactly one of the updates got dropped (we won't know which one).
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
numDropped := 0
for i := 0; i < spamLen; i++ {
_, c, err = state.CoordinateGet(fmt.Sprintf("bogusnode%d", i))
if err != nil {
t.Fatalf("err: %v", err)
}
if c == nil {
numDropped++
}
}
if numDropped != 1 {
t.Fatalf("wrong number of coordinates dropped, %d != 1", numDropped)
}
// Finally, send a coordinate with the wrong dimensionality to make sure
// there are no panics, and that it gets rejected.
arg2.Coord.Vec = make([]float64, 2*len(arg2.Coord.Vec))
err = client.Call("Coordinate.Update", &arg2, &out)
if err == nil || !strings.Contains(err.Error(), "rejected bad coordinate") {
t.Fatalf("should have failed with an error, got %v", err)
}
}
func TestCoordinate_Get(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
arg := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node1",
Coord: generateRandomCoordinate(),
}
// Send an initial update, waiting a little while for the batch update
// to run.
var out struct{}
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(100 * time.Millisecond)
// Query the coordinate via RPC.
arg2 := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: "node1",
}
coord := structs.IndexedCoordinate{}
if err := client.Call("Coordinate.Get", &arg2, &coord); err != nil {
t.Fatalf("err: %v", err)
}
verifyCoordinatesEqual(t, coord.Coord, arg.Coord)
// Send another coordinate update, waiting after for the flush.
arg.Coord = generateRandomCoordinate()
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(100 * time.Millisecond)
// Now re-query and make sure the results are fresh.
if err := client.Call("Coordinate.Get", &arg2, &coord); err != nil {
t.Fatalf("err: %v", err)
}
verifyCoordinatesEqual(t, coord.Coord, arg.Coord)
}