160 lines
3.8 KiB
Go
160 lines
3.8 KiB
Go
|
package submatview
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
mock "github.com/stretchr/testify/mock"
|
||
|
"github.com/stretchr/testify/require"
|
||
|
|
||
|
"github.com/hashicorp/consul/acl"
|
||
|
"github.com/hashicorp/consul/acl/resolver"
|
||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||
|
"github.com/hashicorp/consul/agent/structs"
|
||
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||
|
)
|
||
|
|
||
|
func TestLocalMaterializer(t *testing.T) {
|
||
|
const (
|
||
|
index = 123
|
||
|
topic = pbsubscribe.Topic_ServiceResolver
|
||
|
key = "web"
|
||
|
token = "some-acl-token"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
snapshotEvent = stream.Event{
|
||
|
Topic: topic,
|
||
|
Index: index,
|
||
|
Payload: state.EventPayloadConfigEntry{
|
||
|
Value: &structs.ServiceResolverConfigEntry{
|
||
|
Name: key,
|
||
|
Meta: map[string]string{"snapshot": "true"},
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
|
||
|
publishedEvent1 = stream.Event{
|
||
|
Topic: topic,
|
||
|
Index: index + 1,
|
||
|
Payload: state.EventPayloadConfigEntry{
|
||
|
Value: &structs.ServiceResolverConfigEntry{
|
||
|
Name: key,
|
||
|
Meta: map[string]string{"published": "true"},
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
|
||
|
publishedEvent2 = stream.Event{
|
||
|
Topic: topic,
|
||
|
Index: index + 2,
|
||
|
Payload: state.EventPayloadConfigEntry{
|
||
|
Value: &structs.ServiceResolverConfigEntry{
|
||
|
Name: key,
|
||
|
Meta: map[string]string{"published": "true"},
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
)
|
||
|
|
||
|
ctx, cancel := context.WithCancel(context.Background())
|
||
|
t.Cleanup(cancel)
|
||
|
|
||
|
publisher := stream.NewEventPublisher(10 * time.Second)
|
||
|
publisher.RegisterHandler(topic, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
||
|
buf.Append([]stream.Event{snapshotEvent})
|
||
|
return index, nil
|
||
|
}, false)
|
||
|
go publisher.Run(ctx)
|
||
|
|
||
|
// This allows us to swap the authorizer out at runtime.
|
||
|
authz := &struct{ acl.Authorizer }{acl.AllowAll()}
|
||
|
|
||
|
aclResolver := NewMockACLResolver(t)
|
||
|
aclResolver.On("ResolveTokenAndDefaultMeta", token, mock.Anything, mock.Anything).
|
||
|
Return(resolver.Result{Authorizer: authz}, nil)
|
||
|
|
||
|
view := newTestView()
|
||
|
|
||
|
m := NewLocalMaterializer(LocalMaterializerDeps{
|
||
|
Backend: publisher,
|
||
|
ACLResolver: aclResolver,
|
||
|
Deps: Deps{
|
||
|
View: view,
|
||
|
Request: func(index uint64) *pbsubscribe.SubscribeRequest {
|
||
|
return &pbsubscribe.SubscribeRequest{
|
||
|
Topic: topic,
|
||
|
Index: index,
|
||
|
Subject: &pbsubscribe.SubscribeRequest_NamedSubject{
|
||
|
NamedSubject: &pbsubscribe.NamedSubject{
|
||
|
Key: key,
|
||
|
},
|
||
|
},
|
||
|
Token: token,
|
||
|
}
|
||
|
},
|
||
|
},
|
||
|
})
|
||
|
go m.Run(ctx)
|
||
|
|
||
|
// Check that the view received the snapshot events.
|
||
|
events := view.getEvents(t)
|
||
|
require.Len(t, events, 1)
|
||
|
require.Equal(t, snapshotEvent.Payload.ToSubscriptionEvent(index), events[0])
|
||
|
|
||
|
publisher.Publish([]stream.Event{publishedEvent1})
|
||
|
|
||
|
// Check that the view received the published events.
|
||
|
events = view.getEvents(t)
|
||
|
require.Len(t, events, 1)
|
||
|
require.Equal(t, publishedEvent1.Payload.ToSubscriptionEvent(index+1), events[0])
|
||
|
|
||
|
// Replace the authorizer and check that we don't receive newly published events.
|
||
|
authz.Authorizer = acl.DenyAll()
|
||
|
publisher.Publish([]stream.Event{publishedEvent2})
|
||
|
view.expectNoEvents(t)
|
||
|
}
|
||
|
|
||
|
func newTestView() *testView {
|
||
|
return &testView{
|
||
|
eventsCh: make(chan []*pbsubscribe.Event),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type testView struct {
|
||
|
eventsCh chan []*pbsubscribe.Event
|
||
|
}
|
||
|
|
||
|
func (testView) Reset() {}
|
||
|
|
||
|
func (testView) Result(uint64) any { return nil }
|
||
|
|
||
|
func (v *testView) Update(events []*pbsubscribe.Event) error {
|
||
|
v.eventsCh <- events
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (v *testView) getEvents(t *testing.T) []*pbsubscribe.Event {
|
||
|
t.Helper()
|
||
|
|
||
|
select {
|
||
|
case events := <-v.eventsCh:
|
||
|
return events
|
||
|
case <-time.After(100 * time.Millisecond):
|
||
|
t.Fatal("timeout waiting for view to receive events")
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (v *testView) expectNoEvents(t *testing.T) {
|
||
|
t.Helper()
|
||
|
|
||
|
select {
|
||
|
case <-v.eventsCh:
|
||
|
t.Fatal("expected no events to be received")
|
||
|
case <-time.After(100 * time.Millisecond):
|
||
|
}
|
||
|
}
|