Adds fine-grained watches to coordinate endpoints.

This commit is contained in:
James Phillips 2017-01-24 08:13:48 -08:00
parent 2cbf45301e
commit e1dd427cf4
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
4 changed files with 31 additions and 16 deletions

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
)
@ -174,11 +175,10 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
}
state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("Coordinates"),
func() error {
index, coords, err := state.Coordinates()
func(ws memdb.WatchSet) error {
index, coords, err := state.Coordinates(ws)
if err != nil {
return err
}

View File

@ -544,7 +544,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}()
// Verify coordinates are restored
_, coords, err := fsm2.state.Coordinates()
_, coords, err := fsm2.state.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -832,7 +832,7 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
}
// Read back the two coordinates to make sure they got updated.
_, coords, err := fsm.state.Coordinates()
_, coords, err := fsm.state.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -58,20 +58,22 @@ func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, erro
}
// Coordinates queries for all nodes with coordinates.
func (s *StateStore) Coordinates() (uint64, structs.Coordinates, error) {
func (s *StateStore) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("Coordinates")...)
idx := maxIndexTxn(tx, "coordinates")
// Pull all the coordinates.
coords, err := tx.Get("coordinates", "id")
iter, err := tx.Get("coordinates", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
ws.Add(iter.WatchCh())
var results structs.Coordinates
for coord := coords.Next(); coord != nil; coord = coords.Next() {
for coord := iter.Next(); coord != nil; coord = iter.Next() {
results = append(results, coord.(*structs.Coordinate))
}
return idx, results, nil

View File

@ -6,6 +6,7 @@ import (
"testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
)
@ -29,7 +30,8 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Make sure the coordinates list starts out empty, and that a query for
// a raw coordinate for a nonexistent node doesn't do anything bad.
idx, coords, err := s.Coordinates()
ws := memdb.NewWatchSet()
idx, coords, err := s.Coordinates(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -62,10 +64,14 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
if err := s.CoordinateBatchUpdate(1, updates); err != nil {
t.Fatalf("err: %s", err)
}
if watchFired(ws) {
t.Fatalf("bad")
}
// Should still be empty, though applying an empty batch does bump
// the table index.
idx, coords, err = s.Coordinates()
ws = memdb.NewWatchSet()
idx, coords, err = s.Coordinates(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -82,9 +88,13 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Should go through now.
idx, coords, err = s.Coordinates()
ws = memdb.NewWatchSet()
idx, coords, err = s.Coordinates(ws)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -111,9 +121,12 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
if err := s.CoordinateBatchUpdate(4, updates); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Verify it got applied.
idx, coords, err = s.Coordinates()
idx, coords, err = s.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -175,7 +188,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
}
// Make sure the index got updated.
idx, coords, err := s.Coordinates()
idx, coords, err := s.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -252,7 +265,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
restore.Commit()
// Read the restored coordinates back out and verify that they match.
idx, res, err := s.Coordinates()
idx, res, err := s.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}