open-vault/vault/quotas/quotas.go

891 lines
22 KiB
Go

package quotas
import (
"context"
"errors"
"fmt"
"path"
"strings"
"sync"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/shared-secure-libs/metricsutil"
"github.com/hashicorp/vault/sdk/logical"
)
// Type represents the quota kind
type Type string
const (
// TypeRateLimit represents the rate limiting quota type
TypeRateLimit Type = "rate-limit"
// TypeLeaseCount represents the lease count limiting quota type
TypeLeaseCount Type = "lease-count"
)
// LeaseAction is the action taken by the expiration manager on the lease. The
// quota manager will use this information to update the lease path cache and
// updating counters for relevant quota rules.
type LeaseAction uint32
// String converts each lease action into its string equivalent value
func (la LeaseAction) String() string {
switch la {
case LeaseActionLoaded:
return "loaded"
case LeaseActionCreated:
return "created"
case LeaseActionDeleted:
return "deleted"
case LeaseActionAllow:
return "allow"
}
return "unknown"
}
const (
_ LeaseAction = iota
// LeaseActionLoaded indicates loading of lease in the expiration manager after
// unseal.
LeaseActionLoaded
// LeaseActionCreated indicates that a lease is created in the expiration manager.
LeaseActionCreated
// LeaseActionDeleted indicates that is lease is expired and deleted in the
// expiration manager.
LeaseActionDeleted
// LeaseActionAllow will be used to indicate the lease count checker that
// incCounter is called from Allow(). All the rest of the actions indicate the
// action took place on the lease in the expiration manager.
LeaseActionAllow
)
type leaseWalkFunc func(context.Context, func(request *Request) bool) error
// String converts each quota type into its string equivalent value
func (q Type) String() string {
switch q {
case TypeLeaseCount:
return "lease-count"
case TypeRateLimit:
return "rate-limit"
}
return "unknown"
}
const (
indexID = "id"
indexName = "name"
indexNamespace = "ns"
indexNamespaceMount = "ns_mount"
)
const (
// StoragePrefix is the prefix for the physical location where quota rules are
// persisted.
StoragePrefix = "quotas/"
// ConfigPath is the physical location where the quota configuration is
// persisted.
ConfigPath = StoragePrefix + "config"
)
var (
// ErrLeaseCountQuotaExceeded is returned when a request is rejected due to a lease
// count quota being exceeded.
ErrLeaseCountQuotaExceeded = errors.New("lease count quota exceeded")
// ErrRateLimitQuotaExceeded is returned when a request is rejected due to a
// rate limit quota being exceeded.
ErrRateLimitQuotaExceeded = errors.New("rate limit quota exceeded")
)
// Access provides information to reach back to the quota checker.
type Access interface {
// QuotaID is the identifier of the quota that issued this access.
QuotaID() string
}
// Ensure that access implements the Access interface.
var _ Access = (*access)(nil)
// access implements the Access interface
type access struct {
quotaID string
}
// QuotaID returns the identifier of the quota rule to which this access refers
// to.
func (a *access) QuotaID() string {
return a.quotaID
}
// Manager holds all the existing quota rules. For any given input. the manager
// checks them against any applicable quota rules.
type Manager struct {
entManager
// db holds the in memory instances of all active quota rules indexed by
// some of the quota properties.
db *memdb.MemDB
// config containing operator preferences and quota behaviors
config *Config
storage logical.Storage
ctx context.Context
logger log.Logger
metricSink *metricsutil.ClusterMetricSink
lock *sync.RWMutex
}
// Quota represents the common properties of every quota type
type Quota interface {
// allow checks the if the request is allowed by the quota type implementation.
allow(*Request) (Response, error)
// quotaID is the identifier of the quota rule
quotaID() string
// QuotaName is the name of the quota rule
QuotaName() string
// initialize sets up the fields in the quota type to begin operating
initialize(log.Logger, *metricsutil.ClusterMetricSink) error
// close defines any cleanup behavior that needs to be executed when a quota
// rule is deleted.
close() error
// handleRemount takes in the new mount path in the quota
handleRemount(string)
}
// Response holds information about the result of the Allow() call. The response
// can optionally have the Access field set, which is used to reach back into
// the quota rule that sent this response.
type Response struct {
// Allowed is set if the quota allows the request
Allowed bool
// Access is the handle to reach back into the quota rule that processed the
// quota request. This may not be set all the time.
Access Access
// Headers defines any optional headers that may be returned by the quota rule
// to clients.
Headers map[string]string
}
// Config holds operator preferences around quota behaviors
type Config struct {
// EnableRateLimitAuditLogging, if set, starts audit logging of the
// request rejections that arise due to rate limit quota violations.
EnableRateLimitAuditLogging bool `json:"enable_rate_limit_audit_logging"`
// EnableRateLimitResponseHeaders dictates if rate limit quota HTTP headers
// should be added to responses.
EnableRateLimitResponseHeaders bool `json:"enable_rate_limit_response_headers"`
}
// Request contains information required by the quota manager to query and
// apply the quota rules.
type Request struct {
// Type is the quota type
Type Type
// Path is the request path to which quota rules are being queried for
Path string
// NamespacePath is the namespace path to which the request belongs
NamespacePath string
// MountPath is the mount path to which the request is made
MountPath string
// ClientAddress is client unique addressable string (e.g. IP address). It can
// be empty if the quota type does not need it.
ClientAddress string
}
// NewManager creates and initializes a new quota manager to hold all the quota
// rules and to process incoming requests.
func NewManager(logger log.Logger, walkFunc leaseWalkFunc, ms *metricsutil.ClusterMetricSink) (*Manager, error) {
db, err := memdb.NewMemDB(dbSchema())
if err != nil {
return nil, err
}
manager := &Manager{
db: db,
logger: logger,
metricSink: ms,
config: new(Config),
lock: new(sync.RWMutex),
}
manager.init(walkFunc)
return manager, nil
}
// SetQuota adds a new quota rule to the db.
func (m *Manager) SetQuota(ctx context.Context, qType string, quota Quota, loading bool) error {
m.lock.Lock()
defer m.lock.Unlock()
return m.setQuotaLocked(ctx, qType, quota, loading)
}
// setQuotaLocked should be called with the manager's lock held
func (m *Manager) setQuotaLocked(ctx context.Context, qType string, quota Quota, loading bool) error {
if qType == TypeLeaseCount.String() {
m.setIsPerfStandby(quota)
}
txn := m.db.Txn(true)
defer txn.Abort()
raw, err := txn.First(qType, "id", quota.quotaID())
if err != nil {
return err
}
// If there already exists an entry in the db, remove that first.
if raw != nil {
err = txn.Delete(qType, raw)
if err != nil {
return err
}
}
// Initialize the quota type implementation
if err := quota.initialize(m.logger, m.metricSink); err != nil {
return err
}
// Add the initialized quota type implementation to the db
if err := txn.Insert(qType, quota); err != nil {
return err
}
if loading {
txn.Commit()
return nil
}
// For the lease count type, recompute the counters
if !loading && qType == TypeLeaseCount.String() {
if err := m.recomputeLeaseCounts(ctx, txn); err != nil {
return err
}
}
txn.Commit()
return nil
}
// QuotaNames returns the names of all the quota rules for a given type
func (m *Manager) QuotaNames(qType Type) ([]string, error) {
m.lock.RLock()
defer m.lock.RUnlock()
txn := m.db.Txn(false)
iter, err := txn.Get(qType.String(), indexID)
if err != nil {
return nil, err
}
var names []string
for raw := iter.Next(); raw != nil; raw = iter.Next() {
names = append(names, raw.(Quota).QuotaName())
}
return names, nil
}
// QuotaByID queries for a quota rule in the db for a given quota ID
func (m *Manager) QuotaByID(qType string, id string) (Quota, error) {
m.lock.RLock()
defer m.lock.RUnlock()
txn := m.db.Txn(false)
quotaRaw, err := txn.First(qType, indexID, id)
if err != nil {
return nil, err
}
if quotaRaw == nil {
return nil, nil
}
return quotaRaw.(Quota), nil
}
// QuotaByName queries for a quota rule in the db for a given quota name
func (m *Manager) QuotaByName(qType string, name string) (Quota, error) {
m.lock.RLock()
defer m.lock.RUnlock()
txn := m.db.Txn(false)
quotaRaw, err := txn.First(qType, indexName, name)
if err != nil {
return nil, err
}
if quotaRaw == nil {
return nil, nil
}
return quotaRaw.(Quota), nil
}
// QuotaByFactors returns the quota rule that matches the provided factors
func (m *Manager) QuotaByFactors(ctx context.Context, qType, nsPath, mountPath string) (Quota, error) {
m.lock.RLock()
defer m.lock.RUnlock()
// nsPath would have been made non-empty during insertion. Use non-empty value
// during query as well.
if nsPath == "" {
nsPath = "root"
}
idx := indexNamespace
args := []interface{}{nsPath, false}
if mountPath != "" {
idx = indexNamespaceMount
args = []interface{}{nsPath, mountPath}
}
txn := m.db.Txn(false)
iter, err := txn.Get(qType, idx, args...)
if err != nil {
return nil, err
}
var quotas []Quota
for raw := iter.Next(); raw != nil; raw = iter.Next() {
quotas = append(quotas, raw.(Quota))
}
if len(quotas) > 1 {
return nil, fmt.Errorf("conflicting quota definitions detected")
}
if len(quotas) == 0 {
return nil, nil
}
return quotas[0], nil
}
// QueryQuota returns the most specific applicable quota for a given request.
func (m *Manager) QueryQuota(req *Request) (Quota, error) {
m.lock.RLock()
defer m.lock.RUnlock()
return m.queryQuota(nil, req)
}
// queryQuota returns the quota rule that is applicable for the given request. It
// queries all the quota rules that are defined against request values and finds
// the quota rule that takes priority.
//
// Priority rules are as follows:
// - namespace specific quota takes precedence over global quota
// - mount specific quota takes precedence over namespace specific quota
func (m *Manager) queryQuota(txn *memdb.Txn, req *Request) (Quota, error) {
if txn == nil {
txn = m.db.Txn(false)
}
// ns would have been made non-empty during insertion. Use non-empty
// value during query as well.
if req.NamespacePath == "" {
req.NamespacePath = "root"
}
//
// Find a match from most specific applicable quota rule to less specific one.
//
quotaFetchFunc := func(idx string, args ...interface{}) (Quota, error) {
iter, err := txn.Get(req.Type.String(), idx, args...)
if err != nil {
return nil, err
}
var quotas []Quota
for raw := iter.Next(); raw != nil; raw = iter.Next() {
quota := raw.(Quota)
quotas = append(quotas, quota)
}
if len(quotas) > 1 {
return nil, fmt.Errorf("conflicting quota definitions detected")
}
if len(quotas) == 0 {
return nil, nil
}
return quotas[0], nil
}
// Fetch mount quota
quota, err := quotaFetchFunc(indexNamespaceMount, req.NamespacePath, req.MountPath)
if err != nil {
return nil, err
}
if quota != nil {
return quota, nil
}
// Fetch ns quota. If NamespacePath is root, this will return the global quota.
quota, err = quotaFetchFunc(indexNamespace, req.NamespacePath, false)
if err != nil {
return nil, err
}
if quota != nil {
return quota, nil
}
// If the request belongs to "root" namespace, then we have already looked at
// global quotas when fetching namespace specific quota rule. When the request
// belongs to a non-root namespace, and when there are no namespace specific
// quota rules present, we fallback on the global quotas.
if req.NamespacePath == "root" {
return nil, nil
}
// Fetch global quota
quota, err = quotaFetchFunc(indexNamespace, "root", false)
if err != nil {
return nil, err
}
if quota != nil {
return quota, nil
}
return nil, nil
}
// DeleteQuota removes a quota rule from the db for a given name
func (m *Manager) DeleteQuota(ctx context.Context, qType string, name string) error {
m.lock.Lock()
defer m.lock.Unlock()
txn := m.db.Txn(true)
defer txn.Abort()
raw, err := txn.First(qType, indexName, name)
if err != nil {
return err
}
if raw == nil {
return nil
}
quota := raw.(Quota)
if err := quota.close(); err != nil {
return err
}
err = txn.Delete(qType, raw)
if err != nil {
return err
}
// For the lease count type, recompute the counters
if qType == TypeLeaseCount.String() {
if err := m.recomputeLeaseCounts(ctx, txn); err != nil {
return err
}
}
txn.Commit()
return nil
}
// ApplyQuota runs the request against any quota rule that is applicable to it. If
// there are multiple quota rule that matches the request parameters, rule that
// takes precedence will be used to allow/reject the request.
func (m *Manager) ApplyQuota(req *Request) (Response, error) {
var resp Response
quota, err := m.QueryQuota(req)
if err != nil {
return resp, err
}
// If there is no quota defined, allow the request.
if quota == nil {
resp.Allowed = true
return resp, nil
}
// If the quota type is lease count, and if the path is not known to
// generate leases, allow the request.
if req.Type == TypeLeaseCount && !m.inLeasePathCache(req.Path) {
resp.Allowed = true
return resp, nil
}
return quota.allow(req)
}
// SetEnableRateLimitAuditLogging updates the operator preference regarding the
// audit logging behavior.
func (m *Manager) SetEnableRateLimitAuditLogging(val bool) {
m.config.EnableRateLimitAuditLogging = val
}
// SetEnableRateLimitResponseHeaders updates the operator preference regarding
// the rate limit quota HTTP header behavior.
func (m *Manager) SetEnableRateLimitResponseHeaders(val bool) {
m.config.EnableRateLimitResponseHeaders = val
}
// RateLimitAuditLoggingEnabled returns if the quota configuration allows audit
// logging of request rejections due to rate limiting quota rule violations.
func (m *Manager) RateLimitAuditLoggingEnabled() bool {
return m.config.EnableRateLimitAuditLogging
}
// RateLimitResponseHeadersEnabled returns if the quota configuration allows for
// rate limit quota HTTP headers to be added to responses.
func (m *Manager) RateLimitResponseHeadersEnabled() bool {
return m.config.EnableRateLimitResponseHeaders
}
// Config returns the operator preferences in the quota manager
func (m *Manager) Config() *Config {
return m.config
}
// Reset will clear all the quotas from the db and clear the lease path cache.
func (m *Manager) Reset() error {
m.lock.Lock()
defer m.lock.Unlock()
var err error
m.db, err = memdb.NewMemDB(dbSchema())
if err != nil {
return err
}
m.storage = nil
m.ctx = nil
return m.entManager.Reset()
}
// dbSchema creates a DB schema for holding all the quota rules. It creates a
// table for each supported type of quota.
func dbSchema() *memdb.DBSchema {
schema := &memdb.DBSchema{
Tables: make(map[string]*memdb.TableSchema),
}
commonSchema := func(name string) *memdb.TableSchema {
return &memdb.TableSchema{
Name: name,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "ID",
},
},
indexName: {
Name: indexName,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Name",
},
},
indexNamespace: {
Name: indexNamespace,
Indexer: &memdb.CompoundMultiIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "NamespacePath",
},
// By sending false as the query parameter, we can
// query just the namespace specific quota.
&memdb.FieldSetIndex{
Field: "MountPath",
},
},
},
},
indexNamespaceMount: {
Name: indexNamespaceMount,
AllowMissing: true,
Indexer: &memdb.CompoundMultiIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "NamespacePath",
},
&memdb.StringFieldIndex{
Field: "MountPath",
},
},
},
},
},
}
}
// Create a table per quota type. This allows names to be reused between
// different quota types and querying a bit easier.
for _, name := range quotaTypes() {
schema.Tables[name] = commonSchema(name)
}
return schema
}
// Invalidate receives notifications from the replication sub-system when a key
// is updated in the storage. This function will read the key from storage and
// updates the caches and data structures to reflect those updates.
func (m *Manager) Invalidate(key string) {
switch key {
case "config":
config, err := LoadConfig(m.ctx, m.storage)
if err != nil {
m.logger.Error("failed to invalidate quota config", "error", err)
return
}
m.SetEnableRateLimitAuditLogging(config.EnableRateLimitAuditLogging)
m.SetEnableRateLimitResponseHeaders(config.EnableRateLimitResponseHeaders)
default:
splitKeys := strings.Split(key, "/")
if len(splitKeys) != 2 {
m.logger.Error("incorrect key while invalidating quota rule")
return
}
qType := splitKeys[0]
name := splitKeys[1]
// Read quota rule from storage
quota, err := Load(m.ctx, m.storage, qType, name)
if err != nil {
m.logger.Error("failed to read invalidated quota rule", "error", err)
return
}
switch {
case quota == nil:
// Handle quota deletion
if err := m.DeleteQuota(m.ctx, qType, name); err != nil {
m.logger.Error("failed to delete invalidated quota rule", "error", err)
return
}
default:
// Handle quota update
if err := m.SetQuota(m.ctx, qType, quota, false); err != nil {
m.logger.Error("failed to update invalidated quota rule", "error", err)
return
}
}
}
}
// LoadConfig reads the quota configuration from the underlying storage
func LoadConfig(ctx context.Context, storage logical.Storage) (*Config, error) {
var config Config
entry, err := storage.Get(ctx, ConfigPath)
if err != nil {
return nil, err
}
if entry == nil {
return &config, nil
}
err = entry.DecodeJSON(&config)
if err != nil {
return nil, err
}
return &config, nil
}
// Load reads the quota rule from the underlying storage
func Load(ctx context.Context, storage logical.Storage, qType, name string) (Quota, error) {
var quota Quota
entry, err := storage.Get(ctx, QuotaStoragePath(qType, name))
if err != nil {
return nil, err
}
if entry == nil {
return nil, nil
}
switch qType {
case TypeRateLimit.String():
quota = &RateLimitQuota{}
case TypeLeaseCount.String():
quota = &LeaseCountQuota{}
default:
return nil, fmt.Errorf("unsupported type: %v", qType)
}
err = entry.DecodeJSON(quota)
if err != nil {
return nil, err
}
return quota, nil
}
// Setup loads the quota configuration and all the quota rules into the
// quota manager.
func (m *Manager) Setup(ctx context.Context, storage logical.Storage, isPerfStandby bool) error {
m.lock.Lock()
defer m.lock.Unlock()
m.storage = storage
m.ctx = ctx
m.isPerfStandby = isPerfStandby
// Load the quota configuration from storage and load it into the quota
// manager.
config, err := LoadConfig(ctx, storage)
if err != nil {
return err
}
m.SetEnableRateLimitAuditLogging(config.EnableRateLimitAuditLogging)
// Load the quota rules for all supported types from storage and load it in
// the quota manager.
for _, qType := range quotaTypes() {
names, err := logical.CollectKeys(ctx, logical.NewStorageView(storage, StoragePrefix+qType+"/"))
if err != nil {
return nil
}
for _, name := range names {
quota, err := Load(ctx, m.storage, qType, name)
if err != nil {
return err
}
if quota == nil {
continue
}
err = m.setQuotaLocked(ctx, qType, quota, true)
if err != nil {
return err
}
}
}
return nil
}
// QuotaStoragePath returns the storage path suffix for persisting the quota
// rule.
func QuotaStoragePath(quotaType, name string) string {
return path.Join(StoragePrefix+quotaType, name)
}
// HandleRemount updates the quota subsystem about the remount operation that
// took place. Quota manager will trigger the quota specific updates including
// the mount path update..
func (m *Manager) HandleRemount(ctx context.Context, nsPath, fromPath, toPath string) error {
m.lock.Lock()
defer m.lock.Unlock()
txn := m.db.Txn(true)
defer txn.Abort()
// nsPath would have been made non-empty during insertion. Use non-empty value
// during query as well.
if nsPath == "" {
nsPath = "root"
}
idx := indexNamespaceMount
leaseQuotaUpdated := false
args := []interface{}{nsPath, fromPath}
for _, quotaType := range quotaTypes() {
iter, err := txn.Get(quotaType, idx, args...)
if err != nil {
return err
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
quota := raw.(Quota)
quota.handleRemount(toPath)
entry, err := logical.StorageEntryJSON(QuotaStoragePath(quotaType, quota.QuotaName()), quota)
if err != nil {
return err
}
if err := m.storage.Put(ctx, entry); err != nil {
return err
}
if quotaType == TypeLeaseCount.String() {
leaseQuotaUpdated = true
}
}
}
if leaseQuotaUpdated {
if err := m.recomputeLeaseCounts(ctx, txn); err != nil {
return err
}
}
txn.Commit()
return nil
}
// HandleBackendDisabling updates the quota subsystem with the disabling of auth
// or secret engine disabling.
func (m *Manager) HandleBackendDisabling(ctx context.Context, nsPath, mountPath string) error {
m.lock.Lock()
defer m.lock.Unlock()
txn := m.db.Txn(true)
defer txn.Abort()
// nsPath would have been made non-empty during insertion. Use non-empty value
// during query as well.
if nsPath == "" {
nsPath = "root"
}
idx := indexNamespaceMount
leaseQuotaDeleted := false
args := []interface{}{nsPath, mountPath}
for _, quotaType := range quotaTypes() {
iter, err := txn.Get(quotaType, idx, args...)
if err != nil {
return err
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
if err := txn.Delete(quotaType, raw); err != nil {
return fmt.Errorf("failed to delete quota from db after mount disabling; namespace %q, err %v", nsPath, err)
}
quota := raw.(Quota)
if err := m.storage.Delete(ctx, QuotaStoragePath(quotaType, quota.QuotaName())); err != nil {
return fmt.Errorf("failed to delete quota from storage after mount disabling; namespace %q, err %v", nsPath, err)
}
if quotaType == TypeLeaseCount.String() {
leaseQuotaDeleted = true
}
}
}
if leaseQuotaDeleted {
if err := m.recomputeLeaseCounts(ctx, txn); err != nil {
return err
}
}
txn.Commit()
return nil
}