open-nomad/nomad/stream/event_broker_test.go

594 lines
17 KiB
Go
Raw Normal View History

package stream
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestEventBroker_PublishChangesAndSubscribe(t *testing.T) {
subscription := &SubscribeRequest{
Topics: map[structs.Topic][]string{
"Test": {"sub-key"},
},
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{EventBufferSize: 100})
require.NoError(t, err)
sub, err := publisher.Subscribe(subscription)
require.NoError(t, err)
eventCh := consumeSubscription(ctx, sub)
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
events := []structs.Event{{
Index: 1,
Topic: "Test",
Key: "sub-key",
Payload: "sample payload",
}}
publisher.Publish(&structs.Events{Index: 1, Events: events})
// Subscriber should see the published event
result := nextResult(t, eventCh)
require.NoError(t, result.Err)
expected := []structs.Event{{Payload: "sample payload", Key: "sub-key", Topic: "Test", Index: 1}}
require.Equal(t, expected, result.Events)
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
// Publish a second event
events = []structs.Event{{
Index: 2,
Topic: "Test",
Key: "sub-key",
Payload: "sample payload 2",
}}
publisher.Publish(&structs.Events{Index: 2, Events: events})
result = nextResult(t, eventCh)
require.NoError(t, result.Err)
expected = []structs.Event{{Payload: "sample payload 2", Key: "sub-key", Topic: "Test", Index: 2}}
require.Equal(t, expected, result.Events)
}
func TestEventBroker_ShutdownClosesSubscriptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
sub1, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
defer sub1.Unsubscribe()
sub2, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
defer sub2.Unsubscribe()
cancel() // Shutdown
err = consumeSub(context.Background(), sub1)
require.Equal(t, err, ErrSubscriptionClosed)
_, err = sub2.Next(context.Background())
require.Equal(t, err, ErrSubscriptionClosed)
}
// TestEventBroker_EmptyReqToken_DistinctSubscriptions tests subscription
// hanlding behavior when ACLs are disabled (request Token is empty).
// Subscriptions are mapped by their request token. when that token is empty,
// the subscriptions should still be handled indeppendtly of each other when
// unssubscribing.
func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
// first subscription, empty token
sub1, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
defer sub1.Unsubscribe()
// second subscription, empty token
sub2, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
require.NotNil(t, sub2)
sub1.Unsubscribe()
require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state))
}
func TestEventBroker_handleACLUpdates_TokenDeleted(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
sub1, err := publisher.Subscribe(&SubscribeRequest{
Topics: map[structs.Topic][]string{
"*": {"*"},
},
Token: "foo",
})
require.NoError(t, err)
defer sub1.Unsubscribe()
aclEvent := structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenDeleted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: "foo"}),
}
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{aclEvent}})
for {
_, err := sub1.Next(ctx)
if err == ErrSubscriptionClosed {
break
}
}
out, err := sub1.Next(ctx)
require.Error(t, err)
require.Equal(t, ErrSubscriptionClosed, err)
require.Equal(t, structs.Events{}, out)
}
type fakeACLDelegate struct {
tokenProvider ACLTokenProvider
}
func (d *fakeACLDelegate) TokenProvider() ACLTokenProvider {
return d.tokenProvider
}
type fakeACLTokenProvider struct {
policy *structs.ACLPolicy
policyErr error
token *structs.ACLToken
tokenErr error
}
func (p *fakeACLTokenProvider) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error) {
return p.token, p.tokenErr
}
func (p *fakeACLTokenProvider) ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error) {
return p.policy, p.policyErr
}
func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
secretID := "some-secret-id"
cases := []struct {
policyBeforeRules string
policyAfterRules string
topics map[structs.Topic][]string
desc string
event structs.Event
policyEvent structs.Event
shouldUnsubscribe bool
initialSubErr bool
}{
{
desc: "subscribed to deployments and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{
Deployment: &structs.Deployment{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to evals and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Payload: structs.EvaluationEvent{
Evaluation: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to allocs and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicAllocation,
Type: structs.TypeAllocationUpdated,
Payload: structs.AllocationEvent{
Allocation: &structs.Allocation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to nodes and removed access",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyDeny),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to deployments and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{
Deployment: &structs.Deployment{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to evals and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Payload: structs.EvaluationEvent{
Evaluation: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to allocs and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicAllocation,
Type: structs.TypeAllocationUpdated,
Payload: structs.AllocationEvent{
Allocation: &structs.Allocation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to nodes and no access change",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyRead),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "initial token insufficient privileges",
initialSubErr: true,
policyBeforeRules: mock.NodePolicy(acl.PolicyDeny),
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to nodes and policy change no change",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyWrite),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyUpserted,
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
{
desc: "subscribed to nodes and policy change no access",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyDeny),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyUpserted,
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
{
desc: "subscribed to nodes policy deleted",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: "",
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyDeleted,
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
policy := &structs.ACLPolicy{
Name: "some-policy",
Rules: tc.policyBeforeRules,
}
policy.SetHash()
tokenProvider := &fakeACLTokenProvider{
policy: policy,
token: &structs.ACLToken{
SecretID: secretID,
Policies: []string{policy.Name},
},
}
aclDelegate := &fakeACLDelegate{
tokenProvider: tokenProvider,
}
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
require.NoError(t, err)
sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{
tc.event.Topic: {"*"},
},
Namespace: structs.DefaultNamespace,
Token: secretID,
})
if tc.initialSubErr {
require.Error(t, err)
require.Nil(t, sub)
return
} else {
require.NoError(t, err)
}
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}})
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
require.NoError(t, err)
// Update the mock provider to use the after rules
policyAfter := &structs.ACLPolicy{
Name: "some-new-policy",
Rules: tc.policyAfterRules,
ModifyIndex: 101, // The ModifyIndex is used to caclulate the acl cache key
}
policyAfter.SetHash()
tokenProvider.policy = policyAfter
// Publish ACL event triggering subscription re-evaluation
publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}})
// Publish another event
publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}})
// If we are expecting to unsubscribe consume the subscription
// until the expected error occurs.
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
if tc.shouldUnsubscribe {
for {
_, err = sub.Next(ctx)
if err != nil {
if err == context.DeadlineExceeded {
require.Fail(t, err.Error())
}
if err == ErrSubscriptionClosed {
break
}
}
}
} else {
_, err = sub.Next(ctx)
require.NoError(t, err)
}
publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}})
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
if tc.shouldUnsubscribe {
require.Equal(t, ErrSubscriptionClosed, err)
} else {
require.NoError(t, err)
}
})
}
}
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
go func() {
for {
es, err := sub.Next(ctx)
eventCh <- subNextResult{
Events: es.Events,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
type subNextResult struct {
Events []structs.Event
Err error
}
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult {
t.Helper()
select {
case next := <-eventCh:
return next
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return subNextResult{}
}
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}
func consumeSub(ctx context.Context, sub *Subscription) error {
for {
_, err := sub.Next(ctx)
if err != nil {
return err
}
}
}