Adding tests and stuff
This commit is contained in:
parent
e54c8f2ea0
commit
b5bbe2bcfa
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/hashicorp/consul/consul"
|
||||
"github.com/hashicorp/consul/consul/state"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
|
@ -556,6 +557,23 @@ func (a *Agent) ResumeSync() {
|
|||
a.state.Resume()
|
||||
}
|
||||
|
||||
// StartSendingCoordinate starts a goroutine that periodically sends the local coordinate
|
||||
// to a server
|
||||
func (a *Agent) StartSendingCoordinate() {
|
||||
go func() {
|
||||
var c coordinate.Coordinate
|
||||
if a.config.Server {
|
||||
c = a.server
|
||||
}
|
||||
req := structs.CoordinateUpdateRequest{
|
||||
Datacenter: a.config.Datacenter,
|
||||
Node: a.config.NodeName,
|
||||
|
||||
QueryOptions: structs.QueryOptions{Token: a.config.ACLToken},
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// persistService saves a service definition to a JSON file in the data dir
|
||||
func (a *Agent) persistService(service *structs.NodeService) error {
|
||||
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))
|
||||
|
|
|
@ -1,5 +1,46 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
type Coordinate struct {
|
||||
srv *Server
|
||||
}
|
||||
|
||||
// Get returns the the coordinate or a node.
|
||||
//
|
||||
// If the node is in the same datacenter, then the LAN coordinate of the node is
|
||||
// returned. If the node is in a remote DC, then the WAN coordinate of the node
|
||||
// is returned.
|
||||
func (c *Coordinate) Get(args *structs.CoordinateGetRequest, reply *structs.Coordinate) error {
|
||||
if done, err := c.srv.forward("Coordinate.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
if args.OriginDC == c.srv.config.Datacenter {
|
||||
state := c.srv.fsm.State()
|
||||
_, coord, err := state.CoordinateGet(args.Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *coord
|
||||
} else {
|
||||
reply.Node = args.Node
|
||||
reply.Coord = c.srv.serfWAN.GetCoordinate()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error {
|
||||
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
_, err := c.srv.raftApply(structs.CoordinateRequestType, args)
|
||||
if err != nil {
|
||||
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
func getRandomCoordinate() *coordinate.Coordinate {
|
||||
config := coordinate.DefaultConfig()
|
||||
coord := coordinate.NewCoordinate(config)
|
||||
for i := 0; i < len(coord.Vec); i++ {
|
||||
coord.Vec[i] = rand.Float64()
|
||||
}
|
||||
return coord
|
||||
}
|
||||
|
||||
func coordinatesEqual(a, b *coordinate.Coordinate) bool {
|
||||
config := coordinate.DefaultConfig()
|
||||
client := coordinate.NewClient(config)
|
||||
dist, err := client.DistanceBetween(a, b)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return dist < 0.00001
|
||||
}
|
||||
|
||||
func TestCoordinate_Update(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
|
||||
arg := structs.CoordinateUpdateRequest{
|
||||
NodeSpecificRequest: structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
},
|
||||
Op: structs.CoordinateSet,
|
||||
Coord: getRandomCoordinate(),
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify
|
||||
state := s1.fsm.State()
|
||||
_, d, err := state.CoordinateGet("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if coordinatesEqual(d.Coord, arg.Coord) {
|
||||
t.Fatalf("should be equal\n%v\n%v", d.Coord, arg.Coord)
|
||||
}
|
||||
}
|
|
@ -89,6 +89,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
|||
return c.applyACLOperation(buf[1:], log.Index)
|
||||
case structs.TombstoneRequestType:
|
||||
return c.applyTombstoneOperation(buf[1:], log.Index)
|
||||
case structs.CoordinateRequestType:
|
||||
return c.applyCoordinateOperation(buf[1:], log.Index)
|
||||
default:
|
||||
if ignoreUnknown {
|
||||
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
|
||||
|
@ -246,6 +248,22 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
|
|||
}
|
||||
}
|
||||
|
||||
func (c *consulFSM) applyCoordinateOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.CoordinateUpdateRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", string(req.Op)}, time.Now())
|
||||
switch req.Op {
|
||||
case structs.CoordinateSet:
|
||||
coord := &structs.Coordinate{req.Node, req.Coord}
|
||||
return c.state.CoordinateUpdate(index, coord)
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid Coordinate operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid Coordinate operation '%s'", req.Op)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||
defer func(start time.Time) {
|
||||
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
||||
|
|
|
@ -407,6 +407,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
|
|||
s.rpcServer.Register(s.endpoints.Session)
|
||||
s.rpcServer.Register(s.endpoints.Internal)
|
||||
s.rpcServer.Register(s.endpoints.ACL)
|
||||
s.rpcServer.Register(s.endpoints.Coordinate)
|
||||
|
||||
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
|
||||
if err != nil {
|
||||
|
|
|
@ -32,6 +32,7 @@ const (
|
|||
SessionRequestType
|
||||
ACLRequestType
|
||||
TombstoneRequestType
|
||||
CoordinateRequestType
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -625,13 +626,24 @@ type Coordinate struct {
|
|||
Coord *coordinate.Coordinate
|
||||
}
|
||||
|
||||
// CoordinateGetRequest is used to request the network coordinate of a given node
|
||||
type CoordinateGetRequest struct {
|
||||
Nodes []string
|
||||
NodeSpecificRequest
|
||||
OriginDC string
|
||||
}
|
||||
|
||||
type CoordinateOp string
|
||||
|
||||
const (
|
||||
CoordinateSet CoordinateOp = "set"
|
||||
)
|
||||
|
||||
// CoordinateUpdateRequest is used to update the network coordinate of a given node
|
||||
type CoordinateUpdateRequest struct {
|
||||
Node string
|
||||
NodeSpecificRequest
|
||||
Op CoordinateOp
|
||||
Coord *coordinate.Coordinate
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// EventFireRequest is used to ask a server to fire
|
||||
|
|
Loading…
Reference in New Issue