Merge pull request #10112 from hashicorp/dnephin/remove-streaming-from-cache

streaming: replace agent/cache with submatview.Store
This commit is contained in:
Daniel Nephin 2021-04-28 17:31:42 -04:00 committed by GitHub
commit b4362552fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1796 additions and 732 deletions

3
.changelog/10112.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
streaming: fixes a bug that would cause context cancellation errors when a cache entry expired while requests were active.
```

View File

@ -373,15 +373,21 @@ func New(bd BaseDeps) (*Agent, error) {
cache: bd.Cache, cache: bd.Cache,
} }
cacheName := cachetype.HealthServicesName // TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent
if bd.RuntimeConfig.UseStreamingBackend { conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
cacheName = cachetype.StreamingHealthServicesName if err != nil {
return nil, err
} }
a.rpcClientHealth = &health.Client{ a.rpcClientHealth = &health.Client{
Cache: bd.Cache, Cache: bd.Cache,
NetRPC: &a, NetRPC: &a,
CacheName: cacheName, CacheName: cachetype.HealthServicesName,
CacheNameIngress: cachetype.HealthServicesName, ViewStore: bd.ViewStore,
MaterializerDeps: health.MaterializerDeps{
Conn: conn,
Logger: bd.Logger.Named("rpcclient.health"),
},
} }
a.serviceManager = NewServiceManager(&a) a.serviceManager = NewServiceManager(&a)
@ -533,6 +539,8 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLDefaultPolicy) return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLDefaultPolicy)
} }
go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})
// Start the proxy config manager. // Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: a.cache, Cache: a.cache,
@ -540,7 +548,6 @@ func (a *Agent) Start(ctx context.Context) error {
Logger: a.logger.Named(logging.ProxyConfig), Logger: a.logger.Named(logging.ProxyConfig),
State: a.State, State: a.State,
Source: &structs.QuerySource{ Source: &structs.QuerySource{
Node: a.config.NodeName,
Datacenter: a.config.Datacenter, Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName, Segment: a.config.SegmentName,
}, },
@ -1385,6 +1392,8 @@ func (a *Agent) ShutdownAgent() error {
a.cache.Close() a.cache.Close()
} }
a.rpcClientHealth.Close()
var err error var err error
if a.delegate != nil { if a.delegate != nil {
err = a.delegate.Shutdown() err = a.delegate.Shutdown()

View File

@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"google.golang.org/grpc"
"gopkg.in/square/go-jose.v2/jwt" "gopkg.in/square/go-jose.v2/jwt"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
@ -307,6 +308,7 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil), Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store), Tokens: new(token.Store),
TLSConfigurator: tlsConf, TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
}, },
RuntimeConfig: &config.RuntimeConfig{ RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{ HTTPAddrs: []net.Addr{
@ -355,6 +357,12 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
} }
} }
type fakeGRPCConnPool struct{}
func (f fakeGRPCConnPool) ClientConn(_ string) (*grpc.ClientConn, error) {
return nil, nil
}
func TestAgent_ReconnectConfigWanDisabled(t *testing.T) { func TestAgent_ReconnectConfigWanDisabled(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
@ -5173,6 +5181,7 @@ func TestAgent_ListenHTTP_MultipleAddresses(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil), Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store), Tokens: new(token.Store),
TLSConfigurator: tlsConf, TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
}, },
RuntimeConfig: &config.RuntimeConfig{ RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{ HTTPAddrs: []net.Addr{

View File

@ -1,66 +0,0 @@
package cachetype
import (
"context"
"fmt"
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// TestStreamingClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type TestStreamingClient struct {
pbsubscribe.StateChangeSubscription_SubscribeClient
events chan eventOrErr
ctx context.Context
expectedNamespace string
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func NewTestStreamingClient(ns string) *TestStreamingClient {
return &TestStreamingClient{
events: make(chan eventOrErr, 32),
expectedNamespace: ns,
}
}
func (t *TestStreamingClient) Subscribe(
ctx context.Context,
req *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
if req.Namespace != t.expectedNamespace {
return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v",
req.Namespace, t.expectedNamespace)
}
t.ctx = ctx
return t, nil
}
func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
for _, e := range events {
t.events <- eventOrErr{Event: e}
}
}
func (t *TestStreamingClient) QueueErr(err error) {
t.events <- eventOrErr{Err: err}
}
func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) {
select {
case eoe := <-t.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}

View File

@ -1,12 +1,13 @@
package consul package consul
import ( import (
"github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
) )
type Deps struct { type Deps struct {
@ -15,5 +16,9 @@ type Deps struct {
Tokens *token.Store Tokens *token.Store
Router *router.Router Router *router.Router
ConnPool *pool.ConnPool ConnPool *pool.ConnPool
GRPCConnPool *grpc.ClientConnPool GRPCConnPool GRPCClientConner
}
type GRPCClientConner interface {
ClientConn(datacenter string) (*grpc.ClientConn, error)
} }

View File

@ -5,14 +5,13 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
agentgrpc "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/rpc/subscribe" "github.com/hashicorp/consul/agent/rpc/subscribe"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
type subscribeBackend struct { type subscribeBackend struct {
srv *Server srv *Server
connPool *agentgrpc.ClientConnPool connPool GRPCClientConner
} }
// TODO: refactor Resolve methods to an ACLBackend that can be used by all // TODO: refactor Resolve methods to an ACLBackend that can be used by all

View File

@ -219,13 +219,6 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
return nil, nil return nil, nil
} }
useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming)
if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" {
return nil, BadRequestError{Reason: "'near' query param can not be used with streaming"}
}
out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args) out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -352,10 +352,7 @@ func testManager_BasicLifecycle(
require := require.New(t) require := require.New(t)
logger := testutil.Logger(t) logger := testutil.Logger(t)
state := local.NewState(local.Config{}, logger, &token.Store{}) state := local.NewState(local.Config{}, logger, &token.Store{})
source := &structs.QuerySource{ source := &structs.QuerySource{Datacenter: "dc1"}
Node: "node1",
Datacenter: "dc1",
}
// Stub state syncing // Stub state syncing
state.TriggerSyncChanges = func() {} state.TriggerSyncChanges = func() {}

View File

@ -5,16 +5,18 @@ import (
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/pbsubscribe"
) )
// Client provides access to service health data.
type Client struct { type Client struct {
NetRPC NetRPC NetRPC NetRPC
Cache CacheGetter Cache CacheGetter
// CacheName to use for service health. ViewStore MaterializedViewStore
CacheName string MaterializerDeps MaterializerDeps
// CacheNameIngress is the name of the cache type to use for ingress CacheName string
// service health. UseStreamingBackend bool
CacheNameIngress string
} }
type NetRPC interface { type NetRPC interface {
@ -26,10 +28,23 @@ type CacheGetter interface {
Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error
} }
type MaterializedViewStore interface {
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
}
func (c *Client) ServiceNodes( func (c *Client) ServiceNodes(
ctx context.Context, ctx context.Context,
req structs.ServiceSpecificRequest, req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { ) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) {
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
if err != nil {
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
}
return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err
}
out, md, err := c.getServiceNodes(ctx, req) out, md, err := c.getServiceNodes(ctx, req)
if err != nil { if err != nil {
return out, md, err return out, md, err
@ -50,18 +65,12 @@ func (c *Client) getServiceNodes(
req structs.ServiceSpecificRequest, req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { ) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
var out structs.IndexedCheckServiceNodes var out structs.IndexedCheckServiceNodes
if !req.QueryOptions.UseCache { if !req.QueryOptions.UseCache {
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out) err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
return out, cache.ResultMeta{}, err return out, cache.ResultMeta{}, err
} }
cacheName := c.CacheName raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
if req.Ingress {
cacheName = c.CacheNameIngress
}
raw, md, err := c.Cache.Get(ctx, cacheName, &req)
if err != nil { if err != nil {
return out, md, err return out, md, err
} }
@ -80,9 +89,55 @@ func (c *Client) Notify(
correlationID string, correlationID string,
ch chan<- cache.UpdateEvent, ch chan<- cache.UpdateEvent,
) error { ) error {
cacheName := c.CacheName if c.useStreaming(req) {
if req.Ingress { sr := c.newServiceRequest(req)
cacheName = c.CacheNameIngress return c.ViewStore.Notify(ctx, sr, correlationID, ch)
} }
return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch)
return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch)
}
func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool {
return c.UseStreamingBackend && !req.Ingress && req.Source.Node == ""
}
func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest {
return serviceRequest{
ServiceSpecificRequest: req,
deps: c.MaterializerDeps,
}
}
// Close any underlying connections used by the client.
func (c *Client) Close() error {
if c == nil {
return nil
}
return c.MaterializerDeps.Conn.Close()
}
type serviceRequest struct {
structs.ServiceSpecificRequest
deps MaterializerDeps
}
func (r serviceRequest) CacheInfo() cache.RequestInfo {
return r.ServiceSpecificRequest.CacheInfo()
}
func (r serviceRequest) Type() string {
return "service-health"
}
func (r serviceRequest) NewMaterializer() (*submatview.Materializer, error) {
view, err := newHealthView(r.ServiceSpecificRequest)
if err != nil {
return nil, err
}
return submatview.NewMaterializer(submatview.Deps{
View: view,
Client: pbsubscribe.NewStateChangeSubscriptionClient(r.deps.Conn),
Logger: r.deps.Logger,
Request: newMaterializerRequest(r.ServiceSpecificRequest),
}), nil
} }

View File

@ -0,0 +1,235 @@
package health
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
)
func TestClient_ServiceNodes_BackendRouting(t *testing.T) {
type testCase struct {
name string
req structs.ServiceSpecificRequest
expected func(t *testing.T, c *Client)
}
run := func(t *testing.T, tc testCase) {
c := &Client{
NetRPC: &fakeNetRPC{},
Cache: &fakeCache{},
ViewStore: &fakeViewStore{},
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
}
_, _, err := c.ServiceNodes(context.Background(), tc.req)
require.NoError(t, err)
tc.expected(t, c)
}
var testCases = []testCase{
{
name: "rpc by default",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
},
expected: useRPC,
},
{
name: "use streaming instead of cache",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
QueryOptions: structs.QueryOptions{UseCache: true},
},
expected: useStreaming,
},
{
name: "use streaming for MinQueryIndex",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
QueryOptions: structs.QueryOptions{MinQueryIndex: 22},
},
expected: useStreaming,
},
{
name: "use cache for ingress request",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
QueryOptions: structs.QueryOptions{UseCache: true},
Ingress: true,
},
expected: useCache,
},
{
name: "use cache for near request",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
QueryOptions: structs.QueryOptions{UseCache: true},
Source: structs.QuerySource{Node: "node1"},
},
expected: useCache,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
func useRPC(t *testing.T, c *Client) {
t.Helper()
rpc, ok := c.NetRPC.(*fakeNetRPC)
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
cache, ok := c.Cache.(*fakeCache)
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
store, ok := c.ViewStore.(*fakeViewStore)
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
require.Len(t, cache.calls, 0)
require.Len(t, store.calls, 0)
require.Equal(t, []string{"Health.ServiceNodes"}, rpc.calls)
}
func useStreaming(t *testing.T, c *Client) {
t.Helper()
rpc, ok := c.NetRPC.(*fakeNetRPC)
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
cache, ok := c.Cache.(*fakeCache)
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
store, ok := c.ViewStore.(*fakeViewStore)
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
require.Len(t, cache.calls, 0)
require.Len(t, rpc.calls, 0)
require.Len(t, store.calls, 1)
}
func useCache(t *testing.T, c *Client) {
t.Helper()
rpc, ok := c.NetRPC.(*fakeNetRPC)
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
cache, ok := c.Cache.(*fakeCache)
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
store, ok := c.ViewStore.(*fakeViewStore)
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
require.Len(t, rpc.calls, 0)
require.Len(t, store.calls, 0)
require.Equal(t, []string{"cache-no-streaming"}, cache.calls)
}
type fakeCache struct {
calls []string
}
func (f *fakeCache) Get(_ context.Context, t string, _ cache.Request) (interface{}, cache.ResultMeta, error) {
f.calls = append(f.calls, t)
result := &structs.IndexedCheckServiceNodes{}
return result, cache.ResultMeta{}, nil
}
func (f *fakeCache) Notify(_ context.Context, t string, _ cache.Request, _ string, _ chan<- cache.UpdateEvent) error {
f.calls = append(f.calls, t)
return nil
}
type fakeNetRPC struct {
calls []string
}
func (f *fakeNetRPC) RPC(method string, _ interface{}, _ interface{}) error {
f.calls = append(f.calls, method)
return nil
}
type fakeViewStore struct {
calls []submatview.Request
}
func (f *fakeViewStore) Get(_ context.Context, req submatview.Request) (submatview.Result, error) {
f.calls = append(f.calls, req)
return submatview.Result{Value: &structs.IndexedCheckServiceNodes{}}, nil
}
func (f *fakeViewStore) Notify(_ context.Context, req submatview.Request, _ string, _ chan<- cache.UpdateEvent) error {
f.calls = append(f.calls, req)
return nil
}
func TestClient_Notify_BackendRouting(t *testing.T) {
type testCase struct {
name string
req structs.ServiceSpecificRequest
expected func(t *testing.T, c *Client)
}
run := func(t *testing.T, tc testCase) {
c := &Client{
NetRPC: &fakeNetRPC{},
Cache: &fakeCache{},
ViewStore: &fakeViewStore{},
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
}
err := c.Notify(context.Background(), tc.req, "cid", nil)
require.NoError(t, err)
tc.expected(t, c)
}
var testCases = []testCase{
{
name: "streaming by default",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
},
expected: useStreaming,
},
{
name: "use cache for ingress request",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
Ingress: true,
},
expected: useCache,
},
{
name: "use cache for near request",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
Source: structs.QuerySource{Node: "node1"},
},
expected: useCache,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
}

View File

@ -0,0 +1,69 @@
package health
import (
"context"
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// streamClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type streamClient struct {
pbsubscribe.StateChangeSubscription_SubscribeClient
subFn func(*pbsubscribe.SubscribeRequest) error
events chan eventOrErr
ctx context.Context
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func newStreamClient(sub func(req *pbsubscribe.SubscribeRequest) error) *streamClient {
if sub == nil {
sub = func(*pbsubscribe.SubscribeRequest) error {
return nil
}
}
return &streamClient{
events: make(chan eventOrErr, 32),
subFn: sub,
}
}
func (t *streamClient) Subscribe(
ctx context.Context,
req *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
if err := t.subFn(req); err != nil {
return nil, err
}
t.ctx = ctx
return t, nil
}
func (t *streamClient) QueueEvents(events ...*pbsubscribe.Event) {
for _, e := range events {
t.events <- eventOrErr{Event: e}
}
}
func (t *streamClient) QueueErr(err error) {
t.events <- eventOrErr{Err: err}
}
func (t *streamClient) Recv() (*pbsubscribe.Event, error) {
select {
case eoe := <-t.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}

View File

@ -1,74 +1,27 @@
package cachetype package health
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
"time"
"github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
) )
const (
// Recommended name for registration.
StreamingHealthServicesName = "streaming-health-services"
)
// StreamingHealthServices supports fetching discovering service instances via the
// catalog using the streaming gRPC endpoint.
type StreamingHealthServices struct {
RegisterOptionsBlockingRefresh
deps MaterializerDeps
}
// RegisterOptions returns options with a much shorter LastGetTTL than the default.
// Unlike other cache-types, StreamingHealthServices runs a materialized view in
// the background which will receive streamed events from a server. If the cache
// is not being used, that stream uses memory on the server and network transfer
// between the client and the server.
// The materialize view and the stream are stopped when the cache entry expires,
// so using a shorter TTL ensures the cache entry expires sooner.
func (c *StreamingHealthServices) RegisterOptions() cache.RegisterOptions {
opts := c.RegisterOptionsBlockingRefresh.RegisterOptions()
opts.LastGetTTL = 20 * time.Minute
return opts
}
// NewStreamingHealthServices creates a cache-type for watching for service
// health results via streaming updates.
func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices {
return &StreamingHealthServices{deps: deps}
}
type MaterializerDeps struct { type MaterializerDeps struct {
Client submatview.StreamClient Conn *grpc.ClientConn
Logger hclog.Logger Logger hclog.Logger
} }
// Fetch service health from the materialized view. If no materialized view func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) pbsubscribe.SubscribeRequest {
// exists, create one and start it running in a goroutine. The goroutine will return func(index uint64) pbsubscribe.SubscribeRequest {
// exit when the cache entry storing the result is expired, the cache will call
// Close on the result.State.
//
// Fetch implements part of the cache.Type interface, and assumes that the
// caller ensures that only a single call to Fetch is running at any time.
func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
if opts.LastResult != nil && opts.LastResult.State != nil {
return opts.LastResult.State.(*streamingHealthState).Fetch(opts)
}
srvReq := req.(*structs.ServiceSpecificRequest)
newReqFn := func(index uint64) pbsubscribe.SubscribeRequest {
req := pbsubscribe.SubscribeRequest{ req := pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: srvReq.ServiceName, Key: srvReq.ServiceName,
@ -82,69 +35,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
} }
return req return req
} }
materializer, err := newMaterializer(c.deps, newReqFn, srvReq)
if err != nil {
return cache.FetchResult{}, err
}
ctx, cancel := context.WithCancel(context.TODO())
go materializer.Run(ctx)
state := &streamingHealthState{
materializer: materializer,
done: ctx.Done(),
cancel: cancel,
}
return state.Fetch(opts)
} }
func newMaterializer( func newHealthView(req structs.ServiceSpecificRequest) (*healthView, error) {
deps MaterializerDeps,
newRequestFn func(uint64) pbsubscribe.SubscribeRequest,
req *structs.ServiceSpecificRequest,
) (*submatview.Materializer, error) {
view, err := newHealthView(req)
if err != nil {
return nil, err
}
return submatview.NewMaterializer(submatview.Deps{
View: view,
Client: deps.Client,
Logger: deps.Logger,
Waiter: &retry.Waiter{
MinFailures: 1,
// Start backing off with small increments (200-400ms) which will double
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
// after that). (retry.Wait applies Max limit after jitter right now).
Factor: 200 * time.Millisecond,
MinWait: 0,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
},
Request: newRequestFn,
}), nil
}
// streamingHealthState wraps a Materializer to manage its lifecycle, and to
// add itself to the FetchResult.State.
type streamingHealthState struct {
materializer *submatview.Materializer
done <-chan struct{}
cancel func()
}
func (s *streamingHealthState) Close() error {
s.cancel()
return nil
}
func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
result, err := s.materializer.Fetch(s.done, opts)
result.State = s
return result, err
}
func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) {
fe, err := newFilterEvaluator(req) fe, err := newFilterEvaluator(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -197,7 +90,7 @@ type filterEvaluator interface {
Evaluate(datum interface{}) (bool, error) Evaluate(datum interface{}) (bool, error)
} }
func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) { func newFilterEvaluator(req structs.ServiceSpecificRequest) (filterEvaluator, error) {
var evaluators []filterEvaluator var evaluators []filterEvaluator
typ := reflect.TypeOf(structs.CheckServiceNode{}) typ := reflect.TypeOf(structs.CheckServiceNode{})
@ -274,7 +167,7 @@ func sortCheckServiceNodes(serviceNodes *structs.IndexedCheckServiceNodes) {
} }
// Result returns the structs.IndexedCheckServiceNodes stored by this view. // Result returns the structs.IndexedCheckServiceNodes stored by this view.
func (s *healthView) Result(index uint64) (interface{}, error) { func (s *healthView) Result(index uint64) interface{} {
result := structs.IndexedCheckServiceNodes{ result := structs.IndexedCheckServiceNodes{
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)), Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
QueryMeta: structs.QueryMeta{ QueryMeta: structs.QueryMeta{
@ -286,7 +179,7 @@ func (s *healthView) Result(index uint64) (interface{}, error) {
} }
sortCheckServiceNodes(&result) sortCheckServiceNodes(&result)
return &result, nil return &result
} }
func (s *healthView) Reset() { func (s *healthView) Reset() {

View File

@ -1,238 +1,29 @@
package cachetype package health
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
) )
func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { func TestSortCheckServiceNodes_OrderIsConsistentWithRPCResponse(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
namespace := pbcommon.DefaultEnterpriseMeta.Namespace
client := NewTestStreamingClient(namespace)
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client,
Logger: hclog.Default(),
}}
// Initially there are no services registered. Server should send an
// EndOfSnapshot message immediately with index of 1.
client.QueueEvents(newEndOfSnapshotEvent(1))
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
}
empty := &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{},
QueryMeta: structs.QueryMeta{
Index: 1,
},
}
runStep(t, "empty snapshot returned", func(t *testing.T) {
// Fetch should return an empty
// result of the right type with a non-zero index, and in the background begin
// streaming updates.
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
require.Equal(t, uint64(1), result.Index)
require.Equal(t, empty, result.Value)
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "blocks for timeout", func(t *testing.T) {
// Subsequent fetch should block for the timeout
start := time.Now()
opts.Timeout = 200 * time.Millisecond
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
require.Equal(t, empty, result.Value, "result value should not have changed")
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "blocks until update", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update
// event part way through.
start := time.Now()
go func() {
time.Sleep(200 * time.Millisecond)
client.QueueEvents(newEventServiceHealthRegister(4, 1, "web"))
}()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until the event was delivered")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(4), result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 1,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
client.QueueErr(tempError("broken pipe"))
// Next fetch will continue to block until timeout and receive the same
// result.
start := time.Now()
opts.Timeout = 200 * time.Millisecond
result, err := typ.Fetch(opts, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
require.Equal(t, opts.LastResult.Value, result.Value, "result value should not have changed")
opts.MinIndex = result.Index
opts.LastResult = &result
// But an update should still be noticed due to reconnection
client.QueueEvents(newEventServiceHealthRegister(10, 2, "web"))
start = time.Now()
opts.Timeout = time.Second
result, err = typ.Fetch(opts, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(10), result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 2,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
})
runStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
// Wait and send the error while fetcher is waiting
go func() {
time.Sleep(200 * time.Millisecond)
client.QueueErr(errors.New("invalid request"))
}()
// Next fetch should return the error
start := time.Now()
opts.Timeout = time.Second
result, err := typ.Fetch(opts, req)
require.Error(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until error was sent")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, opts.MinIndex, result.Index, "result index should not have changed")
// We don't require instances to be returned in same order so we use
// elementsMatch which is recursive.
requireResultsSame(t,
opts.LastResult.Value.(*structs.IndexedCheckServiceNodes),
result.Value.(*structs.IndexedCheckServiceNodes),
)
opts.MinIndex = result.Index
opts.LastResult = &result
// But an update should still be noticed due to reconnection
client.QueueEvents(newEventServiceHealthRegister(opts.MinIndex+5, 3, "web"))
opts.Timeout = time.Second
result, err = typ.Fetch(opts, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, opts.MinIndex+5, result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
"result value should contain the new registration")
opts.MinIndex = result.Index
opts.LastResult = &result
})
}
type tempError string
func (e tempError) Error() string {
return string(e)
}
func (e tempError) Temporary() bool {
return true
}
// requireResultsSame compares two IndexedCheckServiceNodes without requiring
// the same order of results (which vary due to map usage internally).
func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNodes) {
require.Equal(t, want.Index, got.Index)
svcIDs := func(csns structs.CheckServiceNodes) []string {
res := make([]string, 0, len(csns))
for _, csn := range csns {
res = append(res, fmt.Sprintf("%s/%s", csn.Node.Node, csn.Service.ID))
}
return res
}
gotIDs := svcIDs(got.Nodes)
wantIDs := svcIDs(want.Nodes)
require.ElementsMatch(t, wantIDs, gotIDs)
}
// getNamespace returns a namespace if namespace support exists, otherwise
// returns the empty string. It allows the same tests to work in both oss and ent
// without duplicating the tests.
func getNamespace(ns string) string {
meta := structs.NewEnterpriseMeta(ns)
return meta.NamespaceOrEmpty()
}
func TestOrderingConsistentWithMemDb(t *testing.T) {
index := uint64(42) index := uint64(42)
buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode { buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode {
newID, err := uuid.GenerateUUID() newID, err := uuid.GenerateUUID()
@ -270,29 +61,205 @@ func TestOrderingConsistentWithMemDb(t *testing.T) {
two := buildTestNode("node1", "testService:2") two := buildTestNode("node1", "testService:2")
three := buildTestNode("node2", "testService") three := buildTestNode("node2", "testService")
result := structs.IndexedCheckServiceNodes{ result := structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{ Nodes: structs.CheckServiceNodes{three, two, zero, one},
three, two, zero, one, QueryMeta: structs.QueryMeta{Index: index},
},
QueryMeta: structs.QueryMeta{
Index: index,
},
} }
sortCheckServiceNodes(&result) sortCheckServiceNodes(&result)
expected := structs.CheckServiceNodes{zero, one, two, three} expected := structs.CheckServiceNodes{zero, one, two, three}
require.Equal(t, expected, result.Nodes) require.Equal(t, expected, result.Nodes)
} }
func TestStreamingHealthServices_FullSnapshot(t *testing.T) { func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace)
streamClient := newStreamClient(validateNamespace(namespace))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := submatview.NewStore(hclog.New(nil))
go store.Run(ctx)
// Initially there are no services registered. Server should send an
// EndOfSnapshot message immediately with index of 1.
streamClient.QueueEvents(newEndOfSnapshotEvent(1))
req := serviceRequestStub{
serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
},
},
streamClient: streamClient,
}
empty := &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{},
QueryMeta: structs.QueryMeta{
Index: 1,
},
}
runStep(t, "empty snapshot returned", func(t *testing.T) {
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(1), result.Index)
require.Equal(t, empty, result.Value)
req.QueryOptions.MinQueryIndex = result.Index
})
runStep(t, "blocks for timeout", func(t *testing.T) {
// Subsequent fetch should block for the timeout
start := time.Now()
req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
require.Equal(t, empty, result.Value, "result value should not have changed")
req.QueryOptions.MinQueryIndex = result.Index
})
var lastResultValue structs.CheckServiceNodes
runStep(t, "blocks until update", func(t *testing.T) {
// Make another blocking query with a longer timeout and trigger an update
// event part way through.
start := time.Now()
go func() {
time.Sleep(200 * time.Millisecond)
streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web"))
}()
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until the event was delivered")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(4), result.Index, "result index should not have changed")
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
require.Len(t, lastResultValue, 1,
"result value should contain the new registration")
req.QueryOptions.MinQueryIndex = result.Index
})
runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) {
streamClient.QueueErr(tempError("broken pipe"))
// Next fetch will continue to block until timeout and receive the same
// result.
start := time.Now()
req.QueryOptions.MaxQueryTime = 200 * time.Millisecond
result, err := store.Get(ctx, req)
require.NoError(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index,
"result index should not have changed")
require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes,
"result value should not have changed")
req.QueryOptions.MinQueryIndex = result.Index
// But an update should still be noticed due to reconnection
streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web"))
start = time.Now()
req.QueryOptions.MaxQueryTime = time.Second
result, err = store.Get(ctx, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, uint64(10), result.Index, "result index should not have changed")
lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
require.Len(t, lastResultValue, 2,
"result value should contain the new registration")
req.QueryOptions.MinQueryIndex = result.Index
})
runStep(t, "returns non-temporary error to watchers", func(t *testing.T) {
// Wait and send the error while fetcher is waiting
go func() {
time.Sleep(200 * time.Millisecond)
streamClient.QueueErr(errors.New("invalid request"))
}()
// Next fetch should return the error
start := time.Now()
req.QueryOptions.MaxQueryTime = time.Second
result, err := store.Get(ctx, req)
require.Error(t, err)
elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond,
"Fetch should have blocked until error was sent")
require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed")
require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes)
req.QueryOptions.MinQueryIndex = result.Index
// But an update should still be noticed due to reconnection
streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web"))
req.QueryOptions.MaxQueryTime = time.Second
result, err = store.Get(ctx, req)
require.NoError(t, err)
elapsed = time.Since(start)
require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3,
"result value should contain the new registration")
req.QueryOptions.MinQueryIndex = result.Index
})
}
type tempError string
func (e tempError) Error() string {
return string(e)
}
func (e tempError) Temporary() bool {
return true
}
func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
namespace := getNamespace("ns2") namespace := getNamespace("ns2")
client := NewTestStreamingClient(namespace) client := newStreamClient(validateNamespace(namespace))
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client, ctx, cancel := context.WithCancel(context.Background())
Logger: hclog.Default(), defer cancel()
}}
store := submatview.NewStore(hclog.New(nil))
// Create an initial snapshot of 3 instances on different nodes // Create an initial snapshot of 3 instances on different nodes
registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event { registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event {
@ -304,37 +271,28 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
registerServiceWeb(5, 3), registerServiceWeb(5, 3),
newEndOfSnapshotEvent(5)) newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls. req := serviceRequestStub{
opts := cache.FetchOptions{ serviceRequest: serviceRequest{
MinIndex: 0, ServiceSpecificRequest: structs.ServiceSpecificRequest{
Timeout: 1 * time.Second, Datacenter: "dc1",
} ServiceName: "web",
req := &structs.ServiceSpecificRequest{ EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
Datacenter: "dc1", QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
ServiceName: "web", },
EnterpriseMeta: structs.NewEnterpriseMeta(namespace), },
} streamClient: client,
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
// Result will be sorted alphabetically the same way as memdb
return nodes
} }
runStep(t, "full snapshot returned", func(t *testing.T) { runStep(t, "full snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(5), result.Index) require.Equal(t, uint64(5), result.Index)
require.ElementsMatch(t, []string{"node1", "node2", "node3"}, expected := newExpectedNodes("node1", "node2", "node3")
gatherNodes(result.Value)) expected.Index = 5
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
opts.MinIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
opts.LastResult = &result
}) })
runStep(t, "blocks until deregistration", func(t *testing.T) { runStep(t, "blocks until deregistration", func(t *testing.T) {
@ -348,8 +306,8 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web")) client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web"))
}() }()
opts.Timeout = time.Second req.QueryOptions.MaxQueryTime = time.Second
result, err := typ.Fetch(opts, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
elapsed := time.Since(start) elapsed := time.Since(start)
require.True(t, elapsed >= 200*time.Millisecond, require.True(t, elapsed >= 200*time.Millisecond,
@ -358,10 +316,11 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
"Fetch should have returned before the timeout") "Fetch should have returned before the timeout")
require.Equal(t, uint64(20), result.Index) require.Equal(t, uint64(20), result.Index)
require.Equal(t, []string{"node2", "node3"}, gatherNodes(result.Value)) expected := newExpectedNodes("node2", "node3")
expected.Index = 20
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
opts.MinIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
opts.LastResult = &result
}) })
runStep(t, "server reload is respected", func(t *testing.T) { runStep(t, "server reload is respected", func(t *testing.T) {
@ -379,18 +338,19 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
// Make another blocking query with THE SAME index. It should immediately // Make another blocking query with THE SAME index. It should immediately
// return the new snapshot. // return the new snapshot.
start := time.Now() start := time.Now()
opts.Timeout = time.Second req.QueryOptions.MaxQueryTime = time.Second
result, err := typ.Fetch(opts, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
elapsed := time.Since(start) elapsed := time.Since(start)
require.True(t, elapsed < time.Second, require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout") "Fetch should have returned before the timeout")
require.Equal(t, uint64(50), result.Index) require.Equal(t, uint64(50), result.Index)
require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value)) expected := newExpectedNodes("node3", "node4", "node5")
expected.Index = 50
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
opts.MinIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
opts.LastResult = &result
}) })
runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) { runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) {
@ -404,26 +364,54 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
newEndOfSnapshotEvent(50)) newEndOfSnapshotEvent(50))
start := time.Now() start := time.Now()
opts.MinIndex = 49 req.QueryOptions.MinQueryIndex = 49
opts.Timeout = time.Second req.QueryOptions.MaxQueryTime = time.Second
result, err := typ.Fetch(opts, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
elapsed := time.Since(start) elapsed := time.Since(start)
require.True(t, elapsed < time.Second, require.True(t, elapsed < time.Second,
"Fetch should have returned before the timeout") "Fetch should have returned before the timeout")
require.Equal(t, uint64(50), result.Index) require.Equal(t, uint64(50), result.Index)
require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value)) expected := newExpectedNodes("node3", "node4", "node5")
expected.Index = 50
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
}) })
} }
func TestStreamingHealthServices_EventBatches(t *testing.T) { func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes {
result := &structs.IndexedCheckServiceNodes{}
for _, node := range nodes {
result.Nodes = append(result.Nodes, structs.CheckServiceNode{
Node: &structs.Node{Node: node},
})
}
return result
}
// cmpCheckServiceNodeNames does a shallow comparison of structs.CheckServiceNode
// by Node name.
var cmpCheckServiceNodeNames = cmp.Options{
cmp.Comparer(func(x, y structs.CheckServiceNode) bool {
return x.Node.Node == y.Node.Node
}),
}
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
t.Helper()
if diff := cmp.Diff(x, y, opts...); diff != "" {
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
}
}
func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
namespace := getNamespace("ns3") namespace := getNamespace("ns3")
client := NewTestStreamingClient(namespace) client := newStreamClient(validateNamespace(namespace))
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client, ctx, cancel := context.WithCancel(context.Background())
Logger: hclog.Default(), defer cancel()
}}
store := submatview.NewStore(hclog.New(nil))
// Create an initial snapshot of 3 instances but in a single event batch // Create an initial snapshot of 3 instances but in a single event batch
batchEv := newEventBatchWithEvents( batchEv := newEventBatchWithEvents(
@ -434,36 +422,28 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
batchEv, batchEv,
newEndOfSnapshotEvent(5)) newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls. req := serviceRequestStub{
opts := cache.FetchOptions{ serviceRequest: serviceRequest{
MinIndex: 0, ServiceSpecificRequest: structs.ServiceSpecificRequest{
Timeout: 1 * time.Second, Datacenter: "dc1",
} ServiceName: "web",
req := &structs.ServiceSpecificRequest{ EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
Datacenter: "dc1", QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second},
ServiceName: "web", },
EnterpriseMeta: structs.NewEnterpriseMeta(namespace), },
} streamClient: client,
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
return nodes
} }
runStep(t, "full snapshot returned", func(t *testing.T) { runStep(t, "full snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(5), result.Index) require.Equal(t, uint64(5), result.Index)
require.ElementsMatch(t, []string{"node1", "node2", "node3"},
gatherNodes(result.Value))
opts.MinIndex = result.Index expected := newExpectedNodes("node1", "node2", "node3")
opts.LastResult = &result expected.Index = 5
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
req.QueryOptions.MinQueryIndex = result.Index
}) })
runStep(t, "batched updates work too", func(t *testing.T) { runStep(t, "batched updates work too", func(t *testing.T) {
@ -476,92 +456,220 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) {
newEventServiceHealthRegister(20, 4, "web"), newEventServiceHealthRegister(20, 4, "web"),
) )
client.QueueEvents(batchEv) client.QueueEvents(batchEv)
opts.Timeout = time.Second req.QueryOptions.MaxQueryTime = time.Second
result, err := typ.Fetch(opts, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(20), result.Index) require.Equal(t, uint64(20), result.Index)
require.ElementsMatch(t, []string{"node2", "node3", "node4"}, expected := newExpectedNodes("node2", "node3", "node4")
gatherNodes(result.Value)) expected.Index = 20
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
opts.MinIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
opts.LastResult = &result
}) })
} }
func TestStreamingHealthServices_Filtering(t *testing.T) { func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
namespace := getNamespace("ns3") namespace := getNamespace("ns3")
client := NewTestStreamingClient(namespace) streamClient := newStreamClient(validateNamespace(namespace))
typ := StreamingHealthServices{deps: MaterializerDeps{
Client: client, ctx, cancel := context.WithCancel(context.Background())
Logger: hclog.Default(), defer cancel()
}}
store := submatview.NewStore(hclog.New(nil))
go store.Run(ctx)
req := serviceRequestStub{
serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node2"`,
MaxQueryTime: time.Second,
},
},
},
streamClient: streamClient,
}
// Create an initial snapshot of 3 instances but in a single event batch // Create an initial snapshot of 3 instances but in a single event batch
batchEv := newEventBatchWithEvents( batchEv := newEventBatchWithEvents(
newEventServiceHealthRegister(5, 1, "web"), newEventServiceHealthRegister(5, 1, "web"),
newEventServiceHealthRegister(5, 2, "web"), newEventServiceHealthRegister(5, 2, "web"),
newEventServiceHealthRegister(5, 3, "web")) newEventServiceHealthRegister(5, 3, "web"))
client.QueueEvents( streamClient.QueueEvents(
batchEv, batchEv,
newEndOfSnapshotEvent(5)) newEndOfSnapshotEvent(5))
// This contains the view state so important we share it between calls.
opts := cache.FetchOptions{
MinIndex: 0,
Timeout: 1 * time.Second,
}
req := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMeta(namespace),
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node2"`,
},
}
gatherNodes := func(res interface{}) []string {
nodes := make([]string, 0, 3)
r := res.(*structs.IndexedCheckServiceNodes)
for _, csn := range r.Nodes {
nodes = append(nodes, csn.Node.Node)
}
return nodes
}
runStep(t, "filtered snapshot returned", func(t *testing.T) { runStep(t, "filtered snapshot returned", func(t *testing.T) {
result, err := typ.Fetch(opts, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(5), result.Index) require.Equal(t, uint64(5), result.Index)
require.Equal(t, []string{"node2"}, gatherNodes(result.Value)) expected := newExpectedNodes("node2")
expected.Index = 5
assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
opts.MinIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
opts.LastResult = &result
}) })
runStep(t, "filtered updates work too", func(t *testing.T) { runStep(t, "filtered updates work too", func(t *testing.T) {
// Simulate multiple registrations happening in one Txn (so all have same // Simulate multiple registrations happening in one Txn (all have same index)
// index)
batchEv := newEventBatchWithEvents( batchEv := newEventBatchWithEvents(
// Deregister an existing node // Deregister an existing node
newEventServiceHealthDeregister(20, 1, "web"), newEventServiceHealthDeregister(20, 1, "web"),
// Register another // Register another
newEventServiceHealthRegister(20, 4, "web"), newEventServiceHealthRegister(20, 4, "web"),
) )
client.QueueEvents(batchEv) streamClient.QueueEvents(batchEv)
opts.Timeout = time.Second result, err := store.Get(ctx, req)
result, err := typ.Fetch(opts, req)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(20), result.Index) require.Equal(t, uint64(20), result.Index)
require.Equal(t, []string{"node2"}, gatherNodes(result.Value)) expected := newExpectedNodes("node2")
expected.Index = 20
opts.MinIndex = result.Index assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
opts.LastResult = &result
}) })
} }
// serviceRequestStub overrides NewMaterializer so that test can use a fake
// StreamClient.
type serviceRequestStub struct {
serviceRequest
streamClient submatview.StreamClient
}
func (r serviceRequestStub) NewMaterializer() (*submatview.Materializer, error) {
view, err := newHealthView(r.ServiceSpecificRequest)
if err != nil {
return nil, err
}
return submatview.NewMaterializer(submatview.Deps{
View: view,
Client: r.streamClient,
Logger: hclog.New(nil),
Request: newMaterializerRequest(r.ServiceSpecificRequest),
}), nil
}
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
ID: nodeID,
Node: node,
Address: addr,
Datacenter: "dc1",
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
}
}
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Deregister,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: node,
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
RaftIndex: pbcommon.RaftIndex{
// The original insertion index since a delete doesn't update
// this. This magic value came from state store tests where we
// setup at index 10 and then mutate at index 100. It can be
// modified by the caller later and makes it easier than having
// yet another argument in the common case.
CreateIndex: 10,
ModifyIndex: 10,
},
},
},
},
},
}
}
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
events := make([]*pbsubscribe.Event, len(evs)+1)
events[0] = first
for i := range evs {
events[i+1] = evs[i]
}
return &pbsubscribe.Event{
Index: first.Index,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{Events: events},
},
}
}
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
}
}
func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
return &pbsubscribe.Event{
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
}
}
// getNamespace returns a namespace if namespace support exists, otherwise
// returns the empty string. It allows the same tests to work in both oss and ent
// without duplicating the tests.
func getNamespace(ns string) string {
meta := structs.NewEnterpriseMeta(ns)
return meta.NamespaceOrEmpty()
}
func validateNamespace(ns string) func(request *pbsubscribe.SubscribeRequest) error {
return func(request *pbsubscribe.SubscribeRequest) error {
if request.Namespace != ns {
return fmt.Errorf("expected request.Namespace %v, got %v", ns, request.Namespace)
}
return nil
}
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) { func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper() t.Helper()
if !t.Run(name, fn) { if !t.Run(name, fn) {
@ -578,7 +686,7 @@ func TestNewFilterEvaluator(t *testing.T) {
} }
fn := func(t *testing.T, tc testCase) { fn := func(t *testing.T, tc testCase) {
e, err := newFilterEvaluator(&tc.req) e, err := newFilterEvaluator(tc.req)
require.NoError(t, err) require.NoError(t, err)
actual, err := e.Evaluate(tc.data) actual, err := e.Evaluate(tc.data)
require.NoError(t, err) require.NoError(t, err)

View File

@ -8,31 +8,27 @@ import (
"sync" "sync"
"time" "time"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
grpcresolver "google.golang.org/grpc/resolver" grpcresolver "google.golang.org/grpc/resolver"
autoconf "github.com/hashicorp/consul/agent/auto-config" autoconf "github.com/hashicorp/consul/agent/auto-config"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/grpc/resolver" "github.com/hashicorp/consul/agent/grpc/resolver"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
) )
@ -46,6 +42,7 @@ type BaseDeps struct {
MetricsHandler MetricsHandler MetricsHandler MetricsHandler
AutoConfig *autoconf.AutoConfig // TODO: use an interface AutoConfig *autoconf.AutoConfig // TODO: use an interface
Cache *cache.Cache Cache *cache.Cache
ViewStore *submatview.Store
} }
// MetricsHandler provides an http.Handler for displaying metrics. // MetricsHandler provides an http.Handler for displaying metrics.
@ -69,7 +66,10 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
if err != nil { if err != nil {
return d, err return d, err
} }
grpclog.SetLoggerV2(logging.NewGRPCLogger(cfg.Logging.LogLevel, d.Logger))
grpcLogInitOnce.Do(func() {
grpclog.SetLoggerV2(logging.NewGRPCLogger(cfg.Logging.LogLevel, d.Logger))
})
for _, w := range result.Warnings { for _, w := range result.Warnings {
d.Logger.Warn(w) d.Logger.Warn(w)
@ -100,6 +100,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
cfg.Cache.Logger = d.Logger.Named("cache") cfg.Cache.Logger = d.Logger.Named("cache")
// cache-types are not registered yet, but they won't be used until the components are started. // cache-types are not registered yet, but they won't be used until the components are started.
d.Cache = cache.New(cfg.Cache) d.Cache = cache.New(cfg.Cache)
d.ViewStore = submatview.NewStore(d.Logger.Named("viewstore"))
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator) d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
builder := resolver.NewServerResolverBuilder(resolver.Config{}) builder := resolver.NewServerResolverBuilder(resolver.Config{})
@ -122,32 +123,12 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
return d, err return d, err
} }
if err := registerCacheTypes(d); err != nil {
return d, err
}
return d, nil return d, nil
} }
// registerCacheTypes on bd.Cache. // grpcLogInitOnce because the test suite will call NewBaseDeps in many tests and
// // causes data races when it is re-initialized.
// Note: most cache types are still registered in Agent.registerCache. This var grpcLogInitOnce sync.Once
// function is for registering newer cache-types which no longer have a dependency
// on Agent.
func registerCacheTypes(bd BaseDeps) error {
if bd.RuntimeConfig.UseStreamingBackend {
conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
if err != nil {
return err
}
matDeps := cachetype.MaterializerDeps{
Client: pbsubscribe.NewStateChangeSubscriptionClient(conn),
Logger: bd.Logger,
}
bd.Cache.RegisterType(cachetype.StreamingHealthServicesName, cachetype.NewStreamingHealthServices(matDeps))
}
return nil
}
func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool { func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool {
var rpcSrcAddr *net.TCPAddr var rpcSrcAddr *net.TCPAddr

View File

@ -10,7 +10,6 @@ import (
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/retry" "github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
) )
@ -32,7 +31,7 @@ type View interface {
// separately and passed in in case the return type needs an Index field // separately and passed in in case the return type needs an Index field
// populating. This allows implementations to not worry about maintaining // populating. This allows implementations to not worry about maintaining
// indexes seen during Update. // indexes seen during Update.
Result(index uint64) (interface{}, error) Result(index uint64) interface{}
// Reset the view to the zero state, done in preparation for receiving a new // Reset the view to the zero state, done in preparation for receiving a new
// snapshot. // snapshot.
@ -80,6 +79,18 @@ func NewMaterializer(deps Deps) *Materializer {
retryWaiter: deps.Waiter, retryWaiter: deps.Waiter,
updateCh: make(chan struct{}), updateCh: make(chan struct{}),
} }
if v.retryWaiter == nil {
v.retryWaiter = &retry.Waiter{
MinFailures: 1,
// Start backing off with small increments (200-400ms) which will double
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
// after that). (retry.Wait applies Max limit after jitter right now).
Factor: 200 * time.Millisecond,
MinWait: 0,
MaxWait: 60 * time.Second,
Jitter: retry.NewJitter(100),
}
}
return v return v
} }
@ -204,81 +215,61 @@ func (m *Materializer) notifyUpdateLocked(err error) {
m.updateCh = make(chan struct{}) m.updateCh = make(chan struct{})
} }
// Fetch implements the logic a StreamingCacheType will need during it's Fetch type Result struct {
// call. Cache types that use streaming should just be able to proxy to this Index uint64
// once they have a subscription object and return it's results directly. Value interface{}
func (m *Materializer) Fetch(done <-chan struct{}, opts cache.FetchOptions) (cache.FetchResult, error) { }
var result cache.FetchResult
// Get current view Result and index // getFromView blocks until the index of the View is greater than opts.MinIndex,
//or the context is cancelled.
func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result, error) {
m.lock.Lock() m.lock.Lock()
index := m.index
val, err := m.view.Result(m.index) result := Result{
Index: m.index,
Value: m.view.Result(m.index),
}
updateCh := m.updateCh updateCh := m.updateCh
m.lock.Unlock() m.lock.Unlock()
if err != nil {
return result, err
}
result.Index = index
result.Value = val
// If our index is > req.Index return right away. If index is zero then we // If our index is > req.Index return right away. If index is zero then we
// haven't loaded a snapshot at all yet which means we should wait for one on // haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan. Note it's opts.MinIndex that the cache is using here the // the update chan.
// request min index might be different and from initial user request. if result.Index > 0 && result.Index > minIndex {
if index > 0 && index > opts.MinIndex {
return result, nil return result, nil
} }
// Watch for timeout of the Fetch. Note it's opts.Timeout not req.Timeout
// since that is the timeout the client requested from the cache Get while the
// options one is the internal "background refresh" timeout which is what the
// Fetch call should be using.
timeoutCh := time.After(opts.Timeout)
for { for {
select { select {
case <-updateCh: case <-updateCh:
// View updated, return the new result // View updated, return the new result
m.lock.Lock() m.lock.Lock()
result.Index = m.index result.Index = m.index
// Grab the new updateCh in case we need to keep waiting for the next
// update.
updateCh = m.updateCh
fetchErr := m.err
if fetchErr == nil {
// Only generate a new result if there was no error to avoid pointless
// work potentially shuffling the same data around.
result.Value, err = m.view.Result(m.index)
}
m.lock.Unlock()
// If there was a non-transient error return it switch {
if fetchErr != nil { case m.err != nil:
return result, fetchErr err := m.err
} m.lock.Unlock()
if err != nil {
return result, err return result, err
} case result.Index <= minIndex:
// get a reference to the new updateCh, the previous one was closed
// Sanity check the update is actually later than the one the user updateCh = m.updateCh
// requested. m.lock.Unlock()
if result.Index <= opts.MinIndex {
// The result is still older/same as the requested index, continue to
// wait for further updates.
continue continue
} }
// Return the updated result result.Value = m.view.Result(m.index)
m.lock.Unlock()
return result, nil return result, nil
case <-timeoutCh: case <-ctx.Done():
// Just return whatever we got originally, might still be empty // Update the result value to the latest because callers may still
return result, nil // use the value when the error is context.DeadlineExceeded
m.lock.Lock()
case <-done: result.Value = m.view.Result(m.index)
return result, context.Canceled m.lock.Unlock()
return result, ctx.Err()
} }
} }
} }

