da5069bded
This change ensures that a token's expiry is checked before every event is sent to the caller. Previously, a token could still be used to listen for events after it had expired, as long as the subscription was made while it was unexpired. This would last until the token was garbage collected from state. The check occurs within the RPC as there is currently no state update when a token expires.
468 lines
13 KiB
Go
468 lines
13 KiB
Go
package stream
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/hashicorp/go-memdb"
|
|
lru "github.com/hashicorp/golang-lru"
|
|
"github.com/hashicorp/nomad/acl"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
)
|
|
|
|
const (
|
|
ACLCheckNodeRead = "node-read"
|
|
ACLCheckManagement = "management"
|
|
aclCacheSize = 32
|
|
)
|
|
|
|
type EventBrokerCfg struct {
|
|
EventBufferSize int64
|
|
Logger hclog.Logger
|
|
}
|
|
|
|
type EventBroker struct {
|
|
// mu protects subscriptions
|
|
mu sync.Mutex
|
|
subscriptions *subscriptions
|
|
|
|
// eventBuf stores a configurable amount of events in memory
|
|
eventBuf *eventBuffer
|
|
|
|
// publishCh is used to send messages from an active txn to a goroutine which
|
|
// publishes events, so that publishing can happen asynchronously from
|
|
// the Commit call in the FSM hot path.
|
|
publishCh chan *structs.Events
|
|
|
|
aclDelegate ACLDelegate
|
|
aclCache *lru.TwoQueueCache
|
|
|
|
aclCh chan structs.Event
|
|
|
|
logger hclog.Logger
|
|
}
|
|
|
|
// NewEventBroker returns an EventBroker for publishing change events.
|
|
// A goroutine is run in the background to publish events to an event buffer.
|
|
// Cancelling the context will shutdown the goroutine to free resources, and stop
|
|
// all publishing.
|
|
func NewEventBroker(ctx context.Context, aclDelegate ACLDelegate, cfg EventBrokerCfg) (*EventBroker, error) {
|
|
if cfg.Logger == nil {
|
|
cfg.Logger = hclog.NewNullLogger()
|
|
}
|
|
|
|
// Set the event buffer size to a minimum
|
|
if cfg.EventBufferSize == 0 {
|
|
cfg.EventBufferSize = 100
|
|
}
|
|
|
|
aclCache, err := lru.New2Q(aclCacheSize)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
buffer := newEventBuffer(cfg.EventBufferSize)
|
|
e := &EventBroker{
|
|
logger: cfg.Logger.Named("event_broker"),
|
|
eventBuf: buffer,
|
|
publishCh: make(chan *structs.Events, 64),
|
|
aclCh: make(chan structs.Event, 10),
|
|
aclDelegate: aclDelegate,
|
|
aclCache: aclCache,
|
|
subscriptions: &subscriptions{
|
|
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
|
|
},
|
|
}
|
|
|
|
go e.handleUpdates(ctx)
|
|
go e.handleACLUpdates(ctx)
|
|
|
|
return e, nil
|
|
}
|
|
|
|
// Len returns the current length of the event buffer.
|
|
func (e *EventBroker) Len() int {
|
|
return e.eventBuf.Len()
|
|
}
|
|
|
|
// Publish events to all subscribers of the event Topic.
|
|
func (e *EventBroker) Publish(events *structs.Events) {
|
|
if len(events.Events) == 0 {
|
|
return
|
|
}
|
|
|
|
// Notify the broker to check running subscriptions against potentially
|
|
// updated ACL Token or Policy
|
|
for _, event := range events.Events {
|
|
if event.Topic == structs.TopicACLToken || event.Topic == structs.TopicACLPolicy {
|
|
e.aclCh <- event
|
|
}
|
|
}
|
|
|
|
e.publishCh <- events
|
|
}
|
|
|
|
// SubscribeWithACLCheck validates the SubscribeRequest's token and requested
|
|
// topics to ensure that the tokens privileges are sufficient. It will also
|
|
// return the token expiry time, if any. It is the callers responsibility to
|
|
// check this before publishing events to the caller.
|
|
func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, *time.Time, error) {
|
|
aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token)
|
|
if err != nil {
|
|
return nil, nil, structs.ErrPermissionDenied
|
|
}
|
|
|
|
if allowed := aclAllowsSubscription(aclObj, req); !allowed {
|
|
return nil, nil, structs.ErrPermissionDenied
|
|
}
|
|
|
|
sub, err := e.Subscribe(req)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return sub, expiryTime, nil
|
|
}
|
|
|
|
// Subscribe returns a new Subscription for a given request. A Subscription
|
|
// will receive an initial empty currentItem value which points to the first item
|
|
// in the buffer. This allows the new subscription to call Next() without first checking
|
|
// for the current Item.
|
|
//
|
|
// A Subscription will start at the requested index, or as close as possible to
|
|
// the requested index if it is no longer in the buffer. If StartExactlyAtIndex is
|
|
// set and the index is no longer in the buffer or not yet in the buffer an error
|
|
// will be returned.
|
|
//
|
|
// When a caller is finished with the subscription it must call Subscription.Unsubscribe
|
|
// to free ACL tracking resources.
|
|
func (e *EventBroker) Subscribe(req *SubscribeRequest) (*Subscription, error) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
var head *bufferItem
|
|
var offset int
|
|
if req.Index != 0 {
|
|
head, offset = e.eventBuf.StartAtClosest(req.Index)
|
|
} else {
|
|
head = e.eventBuf.Head()
|
|
}
|
|
if offset > 0 && req.StartExactlyAtIndex {
|
|
return nil, fmt.Errorf("requested index not in buffer")
|
|
} else if offset > 0 {
|
|
metrics.SetGauge([]string{"nomad", "event_broker", "subscription", "request_offset"}, float32(offset))
|
|
e.logger.Debug("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Events.Index))
|
|
}
|
|
|
|
// Empty head so that calling Next on sub
|
|
start := newBufferItem(&structs.Events{Index: req.Index})
|
|
start.link.next.Store(head)
|
|
close(start.link.nextCh)
|
|
|
|
sub := newSubscription(req, start, e.subscriptions.unsubscribeFn(req))
|
|
|
|
e.subscriptions.add(req, sub)
|
|
return sub, nil
|
|
}
|
|
|
|
// CloseAll closes all subscriptions
|
|
func (e *EventBroker) CloseAll() {
|
|
e.subscriptions.closeAll()
|
|
}
|
|
|
|
func (e *EventBroker) handleUpdates(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
e.subscriptions.closeAll()
|
|
return
|
|
case update := <-e.publishCh:
|
|
e.eventBuf.Append(update)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *EventBroker) handleACLUpdates(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case update := <-e.aclCh:
|
|
switch payload := update.Payload.(type) {
|
|
case *structs.ACLTokenEvent:
|
|
tokenSecretID := payload.SecretID()
|
|
|
|
// Token was deleted
|
|
if update.Type == structs.TypeACLTokenDeleted {
|
|
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
|
|
continue
|
|
}
|
|
|
|
// If broker cannot fetch state there is nothing more to do
|
|
if e.aclDelegate == nil {
|
|
continue
|
|
}
|
|
|
|
aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID)
|
|
if err != nil || aclObj == nil {
|
|
e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err)
|
|
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
|
|
continue
|
|
}
|
|
|
|
if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
|
|
e.logger.Info("ACL token is expired, closing subscriptions")
|
|
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
|
|
continue
|
|
}
|
|
|
|
e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
|
|
return !aclAllowsSubscription(aclObj, sub.req)
|
|
})
|
|
|
|
case *structs.ACLPolicyEvent, *structs.ACLRoleStreamEvent:
|
|
// Re-evaluate each subscription permission since a policy or
|
|
// role change may alter the permissions of the token being
|
|
// used for the subscription.
|
|
e.checkSubscriptionsAgainstACLChange()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkSubscriptionsAgainstACLChange iterates over the brokers subscriptions
|
|
// and evaluates whether the token used for the subscription is still valid. A
|
|
// token may become invalid is the assigned policies or roles have been updated
|
|
// which removed the required permission. If the token is no long valid, the
|
|
// subscription is closed.
|
|
func (e *EventBroker) checkSubscriptionsAgainstACLChange() {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
// If broker cannot fetch state there is nothing more to do
|
|
if e.aclDelegate == nil {
|
|
return
|
|
}
|
|
|
|
aclSnapshot := e.aclDelegate.TokenProvider()
|
|
for tokenSecretID := range e.subscriptions.byToken {
|
|
// if tokenSecretID is empty ACLs were disabled at time of subscribing
|
|
if tokenSecretID == "" {
|
|
continue
|
|
}
|
|
|
|
aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID)
|
|
if err != nil || aclObj == nil {
|
|
e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err)
|
|
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
|
|
continue
|
|
}
|
|
|
|
if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
|
|
e.logger.Info("ACL token is expired, closing subscriptions")
|
|
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
|
|
continue
|
|
}
|
|
|
|
e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
|
|
return !aclAllowsSubscription(aclObj, sub.req)
|
|
})
|
|
}
|
|
}
|
|
|
|
func aclObjFromSnapshotForTokenSecretID(
|
|
aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (
|
|
*acl.ACL, *time.Time, error) {
|
|
|
|
aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if aclToken == nil {
|
|
return nil, nil, structs.ErrTokenNotFound
|
|
}
|
|
if aclToken.IsExpired(time.Now().UTC()) {
|
|
return nil, nil, structs.ErrTokenExpired
|
|
}
|
|
|
|
// Check if this is a management token
|
|
if aclToken.Type == structs.ACLManagementToken {
|
|
return acl.ManagementACL, aclToken.ExpirationTime, nil
|
|
}
|
|
|
|
aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles))
|
|
|
|
for _, policyName := range aclToken.Policies {
|
|
policy, err := aclSnapshot.ACLPolicyByName(nil, policyName)
|
|
if err != nil || policy == nil {
|
|
return nil, nil, errors.New("error finding acl policy")
|
|
}
|
|
aclPolicies = append(aclPolicies, policy)
|
|
}
|
|
|
|
// Iterate all the token role links, so we can unpack these and identify
|
|
// the ACL policies.
|
|
for _, roleLink := range aclToken.Roles {
|
|
|
|
role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if role == nil {
|
|
continue
|
|
}
|
|
|
|
for _, policyLink := range role.Policies {
|
|
policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name)
|
|
if err != nil || policy == nil {
|
|
return nil, nil, errors.New("error finding acl policy")
|
|
}
|
|
aclPolicies = append(aclPolicies, policy)
|
|
}
|
|
}
|
|
|
|
aclObj, err := structs.CompileACLObject(aclCache, aclPolicies)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return aclObj, aclToken.ExpirationTime, nil
|
|
}
|
|
|
|
type ACLTokenProvider interface {
|
|
ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error)
|
|
ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error)
|
|
GetACLRoleByID(ws memdb.WatchSet, roleID string) (*structs.ACLRole, error)
|
|
}
|
|
|
|
type ACLDelegate interface {
|
|
TokenProvider() ACLTokenProvider
|
|
}
|
|
|
|
func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool {
|
|
for topic := range subReq.Topics {
|
|
switch topic {
|
|
case structs.TopicDeployment,
|
|
structs.TopicEvaluation,
|
|
structs.TopicAllocation,
|
|
structs.TopicJob,
|
|
structs.TopicService:
|
|
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
|
|
return false
|
|
}
|
|
case structs.TopicNode:
|
|
if ok := aclObj.AllowNodeRead(); !ok {
|
|
return false
|
|
}
|
|
default:
|
|
if ok := aclObj.IsManagement(); !ok {
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (s *Subscription) forceClose() {
|
|
if atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed) {
|
|
close(s.forceClosed)
|
|
}
|
|
}
|
|
|
|
type subscriptions struct {
|
|
// mu for byToken. If both subscription.mu and EventBroker.mu need
|
|
// to be held, EventBroker mutex MUST always be acquired first.
|
|
mu sync.RWMutex
|
|
|
|
// byToken is an mapping of active Subscriptions indexed by a token and
|
|
// a pointer to the request.
|
|
// When the token is modified all subscriptions under that token will be
|
|
// reloaded.
|
|
// A subscription may be unsubscribed by using the pointer to the request.
|
|
byToken map[string]map[*SubscribeRequest]*Subscription
|
|
}
|
|
|
|
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
subsByToken, ok := s.byToken[req.Token]
|
|
if !ok {
|
|
subsByToken = make(map[*SubscribeRequest]*Subscription)
|
|
s.byToken[req.Token] = subsByToken
|
|
}
|
|
subsByToken[req] = sub
|
|
}
|
|
|
|
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
for _, secretID := range tokenSecretIDs {
|
|
if subs, ok := s.byToken[secretID]; ok {
|
|
for _, sub := range subs {
|
|
sub.forceClose()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *subscriptions) closeSubscriptionFunc(tokenSecretID string, fn func(*Subscription) bool) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
for _, sub := range s.byToken[tokenSecretID] {
|
|
if fn(sub) {
|
|
sub.forceClose()
|
|
}
|
|
}
|
|
}
|
|
|
|
// unsubscribeFn returns a function that the subscription will call to remove
|
|
// itself from the subsByToken.
|
|
// This function is returned as a closure so that the caller doesn't need to keep
|
|
// track of the SubscriptionRequest, and can not accidentally call unsubscribeFn with the
|
|
// wrong pointer.
|
|
func (s *subscriptions) unsubscribeFn(req *SubscribeRequest) func() {
|
|
return func() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
subsByToken, ok := s.byToken[req.Token]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
sub := subsByToken[req]
|
|
if sub == nil {
|
|
return
|
|
}
|
|
|
|
// close the subscription
|
|
sub.forceClose()
|
|
|
|
delete(subsByToken, req)
|
|
if len(subsByToken) == 0 {
|
|
delete(s.byToken, req.Token)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *subscriptions) closeAll() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
for _, byRequest := range s.byToken {
|
|
for _, sub := range byRequest {
|
|
sub.forceClose()
|
|
}
|
|
}
|
|
}
|