event stream: ensure token expiry is correctly checked for subs.

This change ensures that a token's expiry is checked before every
event is sent to the caller. Previously, a token could still be
used to listen for events after it had expired, as long as the
subscription was made while it was unexpired. This would last until
the token was garbage collected from state.

The check occurs within the RPC as there is currently no state
update when a token expires.
This commit is contained in:
James Rasell 2022-10-24 14:32:18 +01:00 committed by Tim Gross
parent 81ac5d93f1
commit da5069bded
5 changed files with 181 additions and 22 deletions

3
.changelog/15013.txt Normal file
View file

@ -0,0 +1,3 @@
```release-note:security
event stream: Fixed a bug where ACL token expiration was not checked when emitting events
```

View file

@ -59,9 +59,13 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
// start subscription to publisher // start subscription to publisher
var subscription *stream.Subscription var subscription *stream.Subscription
var subErr error var subErr error
// Track whether the ACL token being used has an expiry time.
var expiryTime *time.Time
// Check required ACL permissions for requested Topics // Check required ACL permissions for requested Topics
if e.srv.config.ACLEnabled { if e.srv.config.ACLEnabled {
subscription, subErr = publisher.SubscribeWithACLCheck(subReq) subscription, expiryTime, subErr = publisher.SubscribeWithACLCheck(subReq)
} else { } else {
subscription, subErr = publisher.Subscribe(subReq) subscription, subErr = publisher.Subscribe(subReq)
} }
@ -93,6 +97,16 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
return return
} }
// Ensure the token being used is not expired before we any events
// to subscribers.
if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
select {
case errCh <- structs.ErrTokenExpired:
case <-ctx.Done():
}
return
}
// Continue if there are no events // Continue if there are no events
if len(events.Events) == 0 { if len(events.Events) == 0 {
continue continue

View file

@ -15,11 +15,13 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil" "github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -625,3 +627,117 @@ OUTER:
} }
} }
} }
// TestEventStream_ACLTokenExpiry ensure a subscription does not receive events
// and is closed once the token has expired.
func TestEventStream_ACLTokenExpiry(t *testing.T) {
ci.Parallel(t)
// Start our test server and wait until we have a leader.
testServer, _, testServerCleanup := TestACLServer(t, nil)
defer testServerCleanup()
testutil.WaitForLeader(t, testServer.RPC)
// Create and upsert and ACL token which has a short expiry set.
aclTokenWithExpiry := mock.ACLManagementToken()
aclTokenWithExpiry.ExpirationTime = pointer.Of(time.Now().Add(2 * time.Second))
must.NoError(t, testServer.fsm.State().UpsertACLTokens(
structs.MsgTypeTestSetup, 10, []*structs.ACLToken{aclTokenWithExpiry}))
req := structs.EventStreamRequest{
Topics: map[structs.Topic][]string{"Job": {"*"}},
QueryOptions: structs.QueryOptions{
Region: testServer.Region(),
Namespace: structs.DefaultNamespace,
AuthToken: aclTokenWithExpiry.SecretID,
},
}
handler, err := testServer.StreamingRpcHandler("Event.Stream")
must.NoError(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *structs.EventStreamWrapper)
go handler(p2)
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg structs.EventStreamWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %w", err)
}
streamMsg <- &msg
}
}()
publisher, err := testServer.State().EventBroker()
must.NoError(t, err)
jobEvent := structs.JobEvent{
Job: mock.Job(),
}
// send req
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
must.Nil(t, encoder.Encode(req))
// publish some events
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second))
defer cancel()
errChStream := make(chan error, 1)
go func() {
for {
select {
case <-ctx.Done():
errChStream <- ctx.Err()
return
case err := <-errCh:
errChStream <- err
return
case msg := <-streamMsg:
if msg.Error == nil {
continue
}
errChStream <- msg.Error
return
}
}
}()
// Generate a timeout for the test and for the expiry. The expiry timeout
// is used to trigger an update which will close the subscription as the
// event stream only reacts to change in state.
testTimeout := time.After(4 * time.Second)
expiryTimeout := time.After(time.Until(*aclTokenWithExpiry.ExpirationTime))
for {
select {
case <-testTimeout:
t.Fatal("timeout waiting for event stream to close")
case err := <-errCh:
t.Fatal(err)
case <-expiryTimeout:
publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
case err := <-errChStream:
// Success
must.StrContains(t, err.Error(), "ACL token expired")
return
}
}
}

View file

