From 9239df6dbd6e5fa72920aa269e2815bcd9afc6e3 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Fri, 20 Nov 2020 16:23:35 +0100
Subject: [PATCH 1/3] [Streaming] Predictable order for results of
/health/service/:serviceName to mimic memdb
This ensures the result is consitent with/witout streaming
Will partially fix #9239
---
.../cache-types/streaming_health_services.go | 19 ++++++
.../streaming_health_services_test.go | 58 ++++++++++++++++++-
2 files changed, 75 insertions(+), 2 deletions(-)
diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go
index ac1e6e528..dd6b4bc80 100644
--- a/agent/cache-types/streaming_health_services.go
+++ b/agent/cache-types/streaming_health_services.go
@@ -4,6 +4,8 @@ import (
"context"
"fmt"
"reflect"
+ "sort"
+ "strings"
"time"
"github.com/hashicorp/go-bexpr"
@@ -206,6 +208,21 @@ func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) {
return true, nil
}
+// cachedHealResultSorter sorts the results to match memdb semantics
+// Sort results by Node.Node, if 2 instances match, order by Service.ID
+// Will allow result to be stable sorted and match queries without cache
+func cachedHealResultSorter(serviceNodes *structs.IndexedCheckServiceNodes) {
+ sort.SliceStable(serviceNodes.Nodes, func(i, j int) bool {
+ left := serviceNodes.Nodes[i]
+ right := serviceNodes.Nodes[j]
+ res := strings.Compare(left.Node.Node, right.Node.Node)
+ if res != 0 {
+ return res < 0
+ }
+ return strings.Compare(left.Service.ID, right.Service.ID) < 0
+ })
+}
+
// Result returns the structs.IndexedCheckServiceNodes stored by this view.
func (s *healthView) Result(index uint64) (interface{}, error) {
result := structs.IndexedCheckServiceNodes{
@@ -217,6 +234,8 @@ func (s *healthView) Result(index uint64) (interface{}, error) {
for _, node := range s.state {
result.Nodes = append(result.Nodes, node)
}
+ cachedHealResultSorter(&result)
+
return &result, nil
}
diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go
index 768962aa8..570aa8e52 100644
--- a/agent/cache-types/streaming_health_services_test.go
+++ b/agent/cache-types/streaming_health_services_test.go
@@ -3,11 +3,12 @@ package cachetype
import (
"errors"
"fmt"
- "sort"
+ "strings"
"testing"
"time"
"github.com/hashicorp/go-hclog"
+ "github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -16,6 +17,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbsubscribe"
+ "github.com/hashicorp/consul/types"
)
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
@@ -226,6 +228,58 @@ func getNamespace(ns string) string {
return meta.GetNamespace()
}
+func TestOrderingConsistentWithMemDb(t *testing.T) {
+ index := uint64(42)
+ buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode {
+ newID, err := uuid.GenerateUUID()
+ require.NoError(t, err)
+ return structs.CheckServiceNode{
+ Node: &structs.Node{
+ ID: types.NodeID(strings.ToUpper(newID)),
+ Node: nodeName,
+ Address: nodeName,
+ Datacenter: "dc1",
+ RaftIndex: structs.RaftIndex{
+ CreateIndex: index,
+ ModifyIndex: index,
+ },
+ },
+ Service: &structs.NodeService{
+ ID: serviceID,
+ Service: "testService",
+ Port: 8080,
+ Weights: &structs.Weights{
+ Passing: 1,
+ Warning: 1,
+ },
+ RaftIndex: structs.RaftIndex{
+ CreateIndex: index,
+ ModifyIndex: index,
+ },
+ EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
+ },
+ Checks: []*structs.HealthCheck{},
+ }
+ }
+ zero := buildTestNode("a-zero-node", "testService:1")
+ one := buildTestNode("node1", "testService:1")
+ two := buildTestNode("node1", "testService:2")
+ three := buildTestNode("node2", "testService")
+ result := structs.IndexedCheckServiceNodes{
+ Nodes: structs.CheckServiceNodes{
+ three, two, zero, one,
+ },
+ QueryMeta: structs.QueryMeta{
+ Index: index,
+ },
+ }
+ cachedHealResultSorter(&result)
+ require.Equal(t, result.Nodes[0], zero)
+ require.Equal(t, result.Nodes[1], one)
+ require.Equal(t, result.Nodes[2], two)
+ require.Equal(t, result.Nodes[3], three)
+}
+
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
namespace := getNamespace("ns2")
client := NewTestStreamingClient(namespace)
@@ -261,7 +315,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
- sort.Strings(nodes)
+ // Result will be sorted alphabetically the same way as memdb
return nodes
}
From c22f249a991c96ed396b9459a624fcda7386f724 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Fri, 20 Nov 2020 17:11:11 +0100
Subject: [PATCH 2/3] Added changelog entry for 9247
---
.changelog/9247.txt | 3 +++
1 file changed, 3 insertions(+)
create mode 100644 .changelog/9247.txt
diff --git a/.changelog/9247.txt b/.changelog/9247.txt
new file mode 100644
index 000000000..1021a80b1
--- /dev/null
+++ b/.changelog/9247.txt
@@ -0,0 +1,3 @@
+```release-note:bug
+streaming: ensure the order of results provided by /health/service/:serviceName is consistent with and without streaming enabled
+```
From 09673426e3faa58f7b0dc51cf9950d4f133fd6dd Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Wed, 25 Nov 2020 21:01:07 +0100
Subject: [PATCH 3/3] Applied suggestions from @dnephin
* Renamed `cachedHealResultSorter` into `sortCheckServiceNodes`
* Use `<` instead of `strings.Compare`
* Single line comparison in unit test
---
agent/cache-types/streaming_health_services.go | 14 ++++++--------
.../cache-types/streaming_health_services_test.go | 8 +++-----
2 files changed, 9 insertions(+), 13 deletions(-)
diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go
index dd6b4bc80..efb1dba87 100644
--- a/agent/cache-types/streaming_health_services.go
+++ b/agent/cache-types/streaming_health_services.go
@@ -5,7 +5,6 @@ import (
"fmt"
"reflect"
"sort"
- "strings"
"time"
"github.com/hashicorp/go-bexpr"
@@ -208,18 +207,17 @@ func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) {
return true, nil
}
-// cachedHealResultSorter sorts the results to match memdb semantics
+// sortCheckServiceNodes sorts the results to match memdb semantics
// Sort results by Node.Node, if 2 instances match, order by Service.ID
// Will allow result to be stable sorted and match queries without cache
-func cachedHealResultSorter(serviceNodes *structs.IndexedCheckServiceNodes) {
+func sortCheckServiceNodes(serviceNodes *structs.IndexedCheckServiceNodes) {
sort.SliceStable(serviceNodes.Nodes, func(i, j int) bool {
left := serviceNodes.Nodes[i]
right := serviceNodes.Nodes[j]
- res := strings.Compare(left.Node.Node, right.Node.Node)
- if res != 0 {
- return res < 0
+ if left.Node.Node == right.Node.Node {
+ return left.Service.ID < right.Service.ID
}
- return strings.Compare(left.Service.ID, right.Service.ID) < 0
+ return left.Node.Node < right.Node.Node
})
}
@@ -234,7 +232,7 @@ func (s *healthView) Result(index uint64) (interface{}, error) {
for _, node := range s.state {
result.Nodes = append(result.Nodes, node)
}
- cachedHealResultSorter(&result)
+ sortCheckServiceNodes(&result)
return &result, nil
}
diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go
index 570aa8e52..b2581380f 100644
--- a/agent/cache-types/streaming_health_services_test.go
+++ b/agent/cache-types/streaming_health_services_test.go
@@ -273,11 +273,9 @@ func TestOrderingConsistentWithMemDb(t *testing.T) {
Index: index,
},
}
- cachedHealResultSorter(&result)
- require.Equal(t, result.Nodes[0], zero)
- require.Equal(t, result.Nodes[1], one)
- require.Equal(t, result.Nodes[2], two)
- require.Equal(t, result.Nodes[3], three)
+ sortCheckServiceNodes(&result)
+ expected := structs.CheckServiceNodes{zero, one, two, three}
+ require.Equal(t, expected, result.Nodes)
}
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {