Moves batching down into the state store and changes it to fail-fast.
* A batch of updates is done all in a single transaction. * We no longer need to get an update to kick things, there's a periodic flush. * If incoming updates overwhelm the configured flush rate they will be dumped with an error.
This commit is contained in:
parent
b6c31bdf2f
commit
acb0dce829
|
@ -580,7 +580,6 @@ func (a *Agent) sendCoordinate() {
|
|||
req := structs.CoordinateUpdateRequest{
|
||||
Datacenter: a.config.Datacenter,
|
||||
Node: a.config.NodeName,
|
||||
Op: structs.CoordinateUpdate,
|
||||
Coord: c,
|
||||
WriteRequest: structs.WriteRequest{Token: a.config.ACLToken},
|
||||
}
|
||||
|
|
|
@ -376,9 +376,10 @@ type Config struct {
|
|||
DisableCoordinates bool `mapstructure:"disable_coordinates" json:"-"`
|
||||
|
||||
// SyncCoordinateInterval controls the interval for sending network
|
||||
// coordinates to the server. Defaults to every 15s, but scales up as
|
||||
// coordinates to the server. Defaults to every 20s, but scales up as
|
||||
// the number of nodes increases in the network, to prevent servers from
|
||||
// being overwhelmed.
|
||||
// being overwhelmed. If you update this, you may need to adjust the
|
||||
// tuning of CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
|
||||
SyncCoordinateInterval time.Duration `mapstructure:"-" json:"-"`
|
||||
|
||||
// Checks holds the provided check definitions
|
||||
|
@ -476,7 +477,7 @@ func DefaultConfig() *Config {
|
|||
CheckUpdateInterval: 5 * time.Minute,
|
||||
AEInterval: time.Minute,
|
||||
DisableCoordinates: false,
|
||||
SyncCoordinateInterval: 15 * time.Second,
|
||||
SyncCoordinateInterval: 20 * time.Second,
|
||||
ACLTTL: 30 * time.Second,
|
||||
ACLDownPolicy: "extend-cache",
|
||||
ACLDefaultPolicy: "allow",
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||
|
@ -807,7 +808,8 @@ func TestAgent_nestedPauseResume(t *testing.T) {
|
|||
func TestAgent_sendCoordinate(t *testing.T) {
|
||||
conf := nextConfig()
|
||||
conf.SyncCoordinateInterval = 10 * time.Millisecond
|
||||
conf.ConsulConfig.CoordinateUpdatePeriod = 0 * time.Millisecond
|
||||
conf.ConsulConfig.CoordinateUpdatePeriod = 100 * time.Millisecond
|
||||
conf.ConsulConfig.CoordinateUpdateMaxBatchSize = 20
|
||||
dir, agent := makeAgent(t, conf)
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
@ -815,7 +817,7 @@ func TestAgent_sendCoordinate(t *testing.T) {
|
|||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
// Wait a little while for an update.
|
||||
time.Sleep(3 * conf.SyncCoordinateInterval)
|
||||
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
|
||||
|
||||
// Make sure the coordinate is present.
|
||||
req := structs.NodeSpecificRequest{
|
||||
|
@ -829,4 +831,45 @@ func TestAgent_sendCoordinate(t *testing.T) {
|
|||
if reply.Coord == nil {
|
||||
t.Fatalf("should get a coordinate")
|
||||
}
|
||||
|
||||
// Start spamming for a little while to get rate limit errors back from
|
||||
// the server.
|
||||
conf.SyncCoordinateInterval = 1 * time.Millisecond
|
||||
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
|
||||
|
||||
// Slow down and let the server catch up.
|
||||
conf.SyncCoordinateInterval = 10 * time.Millisecond
|
||||
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
|
||||
|
||||
// Inject a random coordinate so we can confirm that the periodic process
|
||||
// is still able to update it.
|
||||
zeroCoord := &coordinate.Coordinate{}
|
||||
func() {
|
||||
req := structs.CoordinateUpdateRequest{
|
||||
Datacenter: agent.config.Datacenter,
|
||||
Node: agent.config.NodeName,
|
||||
Coord: zeroCoord,
|
||||
WriteRequest: structs.WriteRequest{Token: agent.config.ACLToken},
|
||||
}
|
||||
var reply struct{}
|
||||
if err := agent.RPC("Coordinate.Update", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait a little while for the injected update, as well as periodic ones
|
||||
// to fire.
|
||||
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
|
||||
|
||||
// Make sure the injected coordinate is not the one that's present.
|
||||
req = structs.NodeSpecificRequest{
|
||||
Datacenter: agent.config.Datacenter,
|
||||
Node: agent.config.NodeName,
|
||||
}
|
||||
if err := agent.RPC("Coordinate.Get", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reflect.DeepEqual(zeroCoord, reply.Coord) {
|
||||
t.Fatalf("should not have gotten the zero coordinate")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,14 +18,17 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// This scale factor means we will add a minute after we
|
||||
// cross 128 nodes, another at 256, another at 512, etc.
|
||||
// By 8192 nodes, we will scale up by a factor of 8
|
||||
// This scale factor means we will add a minute after we cross 128 nodes,
|
||||
// another at 256, another at 512, etc. By 8192 nodes, we will scale up
|
||||
// by a factor of 8.
|
||||
//
|
||||
// If you update this, you may need to adjust the tuning of
|
||||
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
|
||||
aeScaleThreshold = 128
|
||||
)
|
||||
|
||||
// aeScale is used to scale the time interval at which anti-entropy
|
||||
// take place. It is used to prevent saturation as the cluster size grows
|
||||
// aeScale is used to scale the time interval at which anti-entropy and coordinate
|
||||
// updates take place. It is used to prevent saturation as the cluster size grows.
|
||||
func aeScale(interval time.Duration, n int) time.Duration {
|
||||
// Don't scale until we cross the threshold
|
||||
if n <= aeScaleThreshold {
|
||||
|
|
|
@ -255,23 +255,29 @@ func DefaultConfig() *Config {
|
|||
}
|
||||
|
||||
conf := &Config{
|
||||
Datacenter: DefaultDC,
|
||||
NodeName: hostname,
|
||||
RPCAddr: DefaultRPCAddr,
|
||||
RaftConfig: raft.DefaultConfig(),
|
||||
SerfLANConfig: serf.DefaultConfig(),
|
||||
SerfWANConfig: serf.DefaultConfig(),
|
||||
ReconcileInterval: 60 * time.Second,
|
||||
ProtocolVersion: ProtocolVersionMax,
|
||||
ACLTTL: 30 * time.Second,
|
||||
ACLDefaultPolicy: "allow",
|
||||
ACLDownPolicy: "extend-cache",
|
||||
TombstoneTTL: 15 * time.Minute,
|
||||
TombstoneTTLGranularity: 30 * time.Second,
|
||||
SessionTTLMin: 10 * time.Second,
|
||||
DisableCoordinates: false,
|
||||
CoordinateUpdatePeriod: 30 * time.Second,
|
||||
CoordinateUpdateMaxBatchSize: 1000,
|
||||
Datacenter: DefaultDC,
|
||||
NodeName: hostname,
|
||||
RPCAddr: DefaultRPCAddr,
|
||||
RaftConfig: raft.DefaultConfig(),
|
||||
SerfLANConfig: serf.DefaultConfig(),
|
||||
SerfWANConfig: serf.DefaultConfig(),
|
||||
ReconcileInterval: 60 * time.Second,
|
||||
ProtocolVersion: ProtocolVersionMax,
|
||||
ACLTTL: 30 * time.Second,
|
||||
ACLDefaultPolicy: "allow",
|
||||
ACLDownPolicy: "extend-cache",
|
||||
TombstoneTTL: 15 * time.Minute,
|
||||
TombstoneTTLGranularity: 30 * time.Second,
|
||||
SessionTTLMin: 10 * time.Second,
|
||||
DisableCoordinates: false,
|
||||
|
||||
// SyncCoordinateInterval defaults to 20 seconds, and scales up
|
||||
// as the number of nodes in the cluster goes up. For 100k nodes,
|
||||
// it will move up to 201 seconds, which gives an update rate of
|
||||
// just under 500 updates per second. We will split this into 2
|
||||
// batches.
|
||||
CoordinateUpdatePeriod: 500 * time.Millisecond,
|
||||
CoordinateUpdateMaxBatchSize: 250,
|
||||
}
|
||||
|
||||
// Increase our reap interval to 3 days instead of 24h.
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
|
@ -12,56 +12,69 @@ type Coordinate struct {
|
|||
// srv is a pointer back to the server.
|
||||
srv *Server
|
||||
|
||||
// updateLastSent is the last time we flushed pending coordinate updates
|
||||
// to the Raft log. CoordinateUpdatePeriod is used to control how long we
|
||||
// wait before doing an update (that time, or hitting more than the
|
||||
// configured CoordinateUpdateMaxBatchSize, whichever comes first).
|
||||
updateLastSent time.Time
|
||||
|
||||
// updateBuffer holds the pending coordinate updates, waiting to be
|
||||
// flushed to the Raft log.
|
||||
updateBuffer []*structs.CoordinateUpdateRequest
|
||||
|
||||
// updateBufferLock manages concurrent access to updateBuffer.
|
||||
updateBufferLock sync.Mutex
|
||||
// updateCh receives coordinate updates and applies them to the raft log
|
||||
// in batches so that we don't create tons of tiny transactions.
|
||||
updateCh chan *structs.Coordinate
|
||||
}
|
||||
|
||||
// NewCoordinate returns a new Coordinate endpoint.
|
||||
func NewCoordinate(srv *Server) *Coordinate {
|
||||
return &Coordinate{
|
||||
srv: srv,
|
||||
updateLastSent: time.Now(),
|
||||
len := srv.config.CoordinateUpdateMaxBatchSize
|
||||
c := &Coordinate{
|
||||
srv: srv,
|
||||
updateCh: make(chan *structs.Coordinate, len),
|
||||
}
|
||||
|
||||
// This will flush all pending updates at a fixed period.
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(srv.config.CoordinateUpdatePeriod):
|
||||
c.batchApplyUpdates()
|
||||
case <-srv.shutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// batchApplyUpdates is a non-blocking routine that applies all pending updates
|
||||
// to the Raft log.
|
||||
func (c *Coordinate) batchApplyUpdates() {
|
||||
var updates []*structs.Coordinate
|
||||
for done := false; !done; {
|
||||
select {
|
||||
case update := <-c.updateCh:
|
||||
updates = append(updates, update)
|
||||
default:
|
||||
done = true
|
||||
}
|
||||
}
|
||||
|
||||
if len(updates) > 0 {
|
||||
_, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, updates)
|
||||
if err != nil {
|
||||
c.srv.logger.Printf("[ERR] consul.coordinate: Batch update failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update handles requests to update the LAN coordinate of a node.
|
||||
// Update inserts or updates the LAN coordinate of a node.
|
||||
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error {
|
||||
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
c.updateBufferLock.Lock()
|
||||
defer c.updateBufferLock.Unlock()
|
||||
c.updateBuffer = append(c.updateBuffer, args)
|
||||
|
||||
// Process updates in batches to avoid tons of small transactions against
|
||||
// the Raft log.
|
||||
shouldFlush := time.Since(c.updateLastSent) > c.srv.config.CoordinateUpdatePeriod ||
|
||||
len(c.updateBuffer) > c.srv.config.CoordinateUpdateMaxBatchSize
|
||||
if shouldFlush {
|
||||
// This transaction could take a while so we don't block here.
|
||||
buf := c.updateBuffer
|
||||
go func() {
|
||||
_, err := c.srv.raftApply(structs.CoordinateRequestType, buf)
|
||||
if err != nil {
|
||||
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// We clear the buffer regardless of whether the raft transaction
|
||||
// succeeded, just so the buffer doesn't keep growing without bound.
|
||||
c.updateLastSent = time.Now()
|
||||
c.updateBuffer = nil
|
||||
// Perform a non-blocking write to the channel. We'd rather spill updates
|
||||
// than gum things up blocking here.
|
||||
update := &structs.Coordinate{Node: args.Node, Coord: args.Coord}
|
||||
select {
|
||||
case c.updateCh <- update:
|
||||
// This is a noop - we are done if the write went through.
|
||||
default:
|
||||
return fmt.Errorf("Coordinate update rate limit exceeded, increase SyncCoordinateInterval")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"math/rand"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -29,7 +30,8 @@ func generateRandomCoordinate() *coordinate.Coordinate {
|
|||
}
|
||||
|
||||
// verifyCoordinatesEqual will compare a and b and fail if they are not exactly
|
||||
// equal (no floating point fuzz is considered).
|
||||
// equal (no floating point fuzz is considered since we are trying to make sure
|
||||
// we are getting exactly the coordinates we expect, without math on them).
|
||||
func verifyCoordinatesEqual(t *testing.T, a, b *coordinate.Coordinate) {
|
||||
if !reflect.DeepEqual(a, b) {
|
||||
t.Fatalf("coordinates are not equal: %v != %v", a, b)
|
||||
|
@ -41,7 +43,7 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
dir1, config1 := testServerConfig(t, name)
|
||||
defer os.RemoveAll(dir1)
|
||||
|
||||
config1.CoordinateUpdatePeriod = 1 * time.Second
|
||||
config1.CoordinateUpdatePeriod = 500 * time.Millisecond
|
||||
config1.CoordinateUpdateMaxBatchSize = 5
|
||||
s1, err := NewServer(config1)
|
||||
if err != nil {
|
||||
|
@ -53,28 +55,29 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
defer client.Close()
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
|
||||
// Send an update for the first node.
|
||||
arg1 := structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Op: structs.CoordinateUpdate,
|
||||
Coord: generateRandomCoordinate(),
|
||||
}
|
||||
|
||||
arg2 := structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
Op: structs.CoordinateUpdate,
|
||||
Coord: generateRandomCoordinate(),
|
||||
}
|
||||
|
||||
// Send an update for the first node.
|
||||
var out struct{}
|
||||
if err := client.Call("Coordinate.Update", &arg1, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure the update did not yet apply because the batching thresholds
|
||||
// haven't yet been met.
|
||||
// Send an update for the second node.
|
||||
arg2 := structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
Coord: generateRandomCoordinate(),
|
||||
}
|
||||
if err := client.Call("Coordinate.Update", &arg2, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure the updates did not yet apply because the update period
|
||||
// hasn't expired.
|
||||
state := s1.fsm.State()
|
||||
_, d, err := state.CoordinateGet("node1")
|
||||
if err != nil {
|
||||
|
@ -83,18 +86,16 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
if d != nil {
|
||||
t.Fatalf("should be nil because the update should be batched")
|
||||
}
|
||||
|
||||
// Wait a while and send another update. This time both updates should
|
||||
// be applied.
|
||||
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
|
||||
if err := client.Call("Coordinate.Update", &arg2, &out); err != nil {
|
||||
_, d, err = state.CoordinateGet("node2")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d != nil {
|
||||
t.Fatalf("should be nil because the update should be batched")
|
||||
}
|
||||
|
||||
// Wait a little while so the flush goroutine can run, then make sure
|
||||
// both coordinates made it in.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Wait a while and the updates should get picked up.
|
||||
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
|
||||
_, d, err = state.CoordinateGet("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -103,7 +104,6 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
t.Fatalf("should return a coordinate but it's nil")
|
||||
}
|
||||
verifyCoordinatesEqual(t, d.Coord, arg1.Coord)
|
||||
|
||||
_, d, err = state.CoordinateGet("node2")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -113,19 +113,25 @@ func TestCoordinate_Update(t *testing.T) {
|
|||
}
|
||||
verifyCoordinatesEqual(t, d.Coord, arg2.Coord)
|
||||
|
||||
// Now try spamming coordinates and make sure it flushes when the batch
|
||||
// size is hit.
|
||||
for i := 0; i < (s1.config.CoordinateUpdateMaxBatchSize + 1); i++ {
|
||||
// Now try spamming coordinates and make sure it starts dropping when
|
||||
// the pipe is full.
|
||||
for i := 0; i < s1.config.CoordinateUpdateMaxBatchSize; i++ {
|
||||
arg1.Coord = generateRandomCoordinate()
|
||||
if err := client.Call("Coordinate.Update", &arg1, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait a little while so the flush goroutine can run, then make sure
|
||||
// the last coordinate update made it in.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// This one should get dropped.
|
||||
arg2.Coord = generateRandomCoordinate()
|
||||
err = client.Call("Coordinate.Update", &arg2, &out)
|
||||
if err == nil || !strings.Contains(err.Error(), "rate limit") {
|
||||
t.Fatalf("should have failed with a rate limit error, got %v", err)
|
||||
}
|
||||
|
||||
// Wait a little while for the batch routine to run, then make sure
|
||||
// all but the last coordinate update made it in.
|
||||
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
|
||||
_, d, err = state.CoordinateGet("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -148,11 +154,10 @@ func TestCoordinate_Get(t *testing.T) {
|
|||
arg := structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Op: structs.CoordinateUpdate,
|
||||
Coord: generateRandomCoordinate(),
|
||||
}
|
||||
|
||||
// Send an initial update, waiting a little while for the flush goroutine
|
||||
// Send an initial update, waiting a little while for the batch update
|
||||
// to run.
|
||||
var out struct{}
|
||||
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
|
||||
|
|
|
@ -89,8 +89,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
|||
return c.applyACLOperation(buf[1:], log.Index)
|
||||
case structs.TombstoneRequestType:
|
||||
return c.applyTombstoneOperation(buf[1:], log.Index)
|
||||
case structs.CoordinateRequestType:
|
||||
return c.applyCoordinateOperation(buf[1:], log.Index)
|
||||
case structs.CoordinateBatchUpdateType:
|
||||
return c.applyCoordinateBatchUpdate(buf[1:], log.Index)
|
||||
default:
|
||||
if ignoreUnknown {
|
||||
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
|
||||
|
@ -248,23 +248,18 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
|
|||
}
|
||||
}
|
||||
|
||||
func (c *consulFSM) applyCoordinateOperation(buf []byte, index uint64) interface{} {
|
||||
var reqs []*structs.CoordinateUpdateRequest
|
||||
if err := structs.Decode(buf, &reqs); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
// applyCoordinateBatchUpdate processes a batch of coordinate updates and applies
|
||||
// them in a single underlying transaction. This interface isn't 1:1 with the outer
|
||||
// update interface that the coordinate endpoint exposes, so we made it single
|
||||
// purpose and avoided the opcode convention.
|
||||
func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} {
|
||||
var updates []*structs.Coordinate
|
||||
if err := structs.Decode(buf, &updates); err != nil {
|
||||
panic(fmt.Errorf("failed to decode batch updates: %v", err))
|
||||
}
|
||||
for _, req := range reqs {
|
||||
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", string(req.Op)}, time.Now())
|
||||
switch req.Op {
|
||||
case structs.CoordinateUpdate:
|
||||
coord := &structs.Coordinate{Node: req.Node, Coord: req.Coord}
|
||||
if err := c.state.CoordinateUpdate(index, coord); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid Coordinate operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid Coordinate operation '%s'", req.Op)
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", "batch-update"}, time.Now())
|
||||
if err := c.state.CoordinateBatchUpdate(index, updates); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -741,20 +741,17 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
|
|||
defer fsm.Close()
|
||||
|
||||
// Write a batch of two coordinates.
|
||||
reqs := make([]*structs.CoordinateUpdateRequest, 2)
|
||||
reqs[0] = &structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Op: structs.CoordinateUpdate,
|
||||
Coord: generateRandomCoordinate(),
|
||||
updates := []*structs.Coordinate{
|
||||
&structs.Coordinate{
|
||||
Node: "node1",
|
||||
Coord: generateRandomCoordinate(),
|
||||
},
|
||||
&structs.Coordinate{
|
||||
Node: "node2",
|
||||
Coord: generateRandomCoordinate(),
|
||||
},
|
||||
}
|
||||
reqs[1] = &structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
Op: structs.CoordinateUpdate,
|
||||
Coord: generateRandomCoordinate(),
|
||||
}
|
||||
buf, err := structs.Encode(structs.CoordinateRequestType, reqs)
|
||||
buf, err := structs.Encode(structs.CoordinateBatchUpdateType, updates)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -764,23 +761,23 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
|
|||
}
|
||||
|
||||
// Read back the two coordinates to make sure they got updated.
|
||||
_, d, err := fsm.state.CoordinateGet(reqs[0].Node)
|
||||
_, d, err := fsm.state.CoordinateGet(updates[0].Node)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d == nil {
|
||||
t.Fatalf("missing")
|
||||
}
|
||||
verifyCoordinatesEqual(t, reqs[0].Coord, d.Coord)
|
||||
verifyCoordinatesEqual(t, updates[0].Coord, d.Coord)
|
||||
|
||||
_, d, err = fsm.state.CoordinateGet(reqs[1].Node)
|
||||
_, d, err = fsm.state.CoordinateGet(updates[1].Node)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d == nil {
|
||||
t.Fatalf("missing")
|
||||
}
|
||||
verifyCoordinatesEqual(t, reqs[1].Coord, d.Coord)
|
||||
verifyCoordinatesEqual(t, updates[1].Coord, d.Coord)
|
||||
}
|
||||
|
||||
func TestFSM_SessionCreate_Destroy(t *testing.T) {
|
||||
|
|
|
@ -32,7 +32,7 @@ const (
|
|||
SessionRequestType
|
||||
ACLRequestType
|
||||
TombstoneRequestType
|
||||
CoordinateRequestType
|
||||
CoordinateBatchUpdateType
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -633,20 +633,11 @@ type IndexedCoordinate struct {
|
|||
QueryMeta
|
||||
}
|
||||
|
||||
// CoordinateOp is used for encoding coordinate-related RPC requests.
|
||||
type CoordinateOp string
|
||||
|
||||
const (
|
||||
// CoordinateUpdate is used to update a node's coordinates in the catalog.
|
||||
CoordinateUpdate CoordinateOp = "update"
|
||||
)
|
||||
|
||||
// CoordinateUpdateRequest is used to update the network coordinate of a given
|
||||
// node.
|
||||
type CoordinateUpdateRequest struct {
|
||||
Datacenter string
|
||||
Node string
|
||||
Op CoordinateOp
|
||||
Coord *coordinate.Coordinate
|
||||
WriteRequest
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue