Merge pull request #8809 from hashicorp/streaming/materialize-view
Add StreamingHealthServices cache-type
This commit is contained in:
commit
a94fe054f0
|
@ -0,0 +1,173 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/proto/pbcommon"
|
||||||
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newEndOfSnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event {
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Topic: topic,
|
||||||
|
Index: index,
|
||||||
|
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newNewSnapshotToFollowEvent(topic pbsubscribe.Topic) *pbsubscribe.Event {
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Topic: topic,
|
||||||
|
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newEventServiceHealthRegister returns a realistically populated service
|
||||||
|
// health registration event for tests. The nodeNum is a
|
||||||
|
// logical node and is used to create the node name ("node%d") but also change
|
||||||
|
// the node ID and IP address to make it a little more realistic for cases that
|
||||||
|
// need that. nodeNum should be less than 64k to make the IP address look
|
||||||
|
// realistic. Any other changes can be made on the returned event to avoid
|
||||||
|
// adding too many options to callers.
|
||||||
|
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
|
||||||
|
node := fmt.Sprintf("node%d", nodeNum)
|
||||||
|
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
|
||||||
|
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
|
||||||
|
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||||
|
Key: svc,
|
||||||
|
Index: index,
|
||||||
|
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||||
|
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||||
|
Op: pbsubscribe.CatalogOp_Register,
|
||||||
|
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||||
|
Node: &pbservice.Node{
|
||||||
|
ID: nodeID,
|
||||||
|
Node: node,
|
||||||
|
Address: addr,
|
||||||
|
Datacenter: "dc1",
|
||||||
|
RaftIndex: pbcommon.RaftIndex{
|
||||||
|
CreateIndex: index,
|
||||||
|
ModifyIndex: index,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Service: &pbservice.NodeService{
|
||||||
|
ID: svc,
|
||||||
|
Service: svc,
|
||||||
|
Port: 8080,
|
||||||
|
Weights: &pbservice.Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
// Empty sadness
|
||||||
|
Proxy: pbservice.ConnectProxyConfig{
|
||||||
|
MeshGateway: pbservice.MeshGatewayConfig{},
|
||||||
|
Expose: pbservice.ExposeConfig{},
|
||||||
|
},
|
||||||
|
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||||
|
RaftIndex: pbcommon.RaftIndex{
|
||||||
|
CreateIndex: index,
|
||||||
|
ModifyIndex: index,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Checks: []*pbservice.HealthCheck{
|
||||||
|
{
|
||||||
|
Node: node,
|
||||||
|
CheckID: "serf-health",
|
||||||
|
Name: "serf-health",
|
||||||
|
Status: "passing",
|
||||||
|
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||||
|
RaftIndex: pbcommon.RaftIndex{
|
||||||
|
CreateIndex: index,
|
||||||
|
ModifyIndex: index,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Node: node,
|
||||||
|
CheckID: types.CheckID("service:" + svc),
|
||||||
|
Name: "service:" + svc,
|
||||||
|
ServiceID: svc,
|
||||||
|
ServiceName: svc,
|
||||||
|
Type: "ttl",
|
||||||
|
Status: "passing",
|
||||||
|
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||||
|
RaftIndex: pbcommon.RaftIndex{
|
||||||
|
CreateIndex: index,
|
||||||
|
ModifyIndex: index,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEventServiceHealthDeregister returns a realistically populated service
|
||||||
|
// health deregistration event for tests. The nodeNum is a
|
||||||
|
// logical node and is used to create the node name ("node%d") but also change
|
||||||
|
// the node ID and IP address to make it a little more realistic for cases that
|
||||||
|
// need that. nodeNum should be less than 64k to make the IP address look
|
||||||
|
// realistic. Any other changes can be made on the returned event to avoid
|
||||||
|
// adding too many options to callers.
|
||||||
|
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
|
||||||
|
node := fmt.Sprintf("node%d", nodeNum)
|
||||||
|
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||||
|
Key: svc,
|
||||||
|
Index: index,
|
||||||
|
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||||
|
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||||
|
Op: pbsubscribe.CatalogOp_Deregister,
|
||||||
|
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||||
|
Node: &pbservice.Node{
|
||||||
|
Node: node,
|
||||||
|
},
|
||||||
|
Service: &pbservice.NodeService{
|
||||||
|
ID: svc,
|
||||||
|
Service: svc,
|
||||||
|
Port: 8080,
|
||||||
|
Weights: &pbservice.Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
// Empty sadness
|
||||||
|
Proxy: pbservice.ConnectProxyConfig{
|
||||||
|
MeshGateway: pbservice.MeshGatewayConfig{},
|
||||||
|
Expose: pbservice.ExposeConfig{},
|
||||||
|
},
|
||||||
|
EnterpriseMeta: pbcommon.EnterpriseMeta{},
|
||||||
|
RaftIndex: pbcommon.RaftIndex{
|
||||||
|
// The original insertion index since a delete doesn't update
|
||||||
|
// this. This magic value came from state store tests where we
|
||||||
|
// setup at index 10 and then mutate at index 100. It can be
|
||||||
|
// modified by the caller later and makes it easier than having
|
||||||
|
// yet another argument in the common case.
|
||||||
|
CreateIndex: 10,
|
||||||
|
ModifyIndex: 10,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
|
||||||
|
events := make([]*pbsubscribe.Event, len(evs)+1)
|
||||||
|
events[0] = first
|
||||||
|
for i := range evs {
|
||||||
|
events[i+1] = evs[i]
|
||||||
|
}
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Topic: first.Topic,
|
||||||
|
Index: first.Index,
|
||||||
|
Payload: &pbsubscribe.Event_EventBatch{
|
||||||
|
EventBatch: &pbsubscribe.EventBatch{Events: events},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,194 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-bexpr"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/agent/submatview"
|
||||||
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Recommended name for registration.
|
||||||
|
StreamingHealthServicesName = "streaming-health-services"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StreamingHealthServices supports fetching discovering service instances via the
|
||||||
|
// catalog using the streaming gRPC endpoint.
|
||||||
|
type StreamingHealthServices struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
|
deps MaterializerDeps
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStreamingHealthServices creates a cache-type for watching for service
|
||||||
|
// health results via streaming updates.
|
||||||
|
func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices {
|
||||||
|
return &StreamingHealthServices{deps: deps}
|
||||||
|
}
|
||||||
|
|
||||||
|
type MaterializerDeps struct {
|
||||||
|
Client submatview.StreamClient
|
||||||
|
Logger hclog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch service health from the materialized view. If no materialized view
|
||||||
|
// exists, create one and start it running in a goroutine. The goroutine will
|
||||||
|
// exit when the cache entry storing the result is expired, the cache will call
|
||||||
|
// Close on the result.State.
|
||||||
|
//
|
||||||
|
// Fetch implements part of the cache.Type interface, and assumes that the
|
||||||
|
// caller ensures that only a single call to Fetch is running at any time.
|
||||||
|
func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||||
|
if opts.LastResult != nil && opts.LastResult.State != nil {
|
||||||
|
return opts.LastResult.State.(*streamingHealthState).Fetch(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
srvReq := req.(*structs.ServiceSpecificRequest)
|
||||||
|
newReqFn := func(index uint64) pbsubscribe.SubscribeRequest {
|
||||||
|
req := pbsubscribe.SubscribeRequest{
|
||||||
|
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||||
|
Key: srvReq.ServiceName,
|
||||||
|
Token: srvReq.Token,
|
||||||
|
Datacenter: srvReq.Datacenter,
|
||||||
|
Index: index,
|
||||||
|
// TODO(streaming): set Namespace from srvReq.EnterpriseMeta.Namespace
|
||||||
|
}
|
||||||
|
if srvReq.Connect {
|
||||||
|
req.Topic = pbsubscribe.Topic_ServiceHealthConnect
|
||||||
|
}
|
||||||
|
return req
|
||||||
|
}
|
||||||
|
|
||||||
|
materializer, err := newMaterializer(c.deps, newReqFn, srvReq.Filter)
|
||||||
|
if err != nil {
|
||||||
|
return cache.FetchResult{}, err
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
go materializer.Run(ctx)
|
||||||
|
|
||||||
|
state := &streamingHealthState{
|
||||||
|
materializer: materializer,
|
||||||
|
done: ctx.Done(),
|
||||||
|
cancel: cancel,
|
||||||
|
}
|
||||||
|
return state.Fetch(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMaterializer(
|
||||||
|
deps MaterializerDeps,
|
||||||
|
newRequestFn func(uint64) pbsubscribe.SubscribeRequest,
|
||||||
|
filter string,
|
||||||
|
) (*submatview.Materializer, error) {
|
||||||
|
view, err := newHealthView(filter)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return submatview.NewMaterializer(submatview.Deps{
|
||||||
|
View: view,
|
||||||
|
Client: deps.Client,
|
||||||
|
Logger: deps.Logger,
|
||||||
|
Waiter: &retry.Waiter{
|
||||||
|
MinFailures: 1,
|
||||||
|
MinWait: 0,
|
||||||
|
MaxWait: 60 * time.Second,
|
||||||
|
Jitter: retry.NewJitter(100),
|
||||||
|
},
|
||||||
|
Request: newRequestFn,
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// streamingHealthState wraps a Materializer to manage its lifecycle, and to
|
||||||
|
// add itself to the FetchResult.State.
|
||||||
|
type streamingHealthState struct {
|
||||||
|
materializer *submatview.Materializer
|
||||||
|
done <-chan struct{}
|
||||||
|
cancel func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamingHealthState) Close() error {
|
||||||
|
s.cancel()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
|
||||||
|
result, err := s.materializer.Fetch(s.done, opts)
|
||||||
|
result.State = s
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHealthView(filterExpr string) (*healthView, error) {
|
||||||
|
s := &healthView{state: make(map[string]structs.CheckServiceNode)}
|
||||||
|
|
||||||
|
// We apply filtering to the raw CheckServiceNodes before we are done mutating
|
||||||
|
// state in Update to save from storing stuff in memory we'll only filter
|
||||||
|
// later. Because the state is just a map of those types, we can simply run
|
||||||
|
// that map through filter and it will remove any entries that don't match.
|
||||||
|
var err error
|
||||||
|
s.filter, err = bexpr.CreateFilter(filterExpr, nil, s.state)
|
||||||
|
return s, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// healthView implements submatview.View for storing the view state
|
||||||
|
// of a service health result. We store it as a map to make updates and
|
||||||
|
// deletions a little easier but we could just store a result type
|
||||||
|
// (IndexedCheckServiceNodes) and update it in place for each event - that
|
||||||
|
// involves re-sorting each time etc. though.
|
||||||
|
type healthView struct {
|
||||||
|
state map[string]structs.CheckServiceNode
|
||||||
|
filter *bexpr.Filter
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update implements View
|
||||||
|
func (s *healthView) Update(events []*pbsubscribe.Event) error {
|
||||||
|
for _, event := range events {
|
||||||
|
serviceHealth := event.GetServiceHealth()
|
||||||
|
if serviceHealth == nil {
|
||||||
|
return fmt.Errorf("unexpected event type for service health view: %T",
|
||||||
|
event.GetPayload())
|
||||||
|
}
|
||||||
|
node := serviceHealth.CheckServiceNode
|
||||||
|
id := fmt.Sprintf("%s/%s", node.Node.Node, node.Service.ID)
|
||||||
|
|
||||||
|
switch serviceHealth.Op {
|
||||||
|
case pbsubscribe.CatalogOp_Register:
|
||||||
|
checkServiceNode := pbservice.CheckServiceNodeToStructs(serviceHealth.CheckServiceNode)
|
||||||
|
s.state[id] = *checkServiceNode
|
||||||
|
case pbsubscribe.CatalogOp_Deregister:
|
||||||
|
delete(s.state, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if s.filter != nil {
|
||||||
|
filtered, err := s.filter.Execute(s.state)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.state = filtered.(map[string]structs.CheckServiceNode)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Result returns the structs.IndexedCheckServiceNodes stored by this view.
|
||||||
|
func (s *healthView) Result(index uint64) (interface{}, error) {
|
||||||
|
result := structs.IndexedCheckServiceNodes{
|
||||||
|
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
|
||||||
|
QueryMeta: structs.QueryMeta{
|
||||||
|
Index: index,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, node := range s.state {
|
||||||
|
result.Nodes = append(result.Nodes, node)
|
||||||
|
}
|
||||||
|
return &result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *healthView) Reset() {
|
||||||
|
s.state = make(map[string]structs.CheckServiceNode)
|
||||||
|
}
|
|
@ -0,0 +1,493 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) {
|
||||||
|
client := NewTestStreamingClient()
|
||||||
|
typ := StreamingHealthServices{deps: MaterializerDeps{
|
||||||
|
Client: client,
|
||||||
|
Logger: hclog.Default(),
|
||||||
|
}}
|
||||||
|
|
||||||
|
// Initially there are no services registered. Server should send an
|
||||||
|
// EndOfSnapshot message immediately with index of 1.
|
||||||
|
client.QueueEvents(newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 1))
|
||||||
|
|
||||||
|
opts := cache.FetchOptions{
|
||||||
|
MinIndex: 0,
|
||||||
|
Timeout: time.Second,
|
||||||
|
}
|
||||||
|
req := &structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
}
|
||||||
|
empty := &structs.IndexedCheckServiceNodes{
|
||||||
|
Nodes: structs.CheckServiceNodes{},
|
||||||
|
QueryMeta: structs.QueryMeta{
|
||||||
|
Index: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
runStep(t, "empty snapshot returned", func(t *testing.T) {
|
||||||
|
// Fetch should return an empty
|
||||||
|
// result of the right type with a non-zero index, and in the background begin
|
||||||
|
// streaming updates.
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(1), result.Index)
|
||||||
|
require.Equal(t, empty, result.Value)
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "blocks for timeout", func(t *testing.T) {
|
||||||
|
// Subsequent fetch should block for the timeout
|
||||||
|
start := time.Now()
|
||||||
|
opts.Timeout = 200 * time.Millisecond
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until timeout")
|
||||||
|
|
||||||
|
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
|
||||||
|
require.Equal(t, empty, result.Value, "result value should not have changed")
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "blocks until update", func(t *testing.T) {
|
||||||
|
// Make another blocking query with a longer timeout and trigger an update
|
||||||
|
// event part way through.
|
||||||
|
start := time.Now()
|
||||||
|
go func() {
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
client.QueueEvents(newEventServiceHealthRegister(4, 1, "web"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until the event was delivered")
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, uint64(4), result.Index, "result index should not have changed")
|
||||||
|
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 1,
|
||||||
|
"result value should contain the new registration")
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
|
||||||
|
client.QueueErr(tempError("broken pipe"))
|
||||||
|
|
||||||
|
// Next fetch will continue to block until timeout and receive the same
|
||||||
|
// result.
|
||||||
|
start := time.Now()
|
||||||
|
opts.Timeout = 200 * time.Millisecond
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until timeout")
|
||||||
|
|
||||||
|
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
|
||||||
|
require.Equal(t, opts.LastResult.Value, result.Value, "result value should not have changed")
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
|
||||||
|
// But an update should still be noticed due to reconnection
|
||||||
|
client.QueueEvents(newEventServiceHealthRegister(10, 2, "web"))
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err = typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed = time.Since(start)
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, uint64(10), result.Index, "result index should not have changed")
|
||||||
|
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 2,
|
||||||
|
"result value should contain the new registration")
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
|
||||||
|
// Wait and send the error while fetcher is waiting
|
||||||
|
go func() {
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
client.QueueErr(errors.New("invalid request"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Next fetch should return the error
|
||||||
|
start := time.Now()
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.Error(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until error was sent")
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
|
||||||
|
// We don't require instances to be returned in same order so we use
|
||||||
|
// elementsMatch which is recursive.
|
||||||
|
requireResultsSame(t,
|
||||||
|
opts.LastResult.Value.(*structs.IndexedCheckServiceNodes),
|
||||||
|
result.Value.(*structs.IndexedCheckServiceNodes),
|
||||||
|
)
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
|
||||||
|
// But an update should still be noticed due to reconnection
|
||||||
|
client.QueueEvents(newEventServiceHealthRegister(opts.MinIndex+5, 3, "web"))
|
||||||
|
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err = typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed = time.Since(start)
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, opts.MinIndex+5, result.Index, "result index should not have changed")
|
||||||
|
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
|
||||||
|
"result value should contain the new registration")
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type tempError string
|
||||||
|
|
||||||
|
func (e tempError) Error() string {
|
||||||
|
return string(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e tempError) Temporary() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// requireResultsSame compares two IndexedCheckServiceNodes without requiring
|
||||||
|
// the same order of results (which vary due to map usage internally).
|
||||||
|
func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) {
|
||||||
|
require.Equal(t, want.Index, got.Index)
|
||||||
|
|
||||||
|
svcIDs := func(csns structs.CheckServiceNodes) []string {
|
||||||
|
res := make([]string, 0, len(csns))
|
||||||
|
for _, csn := range csns {
|
||||||
|
res = append(res, fmt.Sprintf("%s/%s", csn.Node.Node, csn.Service.ID))
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
gotIDs := svcIDs(got.Nodes)
|
||||||
|
wantIDs := svcIDs(want.Nodes)
|
||||||
|
|
||||||
|
require.ElementsMatch(t, wantIDs, gotIDs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
|
client := NewTestStreamingClient()
|
||||||
|
typ := StreamingHealthServices{deps: MaterializerDeps{
|
||||||
|
Client: client,
|
||||||
|
Logger: hclog.Default(),
|
||||||
|
}}
|
||||||
|
|
||||||
|
// Create an initial snapshot of 3 instances on different nodes
|
||||||
|
registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event {
|
||||||
|
return newEventServiceHealthRegister(index, nodeNum, "web")
|
||||||
|
}
|
||||||
|
client.QueueEvents(
|
||||||
|
registerServiceWeb(5, 1),
|
||||||
|
registerServiceWeb(5, 2),
|
||||||
|
registerServiceWeb(5, 3),
|
||||||
|
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
|
||||||
|
|
||||||
|
// This contains the view state so important we share it between calls.
|
||||||
|
opts := cache.FetchOptions{
|
||||||
|
MinIndex: 0,
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}
|
||||||
|
req := &structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
}
|
||||||
|
|
||||||
|
gatherNodes := func(res interface{}) []string {
|
||||||
|
nodes := make([]string, 0, 3)
|
||||||
|
r := res.(*structs.IndexedCheckServiceNodes)
|
||||||
|
for _, csn := range r.Nodes {
|
||||||
|
nodes = append(nodes, csn.Node.Node)
|
||||||
|
}
|
||||||
|
sort.Strings(nodes)
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
runStep(t, "full snapshot returned", func(t *testing.T) {
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(5), result.Index)
|
||||||
|
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
|
||||||
|
gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "blocks until deregistration", func(t *testing.T) {
|
||||||
|
// Make another blocking query with a longer timeout and trigger an update
|
||||||
|
// event part way through.
|
||||||
|
start := time.Now()
|
||||||
|
go func() {
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Deregister instance on node1
|
||||||
|
client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web"))
|
||||||
|
}()
|
||||||
|
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed >= 200*time.Millisecond,
|
||||||
|
"Fetch should have blocked until the event was delivered")
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, uint64(20), result.Index)
|
||||||
|
require.Equal(t, []string{"node2", "node3"}, gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "server reload is respected", func(t *testing.T) {
|
||||||
|
// Simulates the server noticing the request's ACL token privs changing. To
|
||||||
|
// detect this we'll queue up the new snapshot as a different set of nodes
|
||||||
|
// to the first.
|
||||||
|
client.QueueErr(status.Error(codes.Aborted, "reset by server"))
|
||||||
|
|
||||||
|
client.QueueEvents(
|
||||||
|
registerServiceWeb(50, 3), // overlap existing node
|
||||||
|
registerServiceWeb(50, 4),
|
||||||
|
registerServiceWeb(50, 5),
|
||||||
|
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50))
|
||||||
|
|
||||||
|
// Make another blocking query with THE SAME index. It should immediately
|
||||||
|
// return the new snapshot.
|
||||||
|
start := time.Now()
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, uint64(50), result.Index)
|
||||||
|
require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) {
|
||||||
|
client.QueueErr(tempError("temporary connection error"))
|
||||||
|
|
||||||
|
client.QueueEvents(
|
||||||
|
newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth),
|
||||||
|
registerServiceWeb(50, 3), // overlap existing node
|
||||||
|
registerServiceWeb(50, 4),
|
||||||
|
registerServiceWeb(50, 5),
|
||||||
|
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50))
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
opts.MinIndex = 49
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed < time.Second,
|
||||||
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
|
require.Equal(t, uint64(50), result.Index)
|
||||||
|
require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamingHealthServices_EventBatches(t *testing.T) {
|
||||||
|
client := NewTestStreamingClient()
|
||||||
|
typ := StreamingHealthServices{deps: MaterializerDeps{
|
||||||
|
Client: client,
|
||||||
|
Logger: hclog.Default(),
|
||||||
|
}}
|
||||||
|
|
||||||
|
// Create an initial snapshot of 3 instances but in a single event batch
|
||||||
|
batchEv := newEventBatchWithEvents(
|
||||||
|
newEventServiceHealthRegister(5, 1, "web"),
|
||||||
|
newEventServiceHealthRegister(5, 2, "web"),
|
||||||
|
newEventServiceHealthRegister(5, 3, "web"))
|
||||||
|
client.QueueEvents(
|
||||||
|
batchEv,
|
||||||
|
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
|
||||||
|
|
||||||
|
// This contains the view state so important we share it between calls.
|
||||||
|
opts := cache.FetchOptions{
|
||||||
|
MinIndex: 0,
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}
|
||||||
|
req := &structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
}
|
||||||
|
|
||||||
|
gatherNodes := func(res interface{}) []string {
|
||||||
|
nodes := make([]string, 0, 3)
|
||||||
|
r := res.(*structs.IndexedCheckServiceNodes)
|
||||||
|
for _, csn := range r.Nodes {
|
||||||
|
nodes = append(nodes, csn.Node.Node)
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
runStep(t, "full snapshot returned", func(t *testing.T) {
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(5), result.Index)
|
||||||
|
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
|
||||||
|
gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "batched updates work too", func(t *testing.T) {
|
||||||
|
// Simulate multiple registrations happening in one Txn (so all have same
|
||||||
|
// index)
|
||||||
|
batchEv := newEventBatchWithEvents(
|
||||||
|
// Deregister an existing node
|
||||||
|
newEventServiceHealthDeregister(20, 1, "web"),
|
||||||
|
// Register another
|
||||||
|
newEventServiceHealthRegister(20, 4, "web"),
|
||||||
|
)
|
||||||
|
client.QueueEvents(batchEv)
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(20), result.Index)
|
||||||
|
require.ElementsMatch(t, []string{"node2", "node3", "node4"},
|
||||||
|
gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamingHealthServices_Filtering(t *testing.T) {
|
||||||
|
client := NewTestStreamingClient()
|
||||||
|
typ := StreamingHealthServices{deps: MaterializerDeps{
|
||||||
|
Client: client,
|
||||||
|
Logger: hclog.Default(),
|
||||||
|
}}
|
||||||
|
|
||||||
|
// Create an initial snapshot of 3 instances but in a single event batch
|
||||||
|
batchEv := newEventBatchWithEvents(
|
||||||
|
newEventServiceHealthRegister(5, 1, "web"),
|
||||||
|
newEventServiceHealthRegister(5, 2, "web"),
|
||||||
|
newEventServiceHealthRegister(5, 3, "web"))
|
||||||
|
client.QueueEvents(
|
||||||
|
batchEv,
|
||||||
|
newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5))
|
||||||
|
|
||||||
|
// This contains the view state so important we share it between calls.
|
||||||
|
opts := cache.FetchOptions{
|
||||||
|
MinIndex: 0,
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}
|
||||||
|
req := &structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
QueryOptions: structs.QueryOptions{
|
||||||
|
Filter: `Node.Node == "node2"`,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
gatherNodes := func(res interface{}) []string {
|
||||||
|
nodes := make([]string, 0, 3)
|
||||||
|
r := res.(*structs.IndexedCheckServiceNodes)
|
||||||
|
for _, csn := range r.Nodes {
|
||||||
|
nodes = append(nodes, csn.Node.Node)
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
runStep(t, "filtered snapshot returned", func(t *testing.T) {
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(5), result.Index)
|
||||||
|
require.Equal(t, []string{"node2"}, gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "filtered updates work too", func(t *testing.T) {
|
||||||
|
// Simulate multiple registrations happening in one Txn (so all have same
|
||||||
|
// index)
|
||||||
|
batchEv := newEventBatchWithEvents(
|
||||||
|
// Deregister an existing node
|
||||||
|
newEventServiceHealthDeregister(20, 1, "web"),
|
||||||
|
// Register another
|
||||||
|
newEventServiceHealthRegister(20, 4, "web"),
|
||||||
|
)
|
||||||
|
client.QueueEvents(batchEv)
|
||||||
|
opts.Timeout = time.Second
|
||||||
|
result, err := typ.Fetch(opts, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, uint64(20), result.Index)
|
||||||
|
require.Equal(t, []string{"node2"}, gatherNodes(result.Value))
|
||||||
|
|
||||||
|
opts.MinIndex = result.Index
|
||||||
|
opts.LastResult = &result
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
||||||
|
t.Helper()
|
||||||
|
if !t.Run(name, fn) {
|
||||||
|
t.FailNow()
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestStreamingClient is a mock StreamingClient for testing that allows
|
||||||
|
// for queueing up custom events to a subscriber.
|
||||||
|
type TestStreamingClient struct {
|
||||||
|
pbsubscribe.StateChangeSubscription_SubscribeClient
|
||||||
|
events chan eventOrErr
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventOrErr struct {
|
||||||
|
Err error
|
||||||
|
Event *pbsubscribe.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTestStreamingClient() *TestStreamingClient {
|
||||||
|
return &TestStreamingClient{
|
||||||
|
events: make(chan eventOrErr, 32),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) Subscribe(
|
||||||
|
ctx context.Context,
|
||||||
|
_ *pbsubscribe.SubscribeRequest,
|
||||||
|
_ ...grpc.CallOption,
|
||||||
|
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
|
||||||
|
t.ctx = ctx
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
|
||||||
|
for _, e := range events {
|
||||||
|
t.events <- eventOrErr{Event: e}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) QueueErr(err error) {
|
||||||
|
t.events <- eventOrErr{Err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) {
|
||||||
|
select {
|
||||||
|
case eoe := <-t.events:
|
||||||
|
if eoe.Err != nil {
|
||||||
|
return nil, eoe.Err
|
||||||
|
}
|
||||||
|
return eoe.Event, nil
|
||||||
|
case <-t.ctx.Done():
|
||||||
|
return nil, t.ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,14 +18,16 @@ import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/lib"
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/lib"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:generate mockery -all -inpkg
|
//go:generate mockery -all -inpkg
|
||||||
|
@ -773,6 +775,12 @@ func (c *Cache) runExpiryLoop() {
|
||||||
case <-expiryCh:
|
case <-expiryCh:
|
||||||
c.entriesLock.Lock()
|
c.entriesLock.Lock()
|
||||||
|
|
||||||
|
// Perform cleanup operations on the entry's state, if applicable.
|
||||||
|
state := c.entries[entry.Key].State
|
||||||
|
if closer, ok := state.(io.Closer); ok {
|
||||||
|
closer.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// Entry expired! Remove it.
|
// Entry expired! Remove it.
|
||||||
delete(c.entries, entry.Key)
|
delete(c.entries, entry.Key)
|
||||||
heap.Remove(c.entriesExpiryHeap, entry.HeapIndex)
|
heap.Remove(c.entriesExpiryHeap, entry.HeapIndex)
|
||||||
|
|
|
@ -10,11 +10,12 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Test a basic Get with no indexes (and therefore no blocking queries).
|
// Test a basic Get with no indexes (and therefore no blocking queries).
|
||||||
|
@ -841,6 +842,56 @@ func TestCacheGet_expireResetGet(t *testing.T) {
|
||||||
typ.AssertExpectations(t)
|
typ.AssertExpectations(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that entries with state that satisfies io.Closer get cleaned up
|
||||||
|
func TestCacheGet_expireClose(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
typ := &MockType{}
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
c := New(Options{})
|
||||||
|
defer c.Close()
|
||||||
|
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||||
|
SupportsBlocking: true,
|
||||||
|
LastGetTTL: 100 * time.Millisecond,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Register the type with a timeout
|
||||||
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
|
// Configure the type
|
||||||
|
state := &testCloser{}
|
||||||
|
typ.Static(FetchResult{Value: 42, State: state}, nil).Times(1)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
|
result, meta, err := c.Get(ctx, "t", req)
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal(42, result)
|
||||||
|
require.False(meta.Hit)
|
||||||
|
require.False(state.isClosed())
|
||||||
|
|
||||||
|
// Sleep for the expiry
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// state.Close() should have been called
|
||||||
|
require.True(state.isClosed())
|
||||||
|
}
|
||||||
|
|
||||||
|
type testCloser struct {
|
||||||
|
closed uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testCloser) Close() error {
|
||||||
|
atomic.SwapUint32(&t.closed, 1)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testCloser) isClosed() bool {
|
||||||
|
return atomic.LoadUint32(&t.closed) == 1
|
||||||
|
}
|
||||||
|
|
||||||
// Test a Get with a request that returns the same cache key across
|
// Test a Get with a request that returns the same cache key across
|
||||||
// two different "types" returns two separate results.
|
// two different "types" returns two separate results.
|
||||||
func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {
|
func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {
|
||||||
|
|
|
@ -61,6 +61,7 @@ type SnapshotHandlers map[Topic]SnapshotFunc
|
||||||
|
|
||||||
// SnapshotFunc builds a snapshot for the subscription request, and appends the
|
// SnapshotFunc builds a snapshot for the subscription request, and appends the
|
||||||
// events to the Snapshot using SnapshotAppender.
|
// events to the Snapshot using SnapshotAppender.
|
||||||
|
// If err is not nil the SnapshotFunc must return a non-zero index.
|
||||||
type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error)
|
type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error)
|
||||||
|
|
||||||
// SnapshotAppender appends groups of events to create a Snapshot of state.
|
// SnapshotAppender appends groups of events to create a Snapshot of state.
|
||||||
|
|
|
@ -374,6 +374,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
|
||||||
}
|
}
|
||||||
|
|
||||||
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
||||||
|
t.Helper()
|
||||||
if !t.Run(name, fn) {
|
if !t.Run(name, fn) {
|
||||||
t.FailNow()
|
t.FailNow()
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
package submatview
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// eventHandler is a function which performs some operation on the received
|
||||||
|
// events, then returns the eventHandler that should be used for the next set
|
||||||
|
// of events.
|
||||||
|
// If eventHandler fails to handle the events it may return an error. If an
|
||||||
|
// error is returned the next eventHandler will be ignored.
|
||||||
|
// eventHandler is used to implement a very simple finite-state machine.
|
||||||
|
type eventHandler func(state viewState, events *pbsubscribe.Event) (next eventHandler, err error)
|
||||||
|
|
||||||
|
type viewState interface {
|
||||||
|
updateView(events []*pbsubscribe.Event, index uint64) error
|
||||||
|
reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
func initialHandler(index uint64) eventHandler {
|
||||||
|
if index == 0 {
|
||||||
|
return newSnapshotHandler()
|
||||||
|
}
|
||||||
|
return resumeStreamHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
// snapshotHandler accumulates events. When it receives an EndOfSnapshot event
|
||||||
|
// it updates the view, and then returns eventStreamHandler to handle new events.
|
||||||
|
type snapshotHandler struct {
|
||||||
|
events []*pbsubscribe.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSnapshotHandler() eventHandler {
|
||||||
|
return (&snapshotHandler{}).handle
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *snapshotHandler) handle(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
|
||||||
|
if event.GetEndOfSnapshot() {
|
||||||
|
err := state.updateView(h.events, event.Index)
|
||||||
|
return eventStreamHandler, err
|
||||||
|
}
|
||||||
|
|
||||||
|
h.events = append(h.events, eventsFromEvent(event)...)
|
||||||
|
return h.handle, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// eventStreamHandler handles events by updating the view. It always returns
|
||||||
|
// itself as the next handler.
|
||||||
|
func eventStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
|
||||||
|
err := state.updateView(eventsFromEvent(event), event.Index)
|
||||||
|
return eventStreamHandler, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event {
|
||||||
|
if batch := event.GetEventBatch(); batch != nil {
|
||||||
|
return batch.Events
|
||||||
|
}
|
||||||
|
return []*pbsubscribe.Event{event}
|
||||||
|
}
|
||||||
|
|
||||||
|
// resumeStreamHandler checks if the event is a NewSnapshotToFollow event. If it
|
||||||
|
// is it resets the view and returns a snapshotHandler to handle the next event.
|
||||||
|
// Otherwise it uses eventStreamHandler to handle events.
|
||||||
|
func resumeStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
|
||||||
|
if event.GetNewSnapshotToFollow() {
|
||||||
|
state.reset()
|
||||||
|
return newSnapshotHandler(), nil
|
||||||
|
}
|
||||||
|
return eventStreamHandler(state, event)
|
||||||
|
}
|
|
@ -0,0 +1,285 @@
|
||||||
|
package submatview
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// View receives events from, and return results to, Materializer. A view is
|
||||||
|
// responsible for converting the pbsubscribe.Event.Payload into the local
|
||||||
|
// type, and storing it so that it can be returned by Result().
|
||||||
|
type View interface {
|
||||||
|
// Update is called when one or more events are received. The first call will
|
||||||
|
// include _all_ events in the initial snapshot which may be an empty set.
|
||||||
|
// Subsequent calls will contain one or more update events in the order they
|
||||||
|
// are received.
|
||||||
|
Update(events []*pbsubscribe.Event) error
|
||||||
|
|
||||||
|
// Result returns the type-specific cache result based on the state. When no
|
||||||
|
// events have been delivered yet the result should be an empty value type
|
||||||
|
// suitable to return to clients in case there is an empty result on the
|
||||||
|
// servers. The index the materialized view represents is maintained
|
||||||
|
// separately and passed in in case the return type needs an Index field
|
||||||
|
// populating. This allows implementations to not worry about maintaining
|
||||||
|
// indexes seen during Update.
|
||||||
|
Result(index uint64) (interface{}, error)
|
||||||
|
|
||||||
|
// Reset the view to the zero state, done in preparation for receiving a new
|
||||||
|
// snapshot.
|
||||||
|
Reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Materializer consumes the event stream, handling any framing events, and
|
||||||
|
// sends the events to View as they are received.
|
||||||
|
//
|
||||||
|
// Materializer is used as the cache.Result.State for a streaming
|
||||||
|
// cache type and manages the actual streaming RPC call to the servers behind
|
||||||
|
// the scenes until the cache result is discarded when TTL expires.
|
||||||
|
type Materializer struct {
|
||||||
|
deps Deps
|
||||||
|
retryWaiter *retry.Waiter
|
||||||
|
handler eventHandler
|
||||||
|
|
||||||
|
// lock protects the mutable state - all fields below it must only be accessed
|
||||||
|
// while holding lock.
|
||||||
|
lock sync.Mutex
|
||||||
|
index uint64
|
||||||
|
view View
|
||||||
|
updateCh chan struct{}
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type Deps struct {
|
||||||
|
View View
|
||||||
|
Client StreamClient
|
||||||
|
Logger hclog.Logger
|
||||||
|
Waiter *retry.Waiter
|
||||||
|
Request func(index uint64) pbsubscribe.SubscribeRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamClient provides a subscription to state change events.
|
||||||
|
type StreamClient interface {
|
||||||
|
Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMaterializer returns a new Materializer. Run must be called to start it.
|
||||||
|
func NewMaterializer(deps Deps) *Materializer {
|
||||||
|
v := &Materializer{
|
||||||
|
deps: deps,
|
||||||
|
view: deps.View,
|
||||||
|
retryWaiter: deps.Waiter,
|
||||||
|
updateCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run receives events from the StreamClient and sends them to the View. It runs
|
||||||
|
// until ctx is cancelled, so it is expected to be run in a goroutine.
|
||||||
|
func (m *Materializer) Run(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
req := m.deps.Request(m.index)
|
||||||
|
err := m.runSubscription(ctx, req)
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
failures := m.retryWaiter.Failures()
|
||||||
|
if isNonTemporaryOrConsecutiveFailure(err, failures) {
|
||||||
|
m.lock.Lock()
|
||||||
|
m.notifyUpdateLocked(err)
|
||||||
|
m.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
m.deps.Logger.Error("subscribe call failed",
|
||||||
|
"err", err,
|
||||||
|
"topic", req.Topic,
|
||||||
|
"key", req.Key,
|
||||||
|
"failure_count", failures+1)
|
||||||
|
|
||||||
|
if err := m.retryWaiter.Wait(ctx); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// isNonTemporaryOrConsecutiveFailure returns true if the error is not a
|
||||||
|
// temporary error or if failures > 0.
|
||||||
|
func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool {
|
||||||
|
// temporary is an interface used by net and other std lib packages to
|
||||||
|
// show error types represent temporary/recoverable errors.
|
||||||
|
temp, ok := err.(interface {
|
||||||
|
Temporary() bool
|
||||||
|
})
|
||||||
|
return !ok || !temp.Temporary() || failures > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// runSubscription opens a new subscribe streaming call to the servers and runs
|
||||||
|
// for it's lifetime or until the view is closed.
|
||||||
|
func (m *Materializer) runSubscription(ctx context.Context, req pbsubscribe.SubscribeRequest) error {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
m.handler = initialHandler(req.Index)
|
||||||
|
|
||||||
|
s, err := m.deps.Client.Subscribe(ctx, &req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
event, err := s.Recv()
|
||||||
|
switch {
|
||||||
|
case isGrpcStatus(err, codes.Aborted):
|
||||||
|
m.reset()
|
||||||
|
return resetErr("stream reset requested")
|
||||||
|
case err != nil:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m.handler, err = m.handler(m, event)
|
||||||
|
if err != nil {
|
||||||
|
m.reset()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isGrpcStatus(err error, code codes.Code) bool {
|
||||||
|
s, ok := status.FromError(err)
|
||||||
|
return ok && s.Code() == code
|
||||||
|
}
|
||||||
|
|
||||||
|
// resetErr represents a server request to reset the subscription, it's typed so
|
||||||
|
// we can mark it as temporary and so attempt to retry first time without
|
||||||
|
// notifying clients.
|
||||||
|
type resetErr string
|
||||||
|
|
||||||
|
// Temporary Implements the internal Temporary interface
|
||||||
|
func (e resetErr) Temporary() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error implements error
|
||||||
|
func (e resetErr) Error() string {
|
||||||
|
return string(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset clears the state ready to start a new stream from scratch.
|
||||||
|
func (m *Materializer) reset() {
|
||||||
|
m.lock.Lock()
|
||||||
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
|
m.view.Reset()
|
||||||
|
m.index = 0
|
||||||
|
m.retryWaiter.Reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Materializer) updateView(events []*pbsubscribe.Event, index uint64) error {
|
||||||
|
m.lock.Lock()
|
||||||
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
|
if err := m.view.Update(events); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m.index = index
|
||||||
|
m.notifyUpdateLocked(nil)
|
||||||
|
m.retryWaiter.Reset()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// notifyUpdateLocked closes the current update channel and recreates a new
|
||||||
|
// one. It must be called while holding the s.lock lock.
|
||||||
|
func (m *Materializer) notifyUpdateLocked(err error) {
|
||||||
|
m.err = err
|
||||||
|
close(m.updateCh)
|
||||||
|
m.updateCh = make(chan struct{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch implements the logic a StreamingCacheType will need during it's Fetch
|
||||||
|
// call. Cache types that use streaming should just be able to proxy to this
|
||||||
|
// once they have a subscription object and return it's results directly.
|
||||||
|
func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) {
|
||||||
|
var result cache.FetchResult
|
||||||
|
|
||||||
|
// Get current view Result and index
|
||||||
|
m.lock.Lock()
|
||||||
|
index := m.index
|
||||||
|
val, err := m.view.Result(m.index)
|
||||||
|
updateCh := m.updateCh
|
||||||
|
m.lock.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Index = index
|
||||||
|
result.Value = val
|
||||||
|
|
||||||
|
// If our index is > req.Index return right away. If index is zero then we
|
||||||
|
// haven't loaded a snapshot at all yet which means we should wait for one on
|
||||||
|
// the update chan. Note it's opts.MinIndex that the cache is using here the
|
||||||
|
// request min index might be different and from initial user request.
|
||||||
|
if index > 0 && index > opts.MinIndex {
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watch for timeout of the Fetch. Note it's opts.Timeout not req.Timeout
|
||||||
|
// since that is the timeout the client requested from the cache Get while the
|
||||||
|
// options one is the internal "background refresh" timeout which is what the
|
||||||
|
// Fetch call should be using.
|
||||||
|
timeoutCh := time.After(opts.Timeout)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-updateCh:
|
||||||
|
// View updated, return the new result
|
||||||
|
m.lock.Lock()
|
||||||
|
result.Index = m.index
|
||||||
|
// Grab the new updateCh in case we need to keep waiting for the next
|
||||||
|
// update.
|
||||||
|
updateCh = m.updateCh
|
||||||
|
fetchErr := m.err
|
||||||
|
if fetchErr == nil {
|
||||||
|
// Only generate a new result if there was no error to avoid pointless
|
||||||
|
// work potentially shuffling the same data around.
|
||||||
|
result.Value, err = m.view.Result(m.index)
|
||||||
|
}
|
||||||
|
m.lock.Unlock()
|
||||||
|
|
||||||
|
// If there was a non-transient error return it
|
||||||
|
if fetchErr != nil {
|
||||||
|
return result, fetchErr
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanity check the update is actually later than the one the user
|
||||||
|
// requested.
|
||||||
|
if result.Index <= opts.MinIndex {
|
||||||
|
// The result is still older/same as the requested index, continue to
|
||||||
|
// wait for further updates.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the updated result
|
||||||
|
return result, nil
|
||||||
|
|
||||||
|
case <-timeoutCh:
|
||||||
|
// Just return whatever we got originally, might still be empty
|
||||||
|
return result, nil
|
||||||
|
|
||||||
|
case <-done:
|
||||||
|
return result, context.Canceled
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue