diff --git a/command/agent/agent.go b/command/agent/agent.go index e4756e429..cf018e96b 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" ) @@ -556,6 +557,23 @@ func (a *Agent) ResumeSync() { a.state.Resume() } +// StartSendingCoordinate starts a goroutine that periodically sends the local coordinate +// to a server +func (a *Agent) StartSendingCoordinate() { + go func() { + var c coordinate.Coordinate + if a.config.Server { + c = a.server + } + req := structs.CoordinateUpdateRequest{ + Datacenter: a.config.Datacenter, + Node: a.config.NodeName, + + QueryOptions: structs.QueryOptions{Token: a.config.ACLToken}, + } + }() +} + // persistService saves a service definition to a JSON file in the data dir func (a *Agent) persistService(service *structs.NodeService) error { svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index 41425519c..d3fd1fdd3 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -1,5 +1,46 @@ package consul +import ( + "github.com/hashicorp/consul/consul/structs" +) + type Coordinate struct { srv *Server } + +// Get returns the the coordinate or a node. +// +// If the node is in the same datacenter, then the LAN coordinate of the node is +// returned. If the node is in a remote DC, then the WAN coordinate of the node +// is returned. +func (c *Coordinate) Get(args *structs.CoordinateGetRequest, reply *structs.Coordinate) error { + if done, err := c.srv.forward("Coordinate.Get", args, args, reply); done { + return err + } + + if args.OriginDC == c.srv.config.Datacenter { + state := c.srv.fsm.State() + _, coord, err := state.CoordinateGet(args.Node) + if err != nil { + return err + } + *reply = *coord + } else { + reply.Node = args.Node + reply.Coord = c.srv.serfWAN.GetCoordinate() + } + + return nil +} + +func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error { + if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done { + return err + } + _, err := c.srv.raftApply(structs.CoordinateRequestType, args) + if err != nil { + c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err) + return err + } + return nil +} diff --git a/consul/coordinate_endpoint_test.go b/consul/coordinate_endpoint_test.go new file mode 100644 index 000000000..c32392b8d --- /dev/null +++ b/consul/coordinate_endpoint_test.go @@ -0,0 +1,64 @@ +package consul + +import ( + "math/rand" + "os" + "testing" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/serf/coordinate" +) + +func getRandomCoordinate() *coordinate.Coordinate { + config := coordinate.DefaultConfig() + coord := coordinate.NewCoordinate(config) + for i := 0; i < len(coord.Vec); i++ { + coord.Vec[i] = rand.Float64() + } + return coord +} + +func coordinatesEqual(a, b *coordinate.Coordinate) bool { + config := coordinate.DefaultConfig() + client := coordinate.NewClient(config) + dist, err := client.DistanceBetween(a, b) + if err != nil { + panic(err) + } + return dist < 0.00001 +} + +func TestCoordinate_Update(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + testutil.WaitForLeader(t, client.Call, "dc1") + + arg := structs.CoordinateUpdateRequest{ + NodeSpecificRequest: structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: "node1", + }, + Op: structs.CoordinateSet, + Coord: getRandomCoordinate(), + } + + var out struct{} + if err := client.Call("Coordinate.Update", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify + state := s1.fsm.State() + _, d, err := state.CoordinateGet("node1") + if err != nil { + t.Fatalf("err: %v", err) + } + if coordinatesEqual(d.Coord, arg.Coord) { + t.Fatalf("should be equal\n%v\n%v", d.Coord, arg.Coord) + } +} diff --git a/consul/fsm.go b/consul/fsm.go index 89cfd5725..21191a3e4 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -89,6 +89,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.applyACLOperation(buf[1:], log.Index) case structs.TombstoneRequestType: return c.applyTombstoneOperation(buf[1:], log.Index) + case structs.CoordinateRequestType: + return c.applyCoordinateOperation(buf[1:], log.Index) default: if ignoreUnknown { c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) @@ -246,6 +248,22 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{ } } +func (c *consulFSM) applyCoordinateOperation(buf []byte, index uint64) interface{} { + var req structs.CoordinateUpdateRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", string(req.Op)}, time.Now()) + switch req.Op { + case structs.CoordinateSet: + coord := &structs.Coordinate{req.Node, req.Coord} + return c.state.CoordinateUpdate(index, coord) + default: + c.logger.Printf("[WARN] consul.fsm: Invalid Coordinate operation '%s'", req.Op) + return fmt.Errorf("Invalid Coordinate operation '%s'", req.Op) + } +} + func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { defer func(start time.Time) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) diff --git a/consul/server.go b/consul/server.go index 8cdbe1758..d992e0cfc 100644 --- a/consul/server.go +++ b/consul/server.go @@ -407,6 +407,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.rpcServer.Register(s.endpoints.Session) s.rpcServer.Register(s.endpoints.Internal) s.rpcServer.Register(s.endpoints.ACL) + s.rpcServer.Register(s.endpoints.Coordinate) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 15597e2a3..28fd5c19e 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -32,6 +32,7 @@ const ( SessionRequestType ACLRequestType TombstoneRequestType + CoordinateRequestType ) const ( @@ -625,13 +626,24 @@ type Coordinate struct { Coord *coordinate.Coordinate } +// CoordinateGetRequest is used to request the network coordinate of a given node type CoordinateGetRequest struct { - Nodes []string + NodeSpecificRequest + OriginDC string } +type CoordinateOp string + +const ( + CoordinateSet CoordinateOp = "set" +) + +// CoordinateUpdateRequest is used to update the network coordinate of a given node type CoordinateUpdateRequest struct { - Node string + NodeSpecificRequest + Op CoordinateOp Coord *coordinate.Coordinate + WriteRequest } // EventFireRequest is used to ask a server to fire