Prevents bad coordinates and cleans them up in the database.
This commit is contained in:
parent
0a8a78833e
commit
db5be4b76b
|
@ -110,6 +110,13 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Older clients can send coordinates with invalid numeric values like
|
||||||
|
// NaN and Inf. We guard against these coming in, though newer clients
|
||||||
|
// should never send these.
|
||||||
|
if !args.Coord.IsValid() {
|
||||||
|
return fmt.Errorf("invalid coordinate")
|
||||||
|
}
|
||||||
|
|
||||||
// Since this is a coordinate coming from some place else we harden this
|
// Since this is a coordinate coming from some place else we harden this
|
||||||
// and look for dimensionality problems proactively.
|
// and look for dimensionality problems proactively.
|
||||||
coord, err := c.srv.serfLAN.GetCoordinate()
|
coord, err := c.srv.serfLAN.GetCoordinate()
|
||||||
|
@ -117,7 +124,7 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !coord.IsCompatibleWith(args.Coord) {
|
if !coord.IsCompatibleWith(args.Coord) {
|
||||||
return fmt.Errorf("rejected bad coordinate: %v", args.Coord)
|
return fmt.Errorf("incompatible coordinate")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch the ACL token, if any, and enforce the node policy if enabled.
|
// Fetch the ACL token, if any, and enforce the node policy if enabled.
|
||||||
|
|
|
@ -2,6 +2,7 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
@ -179,11 +180,19 @@ func TestCoordinate_Update(t *testing.T) {
|
||||||
t.Fatalf("wrong number of coordinates dropped, %d != 1", numDropped)
|
t.Fatalf("wrong number of coordinates dropped, %d != 1", numDropped)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send a coordinate with a NaN to make sure that we don't absorb that
|
||||||
|
// into the database.
|
||||||
|
arg2.Coord.Vec[0] = math.NaN()
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "invalid coordinate") {
|
||||||
|
t.Fatalf("should have failed with an error, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Finally, send a coordinate with the wrong dimensionality to make sure
|
// Finally, send a coordinate with the wrong dimensionality to make sure
|
||||||
// there are no panics, and that it gets rejected.
|
// there are no panics, and that it gets rejected.
|
||||||
arg2.Coord.Vec = make([]float64, 2*len(arg2.Coord.Vec))
|
arg2.Coord.Vec = make([]float64, 2*len(arg2.Coord.Vec))
|
||||||
err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out)
|
err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out)
|
||||||
if err == nil || !strings.Contains(err.Error(), "rejected bad coordinate") {
|
if err == nil || !strings.Contains(err.Error(), "incompatible coordinate") {
|
||||||
t.Fatalf("should have failed with an error, got %v", err)
|
t.Fatalf("should have failed with an error, got %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,12 @@ func (s *Snapshot) Coordinates() (memdb.ResultIterator, error) {
|
||||||
// already got checked on the way in during a batch update.
|
// already got checked on the way in during a batch update.
|
||||||
func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
|
func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
|
||||||
for _, update := range updates {
|
for _, update := range updates {
|
||||||
|
// Skip any bad data that may have gotten into the database from
|
||||||
|
// a bad client in the past.
|
||||||
|
if !update.Coord.IsValid() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.tx.Insert("coordinates", update); err != nil {
|
if err := s.tx.Insert("coordinates", update); err != nil {
|
||||||
return fmt.Errorf("failed restoring coordinate: %s", err)
|
return fmt.Errorf("failed restoring coordinate: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -86,6 +92,12 @@ func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) e
|
||||||
|
|
||||||
// Upsert the coordinates.
|
// Upsert the coordinates.
|
||||||
for _, update := range updates {
|
for _, update := range updates {
|
||||||
|
// Skip any bad data that may have gotten into the database from
|
||||||
|
// a bad client in the past.
|
||||||
|
if !update.Coord.IsValid() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Since the cleanup of coordinates is tied to deletion of
|
// Since the cleanup of coordinates is tied to deletion of
|
||||||
// nodes, we silently drop any updates for nodes that we don't
|
// nodes, we silently drop any updates for nodes that we don't
|
||||||
// know about. This might be possible during normal operation
|
// know about. This might be possible during normal operation
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package state
|
package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -147,6 +148,30 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
|
||||||
t.Fatalf("bad: %#v", coord)
|
t.Fatalf("bad: %#v", coord)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply an invalid update and make sure it gets ignored.
|
||||||
|
badUpdates := structs.Coordinates{
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node1",
|
||||||
|
Coord: &coordinate.Coordinate{Height: math.NaN()},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := s.CoordinateBatchUpdate(5, badUpdates); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we are at the previous state, though the empty batch does bump
|
||||||
|
// the table index.
|
||||||
|
idx, coords, err = s.Coordinates(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 5 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(coords, updates) {
|
||||||
|
t.Fatalf("bad: %#v", coords)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStateStore_Coordinate_Cleanup(t *testing.T) {
|
func TestStateStore_Coordinate_Cleanup(t *testing.T) {
|
||||||
|
@ -220,6 +245,18 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Manually put a bad coordinate in for node3.
|
||||||
|
testRegisterNode(t, s, 4, "node3")
|
||||||
|
badUpdate := &structs.Coordinate{
|
||||||
|
Node: "node3",
|
||||||
|
Coord: &coordinate.Coordinate{Height: math.NaN()},
|
||||||
|
}
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
if err := tx.Insert("coordinates", badUpdate); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
tx.Commit()
|
||||||
|
|
||||||
// Snapshot the coordinates.
|
// Snapshot the coordinates.
|
||||||
snap := s.Snapshot()
|
snap := s.Snapshot()
|
||||||
defer snap.Close()
|
defer snap.Close()
|
||||||
|
@ -235,12 +272,12 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
Coord: generateRandomCoordinate(),
|
Coord: generateRandomCoordinate(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := s.CoordinateBatchUpdate(4, trash); err != nil {
|
if err := s.CoordinateBatchUpdate(5, trash); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the snapshot.
|
// Verify the snapshot.
|
||||||
if idx := snap.LastIndex(); idx != 3 {
|
if idx := snap.LastIndex(); idx != 4 {
|
||||||
t.Fatalf("bad index: %d", idx)
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
iter, err := snap.Coordinates()
|
iter, err := snap.Coordinates()
|
||||||
|
@ -251,7 +288,10 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
for coord := iter.Next(); coord != nil; coord = iter.Next() {
|
for coord := iter.Next(); coord != nil; coord = iter.Next() {
|
||||||
dump = append(dump, coord.(*structs.Coordinate))
|
dump = append(dump, coord.(*structs.Coordinate))
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(dump, updates) {
|
|
||||||
|
// The snapshot will have the bad update in it, since we don't filter on
|
||||||
|
// the read side.
|
||||||
|
if !reflect.DeepEqual(dump, append(updates, badUpdate)) {
|
||||||
t.Fatalf("bad: %#v", dump)
|
t.Fatalf("bad: %#v", dump)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,7 +299,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
func() {
|
func() {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
restore := s.Restore()
|
restore := s.Restore()
|
||||||
if err := restore.Coordinates(5, dump); err != nil {
|
if err := restore.Coordinates(6, dump); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
restore.Commit()
|
restore.Commit()
|
||||||
|
@ -269,7 +309,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
if idx != 5 {
|
if idx != 6 {
|
||||||
t.Fatalf("bad index: %d", idx)
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(res, updates) {
|
if !reflect.DeepEqual(res, updates) {
|
||||||
|
@ -278,7 +318,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
|
||||||
|
|
||||||
// Check that the index was updated (note that it got passed
|
// Check that the index was updated (note that it got passed
|
||||||
// in during the restore).
|
// in during the restore).
|
||||||
if idx := s.maxIndex("coordinates"); idx != 5 {
|
if idx := s.maxIndex("coordinates"); idx != 6 {
|
||||||
t.Fatalf("bad index: %d", idx)
|
t.Fatalf("bad index: %d", idx)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
Loading…
Reference in New Issue