Merge pull request #9247 from pierresouchay/streaming_predictible_order_for_health
[Streaming] Predictable order for results of /health/service/:serviceName to mimic memdb
This commit is contained in:
commit
813f0d552d
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
streaming: ensure the order of results provided by /health/service/:serviceName is consistent with and without streaming enabled
|
||||||
|
```
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/go-bexpr"
|
"github.com/hashicorp/go-bexpr"
|
||||||
|
@ -206,6 +207,20 @@ func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sortCheckServiceNodes sorts the results to match memdb semantics
|
||||||
|
// Sort results by Node.Node, if 2 instances match, order by Service.ID
|
||||||
|
// Will allow result to be stable sorted and match queries without cache
|
||||||
|
func sortCheckServiceNodes(serviceNodes *structs.IndexedCheckServiceNodes) {
|
||||||
|
sort.SliceStable(serviceNodes.Nodes, func(i, j int) bool {
|
||||||
|
left := serviceNodes.Nodes[i]
|
||||||
|
right := serviceNodes.Nodes[j]
|
||||||
|
if left.Node.Node == right.Node.Node {
|
||||||
|
return left.Service.ID < right.Service.ID
|
||||||
|
}
|
||||||
|
return left.Node.Node < right.Node.Node
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Result returns the structs.IndexedCheckServiceNodes stored by this view.
|
// Result returns the structs.IndexedCheckServiceNodes stored by this view.
|
||||||
func (s *healthView) Result(index uint64) (interface{}, error) {
|
func (s *healthView) Result(index uint64) (interface{}, error) {
|
||||||
result := structs.IndexedCheckServiceNodes{
|
result := structs.IndexedCheckServiceNodes{
|
||||||
|
@ -217,6 +232,8 @@ func (s *healthView) Result(index uint64) (interface{}, error) {
|
||||||
for _, node := range s.state {
|
for _, node := range s.state {
|
||||||
result.Nodes = append(result.Nodes, node)
|
result.Nodes = append(result.Nodes, node)
|
||||||
}
|
}
|
||||||
|
sortCheckServiceNodes(&result)
|
||||||
|
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,11 +3,12 @@ package cachetype
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/hashicorp/go-uuid"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
@ -16,6 +17,7 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/proto/pbcommon"
|
"github.com/hashicorp/consul/proto/pbcommon"
|
||||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
|
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
|
||||||
|
@ -226,6 +228,56 @@ func getNamespace(ns string) string {
|
||||||
return meta.GetNamespace()
|
return meta.GetNamespace()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOrderingConsistentWithMemDb(t *testing.T) {
|
||||||
|
index := uint64(42)
|
||||||
|
buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode {
|
||||||
|
newID, err := uuid.GenerateUUID()
|
||||||
|
require.NoError(t, err)
|
||||||
|
return structs.CheckServiceNode{
|
||||||
|
Node: &structs.Node{
|
||||||
|
ID: types.NodeID(strings.ToUpper(newID)),
|
||||||
|
Node: nodeName,
|
||||||
|
Address: nodeName,
|
||||||
|
Datacenter: "dc1",
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: index,
|
||||||
|
ModifyIndex: index,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: serviceID,
|
||||||
|
Service: "testService",
|
||||||
|
Port: 8080,
|
||||||
|
Weights: &structs.Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: index,
|
||||||
|
ModifyIndex: index,
|
||||||
|
},
|
||||||
|
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||||
|
},
|
||||||
|
Checks: []*structs.HealthCheck{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
zero := buildTestNode("a-zero-node", "testService:1")
|
||||||
|
one := buildTestNode("node1", "testService:1")
|
||||||
|
two := buildTestNode("node1", "testService:2")
|
||||||
|
three := buildTestNode("node2", "testService")
|
||||||
|
result := structs.IndexedCheckServiceNodes{
|
||||||
|
Nodes: structs.CheckServiceNodes{
|
||||||
|
three, two, zero, one,
|
||||||
|
},
|
||||||
|
QueryMeta: structs.QueryMeta{
|
||||||
|
Index: index,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
sortCheckServiceNodes(&result)
|
||||||
|
expected := structs.CheckServiceNodes{zero, one, two, three}
|
||||||
|
require.Equal(t, expected, result.Nodes)
|
||||||
|
}
|
||||||
|
|
||||||
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
namespace := getNamespace("ns2")
|
namespace := getNamespace("ns2")
|
||||||
client := NewTestStreamingClient(namespace)
|
client := NewTestStreamingClient(namespace)
|
||||||
|
@ -261,7 +313,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
for _, csn := range r.Nodes {
|
for _, csn := range r.Nodes {
|
||||||
nodes = append(nodes, csn.Node.Node)
|
nodes = append(nodes, csn.Node.Node)
|
||||||
}
|
}
|
||||||
sort.Strings(nodes)
|
// Result will be sorted alphabetically the same way as memdb
|
||||||
return nodes
|
return nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue