Address comments

This commit is contained in:
Derek Chiang 2015-05-08 04:31:34 -04:00 committed by James Phillips
parent 66d5a129bf
commit eb599a1745
8 changed files with 148 additions and 40 deletions

View File

@ -139,6 +139,8 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter}
conf.EnableCoordinates = c.config.EnableCoordinates
conf.CacheCoordinates = false
if err := ensurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}

View File

@ -202,6 +202,9 @@ type Config struct {
// UserEventHandler callback can be used to handle incoming
// user events. This function should not block.
UserEventHandler func(serf.UserEvent)
// EnableCoordinates enables features related to network coordinates.
EnableCoordinates bool
}
// CheckVersion is used to check if the ProtocolVersion is valid
@ -256,6 +259,7 @@ func DefaultConfig() *Config {
TombstoneTTL: 15 * time.Minute,
TombstoneTTLGranularity: 30 * time.Second,
SessionTTLMin: 10 * time.Second,
EnableCoordinates: true,
}
// Increase our reap interval to 3 days instead of 24h.

View File

@ -1,16 +1,34 @@
package consul
import (
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
)
type Coordinate struct {
srv *Server
}
var (
// We batch updates and send them together every 30 seconds, or every 1000 updates,
// whichever comes sooner
updatePeriod = time.Duration(30) * time.Second
updateBatchMaxSize = 1000
updateBuffer []*structs.CoordinateUpdateRequest
updateLastSent time.Time
)
func init() {
updateBuffer = nil
updateLastSent = time.Now()
}
// Get returns the the LAN coordinate of a node.
func (c *Coordinate) Get(args *structs.CoordinateGetRequest, reply *structs.IndexedCoordinate) error {
if done, err := c.srv.forward("Coordinate.Get", args, args, reply); done {
func (c *Coordinate) GetLAN(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinate) error {
if done, err := c.srv.forward("Coordinate.GetLAN", args, args, reply); done {
return err
}
@ -26,15 +44,41 @@ func (c *Coordinate) Get(args *structs.CoordinateGetRequest, reply *structs.Inde
})
}
// Get returns the the WAN coordinate of a datacenter.
func (c *Coordinate) GetWAN(args *structs.DCSpecificRequest, reply *coordinate.Coordinate) error {
if args.Datacenter == c.srv.config.Datacenter {
*reply = *c.srv.GetWANCoordinate()
} else {
servers := c.srv.remoteConsuls[args.Datacenter] // servers in the specified DC
for i := 0; i < len(servers); i++ {
if coord := c.srv.serfWAN.GetCachedCoordinate(servers[i].Name); coord != nil {
*reply = *coord
}
}
}
return nil
}
// Update updates the the LAN coordinate of a node.
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error {
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
return err
}
_, err := c.srv.raftApply(structs.CoordinateRequestType, args)
if err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
return err
updateBuffer = append(updateBuffer, args)
if time.Since(updateLastSent) > updatePeriod || len(updateBuffer) > updateBatchMaxSize {
_, err := c.srv.raftApply(structs.CoordinateRequestType, updateBuffer)
// We clear the buffer regardless of whether the raft transaction succeeded, just so the
// buffer doesn't keep growing without bound.
updateBuffer = nil
updateLastSent = time.Now()
if err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
return err
}
}
return nil
}

View File