240
agent/submatview/store.go Normal file
View File

@ -0,0 +1,240 @@
package submatview
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/ttlcache"
)
// Store of Materializers. Store implements an interface similar to
// agent/cache.Cache, and allows a single Materializer to fulfil multiple requests
// as long as the requests are identical.
// Store is used in place of agent/cache.Cache because with the streaming
// backend there is no longer any need to run a background goroutine to refresh
// stored values.
type Store struct {
logger hclog.Logger
lock sync.RWMutex
byKey map[string]entry
// expiryHeap tracks entries with 0 remaining requests. Entries are ordered
// by most recent expiry first.
expiryHeap *ttlcache.ExpiryHeap
// idleTTL is the duration of time an entry should remain in the Store after the
// last request for that entry has been terminated. It is a field on the struct
// so that it can be patched in tests without need a lock.
idleTTL time.Duration
}
type entry struct {
materializer *Materializer
expiry *ttlcache.Entry
stop func()
// requests is the count of active requests using this entry. This entry will
// remain in the store as long as this count remains > 0.
requests int
}
// NewStore creates and returns a Store that is ready for use. The caller must
// call Store.Run (likely in a separate goroutine) to start the expiration loop.
func NewStore(logger hclog.Logger) *Store {
return &Store{
logger: logger,
byKey: make(map[string]entry),
expiryHeap: ttlcache.NewExpiryHeap(),
idleTTL: 20 * time.Minute,
}
}
// Run the expiration loop until the context is cancelled.
func (s *Store) Run(ctx context.Context) {
for {
s.lock.RLock()
timer := s.expiryHeap.Next()
s.lock.RUnlock()
select {
case <-ctx.Done():
timer.Stop()
return
// the first item in the heap has changed, restart the timer with the
// new TTL.
case <-s.expiryHeap.NotifyCh:
timer.Stop()
continue
// the TTL for the first item has been reached, attempt an expiration.
case <-timer.Wait():
s.lock.Lock()
he := timer.Entry
s.expiryHeap.Remove(he.Index())
e := s.byKey[he.Key()]
// Only stop the materializer if there are no active requests.
if e.requests == 0 {
e.stop()
delete(s.byKey, he.Key())
}
s.lock.Unlock()
}
}
}
// Request is used to request data from the Store.
// Note that cache.Request is required, but some of the fields cache.RequestInfo
// fields are ignored (ex: MaxAge, and MustRevalidate).
type Request interface {
cache.Request
// NewMaterializer will be called if there is no active materializer to fulfil
// the request. It should return a Materializer appropriate for streaming
// data to fulfil this request.
NewMaterializer() (*Materializer, error)
// Type should return a string which uniquely identifies this type of request.
// The returned value is used as the prefix of the key used to index
// entries in the Store.
Type() string
}
// Get a value from the store, blocking if the store has not yet seen the
// req.Index value.
// See agent/cache.Cache.Get for complete documentation.
func (s *Store) Get(ctx context.Context, req Request) (Result, error) {
info := req.CacheInfo()
key, materializer, err := s.readEntry(req)
if err != nil {
return Result{}, err
}
defer s.releaseEntry(key)
ctx, cancel := context.WithTimeout(ctx, info.Timeout)
defer cancel()
result, err := materializer.getFromView(ctx, info.MinIndex)
// context.DeadlineExceeded is translated to nil to match the behaviour of
// agent/cache.Cache.Get.
if err == nil || errors.Is(err, context.DeadlineExceeded) {
return result, nil
}
return result, err
}
// Notify the updateCh when there are updates to the entry identified by req.
// See agent/cache.Cache.Notify for complete documentation.
//
// Request.CacheInfo().Timeout is ignored because it is not really relevant in
// this case. Instead set a deadline on the context.
func (s *Store) Notify(
ctx context.Context,
req Request,
correlationID string,
updateCh chan<- cache.UpdateEvent,
) error {
info := req.CacheInfo()
key, materializer, err := s.readEntry(req)
if err != nil {
return err
}
go func() {
defer s.releaseEntry(key)
index := info.MinIndex
for {
result, err := materializer.getFromView(ctx, index)
switch {
case ctx.Err() != nil:
return
case err != nil:
s.logger.Warn("handling error in Store.Notify",
"error", err,
"request-type", req.Type(),
"index", index)
continue
}
index = result.Index
u := cache.UpdateEvent{
CorrelationID: correlationID,
Result: result.Value,
Meta: cache.ResultMeta{Index: result.Index},
}
select {
case updateCh <- u:
case <-ctx.Done():
return
}
}
}()
return nil
}
// readEntry from the store, and increment the requests counter. releaseEntry
// must be called when the request is finished to decrement the counter.
func (s *Store) readEntry(req Request) (string, *Materializer, error) {
info := req.CacheInfo()
key := makeEntryKey(req.Type(), info)
s.lock.Lock()
defer s.lock.Unlock()
e, ok := s.byKey[key]
if ok {
e.requests++
s.byKey[key] = e
return key, e.materializer, nil
}
mat, err := req.NewMaterializer()
if err != nil {
return "", nil, err
}
ctx, cancel := context.WithCancel(context.Background())
go mat.Run(ctx)
e = entry{
materializer: mat,
stop: cancel,
requests: 1,
}
s.byKey[key] = e
return key, e.materializer, nil
}
// releaseEntry decrements the request count and starts an expiry timer if the
// count has reached 0. Must be called once for every call to readEntry.
func (s *Store) releaseEntry(key string) {
s.lock.Lock()
defer s.lock.Unlock()
e := s.byKey[key]
e.requests--
s.byKey[key] = e
if e.requests > 0 {
return
}
if e.expiry.Index() == ttlcache.NotIndexed {
e.expiry = s.expiryHeap.Add(key, s.idleTTL)
s.byKey[key] = e
return
}
s.expiryHeap.Update(e.expiry.Index(), s.idleTTL)
}
// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
func makeEntryKey(typ string, r cache.RequestInfo) string {
return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key)
}

