open-nomad/nomad/stream/event_broker_test.go
2023-04-10 15:36:59 +00:00

1044 lines
31 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package stream
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestEventBroker_PublishChangesAndSubscribe(t *testing.T) {
ci.Parallel(t)
subscription := &SubscribeRequest{
Topics: map[structs.Topic][]string{
"Test": {"sub-key"},
},
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{EventBufferSize: 100})
require.NoError(t, err)
sub, err := publisher.Subscribe(subscription)
require.NoError(t, err)
eventCh := consumeSubscription(ctx, sub)
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
events := []structs.Event{{
Index: 1,
Topic: "Test",
Key: "sub-key",
Payload: "sample payload",
}}
publisher.Publish(&structs.Events{Index: 1, Events: events})
// Subscriber should see the published event
result := nextResult(t, eventCh)
require.NoError(t, result.Err)
expected := []structs.Event{{Payload: "sample payload", Key: "sub-key", Topic: "Test", Index: 1}}
require.Equal(t, expected, result.Events)
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
// Publish a second event
events = []structs.Event{{
Index: 2,
Topic: "Test",
Key: "sub-key",
Payload: "sample payload 2",
}}
publisher.Publish(&structs.Events{Index: 2, Events: events})
result = nextResult(t, eventCh)
require.NoError(t, result.Err)
expected = []structs.Event{{Payload: "sample payload 2", Key: "sub-key", Topic: "Test", Index: 2}}
require.Equal(t, expected, result.Events)
}
func TestEventBroker_ShutdownClosesSubscriptions(t *testing.T) {
ci.Parallel(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
sub1, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
defer sub1.Unsubscribe()
sub2, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
defer sub2.Unsubscribe()
cancel() // Shutdown
err = consumeSub(context.Background(), sub1)
require.Equal(t, err, ErrSubscriptionClosed)
_, err = sub2.Next(context.Background())
require.Equal(t, err, ErrSubscriptionClosed)
}
// TestEventBroker_EmptyReqToken_DistinctSubscriptions tests subscription
// hanlding behavior when ACLs are disabled (request Token is empty).
// Subscriptions are mapped by their request token. when that token is empty,
// the subscriptions should still be handled indeppendtly of each other when
// unssubscribing.
func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) {
ci.Parallel(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
// first subscription, empty token
sub1, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
defer sub1.Unsubscribe()
// second subscription, empty token
sub2, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
require.NotNil(t, sub2)
sub1.Unsubscribe()
require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state))
}
func TestEventBroker_handleACLUpdates_TokenDeleted(t *testing.T) {
ci.Parallel(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher, err := NewEventBroker(ctx, nil, EventBrokerCfg{})
require.NoError(t, err)
sub1, err := publisher.Subscribe(&SubscribeRequest{
Topics: map[structs.Topic][]string{
"*": {"*"},
},
Token: "foo",
})
require.NoError(t, err)
defer sub1.Unsubscribe()
aclEvent := structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenDeleted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: "foo"}),
}
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{aclEvent}})
for {
_, err := sub1.Next(ctx)
if err == ErrSubscriptionClosed {
break
}
}
out, err := sub1.Next(ctx)
require.Error(t, err)
require.Equal(t, ErrSubscriptionClosed, err)
require.Equal(t, structs.Events{}, out)
}
type fakeACLDelegate struct {
tokenProvider ACLTokenProvider
}
func (d *fakeACLDelegate) TokenProvider() ACLTokenProvider {
return d.tokenProvider
}
type fakeACLTokenProvider struct {
policy *structs.ACLPolicy
policyErr error
token *structs.ACLToken
tokenErr error
role *structs.ACLRole
roleErr error
}
func (p *fakeACLTokenProvider) ACLTokenBySecretID(_ memdb.WatchSet, _ string) (*structs.ACLToken, error) {
return p.token, p.tokenErr
}
func (p *fakeACLTokenProvider) ACLPolicyByName(_ memdb.WatchSet, _ string) (*structs.ACLPolicy, error) {
return p.policy, p.policyErr
}
func (p *fakeACLTokenProvider) GetACLRoleByID(_ memdb.WatchSet, _ string) (*structs.ACLRole, error) {
return p.role, p.roleErr
}
func TestEventBroker_handleACLUpdates_policyUpdated(t *testing.T) {
ci.Parallel(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
secretID := "some-secret-id"
cases := []struct {
policyBeforeRules string
policyAfterRules string
topics map[structs.Topic][]string
desc string
event structs.Event
policyEvent structs.Event
shouldUnsubscribe bool
initialSubErr bool
}{
{
desc: "subscribed to deployments and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{
Deployment: &structs.Deployment{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to evals and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Payload: structs.EvaluationEvent{
Evaluation: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to allocs and removed access",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicAllocation,
Type: structs.TypeAllocationUpdated,
Payload: structs.AllocationEvent{
Allocation: &structs.Allocation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to nodes and removed access",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyDeny),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to evals in all namespaces and removed access",
policyBeforeRules: mock.NamespacePolicy("*", "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Namespace: "foo",
Payload: structs.EvaluationEvent{
Evaluation: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to deployments and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{
Deployment: &structs.Deployment{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to evals and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Payload: structs.EvaluationEvent{
Evaluation: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to allocs and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicAllocation,
Type: structs.TypeAllocationUpdated,
Payload: structs.AllocationEvent{
Allocation: &structs.Allocation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to nodes and no access change",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyRead),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "initial token insufficient privileges",
initialSubErr: true,
policyBeforeRules: mock.NodePolicy(acl.PolicyDeny),
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to nodes and policy change no change",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyWrite),
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyUpserted,
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
{
desc: "subscribed to nodes and policy change no access",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: mock.NodePolicy(acl.PolicyDeny),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyUpserted,
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
{
desc: "subscribed to nodes policy deleted",
policyBeforeRules: mock.NodePolicy(acl.PolicyRead),
policyAfterRules: "",
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{
Node: &structs.Node{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLPolicy,
Type: structs.TypeACLPolicyDeleted,
Payload: &structs.ACLPolicyEvent{
ACLPolicy: &structs.ACLPolicy{
Name: "some-policy",
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
policy := &structs.ACLPolicy{
Name: "some-policy",
Rules: tc.policyBeforeRules,
}
policy.SetHash()
tokenProvider := &fakeACLTokenProvider{
policy: policy,
token: &structs.ACLToken{
SecretID: secretID,
Policies: []string{policy.Name},
},
}
aclDelegate := &fakeACLDelegate{
tokenProvider: tokenProvider,
}
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
require.NoError(t, err)
var ns string
if tc.event.Namespace != "" {
ns = tc.event.Namespace
} else {
ns = structs.DefaultNamespace
}
sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{
tc.event.Topic: {"*"},
},
Namespace: ns,
Token: secretID,
})
require.Nil(t, expiryTime)
if tc.initialSubErr {
require.Error(t, err)
require.Nil(t, sub)
return
} else {
require.NoError(t, err)
}
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}})
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
require.NoError(t, err)
// Update the mock provider to use the after rules
policyAfter := &structs.ACLPolicy{
Name: "some-new-policy",
Rules: tc.policyAfterRules,
ModifyIndex: 101, // The ModifyIndex is used to caclulate the acl cache key
}
policyAfter.SetHash()
tokenProvider.policy = policyAfter
// Publish ACL event triggering subscription re-evaluation
publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}})
// Publish another event
publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}})
// If we are expecting to unsubscribe consume the subscription
// until the expected error occurs.
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
if tc.shouldUnsubscribe {
for {
_, err = sub.Next(ctx)
if err != nil {
if err == context.DeadlineExceeded {
require.Fail(t, err.Error())
}
if err == ErrSubscriptionClosed {
break
}
}
}
} else {
_, err = sub.Next(ctx)
require.NoError(t, err)
}
publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}})
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
if tc.shouldUnsubscribe {
require.Equal(t, ErrSubscriptionClosed, err)
} else {
require.NoError(t, err)
}
})
}
}
func TestEventBroker_handleACLUpdates_roleUpdated(t *testing.T) {
ci.Parallel(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
// Generate a UUID to use in all tests for the token secret ID and the role
// ID.
tokenSecretID := uuid.Generate()
roleID := uuid.Generate()
cases := []struct {
name string
aclPolicy *structs.ACLPolicy
roleBeforePolicyLinks []*structs.ACLRolePolicyLink
roleAfterPolicyLinks []*structs.ACLRolePolicyLink
topics map[structs.Topic][]string
event structs.Event
policyEvent structs.Event
shouldUnsubscribe bool
initialSubErr bool
}{
{
name: "deployments access policy link removed",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "evaluations access policy link removed",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "allocations access policy link removed",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicAllocation,
Type: structs.TypeAllocationUpdated,
Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "nodes access policy link removed",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NodePolicy(acl.PolicyRead),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{},
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{Node: &structs.Node{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "deployment access no change",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicDeployment,
Type: structs.TypeDeploymentUpdate,
Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "evaluations access no change",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "allocations access no change",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{
acl.NamespaceCapabilityReadJob},
),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicAllocation,
Type: structs.TypeAllocationUpdated,
Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
{
name: "nodes access no change",
aclPolicy: &structs.ACLPolicy{
Name: "test-event-broker-acl-policy",
Rules: mock.NodePolicy(acl.PolicyRead),
},
roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}},
shouldUnsubscribe: false,
event: structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{Node: &structs.Node{}},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}),
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// Build our fake token provider containing the relevant state
// objects and add this to our new delegate. Keeping the token
// provider setup separate means we can easily update its state.
tokenProvider := &fakeACLTokenProvider{
policy: tc.aclPolicy,
token: &structs.ACLToken{
SecretID: tokenSecretID,
Roles: []*structs.ACLTokenRoleLink{{ID: roleID}},
},
role: &structs.ACLRole{
ID: uuid.Short(),
Policies: []*structs.ACLRolePolicyLink{
{Name: tc.aclPolicy.Name},
},
},
}
aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider}
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
require.NoError(t, err)
ns := structs.DefaultNamespace
if tc.event.Namespace != "" {
ns = tc.event.Namespace
}
sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{tc.event.Topic: {"*"}},
Namespace: ns,
Token: tokenSecretID,
})
require.Nil(t, expiryTime)
if tc.initialSubErr {
require.Error(t, err)
require.Nil(t, sub)
return
}
require.NoError(t, err)
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}})
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
require.NoError(t, err)
// Overwrite the ACL role policy links with the updated version
// which is expected to cause a change in the subscription.
tokenProvider.role.Policies = tc.roleAfterPolicyLinks
// Publish ACL event triggering subscription re-evaluation
publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}})
publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}})
// If we are expecting to unsubscribe consume the subscription
// until the expected error occurs.
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
if tc.shouldUnsubscribe {
for {
_, err = sub.Next(ctx)
if err != nil {
if err == context.DeadlineExceeded {
require.Fail(t, err.Error())
}
if err == ErrSubscriptionClosed {
break
}
}
}
} else {
_, err = sub.Next(ctx)
require.NoError(t, err)
}
publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}})
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
if tc.shouldUnsubscribe {
require.Equal(t, ErrSubscriptionClosed, err)
} else {
require.NoError(t, err)
}
})
}
}
func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) {
ci.Parallel(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
cases := []struct {
name string
inputToken *structs.ACLToken
shouldExpire bool
}{
{
name: "token does not expire",
inputToken: &structs.ACLToken{
AccessorID: uuid.Generate(),
SecretID: uuid.Generate(),
ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()),
Type: structs.ACLManagementToken,
},
shouldExpire: false,
},
{
name: "token does expire",
inputToken: &structs.ACLToken{
AccessorID: uuid.Generate(),
SecretID: uuid.Generate(),
ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()),
Type: structs.ACLManagementToken,
},
shouldExpire: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// Build our fake token provider containing the relevant state
// objects and add this to our new delegate. Keeping the token
// provider setup separate means we can easily update its state.
tokenProvider := &fakeACLTokenProvider{token: tc.inputToken}
aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider}
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
require.NoError(t, err)
fakeNodeEvent := structs.Event{
Topic: structs.TopicNode,
Type: structs.TypeNodeRegistration,
Payload: structs.NodeStreamEvent{Node: &structs.Node{}},
}
fakeTokenEvent := structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tc.inputToken.SecretID}),
}
sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{structs.TopicAll: {"*"}},
Token: tc.inputToken.SecretID,
})
require.NoError(t, err)
require.NotNil(t, sub)
require.NotNil(t, expiryTime)
// Publish an event and check that there is a new item in the
// subscription queue.
publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{fakeNodeEvent}})
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
_, err = sub.Next(ctx)
require.NoError(t, err)
// If the test states the token should expire, set the expiration
// time to a previous time.
if tc.shouldExpire {
tokenProvider.token.ExpirationTime = pointer.Of(
time.Date(1987, time.April, 13, 8, 3, 0, 0, time.UTC),
)
}
// Publish some events to trigger re-evaluation of the subscription.
publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{fakeTokenEvent}})
publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{fakeNodeEvent}})
// If we are expecting to unsubscribe consume the subscription
// until the expected error occurs.
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond))
defer cancel()
if tc.shouldExpire {
for {
if _, err = sub.Next(ctx); err != nil {
if err == context.DeadlineExceeded {
require.Fail(t, err.Error())
}
if err == ErrSubscriptionClosed {
break
}
}
}
} else {
_, err = sub.Next(ctx)
require.NoError(t, err)
}
})
}
}
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
go func() {
for {
es, err := sub.Next(ctx)
eventCh <- subNextResult{
Events: es.Events,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
type subNextResult struct {
Events []structs.Event
Err error
}
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult {
t.Helper()
select {
case next := <-eventCh:
return next
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return subNextResult{}
}
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}
func consumeSub(ctx context.Context, sub *Subscription) error {
for {
_, err := sub.Next(ctx)
if err != nil {
return err
}
}
}