@ -12,6 +12,11 @@ import (
"github.com/hashicorp/serf/coordinate"
)
func init() {
// Shorten updatePeriod so we don't have to wait as long
updatePeriod = time.Duration(100) * time.Millisecond
}
// getRandomCoordinate generates a random coordinate.
func getRandomCoordinate() *coordinate.Coordinate {
config := coordinate.DefaultConfig()
@ -46,15 +51,24 @@ func TestCoordinateUpdate(t *testing.T) {
testutil.WaitForLeader(t, client.Call, "dc1")
arg := structs.CoordinateUpdateRequest{
arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node1",
Op: structs.CoordinateSet,
Coord: getRandomCoordinate(),
}
arg2 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node2",
Op: structs.CoordinateSet,
Coord: getRandomCoordinate(),
}
updateLastSent = time.Now()
var out struct{}
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
if err := client.Call("Coordinate.Update", &arg1, &out); err != nil {
t.Fatalf("err: %v", err)
}
@ -64,12 +78,42 @@ func TestCoordinateUpdate(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
if !coordinatesEqual(d.Coord, arg.Coord) {
t.Fatalf("should be equal\n%v\n%v", d.Coord, arg.Coord)
if d != nil {
t.Fatalf("should be nil because the update should be batched")
}
// Wait a while and send another update; this time the updates should be sent
time.Sleep(time.Duration(2) * updatePeriod)
if err := client.Call("Coordinate.Update", &arg2, &out); err != nil {
t.Fatalf("err: %v", err)
}
_, d, err = state.CoordinateGet("node1")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("should return a coordinate but it's nil")
}
if !coordinatesEqual(d.Coord, arg1.Coord) {
t.Fatalf("should be equal\n%v\n%v", d.Coord, arg1.Coord)
}
_, d, err = state.CoordinateGet("node2")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("should return a coordinate but it's nil")
}
if !coordinatesEqual(d.Coord, arg2.Coord) {
t.Fatalf("should be equal\n%v\n%v", d.Coord, arg2.Coord)
}
}
func TestCoordinateGet(t *testing.T) {
func TestCoordinateGetLAN(t *testing.T) {
updatePeriod = time.Duration(0) // to make updates instant
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -92,11 +136,11 @@ func TestCoordinateGet(t *testing.T) {
// Get via RPC
var out2 *structs.IndexedCoordinate
arg2 := structs.CoordinateGetRequest{
arg2 := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: "node1",
}
if err := client.Call("Coordinate.Get", &arg2, &out2); err != nil {
if err := client.Call("Coordinate.GetLAN", &arg2, &out2); err != nil {
t.Fatalf("err: %v", err)
}
if !coordinatesEqual(out2.Coord, arg.Coord) {
@ -109,7 +153,7 @@ func TestCoordinateGet(t *testing.T) {
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if err := client.Call("Coordinate.Get", &arg2, &out2); err != nil {
if err := client.Call("Coordinate.GetLAN", &arg2, &out2); err != nil {
t.Fatalf("err: %v", err)
}
if !coordinatesEqual(out2.Coord, arg.Coord) {

View File

@ -249,19 +249,25 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
}
func (c *consulFSM) applyCoordinateOperation(buf []byte, index uint64) interface{} {
var req structs.CoordinateUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
var reqs []*structs.CoordinateUpdateRequest
if err := structs.Decode(buf, &reqs); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", string(req.Op)}, time.Now())
switch req.Op {
case structs.CoordinateSet:
coord := &structs.Coordinate{Node: req.Node, Coord: req.Coord}
return c.state.CoordinateUpdate(index, coord)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Coordinate operation '%s'", req.Op)
return fmt.Errorf("Invalid Coordinate operation '%s'", req.Op)
for i := 0; i < len(reqs); i++ {
req := reqs[i]
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", string(req.Op)}, time.Now())
switch req.Op {
case structs.CoordinateSet:
coord := &structs.Coordinate{Node: req.Node, Coord: req.Coord}
if err := c.state.CoordinateUpdate(index, coord); err != nil {
return err
}
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Coordinate operation '%s'", req.Op)
return fmt.Errorf("Invalid Coordinate operation '%s'", req.Op)
}
}
return nil
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {

View File

@ -741,13 +741,14 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
defer fsm.Close()
nodeName := "Node1"
req := structs.CoordinateUpdateRequest{
reqs := make([]*structs.CoordinateUpdateRequest, 1)
reqs[0] = &structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: nodeName,
Op: structs.CoordinateSet,
Coord: getRandomCoordinate(),
}
buf, err := structs.Encode(structs.CoordinateRequestType, req)
buf, err := structs.Encode(structs.CoordinateRequestType, reqs)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -764,7 +765,7 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
if d == nil {
t.Fatalf("missing")
}
if !coordinatesEqual(req.Coord, d.Coord) {
if !coordinatesEqual(reqs[0].Coord, d.Coord) {
t.Fatalf("wrong coordinate")
}
}

View File

@ -308,6 +308,14 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
if err := ensurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
conf.EnableCoordinates = s.config.EnableCoordinates
if conf.EnableCoordinates && wan {
// Cache coordinates only if it's the wan network where the number of nodes is
// reasonably low.
conf.CacheCoordinates = true
}
return serf.Create(conf)
}
@ -398,7 +406,9 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.endpoints.Session = &Session{s}
s.endpoints.Internal = &Internal{s}
s.endpoints.ACL = &ACL{s}
s.endpoints.Coordinate = &Coordinate{s}
if s.config.EnableCoordinates {
s.endpoints.Coordinate = &Coordinate{s}
}
// Register the handlers
s.rpcServer.Register(s.endpoints.Status)
@ -408,7 +418,9 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.rpcServer.Register(s.endpoints.Session)
s.rpcServer.Register(s.endpoints.Internal)
s.rpcServer.Register(s.endpoints.ACL)
s.rpcServer.Register(s.endpoints.Coordinate)
if s.config.EnableCoordinates {
s.rpcServer.Register(s.endpoints.Coordinate)
}
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil {
@ -695,7 +707,12 @@ func (s *Server) Stats() map[string]map[string]string {
return stats
}
// GetLANCoordinate returns the network coordinate of the receiver
// GetLANCoordinate returns the LAN coordinate of the server
func (s *Server) GetLANCoordinate() *coordinate.Coordinate {
return s.serfLAN.GetCoordinate()
}
// GetWANCoordinate returns the WAN coordinate of the server
func (s *Server) GetWANCoordinate() *coordinate.Coordinate {
return s.serfWAN.GetCoordinate()
}

View File

@ -637,16 +637,6 @@ const (
CoordinateSet CoordinateOp = "set"
)
type CoordinateGetRequest struct {
Datacenter string
Node string
QueryOptions
}
func (c *CoordinateGetRequest) RequestDatacenter() string {
return c.Datacenter
}
// CoordinateUpdateRequest is used to update the network coordinate of a given node
type CoordinateUpdateRequest struct {
Datacenter string