View File

@ -0,0 +1,458 @@
package submatview
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib/ttlcache"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/sdk/testutil/retry"
)
func TestStore_Get(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := NewStore(hclog.New(nil))
go store.Run(ctx)
req := &fakeRequest{
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
}
req.client.QueueEvents(
newEndOfSnapshotEvent(2),
newEventServiceHealthRegister(10, 1, "srv1"),
newEventServiceHealthRegister(22, 2, "srv1"))
runStep(t, "from empty store, starts materializer", func(t *testing.T) {
var result Result
retry.Run(t, func(r *retry.R) {
var err error
result, err = store.Get(ctx, req)
require.NoError(r, err)
require.Equal(r, uint64(22), result.Index)
})
r, ok := result.Value.(fakeResult)
require.True(t, ok)
require.Len(t, r.srvs, 2)
require.Equal(t, uint64(22), r.index)
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index())
require.Equal(t, 0, e.requests)
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
})
runStep(t, "with an index that already exists in the view", func(t *testing.T) {
req.index = 21
result, err := store.Get(ctx, req)
require.NoError(t, err)
require.Equal(t, uint64(22), result.Index)
r, ok := result.Value.(fakeResult)
require.True(t, ok)
require.Len(t, r.srvs, 2)
require.Equal(t, uint64(22), r.index)
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index())
require.Equal(t, 0, e.requests)
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
})
chResult := make(chan resultOrError, 1)
req.index = 40
go func() {
result, err := store.Get(ctx, req)
chResult <- resultOrError{Result: result, Err: err}
}()
runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) {
select {
case <-chResult:
t.Fatalf("expected Get to block")
case <-time.After(50 * time.Millisecond):
}
store.lock.Lock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
store.lock.Unlock()
require.Equal(t, 1, e.requests)
})
runStep(t, "blocks when an event is received but the index is still below minIndex", func(t *testing.T) {
req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1"))
select {
case <-chResult:
t.Fatalf("expected Get to block")
case <-time.After(50 * time.Millisecond):
}
store.lock.Lock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
store.lock.Unlock()
require.Equal(t, 1, e.requests)
})
runStep(t, "unblocks when an event with index past minIndex", func(t *testing.T) {
req.client.QueueEvents(newEventServiceHealthRegister(41, 1, "srv1"))
var getResult resultOrError
select {
case getResult = <-chResult:
case <-time.After(100 * time.Millisecond):
t.Fatalf("expected Get to unblock when new events are received")
}
require.NoError(t, getResult.Err)
require.Equal(t, uint64(41), getResult.Result.Index)
r, ok := getResult.Result.Value.(fakeResult)
require.True(t, ok)
require.Len(t, r.srvs, 2)
require.Equal(t, uint64(41), r.index)
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, 0, e.expiry.Index())
require.Equal(t, 0, e.requests)
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
})
}
type resultOrError struct {
Result Result
Err error
}
type fakeRequest struct {
index uint64
key string
client *TestStreamingClient
}
func (r *fakeRequest) CacheInfo() cache.RequestInfo {
key := r.key
if key == "" {
key = "key"
}
return cache.RequestInfo{
Key: key,
Token: "abcd",
Datacenter: "dc1",
Timeout: 4 * time.Second,
MinIndex: r.index,
}
}
func (r *fakeRequest) NewMaterializer() (*Materializer, error) {
return NewMaterializer(Deps{
View: &fakeView{srvs: make(map[string]*pbservice.CheckServiceNode)},
Client: r.client,
Logger: hclog.New(nil),
Request: func(index uint64) pbsubscribe.SubscribeRequest {
req := pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "key",
Token: "abcd",
Datacenter: "dc1",
Index: index,
Namespace: pbcommon.DefaultEnterpriseMeta.Namespace,
}
return req
},
}), nil
}
func (r *fakeRequest) Type() string {
return fmt.Sprintf("%T", r)
}
type fakeView struct {
srvs map[string]*pbservice.CheckServiceNode
}
func (f *fakeView) Update(events []*pbsubscribe.Event) error {
for _, event := range events {
serviceHealth := event.GetServiceHealth()
if serviceHealth == nil {
return fmt.Errorf("unexpected event type for service health view: %T",
event.GetPayload())
}
id := serviceHealth.CheckServiceNode.UniqueID()
switch serviceHealth.Op {
case pbsubscribe.CatalogOp_Register:
f.srvs[id] = serviceHealth.CheckServiceNode
case pbsubscribe.CatalogOp_Deregister:
delete(f.srvs, id)
}
}
return nil
}
func (f *fakeView) Result(index uint64) interface{} {
srvs := make([]*pbservice.CheckServiceNode, 0, len(f.srvs))
for _, srv := range f.srvs {
srvs = append(srvs, srv)
}
return fakeResult{srvs: srvs, index: index}
}
type fakeResult struct {
srvs []*pbservice.CheckServiceNode
index uint64
}
func (f *fakeView) Reset() {
f.srvs = make(map[string]*pbservice.CheckServiceNode)
}
func TestStore_Notify(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := NewStore(hclog.New(nil))
go store.Run(ctx)
req := &fakeRequest{
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
}
req.client.QueueEvents(
newEndOfSnapshotEvent(2),
newEventServiceHealthRegister(22, 2, "srv1"))
cID := "correlate"
ch := make(chan cache.UpdateEvent)
err := store.Notify(ctx, req, cID, ch)
require.NoError(t, err)
runStep(t, "from empty store, starts materializer", func(t *testing.T) {
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 1)
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(t, ttlcache.NotIndexed, e.expiry.Index())
require.Equal(t, 1, e.requests)
})
runStep(t, "updates are received", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
select {
case update := <-ch:
require.NoError(r, update.Err)
require.Equal(r, cID, update.CorrelationID)
require.Equal(r, uint64(22), update.Meta.Index)
require.Equal(r, uint64(22), update.Result.(fakeResult).index)
case <-time.After(100 * time.Millisecond):
r.Stop(fmt.Errorf("expected Get to unblock when new events are received"))
}
})
req.client.QueueEvents(newEventServiceHealthRegister(24, 2, "srv1"))
select {
case update := <-ch:
require.NoError(t, update.Err)
require.Equal(t, cID, update.CorrelationID)
require.Equal(t, uint64(24), update.Meta.Index)
require.Equal(t, uint64(24), update.Result.(fakeResult).index)
case <-time.After(100 * time.Millisecond):
t.Fatalf("expected Get to unblock when new events are received")
}
})
runStep(t, "closing the notify starts the expiry counter", func(t *testing.T) {
cancel()
retry.Run(t, func(r *retry.R) {
store.lock.Lock()
defer store.lock.Unlock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
require.Equal(r, 0, e.expiry.Index())
require.Equal(r, 0, e.requests)
require.Equal(r, store.expiryHeap.Next().Entry, e.expiry)
})
})
}
func TestStore_Notify_ManyRequests(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := NewStore(hclog.New(nil))
go store.Run(ctx)
req := &fakeRequest{
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
}
req.client.QueueEvents(newEndOfSnapshotEvent(2))
cID := "correlate"
ch1 := make(chan cache.UpdateEvent)
ch2 := make(chan cache.UpdateEvent)
require.NoError(t, store.Notify(ctx, req, cID, ch1))
assertRequestCount(t, store, req, 1)
require.NoError(t, store.Notify(ctx, req, cID, ch2))
assertRequestCount(t, store, req, 2)
req.index = 15
go func() {
_, _ = store.Get(ctx, req)
}()
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req, 3)
})
go func() {
_, _ = store.Get(ctx, req)
}()
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req, 4)
})
var req2 *fakeRequest
runStep(t, "Get and Notify with a different key", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req2 = &fakeRequest{client: req.client, key: "key2", index: 22}
require.NoError(t, store.Notify(ctx, req2, cID, ch1))
go func() {
_, _ = store.Get(ctx, req2)
}()
// the original entry should still be at count 4
assertRequestCount(t, store, req, 4)
// the new entry should be at count 2
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req2, 2)
})
})
runStep(t, "end all the requests", func(t *testing.T) {
req.client.QueueEvents(
newEventServiceHealthRegister(10, 1, "srv1"),
newEventServiceHealthRegister(12, 2, "srv1"),
newEventServiceHealthRegister(13, 1, "srv2"),
newEventServiceHealthRegister(16, 3, "srv2"))
// The two Get requests should exit now that the index has been updated
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req, 2)
})
// Cancel the context so all requests terminate
cancel()
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req, 0)
})
})
runStep(t, "the expiry heap should contain two entries", func(t *testing.T) {
store.lock.Lock()
defer store.lock.Unlock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
e2 := store.byKey[makeEntryKey(req2.Type(), req2.CacheInfo())]
require.Equal(t, 0, e2.expiry.Index())
require.Equal(t, 1, e.expiry.Index())
require.Equal(t, store.expiryHeap.Next().Entry, e2.expiry)
})
}
type testingT interface {
Helper()
Fatalf(string, ...interface{})
}
func assertRequestCount(t testingT, s *Store, req Request, expected int) {
t.Helper()
key := makeEntryKey(req.Type(), req.CacheInfo())
s.lock.Lock()
defer s.lock.Unlock()
actual := s.byKey[key].requests
if actual != expected {
t.Fatalf("expected request count to be %d, got %d", expected, actual)
}
}
func TestStore_Run_ExpiresEntries(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := NewStore(hclog.New(nil))
ttl := 10 * time.Millisecond
store.idleTTL = ttl
go store.Run(ctx)
req := &fakeRequest{
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
}
req.client.QueueEvents(newEndOfSnapshotEvent(2))
cID := "correlate"
ch1 := make(chan cache.UpdateEvent)
reqCtx, reqCancel := context.WithCancel(context.Background())
defer reqCancel()
require.NoError(t, store.Notify(reqCtx, req, cID, ch1))
assertRequestCount(t, store, req, 1)
// Get a copy of the entry so that we can check it was expired later
store.lock.Lock()
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
store.lock.Unlock()
reqCancel()
retry.Run(t, func(r *retry.R) {
assertRequestCount(r, store, req, 0)
})
// wait for the entry to expire, with lots of buffer
time.Sleep(3 * ttl)
store.lock.Lock()
defer store.lock.Unlock()
require.Len(t, store.byKey, 0)
require.Equal(t, ttlcache.NotIndexed, e.expiry.Index())
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}

