Address comments
This commit is contained in:
parent
840474f170
commit
139c9240ea
|
@ -38,9 +38,6 @@ const (
|
|||
"but no reason was provided. This is a default message."
|
||||
defaultServiceMaintReason = "Maintenance mode is enabled for this " +
|
||||
"service, but no reason was provided. This is a default message."
|
||||
|
||||
// An interval used to send network coordinates to servers
|
||||
syncCoordinateStaggerIntv = 15 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -201,6 +198,9 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Start sending network coordinates to servers
|
||||
go agent.sendCoordinates()
|
||||
|
||||
return agent, nil
|
||||
}
|
||||
|
||||
|
@ -560,15 +560,14 @@ func (a *Agent) ResumeSync() {
|
|||
a.state.Resume()
|
||||
}
|
||||
|
||||
// SendCoordinates starts a loop that periodically sends the local coordinate
|
||||
// sendCoordinates starts a loop that periodically sends the local coordinate
|
||||
// to a server
|
||||
func (a *Agent) SendCoordinates() {
|
||||
func (a *Agent) sendCoordinates() {
|
||||
for {
|
||||
intv := aeScale(a.config.SyncCoordinateInterval, len(a.LANMembers()))
|
||||
intv = intv + randomStagger(intv)
|
||||
timer := time.After(intv)
|
||||
select {
|
||||
case <-timer:
|
||||
case <-time.After(intv):
|
||||
var c *coordinate.Coordinate
|
||||
if a.config.Server {
|
||||
c = a.server.GetLANCoordinate()
|
||||
|
@ -585,7 +584,7 @@ func (a *Agent) SendCoordinates() {
|
|||
|
||||
var reply struct{}
|
||||
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
|
||||
a.logger.Printf("[ERR] coordinate update error: %s", err.Error())
|
||||
a.logger.Printf("[ERR] agent: coordinate update error: %s", err.Error())
|
||||
}
|
||||
case <-a.shutdownCh:
|
||||
return
|
||||
|
|
|
@ -709,9 +709,6 @@ func (c *Command) Run(args []string) int {
|
|||
errWanCh := make(chan struct{})
|
||||
go c.retryJoinWan(config, errWanCh)
|
||||
|
||||
// Start sending network coordinates to servers
|
||||
go c.agent.SendCoordinates()
|
||||
|
||||
// Wait for exit
|
||||
return c.handleSignals(config, errCh, errWanCh)
|
||||
}
|
||||
|
|
|
@ -822,8 +822,6 @@ func TestAgentSendCoordinates(t *testing.T) {
|
|||
|
||||
testutil.WaitForLeader(t, agent1.RPC, "dc1")
|
||||
|
||||
go agent1.SendCoordinates()
|
||||
go agent2.SendCoordinates()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
var reply structs.IndexedCoordinate
|
||||
|
@ -837,4 +835,16 @@ func TestAgentSendCoordinates(t *testing.T) {
|
|||
if reply.Coord == nil {
|
||||
t.Fatalf("should get a coordinate")
|
||||
}
|
||||
|
||||
var reply2 structs.IndexedCoordinate
|
||||
req2 := structs.CoordinateGetRequest{
|
||||
Datacenter: agent2.config.Datacenter,
|
||||
Node: agent2.config.NodeName,
|
||||
}
|
||||
if err := agent1.RPC("Coordinate.Get", &req2, &reply2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reply2.Coord == nil {
|
||||
t.Fatalf("should get a coordinate")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,11 +8,7 @@ type Coordinate struct {
|
|||
srv *Server
|
||||
}
|
||||
|
||||
// Get returns the the coordinate or a node.
|
||||
//
|
||||
// If the node is in the same datacenter, then the LAN coordinate of the node is
|
||||
// returned. If the node is in a remote DC, then the WAN coordinate of the node
|
||||
// is returned.
|
||||
// Get returns the the LAN coordinate of a node.
|
||||
func (c *Coordinate) Get(args *structs.CoordinateGetRequest, reply *structs.IndexedCoordinate) error {
|
||||
if done, err := c.srv.forward("Coordinate.Get", args, args, reply); done {
|
||||
return err
|
||||
|
@ -30,6 +26,7 @@ func (c *Coordinate) Get(args *structs.CoordinateGetRequest, reply *structs.Inde
|
|||
})
|
||||
}
|
||||
|
||||
// Update updates the the LAN coordinate of a node.
|
||||
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error {
|
||||
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
|
||||
return err
|
||||
|
|
|
@ -18,7 +18,7 @@ func getRandomCoordinate() *coordinate.Coordinate {
|
|||
n := 5
|
||||
clients := make([]*coordinate.Client, n)
|
||||
for i := 0; i < n; i++ {
|
||||
clients[i] = coordinate.NewClient(config)
|
||||
clients[i], _ = coordinate.NewClient(config)
|
||||
}
|
||||
|
||||
for i := 0; i < n*100; i++ {
|
||||
|
@ -41,7 +41,7 @@ func coordinatesEqual(a, b *coordinate.Coordinate) bool {
|
|||
return dist < 0.1
|
||||
}
|
||||
|
||||
func TestCoordinate(t *testing.T) {
|
||||
func TestCoordinateUpdate(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
@ -71,6 +71,28 @@ func TestCoordinate(t *testing.T) {
|
|||
if !coordinatesEqual(d.Coord, arg.Coord) {
|
||||
t.Fatalf("should be equal\n%v\n%v", d.Coord, arg.Coord)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoordinateGet(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
|
||||
arg := structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Op: structs.CoordinateSet,
|
||||
Coord: getRandomCoordinate(),
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Get via RPC
|
||||
var out2 *structs.IndexedCoordinate
|
||||
|
|
Loading…
Reference in New Issue