From da5069bded0268edfe221b7a18db3b28d6871d7f Mon Sep 17 00:00:00 2001 From: James Rasell Date: Mon, 24 Oct 2022 14:32:18 +0100 Subject: [PATCH] event stream: ensure token expiry is correctly checked for subs. This change ensures that a token's expiry is checked before every event is sent to the caller. Previously, a token could still be used to listen for events after it had expired, as long as the subscription was made while it was unexpired. This would last until the token was garbage collected from state. The check occurs within the RPC as there is currently no state update when a token expires. --- .changelog/15013.txt | 3 + nomad/event_endpoint.go | 16 ++++- nomad/event_endpoint_test.go | 116 ++++++++++++++++++++++++++++++ nomad/stream/event_broker.go | 59 ++++++++++----- nomad/stream/event_broker_test.go | 9 ++- 5 files changed, 181 insertions(+), 22 deletions(-) create mode 100644 .changelog/15013.txt diff --git a/.changelog/15013.txt b/.changelog/15013.txt new file mode 100644 index 000000000..2007a55a1 --- /dev/null +++ b/.changelog/15013.txt @@ -0,0 +1,3 @@ +```release-note:security +event stream: Fixed a bug where ACL token expiration was not checked when emitting events +``` diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index 2628b11a1..dcfaf49a2 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -59,9 +59,13 @@ func (e *Event) stream(conn io.ReadWriteCloser) { // start subscription to publisher var subscription *stream.Subscription var subErr error + + // Track whether the ACL token being used has an expiry time. + var expiryTime *time.Time + // Check required ACL permissions for requested Topics if e.srv.config.ACLEnabled { - subscription, subErr = publisher.SubscribeWithACLCheck(subReq) + subscription, expiryTime, subErr = publisher.SubscribeWithACLCheck(subReq) } else { subscription, subErr = publisher.Subscribe(subReq) } @@ -93,6 +97,16 @@ func (e *Event) stream(conn io.ReadWriteCloser) { return } + // Ensure the token being used is not expired before we any events + // to subscribers. + if expiryTime != nil && expiryTime.Before(time.Now().UTC()) { + select { + case errCh <- structs.ErrTokenExpired: + case <-ctx.Done(): + } + return + } + // Continue if there are no events if len(events.Events) == 0 { continue diff --git a/nomad/event_endpoint_test.go b/nomad/event_endpoint_test.go index 31dddfa7e..b0231a485 100644 --- a/nomad/event_endpoint_test.go +++ b/nomad/event_endpoint_test.go @@ -15,11 +15,13 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/mitchellh/mapstructure" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -625,3 +627,117 @@ OUTER: } } } + +// TestEventStream_ACLTokenExpiry ensure a subscription does not receive events +// and is closed once the token has expired. +func TestEventStream_ACLTokenExpiry(t *testing.T) { + ci.Parallel(t) + + // Start our test server and wait until we have a leader. + testServer, _, testServerCleanup := TestACLServer(t, nil) + defer testServerCleanup() + testutil.WaitForLeader(t, testServer.RPC) + + // Create and upsert and ACL token which has a short expiry set. + aclTokenWithExpiry := mock.ACLManagementToken() + aclTokenWithExpiry.ExpirationTime = pointer.Of(time.Now().Add(2 * time.Second)) + + must.NoError(t, testServer.fsm.State().UpsertACLTokens( + structs.MsgTypeTestSetup, 10, []*structs.ACLToken{aclTokenWithExpiry})) + + req := structs.EventStreamRequest{ + Topics: map[structs.Topic][]string{"Job": {"*"}}, + QueryOptions: structs.QueryOptions{ + Region: testServer.Region(), + Namespace: structs.DefaultNamespace, + AuthToken: aclTokenWithExpiry.SecretID, + }, + } + + handler, err := testServer.StreamingRpcHandler("Event.Stream") + must.NoError(t, err) + + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *structs.EventStreamWrapper) + + go handler(p2) + + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg structs.EventStreamWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %w", err) + } + + streamMsg <- &msg + } + }() + + publisher, err := testServer.State().EventBroker() + must.NoError(t, err) + + jobEvent := structs.JobEvent{ + Job: mock.Job(), + } + + // send req + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + must.Nil(t, encoder.Encode(req)) + + // publish some events + publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}}) + publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}}) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second)) + defer cancel() + + errChStream := make(chan error, 1) + go func() { + for { + select { + case <-ctx.Done(): + errChStream <- ctx.Err() + return + case err := <-errCh: + errChStream <- err + return + case msg := <-streamMsg: + if msg.Error == nil { + continue + } + + errChStream <- msg.Error + return + } + } + }() + + // Generate a timeout for the test and for the expiry. The expiry timeout + // is used to trigger an update which will close the subscription as the + // event stream only reacts to change in state. + testTimeout := time.After(4 * time.Second) + expiryTimeout := time.After(time.Until(*aclTokenWithExpiry.ExpirationTime)) + + for { + select { + case <-testTimeout: + t.Fatal("timeout waiting for event stream to close") + case err := <-errCh: + t.Fatal(err) + case <-expiryTimeout: + publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}}) + case err := <-errChStream: + // Success + must.StrContains(t, err.Error(), "ACL token expired") + return + } + } +} diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index 179ffbce6..8ecf33ebd 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -109,19 +109,25 @@ func (e *EventBroker) Publish(events *structs.Events) { e.publishCh <- events } -// SubscribeWithACLCheck validates the SubscribeRequest's token and requested Topics -// to ensure that the tokens privileges are sufficient enough. -func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, error) { - aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token) +// SubscribeWithACLCheck validates the SubscribeRequest's token and requested +// topics to ensure that the tokens privileges are sufficient. It will also +// return the token expiry time, if any. It is the callers responsibility to +// check this before publishing events to the caller. +func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, *time.Time, error) { + aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token) if err != nil { - return nil, structs.ErrPermissionDenied + return nil, nil, structs.ErrPermissionDenied } if allowed := aclAllowsSubscription(aclObj, req); !allowed { - return nil, structs.ErrPermissionDenied + return nil, nil, structs.ErrPermissionDenied } - return e.Subscribe(req) + sub, err := e.Subscribe(req) + if err != nil { + return nil, nil, err + } + return sub, expiryTime, nil } // Subscribe returns a new Subscription for a given request. A Subscription @@ -203,13 +209,19 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) { continue } - aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID) + aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID) if err != nil || aclObj == nil { e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err) e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) continue } + if expiryTime != nil && expiryTime.Before(time.Now().UTC()) { + e.logger.Info("ACL token is expired, closing subscriptions") + e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) + continue + } + e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool { return !aclAllowsSubscription(aclObj, sub.req) }) @@ -245,13 +257,19 @@ func (e *EventBroker) checkSubscriptionsAgainstACLChange() { continue } - aclObj, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID) + aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID) if err != nil || aclObj == nil { e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err) e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) continue } + if expiryTime != nil && expiryTime.Before(time.Now().UTC()) { + e.logger.Info("ACL token is expired, closing subscriptions") + e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) + continue + } + e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool { return !aclAllowsSubscription(aclObj, sub.req) }) @@ -259,23 +277,24 @@ func (e *EventBroker) checkSubscriptionsAgainstACLChange() { } func aclObjFromSnapshotForTokenSecretID( - aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) { + aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) ( + *acl.ACL, *time.Time, error) { aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID) if err != nil { - return nil, err + return nil, nil, err } if aclToken == nil { - return nil, structs.ErrTokenNotFound + return nil, nil, structs.ErrTokenNotFound } if aclToken.IsExpired(time.Now().UTC()) { - return nil, structs.ErrTokenExpired + return nil, nil, structs.ErrTokenExpired } // Check if this is a management token if aclToken.Type == structs.ACLManagementToken { - return acl.ManagementACL, nil + return acl.ManagementACL, aclToken.ExpirationTime, nil } aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles)) @@ -283,7 +302,7 @@ func aclObjFromSnapshotForTokenSecretID( for _, policyName := range aclToken.Policies { policy, err := aclSnapshot.ACLPolicyByName(nil, policyName) if err != nil || policy == nil { - return nil, errors.New("error finding acl policy") + return nil, nil, errors.New("error finding acl policy") } aclPolicies = append(aclPolicies, policy) } @@ -294,7 +313,7 @@ func aclObjFromSnapshotForTokenSecretID( role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID) if err != nil { - return nil, err + return nil, nil, err } if role == nil { continue @@ -303,13 +322,17 @@ func aclObjFromSnapshotForTokenSecretID( for _, policyLink := range role.Policies { policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name) if err != nil || policy == nil { - return nil, errors.New("error finding acl policy") + return nil, nil, errors.New("error finding acl policy") } aclPolicies = append(aclPolicies, policy) } } - return structs.CompileACLObject(aclCache, aclPolicies) + aclObj, err := structs.CompileACLObject(aclCache, aclPolicies) + if err != nil { + return nil, nil, err + } + return aclObj, aclToken.ExpirationTime, nil } type ACLTokenProvider interface { diff --git a/nomad/stream/event_broker_test.go b/nomad/stream/event_broker_test.go index 5ccf24cea..dfeeb6177 100644 --- a/nomad/stream/event_broker_test.go +++ b/nomad/stream/event_broker_test.go @@ -514,13 +514,14 @@ func TestEventBroker_handleACLUpdates_policyUpdated(t *testing.T) { ns = structs.DefaultNamespace } - sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ + sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ Topics: map[structs.Topic][]string{ tc.event.Topic: {"*"}, }, Namespace: ns, Token: secretID, }) + require.Nil(t, expiryTime) if tc.initialSubErr { require.Error(t, err) @@ -811,11 +812,12 @@ func TestEventBroker_handleACLUpdates_roleUpdated(t *testing.T) { ns = tc.event.Namespace } - sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ + sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ Topics: map[structs.Topic][]string{tc.event.Topic: {"*"}}, Namespace: ns, Token: tokenSecretID, }) + require.Nil(t, expiryTime) if tc.initialSubErr { require.Error(t, err) @@ -931,12 +933,13 @@ func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) { Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tc.inputToken.SecretID}), } - sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ + sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ Topics: map[structs.Topic][]string{structs.TopicAll: {"*"}}, Token: tc.inputToken.SecretID, }) require.NoError(t, err) require.NotNil(t, sub) + require.NotNil(t, expiryTime) // Publish an event and check that there is a new item in the // subscription queue.