@ -109,19 +109,25 @@ func (e *EventBroker) Publish(events *structs.Events) {
e.publishCh <- events e.publishCh <- events
} }
// SubscribeWithACLCheck validates the SubscribeRequest's token and requested Topics // SubscribeWithACLCheck validates the SubscribeRequest's token and requested
// to ensure that the tokens privileges are sufficient enough. // topics to ensure that the tokens privileges are sufficient. It will also
func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, error) { // return the token expiry time, if any. It is the callers responsibility to
aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token) // check this before publishing events to the caller.
func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, *time.Time, error) {
aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token)
if err != nil { if err != nil {
return nil, structs.ErrPermissionDenied return nil, nil, structs.ErrPermissionDenied
} }
if allowed := aclAllowsSubscription(aclObj, req); !allowed { if allowed := aclAllowsSubscription(aclObj, req); !allowed {
return nil, structs.ErrPermissionDenied return nil, nil, structs.ErrPermissionDenied
} }
return e.Subscribe(req) sub, err := e.Subscribe(req)
if err != nil {
return nil, nil, err
}
return sub, expiryTime, nil
} }
// Subscribe returns a new Subscription for a given request. A Subscription // Subscribe returns a new Subscription for a given request. A Subscription
@ -203,13 +209,19 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) {
continue continue
} }
aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID) aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID)
if err != nil || aclObj == nil { if err != nil || aclObj == nil {
e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err) e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue continue
} }
if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
e.logger.Info("ACL token is expired, closing subscriptions")
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool { e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
return !aclAllowsSubscription(aclObj, sub.req) return !aclAllowsSubscription(aclObj, sub.req)
}) })
@ -245,13 +257,19 @@ func (e *EventBroker) checkSubscriptionsAgainstACLChange() {
continue continue
} }
aclObj, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID) aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID)
if err != nil || aclObj == nil { if err != nil || aclObj == nil {
e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err) e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue continue
} }
if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
e.logger.Info("ACL token is expired, closing subscriptions")
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool { e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
return !aclAllowsSubscription(aclObj, sub.req) return !aclAllowsSubscription(aclObj, sub.req)
}) })
@ -259,23 +277,24 @@ func (e *EventBroker) checkSubscriptionsAgainstACLChange() {
} }
func aclObjFromSnapshotForTokenSecretID( func aclObjFromSnapshotForTokenSecretID(
aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) { aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (
*acl.ACL, *time.Time, error) {
aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID) aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if aclToken == nil { if aclToken == nil {
return nil, structs.ErrTokenNotFound return nil, nil, structs.ErrTokenNotFound
} }
if aclToken.IsExpired(time.Now().UTC()) { if aclToken.IsExpired(time.Now().UTC()) {
return nil, structs.ErrTokenExpired return nil, nil, structs.ErrTokenExpired
} }
// Check if this is a management token // Check if this is a management token
if aclToken.Type == structs.ACLManagementToken { if aclToken.Type == structs.ACLManagementToken {
return acl.ManagementACL, nil return acl.ManagementACL, aclToken.ExpirationTime, nil
} }
aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles)) aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles))
@ -283,7 +302,7 @@ func aclObjFromSnapshotForTokenSecretID(
for _, policyName := range aclToken.Policies { for _, policyName := range aclToken.Policies {
policy, err := aclSnapshot.ACLPolicyByName(nil, policyName) policy, err := aclSnapshot.ACLPolicyByName(nil, policyName)
if err != nil || policy == nil { if err != nil || policy == nil {
return nil, errors.New("error finding acl policy") return nil, nil, errors.New("error finding acl policy")
} }
aclPolicies = append(aclPolicies, policy) aclPolicies = append(aclPolicies, policy)
} }
@ -294,7 +313,7 @@ func aclObjFromSnapshotForTokenSecretID(
role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID) role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if role == nil { if role == nil {
continue continue
@ -303,13 +322,17 @@ func aclObjFromSnapshotForTokenSecretID(
for _, policyLink := range role.Policies { for _, policyLink := range role.Policies {
policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name) policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name)
if err != nil || policy == nil { if err != nil || policy == nil {
return nil, errors.New("error finding acl policy") return nil, nil, errors.New("error finding acl policy")
} }
aclPolicies = append(aclPolicies, policy) aclPolicies = append(aclPolicies, policy)
} }
} }
return structs.CompileACLObject(aclCache, aclPolicies) aclObj, err := structs.CompileACLObject(aclCache, aclPolicies)
if err != nil {
return nil, nil, err
}
return aclObj, aclToken.ExpirationTime, nil
} }
type ACLTokenProvider interface { type ACLTokenProvider interface {

View file

@ -514,13 +514,14 @@ func TestEventBroker_handleACLUpdates_policyUpdated(t *testing.T) {
ns = structs.DefaultNamespace ns = structs.DefaultNamespace
} }
sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{ Topics: map[structs.Topic][]string{
tc.event.Topic: {"*"}, tc.event.Topic: {"*"},
}, },
Namespace: ns, Namespace: ns,
Token: secretID, Token: secretID,
}) })
require.Nil(t, expiryTime)
if tc.initialSubErr { if tc.initialSubErr {
require.Error(t, err) require.Error(t, err)
@ -811,11 +812,12 @@ func TestEventBroker_handleACLUpdates_roleUpdated(t *testing.T) {
ns = tc.event.Namespace ns = tc.event.Namespace
} }
sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{tc.event.Topic: {"*"}}, Topics: map[structs.Topic][]string{tc.event.Topic: {"*"}},
Namespace: ns, Namespace: ns,
Token: tokenSecretID, Token: tokenSecretID,
}) })
require.Nil(t, expiryTime)
if tc.initialSubErr { if tc.initialSubErr {
require.Error(t, err) require.Error(t, err)
@ -931,12 +933,13 @@ func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) {
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tc.inputToken.SecretID}), Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tc.inputToken.SecretID}),
} }
sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{structs.TopicAll: {"*"}}, Topics: map[structs.Topic][]string{structs.TopicAll: {"*"}},
Token: tc.inputToken.SecretID, Token: tc.inputToken.SecretID,
}) })
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, sub) require.NotNil(t, sub)
require.NotNil(t, expiryTime)
// Publish an event and check that there is a new item in the // Publish an event and check that there is a new item in the
// subscription queue. // subscription queue.