View File

@ -1,7 +1,11 @@
package cachetype package submatview
import ( import (
"context"
"fmt" "fmt"
"sync"
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
@ -9,6 +13,84 @@ import (
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
) )
// TestStreamingClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type TestStreamingClient struct {
expectedNamespace string
subClients []*subscribeClient
lock sync.RWMutex
events []eventOrErr
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func NewTestStreamingClient(ns string) *TestStreamingClient {
return &TestStreamingClient{expectedNamespace: ns}
}
func (s *TestStreamingClient) Subscribe(
ctx context.Context,
req *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
if req.Namespace != s.expectedNamespace {
return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v",
req.Namespace, s.expectedNamespace)
}
c := &subscribeClient{
events: make(chan eventOrErr, 32),
ctx: ctx,
}
s.lock.Lock()
s.subClients = append(s.subClients, c)
for _, event := range s.events {
c.events <- event
}
s.lock.Unlock()
return c, nil
}
type subscribeClient struct {
grpc.ClientStream
events chan eventOrErr
ctx context.Context
}
func (s *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
s.lock.Lock()
for _, e := range events {
s.events = append(s.events, eventOrErr{Event: e})
for _, c := range s.subClients {
c.events <- eventOrErr{Event: e}
}
}
s.lock.Unlock()
}
func (s *TestStreamingClient) QueueErr(err error) {
s.lock.Lock()
s.events = append(s.events, eventOrErr{Err: err})
for _, c := range s.subClients {
c.events <- eventOrErr{Err: err}
}
s.lock.Unlock()
}
func (c *subscribeClient) Recv() (*pbsubscribe.Event, error) {
select {
case eoe := <-c.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-c.ctx.Done():
return nil, c.ctx.Err()
}
}
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event { func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{ return &pbsubscribe.Event{
Index: index, Index: index,
@ -22,13 +104,6 @@ func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
} }
} }
// newEventServiceHealthRegister returns a realistically populated service
// health registration event for tests. The nodeNum is a
// logical node and is used to create the node name ("node%d") but also change
// the node ID and IP address to make it a little more realistic for cases that
// need that. nodeNum should be less than 64k to make the IP address look
// realistic. Any other changes can be made on the returned event to avoid
// adding too many options to callers.
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum) node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
@ -54,61 +129,17 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub
ID: svc, ID: svc,
Service: svc, Service: svc,
Port: 8080, Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
// Empty sadness
Proxy: pbservice.ConnectProxyConfig{
MeshGateway: pbservice.MeshGatewayConfig{},
Expose: pbservice.ExposeConfig{},
},
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{ RaftIndex: pbcommon.RaftIndex{
CreateIndex: index, CreateIndex: index,
ModifyIndex: index, ModifyIndex: index,
}, },
}, },
Checks: []*pbservice.HealthCheck{
{
Node: node,
CheckID: "serf-health",
Name: "serf-health",
Status: "passing",
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
{
Node: node,
CheckID: types.CheckID("service:" + svc),
Name: "service:" + svc,
ServiceID: svc,
ServiceName: svc,
Type: "ttl",
Status: "passing",
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
}, },
}, },
}, },
} }
} }
// TestEventServiceHealthDeregister returns a realistically populated service
// health deregistration event for tests. The nodeNum is a
// logical node and is used to create the node name ("node%d") but also change
// the node ID and IP address to make it a little more realistic for cases that
// need that. nodeNum should be less than 64k to make the IP address look
// realistic. Any other changes can be made on the returned event to avoid
// adding too many options to callers.
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum) node := fmt.Sprintf("node%d", nodeNum)
@ -129,12 +160,6 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbs
Passing: 1, Passing: 1,
Warning: 1, Warning: 1,
}, },
// Empty sadness
Proxy: pbservice.ConnectProxyConfig{
MeshGateway: pbservice.MeshGatewayConfig{},
Expose: pbservice.ExposeConfig{},
},
EnterpriseMeta: pbcommon.EnterpriseMeta{},
RaftIndex: pbcommon.RaftIndex{ RaftIndex: pbcommon.RaftIndex{
// The original insertion index since a delete doesn't update // The original insertion index since a delete doesn't update
// this. This magic value came from state store tests where we // this. This magic value came from state store tests where we

View File

@ -34,6 +34,7 @@ type Failer interface {
// R provides context for the retryer. // R provides context for the retryer.
type R struct { type R struct {
fail bool fail bool
done bool
output []string output []string
} }
@ -77,6 +78,12 @@ func (r *R) log(s string) {
r.output = append(r.output, decorate(s)) r.output = append(r.output, decorate(s))
} }
// Stop retrying, and fail the test with the specified error.
func (r *R) Stop(err error) {
r.log(err.Error())
r.done = true
}
func decorate(s string) string { func decorate(s string) string {
_, file, line, ok := runtime.Caller(3) _, file, line, ok := runtime.Caller(3)
if ok { if ok {
@ -120,6 +127,7 @@ func dedup(a []string) string {
func run(r Retryer, t Failer, f func(r *R)) { func run(r Retryer, t Failer, f func(r *R)) {
t.Helper() t.Helper()
rr := &R{} rr := &R{}
fail := func() { fail := func() {
t.Helper() t.Helper()
out := dedup(rr.output) out := dedup(rr.output)
@ -128,7 +136,8 @@ func run(r Retryer, t Failer, f func(r *R)) {
} }
t.FailNow() t.FailNow()
} }
for r.NextOr(t, fail) {
for r.Continue() {
func() { func() {
defer func() { defer func() {
if p := recover(); p != nil && p != runFailed { if p := recover(); p != nil && p != runFailed {
@ -137,11 +146,17 @@ func run(r Retryer, t Failer, f func(r *R)) {
}() }()
f(rr) f(rr)
}() }()
if !rr.fail {
switch {
case rr.done:
fail()
return
case !rr.fail:
return return
} }
rr.fail = false rr.fail = false
} }
fail()
} }
// DefaultFailer provides default retry.Run() behavior for unit tests. // DefaultFailer provides default retry.Run() behavior for unit tests.
@ -162,9 +177,9 @@ func ThreeTimes() *Counter {
// Retryer provides an interface for repeating operations // Retryer provides an interface for repeating operations
// until they succeed or an exit condition is met. // until they succeed or an exit condition is met.
type Retryer interface { type Retryer interface {
// NextOr returns true if the operation should be repeated. // Continue returns true if the operation should be repeated, otherwise it
// Otherwise, it calls fail and returns false. // returns false to indicate retrying should stop.
NextOr(t Failer, fail func()) bool Continue() bool
} }
// Counter repeats an operation a given number of // Counter repeats an operation a given number of
@ -176,10 +191,8 @@ type Counter struct {
count int count int
} }
func (r *Counter) NextOr(t Failer, fail func()) bool { func (r *Counter) Continue() bool {
t.Helper()
if r.count == r.Count { if r.count == r.Count {
fail()
return false return false
} }
if r.count > 0 { if r.count > 0 {
@ -200,14 +213,12 @@ type Timer struct {
stop time.Time stop time.Time
} }
func (r *Timer) NextOr(t Failer, fail func()) bool { func (r *Timer) Continue() bool {
t.Helper()
if r.stop.IsZero() { if r.stop.IsZero() {
r.stop = time.Now().Add(r.Timeout) r.stop = time.Now().Add(r.Timeout)
return true return true
} }
if time.Now().After(r.stop) { if time.Now().After(r.stop) {
fail()
return false return false
} }
time.Sleep(r.Wait) time.Sleep(r.Wait)

View File

@ -1,8 +1,11 @@
package retry package retry
import ( import (
"fmt"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
) )
// delta defines the time band a test run should complete in. // delta defines the time band a test run should complete in.
@ -19,19 +22,15 @@ func TestRetryer(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) { t.Run(tt.desc, func(t *testing.T) {
var iters, fails int var iters int
fail := func() { fails++ }
start := time.Now() start := time.Now()
for tt.r.NextOr(t, fail) { for tt.r.Continue() {
iters++ iters++
} }
dur := time.Since(start) dur := time.Since(start)
if got, want := iters, 3; got != want { if got, want := iters, 3; got != want {
t.Fatalf("got %d retries want %d", got, want) t.Fatalf("got %d retries want %d", got, want)
} }
if got, want := fails, 1; got != want {
t.Fatalf("got %d FailNow calls want %d", got, want)
}
// since the first iteration happens immediately // since the first iteration happens immediately
// the retryer waits only twice for three iterations. // the retryer waits only twice for three iterations.
// order of events: (true, (wait) true, (wait) true, false) // order of events: (true, (wait) true, (wait) true, false)
@ -41,3 +40,52 @@ func TestRetryer(t *testing.T) {
}) })
} }
} }
func TestRunWith(t *testing.T) {
t.Run("calls FailNow after exceeding retries", func(t *testing.T) {
ft := &fakeT{}
iter := 0
RunWith(&Counter{Count: 3, Wait: time.Millisecond}, ft, func(r *R) {
iter++
r.FailNow()
})
require.Equal(t, 3, iter)
require.Equal(t, 1, ft.fails)
})
t.Run("Stop ends the retrying", func(t *testing.T) {
ft := &fakeT{}
iter := 0
RunWith(&Counter{Count: 5, Wait: time.Millisecond}, ft, func(r *R) {
iter++
if iter == 2 {
r.Stop(fmt.Errorf("do not proceed"))
}
r.Fatalf("not yet")
})
require.Equal(t, 2, iter)
require.Equal(t, 1, ft.fails)
require.Len(t, ft.out, 1)
require.Contains(t, ft.out[0], "not yet\n")
require.Contains(t, ft.out[0], "do not proceed\n")
})
}
type fakeT struct {
fails int
out []string
}
func (f *fakeT) Helper() {}
func (f *fakeT) Log(args ...interface{}) {
f.out = append(f.out, fmt.Sprint(args...))
}
func (f *fakeT) FailNow() {
f.fails++
}
var _ Failer = &fakeT{}

View File

@ -229,9 +229,10 @@ The table below shows this endpoint's support for
- `near` `(string: "")` - Specifies a node name to sort the node list in - `near` `(string: "")` - Specifies a node name to sort the node list in
ascending order based on the estimated round trip time from that node. Passing ascending order based on the estimated round trip time from that node. Passing
`?near=_agent` will use the agent's node for the sort. This is specified as `?near=_agent` will use the agent's node for the sort. This is specified as
part of the URL as a query parameter. **Note** that `near` can not be used if part of the URL as a query parameter. **Note** that using `near` will ignore
[`use_streaming_backend`](/docs/agent/options#use_streaming_backend) [`use_streaming_backend`](/docs/agent/options#use_streaming_backend) and always
is enabled, because the data is not available to sort the results. use blocking queries, because the data required to sort the results is not available
to the streaming backend.
- `tag` `(string: "")` **Deprecated** - Use `filter` with the `Service.Tags` selector instead. - `tag` `(string: "")` **Deprecated** - Use `filter` with the `Service.Tags` selector instead.
This parameter will be removed in a future version of Consul. This parameter will be removed in a future version of Consul.