da5069bded
This change ensures that a token's expiry is checked before every event is sent to the caller. Previously, a token could still be used to listen for events after it had expired, as long as the subscription was made while it was unexpired. This would last until the token was garbage collected from state. The check occurs within the RPC as there is currently no state update when a token expires.
1041 lines
31 KiB
Go
1041 lines
31 KiB
Go
package stream
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/nomad/acl"
|
|
"github.com/hashicorp/nomad/ci"
|
|
"github.com/hashicorp/nomad/helper/pointer"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestEventBroker_PublishChangesAndSubscribe(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
subscription := &SubscribeRequest{
|
|
Topics: map[structs.Topic][]string{
|
|
"Test": {"sub-key"},
|
|
},
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{EventBufferSize: 100})
|
|
require.NoError(t, err)
|
|
|
|
sub, err := publisher.Subscribe(subscription)
|
|
require.NoError(t, err)
|
|
eventCh := consumeSubscription(ctx, sub)
|
|
|
|
// Now subscriber should block waiting for updates
|
|
assertNoResult(t, eventCh)
|
|
|
|
events := []structs.Event{{
|
|
Index: 1,
|
|
Topic: "Test",
|
|
Key: "sub-key",
|
|
Payload: "sample payload",
|
|
}}
|
|
publisher.Publish(&structs.Events{Index: 1, Events: events})
|
|
|
|
// Subscriber should see the published event
|
|
result := nextResult(t, eventCh)
|
|
require.NoError(t, result.Err)
|
|
expected := []structs.Event{{Payload: "sample payload", Key: "sub-key", Topic: "Test", Index: 1}}
|
|
require.Equal(t, expected, result.Events)
|
|
|
|
// Now subscriber should block waiting for updates
|
|
assertNoResult(t, eventCh)
|
|
|
|
// Publish a second event
|
|
events = []structs.Event{{
|
|
Index: 2,
|
|
Topic: "Test",
|
|
Key: "sub-key",
|
|
Payload: "sample payload 2",
|
|
}}
|
|
publisher.Publish(&structs.Events{Index: 2, Events: events})
|
|
|
|
result = nextResult(t, eventCh)
|
|
require.NoError(t, result.Err)
|
|
expected = []structs.Event{{Payload: "sample payload 2", Key: "sub-key", Topic: "Test", Index: 2}}
|
|
require.Equal(t, expected, result.Events)
|
|
}
|
|
|
|
func TestEventBroker_ShutdownClosesSubscriptions(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
|
|
require.NoError(t, err)
|
|
|
|
sub1, err := publisher.Subscribe(&SubscribeRequest{})
|
|
require.NoError(t, err)
|
|
defer sub1.Unsubscribe()
|
|
|
|
sub2, err := publisher.Subscribe(&SubscribeRequest{})
|
|
require.NoError(t, err)
|
|
defer sub2.Unsubscribe()
|
|
|
|
cancel() // Shutdown
|
|
|
|
err = consumeSub(context.Background(), sub1)
|
|
require.Equal(t, err, ErrSubscriptionClosed)
|
|
|
|
_, err = sub2.Next(context.Background())
|
|
require.Equal(t, err, ErrSubscriptionClosed)
|
|
}
|
|
|
|
// TestEventBroker_EmptyReqToken_DistinctSubscriptions tests subscription
|
|
// hanlding behavior when ACLs are disabled (request Token is empty).
|
|
// Subscriptions are mapped by their request token. when that token is empty,
|
|
// the subscriptions should still be handled indeppendtly of each other when
|
|
// unssubscribing.
|
|
func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
|
|
require.NoError(t, err)
|
|
|
|
// first subscription, empty token
|
|
sub1, err := publisher.Subscribe(&SubscribeRequest{})
|
|
require.NoError(t, err)
|
|
defer sub1.Unsubscribe()
|
|
|
|
// second subscription, empty token
|
|
sub2, err := publisher.Subscribe(&SubscribeRequest{})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, sub2)
|
|
|
|
sub1.Unsubscribe()
|
|
|
|
require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state))
|
|
}
|
|
|
|
func TestEventBroker_handleACLUpdates_TokenDeleted(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
|
|
require.NoError(t, err)
|
|
|
|
sub1, err := publisher.Subscribe(&SubscribeRequest{
|
|
Topics: map[structs.Topic][]string{
|
|
"*": {"*"},
|
|
},
|
|
Token: "foo",
|
|
})
|
|
require.NoError(t, err)
|
|
defer sub1.Unsubscribe()
|
|
|
|
aclEvent := structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenDeleted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: "foo"}),
|
|
}
|
|
|
|
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{aclEvent}})
|
|
for {
|
|
_, err := sub1.Next(ctx)
|
|
if err == ErrSubscriptionClosed {
|
|
break
|
|
}
|
|
}
|
|
|
|
out, err := sub1.Next(ctx)
|
|
require.Error(t, err)
|
|
require.Equal(t, ErrSubscriptionClosed, err)
|
|
require.Equal(t, structs.Events{}, out)
|
|
}
|
|
|
|
type fakeACLDelegate struct {
|
|
tokenProvider ACLTokenProvider
|
|
}
|
|
|
|
func (d *fakeACLDelegate) TokenProvider() ACLTokenProvider {
|
|
return d.tokenProvider
|
|
}
|
|
|
|
type fakeACLTokenProvider struct {
|
|
policy *structs.ACLPolicy
|
|
policyErr error
|
|
token *structs.ACLToken
|
|
tokenErr error
|
|
role *structs.ACLRole
|
|
roleErr error
|
|
}
|
|
|
|
func (p *fakeACLTokenProvider) ACLTokenBySecretID(_ memdb.WatchSet, _ string) (*structs.ACLToken, error) {
|
|
return p.token, p.tokenErr
|
|
}
|
|
|
|
func (p *fakeACLTokenProvider) ACLPolicyByName(_ memdb.WatchSet, _ string) (*structs.ACLPolicy, error) {
|
|
return p.policy, p.policyErr
|
|
}
|
|
|
|
func (p *fakeACLTokenProvider) GetACLRoleByID(_ memdb.WatchSet, _ string) (*structs.ACLRole, error) {
|
|
return p.role, p.roleErr
|
|
}
|
|
|
|
func TestEventBroker_handleACLUpdates_policyUpdated(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
secretID := "some-secret-id"
|
|
cases := []struct {
|
|
policyBeforeRules string
|
|
policyAfterRules string
|
|
topics map[structs.Topic][]string
|
|
desc string
|
|
event structs.Event
|
|
policyEvent structs.Event
|
|
shouldUnsubscribe bool
|
|
initialSubErr bool
|
|
}{
|
|
{
|
|
desc: "subscribed to deployments and removed access",
|
|
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
|
|
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
|
|
shouldUnsubscribe: true,
|
|
event: structs.Event{
|
|
Topic: structs.TopicDeployment,
|
|
Type: structs.TypeDeploymentUpdate,
|
|
Payload: structs.DeploymentEvent{
|
|
Deployment: &structs.Deployment{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
|
|
},
|
|
},
|
|
{
|
|
desc: "subscribed to evals and removed access",
|
|
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
|
|
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
|
|
shouldUnsubscribe: true,
|
|
event: structs.Event{
|
|
Topic: structs.TopicEvaluation,
|
|
Type: structs.TypeEvalUpdated,
|
|
Payload: structs.EvaluationEvent{
|
|
Evaluation: &structs.Evaluation{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
|
|
},
|
|
},
|
|
{
|
|
desc: "subscribed to allocs and removed access",
|
|
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
|
|
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
|
|
shouldUnsubscribe: true,
|
|
event: structs.Event{
|
|
Topic: structs.TopicAllocation,
|
|
Type: structs.TypeAllocationUpdated,
|
|
Payload: structs.AllocationEvent{
|
|
Allocation: &structs.Allocation{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
|
|
},
|
|
},
|
|
{
|
|
desc: "subscribed to nodes and removed access",
|
|
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
|
|
policyAfterRules: mock.NodePolicy(acl.PolicyDeny),
|
|
shouldUnsubscribe: true,
|
|
event: structs.Event{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeRegistration,
|
|
Payload: structs.NodeStreamEvent{
|
|
Node: &structs.Node{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
|
|
},
|
|
},
|
|
{
|
|
desc: "subscribed to evals in all namespaces and removed access",
|
|
policyBeforeRules: mock.NamespacePolicy("*", "", []string{acl.NamespaceCapabilityReadJob}),
|
|
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
|
|
shouldUnsubscribe: true,
|
|
event: structs.Event{
|
|
Topic: structs.TopicEvaluation,
|
|
Type: structs.TypeEvalUpdated,
|
|
Namespace: "foo",
|
|
Payload: structs.EvaluationEvent{
|
|
Evaluation: &structs.Evaluation{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
|
|
},
|
|
},
|
|
{
|
|
desc: "subscribed to deployments and no access change",
|
|
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
|
|
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
|
|
shouldUnsubscribe: false,
|
|
event: structs.Event{
|
|
Topic: structs.TopicDeployment,
|
|
Type: structs.TypeDeploymentUpdate,
|
|
Payload: structs.DeploymentEvent{
|
|
Deployment: &structs.Deployment{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
|
|
},
|
|
},
|
|
{
|
|
desc: "subscribed to evals and no access change",
|
|
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
|
|
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
|
|
shouldUnsubscribe: false,
|
|
event: structs.Event{
|
|
Topic: structs.TopicEvaluation,
|
|
Type: structs.TypeEvalUpdated,
|
|
Payload: structs.EvaluationEvent{
|
|
Evaluation: &structs.Evaluation{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
|
|
},
|
|
},
|
|
{
|
|
desc: "subscribed to allocs and no access change",
|
|
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
|
|
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
|
|
shouldUnsubscribe: false,
|
|
event: structs.Event{
|
|
Topic: structs.TopicAllocation,
|
|
Type: structs.TypeAllocationUpdated,
|
|
Payload: structs.AllocationEvent{
|
|
Allocation: &structs.Allocation{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
|
|
},
|
|
},
|
|
{
|
|
desc: "subscribed to nodes and no access change",
|
|
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
|
|
policyAfterRules: mock.NodePolicy(acl.PolicyRead),
|
|
shouldUnsubscribe: false,
|
|
event: structs.Event{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeRegistration,
|
|
Payload: structs.NodeStreamEvent{
|
|
Node: &structs.Node{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
|
|
},
|
|
},
|
|
{
|
|
desc: "initial token insufficient privileges",
|
|
initialSubErr: true,
|
|
policyBeforeRules: mock.NodePolicy(acl.PolicyDeny),
|
|
event: structs.Event{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeRegistration,
|
|
Payload: structs.NodeStreamEvent{
|
|
Node: &structs.Node{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
|
|
},
|
|
},
|
|
{
|
|
desc: "subscribed to nodes and policy change no change",
|
|
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
|
|
policyAfterRules: mock.NodePolicy(acl.PolicyWrite),
|
|
shouldUnsubscribe: false,
|
|
event: structs.Event{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeRegistration,
|
|
Payload: structs.NodeStreamEvent{
|
|
Node: &structs.Node{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLPolicy,
|
|
Type: structs.TypeACLPolicyUpserted,
|
|
Payload: &structs.ACLPolicyEvent{
|
|
ACLPolicy: &structs.ACLPolicy{
|
|
Name: "some-policy",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
desc: "subscribed to nodes and policy change no access",
|
|
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
|
|
policyAfterRules: mock.NodePolicy(acl.PolicyDeny),
|
|
shouldUnsubscribe: true,
|
|
event: structs.Event{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeRegistration,
|
|
Payload: structs.NodeStreamEvent{
|
|
Node: &structs.Node{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLPolicy,
|
|
Type: structs.TypeACLPolicyUpserted,
|
|
Payload: &structs.ACLPolicyEvent{
|
|
ACLPolicy: &structs.ACLPolicy{
|
|
Name: "some-policy",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
desc: "subscribed to nodes policy deleted",
|
|
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
|
|
policyAfterRules: "",
|
|
shouldUnsubscribe: true,
|
|
event: structs.Event{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeRegistration,
|
|
Payload: structs.NodeStreamEvent{
|
|
Node: &structs.Node{
|
|
ID: "some-id",
|
|
},
|
|
},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLPolicy,
|
|
Type: structs.TypeACLPolicyDeleted,
|
|
Payload: &structs.ACLPolicyEvent{
|
|
ACLPolicy: &structs.ACLPolicy{
|
|
Name: "some-policy",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
|
|
policy := &structs.ACLPolicy{
|
|
Name: "some-policy",
|
|
Rules: tc.policyBeforeRules,
|
|
}
|
|
policy.SetHash()
|
|
|
|
tokenProvider := &fakeACLTokenProvider{
|
|
policy: policy,
|
|
token: &structs.ACLToken{
|
|
SecretID: secretID,
|
|
Policies: []string{policy.Name},
|
|
},
|
|
}
|
|
|
|
aclDelegate := &fakeACLDelegate{
|
|
tokenProvider: tokenProvider,
|
|
}
|
|
|
|
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
|
|
require.NoError(t, err)
|
|
|
|
var ns string
|
|
if tc.event.Namespace != "" {
|
|
ns = tc.event.Namespace
|
|
} else {
|
|
ns = structs.DefaultNamespace
|
|
}
|
|
|
|
sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
|
|
Topics: map[structs.Topic][]string{
|
|
tc.event.Topic: {"*"},
|
|
},
|
|
Namespace: ns,
|
|
Token: secretID,
|
|
})
|
|
require.Nil(t, expiryTime)
|
|
|
|
if tc.initialSubErr {
|
|
require.Error(t, err)
|
|
require.Nil(t, sub)
|
|
return
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}})
|
|
|
|
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
|
|
defer cancel()
|
|
_, err = sub.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// Update the mock provider to use the after rules
|
|
policyAfter := &structs.ACLPolicy{
|
|
Name: "some-new-policy",
|
|
Rules: tc.policyAfterRules,
|
|
ModifyIndex: 101, // The ModifyIndex is used to caclulate the acl cache key
|
|
}
|
|
policyAfter.SetHash()
|
|
|
|
tokenProvider.policy = policyAfter
|
|
|
|
// Publish ACL event triggering subscription re-evaluation
|
|
publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}})
|
|
// Publish another event
|
|
publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}})
|
|
|
|
// If we are expecting to unsubscribe consume the subscription
|
|
// until the expected error occurs.
|
|
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
|
|
defer cancel()
|
|
if tc.shouldUnsubscribe {
|
|
for {
|
|
_, err = sub.Next(ctx)
|
|
if err != nil {
|
|
if err == context.DeadlineExceeded {
|
|
require.Fail(t, err.Error())
|
|
}
|
|
if err == ErrSubscriptionClosed {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
_, err = sub.Next(ctx)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}})
|
|
|
|
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
|
|
defer cancel()
|
|
_, err = sub.Next(ctx)
|
|
if tc.shouldUnsubscribe {
|
|
require.Equal(t, ErrSubscriptionClosed, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEventBroker_handleACLUpdates_roleUpdated(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
// Generate a UUID to use in all tests for the token secret ID and the role
|
|
// ID.
|
|
tokenSecretID := uuid.Generate()
|
|
roleID := uuid.Generate()
|
|
|
|
cases := []struct {
|
|
name string
|
|
aclPolicy *structs.ACLPolicy
|
|
roleBeforePolicyLinks []*structs.ACLRolePolicyLink
|
|
roleAfterPolicyLinks []*structs.ACLRolePolicyLink
|
|
topics map[structs.Topic][]string
|
|
event structs.Event
|
|
policyEvent structs.Event
|
|
shouldUnsubscribe bool
|
|
initialSubErr bool
|
|
}{
|
|
{
|
|
name: "deployments access policy link removed",
|
|
aclPolicy: &structs.ACLPolicy{
|
|
Name: "test-event-broker-acl-policy",
|
|
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
|
|
acl.NamespaceCapabilityReadJob},
|
|
),
|
|
},
|
|
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
|
|
shouldUnsubscribe: true,
|
|
event: structs.Event{
|
|
Topic: structs.TopicDeployment,
|
|
Type: structs.TypeDeploymentUpdate,
|
|
Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
|
|
},
|
|
},
|
|
{
|
|
name: "evaluations access policy link removed",
|
|
aclPolicy: &structs.ACLPolicy{
|
|
Name: "test-event-broker-acl-policy",
|
|
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
|
|
acl.NamespaceCapabilityReadJob},
|
|
),
|
|
},
|
|
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
|
|
shouldUnsubscribe: true,
|
|
event: structs.Event{
|
|
Topic: structs.TopicEvaluation,
|
|
Type: structs.TypeEvalUpdated,
|
|
Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
|
|
},
|
|
},
|
|
{
|
|
name: "allocations access policy link removed",
|
|
aclPolicy: &structs.ACLPolicy{
|
|
Name: "test-event-broker-acl-policy",
|
|
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
|
|
acl.NamespaceCapabilityReadJob},
|
|
),
|
|
},
|
|
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
|
|
shouldUnsubscribe: true,
|
|
event: structs.Event{
|
|
Topic: structs.TopicAllocation,
|
|
Type: structs.TypeAllocationUpdated,
|
|
Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
|
|
},
|
|
},
|
|
{
|
|
name: "nodes access policy link removed",
|
|
aclPolicy: &structs.ACLPolicy{
|
|
Name: "test-event-broker-acl-policy",
|
|
Rules: mock.NodePolicy(acl.PolicyRead),
|
|
},
|
|
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
|
|
shouldUnsubscribe: true,
|
|
event: structs.Event{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeRegistration,
|
|
Payload: structs.NodeStreamEvent{Node: &structs.Node{}},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
|
|
},
|
|
},
|
|
{
|
|
name: "deployment access no change",
|
|
aclPolicy: &structs.ACLPolicy{
|
|
Name: "test-event-broker-acl-policy",
|
|
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
|
|
acl.NamespaceCapabilityReadJob},
|
|
),
|
|
},
|
|
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
shouldUnsubscribe: false,
|
|
event: structs.Event{
|
|
Topic: structs.TopicDeployment,
|
|
Type: structs.TypeDeploymentUpdate,
|
|
Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
|
|
},
|
|
},
|
|
{
|
|
name: "evaluations access no change",
|
|
aclPolicy: &structs.ACLPolicy{
|
|
Name: "test-event-broker-acl-policy",
|
|
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
|
|
acl.NamespaceCapabilityReadJob},
|
|
),
|
|
},
|
|
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
shouldUnsubscribe: false,
|
|
event: structs.Event{
|
|
Topic: structs.TopicEvaluation,
|
|
Type: structs.TypeEvalUpdated,
|
|
Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
|
|
},
|
|
},
|
|
{
|
|
name: "allocations access no change",
|
|
aclPolicy: &structs.ACLPolicy{
|
|
Name: "test-event-broker-acl-policy",
|
|
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
|
|
acl.NamespaceCapabilityReadJob},
|
|
),
|
|
},
|
|
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
shouldUnsubscribe: false,
|
|
event: structs.Event{
|
|
Topic: structs.TopicAllocation,
|
|
Type: structs.TypeAllocationUpdated,
|
|
Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
|
|
},
|
|
},
|
|
{
|
|
name: "nodes access no change",
|
|
aclPolicy: &structs.ACLPolicy{
|
|
Name: "test-event-broker-acl-policy",
|
|
Rules: mock.NodePolicy(acl.PolicyRead),
|
|
},
|
|
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
|
|
shouldUnsubscribe: false,
|
|
event: structs.Event{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeRegistration,
|
|
Payload: structs.NodeStreamEvent{Node: &structs.Node{}},
|
|
},
|
|
policyEvent: structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
|
|
// Build our fake token provider containing the relevant state
|
|
// objects and add this to our new delegate. Keeping the token
|
|
// provider setup separate means we can easily update its state.
|
|
tokenProvider := &fakeACLTokenProvider{
|
|
policy: tc.aclPolicy,
|
|
token: &structs.ACLToken{
|
|
SecretID: tokenSecretID,
|
|
Roles: []*structs.ACLTokenRoleLink{{ID: roleID}},
|
|
},
|
|
role: &structs.ACLRole{
|
|
ID: uuid.Short(),
|
|
Policies: []*structs.ACLRolePolicyLink{
|
|
{Name: tc.aclPolicy.Name},
|
|
},
|
|
},
|
|
}
|
|
aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider}
|
|
|
|
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
|
|
require.NoError(t, err)
|
|
|
|
ns := structs.DefaultNamespace
|
|
if tc.event.Namespace != "" {
|
|
ns = tc.event.Namespace
|
|
}
|
|
|
|
sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
|
|
Topics: map[structs.Topic][]string{tc.event.Topic: {"*"}},
|
|
Namespace: ns,
|
|
Token: tokenSecretID,
|
|
})
|
|
require.Nil(t, expiryTime)
|
|
|
|
if tc.initialSubErr {
|
|
require.Error(t, err)
|
|
require.Nil(t, sub)
|
|
return
|
|
}
|
|
|
|
require.NoError(t, err)
|
|
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}})
|
|
|
|
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
|
|
defer cancel()
|
|
_, err = sub.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// Overwrite the ACL role policy links with the updated version
|
|
// which is expected to cause a change in the subscription.
|
|
tokenProvider.role.Policies = tc.roleAfterPolicyLinks
|
|
|
|
// Publish ACL event triggering subscription re-evaluation
|
|
publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}})
|
|
publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}})
|
|
|
|
// If we are expecting to unsubscribe consume the subscription
|
|
// until the expected error occurs.
|
|
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
|
|
defer cancel()
|
|
if tc.shouldUnsubscribe {
|
|
for {
|
|
_, err = sub.Next(ctx)
|
|
if err != nil {
|
|
if err == context.DeadlineExceeded {
|
|
require.Fail(t, err.Error())
|
|
}
|
|
if err == ErrSubscriptionClosed {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
_, err = sub.Next(ctx)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}})
|
|
|
|
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
|
|
defer cancel()
|
|
_, err = sub.Next(ctx)
|
|
if tc.shouldUnsubscribe {
|
|
require.Equal(t, ErrSubscriptionClosed, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
cases := []struct {
|
|
name string
|
|
inputToken *structs.ACLToken
|
|
shouldExpire bool
|
|
}{
|
|
{
|
|
name: "token does not expire",
|
|
inputToken: &structs.ACLToken{
|
|
AccessorID: uuid.Generate(),
|
|
SecretID: uuid.Generate(),
|
|
ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()),
|
|
Type: structs.ACLManagementToken,
|
|
},
|
|
shouldExpire: false,
|
|
},
|
|
{
|
|
name: "token does expire",
|
|
inputToken: &structs.ACLToken{
|
|
AccessorID: uuid.Generate(),
|
|
SecretID: uuid.Generate(),
|
|
ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()),
|
|
Type: structs.ACLManagementToken,
|
|
},
|
|
shouldExpire: true,
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
|
|
// Build our fake token provider containing the relevant state
|
|
// objects and add this to our new delegate. Keeping the token
|
|
// provider setup separate means we can easily update its state.
|
|
tokenProvider := &fakeACLTokenProvider{token: tc.inputToken}
|
|
aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider}
|
|
|
|
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
|
|
require.NoError(t, err)
|
|
|
|
fakeNodeEvent := structs.Event{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeRegistration,
|
|
Payload: structs.NodeStreamEvent{Node: &structs.Node{}},
|
|
}
|
|
|
|
fakeTokenEvent := structs.Event{
|
|
Topic: structs.TopicACLToken,
|
|
Type: structs.TypeACLTokenUpserted,
|
|
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tc.inputToken.SecretID}),
|
|
}
|
|
|
|
sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
|
|
Topics: map[structs.Topic][]string{structs.TopicAll: {"*"}},
|
|
Token: tc.inputToken.SecretID,
|
|
})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, sub)
|
|
require.NotNil(t, expiryTime)
|
|
|
|
// Publish an event and check that there is a new item in the
|
|
// subscription queue.
|
|
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{fakeNodeEvent}})
|
|
|
|
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
|
|
defer cancel()
|
|
_, err = sub.Next(ctx)
|
|
require.NoError(t, err)
|
|
|
|
// If the test states the token should expire, set the expiration
|
|
// time to a previous time.
|
|
if tc.shouldExpire {
|
|
tokenProvider.token.ExpirationTime = pointer.Of(
|
|
time.Date(1987, time.April, 13, 8, 3, 0, 0, time.UTC),
|
|
)
|
|
}
|
|
|
|
// Publish some events to trigger re-evaluation of the subscription.
|
|
publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{fakeTokenEvent}})
|
|
publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{fakeNodeEvent}})
|
|
|
|
// If we are expecting to unsubscribe consume the subscription
|
|
// until the expected error occurs.
|
|
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
|
|
defer cancel()
|
|
|
|
if tc.shouldExpire {
|
|
for {
|
|
if _, err = sub.Next(ctx); err != nil {
|
|
if err == context.DeadlineExceeded {
|
|
require.Fail(t, err.Error())
|
|
}
|
|
if err == ErrSubscriptionClosed {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
_, err = sub.Next(ctx)
|
|
require.NoError(t, err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
|
|
eventCh := make(chan subNextResult, 1)
|
|
go func() {
|
|
for {
|
|
es, err := sub.Next(ctx)
|
|
eventCh <- subNextResult{
|
|
Events: es.Events,
|
|
Err: err,
|
|
}
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return eventCh
|
|
}
|
|
|
|
type subNextResult struct {
|
|
Events []structs.Event
|
|
Err error
|
|
}
|
|
|
|
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult {
|
|
t.Helper()
|
|
select {
|
|
case next := <-eventCh:
|
|
return next
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Fatalf("no event after 100ms")
|
|
}
|
|
return subNextResult{}
|
|
}
|
|
|
|
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
|
|
t.Helper()
|
|
select {
|
|
case next := <-eventCh:
|
|
require.NoError(t, next.Err)
|
|
require.Len(t, next.Events, 1)
|
|
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
|
|
case <-time.After(100 * time.Millisecond):
|
|
}
|
|
}
|
|
|
|
func consumeSub(ctx context.Context, sub *Subscription) error {
|
|
for {
|
|
_, err := sub.Next(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|