open-nomad/nomad/state/state_store_variables.go

500 lines
15 KiB
Go

package state
import (
"fmt"
"math"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
// Variables queries all the variables and is used only for
// snapshot/restore and key rotation
func (s *StateStore) Variables(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
iter, err := txn.Get(TableVariables, indexID)
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}
// GetVariablesByNamespace returns an iterator that contains all
// variables belonging to the provided namespace.
func (s *StateStore) GetVariablesByNamespace(
ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
return s.getVariablesByNamespaceImpl(txn, ws, namespace)
}
func (s *StateStore) getVariablesByNamespaceImpl(
txn *txn, ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
// Walk the entire table.
iter, err := txn.Get(TableVariables, indexID+"_prefix", namespace, "")
if err != nil {
return nil, fmt.Errorf("variable lookup failed: %v", err)
}
ws.Add(iter.WatchCh())
return iter, nil
}
// GetVariablesByNamespaceAndPrefix returns an iterator that contains all
// variables belonging to the provided namespace that match the prefix.
func (s *StateStore) GetVariablesByNamespaceAndPrefix(
ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
// Walk the entire table.
iter, err := txn.Get(TableVariables, indexID+"_prefix", namespace, prefix)
if err != nil {
return nil, fmt.Errorf("variable lookup failed: %v", err)
}
ws.Add(iter.WatchCh())
return iter, nil
}
// GetVariablesByPrefix returns an iterator that contains all variables that
// match the prefix in any namespace. Namespace filtering is the responsibility
// of the caller.
func (s *StateStore) GetVariablesByPrefix(
ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
// Walk the entire table.
iter, err := txn.Get(TableVariables, indexPath+"_prefix", prefix)
if err != nil {
return nil, fmt.Errorf("variable lookup failed: %v", err)
}
ws.Add(iter.WatchCh())
return iter, nil
}
// GetVariablesByKeyID returns an iterator that contains all
// variables that were encrypted with a particular key
func (s *StateStore) GetVariablesByKeyID(
ws memdb.WatchSet, keyID string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
iter, err := txn.Get(TableVariables, indexKeyID, keyID)
if err != nil {
return nil, fmt.Errorf("variable lookup failed: %v", err)
}
ws.Add(iter.WatchCh())
return iter, nil
}
// GetVariable returns a single variable at a given namespace and
// path.
func (s *StateStore) GetVariable(
ws memdb.WatchSet, namespace, path string) (*structs.VariableEncrypted, error) {
txn := s.db.ReadTxn()
// Try to fetch the variable.
watchCh, raw, err := txn.FirstWatch(TableVariables, indexID, namespace, path)
if err != nil { // error during fetch
return nil, fmt.Errorf("variable lookup failed: %v", err)
}
ws.Add(watchCh)
if raw == nil { // not found
return nil, nil
}
sv := raw.(*structs.VariableEncrypted)
return sv, nil
}
// VarSet is used to store a variable object.
func (s *StateStore) VarSet(idx uint64, sv *structs.VarApplyStateRequest) *structs.VarApplyStateResponse {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
// Perform the actual set.
resp := s.varSetTxn(tx, idx, sv)
if resp.IsError() {
return resp
}
if err := tx.Commit(); err != nil {
return sv.ErrorResponse(idx, err)
}
return resp
}
// VarSetCAS is used to do a check-and-set operation on a
// variable. The ModifyIndex in the provided entry is used to determine if
// we should write the entry to the state store or not.
func (s *StateStore) VarSetCAS(idx uint64, sv *structs.VarApplyStateRequest) *structs.VarApplyStateResponse {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
resp := s.varSetCASTxn(tx, idx, sv)
if resp.IsError() || resp.IsConflict() {
return resp
}
if err := tx.Commit(); err != nil {
return sv.ErrorResponse(idx, err)
}
return resp
}
// varSetCASTxn is the inner method used to do a CAS inside an existing
// transaction.
func (s *StateStore) varSetCASTxn(tx WriteTxn, idx uint64, req *structs.VarApplyStateRequest) *structs.VarApplyStateResponse {
sv := req.Var
raw, err := tx.First(TableVariables, indexID, sv.Namespace, sv.Path)
if err != nil {
return req.ErrorResponse(idx, fmt.Errorf("failed variable lookup: %s", err))
}
svEx, ok := raw.(*structs.VariableEncrypted)
// ModifyIndex of 0 means that we are doing a set-if-not-exists.
if sv.ModifyIndex == 0 && raw != nil {
return req.ConflictResponse(idx, svEx)
}
// If the ModifyIndex is set but the variable doesn't exist, return a
// plausible zero value as the conflict
if sv.ModifyIndex != 0 && raw == nil {
zeroVal := &structs.VariableEncrypted{
VariableMetadata: structs.VariableMetadata{
Namespace: sv.Namespace,
Path: sv.Path,
},
}
return req.ConflictResponse(idx, zeroVal)
}
// If the existing index does not match the provided CAS index arg, then we
// shouldn't update anything and can safely return early here.
if ok && sv.ModifyIndex != svEx.ModifyIndex {
return req.ConflictResponse(idx, svEx)
}
// If we made it this far, we should perform the set.
return s.varSetTxn(tx, idx, req)
}
// varSetTxn is used to insert or update a variable in the state
// store. It is the inner method used and handles only the actual storage.
func (s *StateStore) varSetTxn(tx WriteTxn, idx uint64, req *structs.VarApplyStateRequest) *structs.VarApplyStateResponse {
sv := req.Var
existingRaw, err := tx.First(TableVariables, indexID, sv.Namespace, sv.Path)
if err != nil {
return req.ErrorResponse(idx, fmt.Errorf("failed sve lookup: %s", err))
}
existing, _ := existingRaw.(*structs.VariableEncrypted)
existingQuota, err := tx.First(TableVariablesQuotas, indexID, sv.Namespace)
if err != nil {
return req.ErrorResponse(idx, fmt.Errorf("variable quota lookup failed: %v", err))
}
var quotaChange int64
// Set the CreateIndex and CreateTime
if existing != nil {
sv.CreateIndex = existing.CreateIndex
sv.CreateTime = existing.CreateTime
if existing.Equals(*sv) {
// Skip further writing in the state store if the entry is not actually
// changed. Nevertheless, the input's ModifyIndex should be reset
// since the TXN API returns a copy in the response.
sv.ModifyIndex = existing.ModifyIndex
sv.ModifyTime = existing.ModifyTime
return req.SuccessResponse(idx, nil)
}
sv.ModifyIndex = idx
quotaChange = int64(len(sv.Data) - len(existing.Data))
} else {
sv.CreateIndex = idx
sv.ModifyIndex = idx
quotaChange = int64(len(sv.Data))
}
if err := tx.Insert(TableVariables, sv); err != nil {
return req.ErrorResponse(idx, fmt.Errorf("failed inserting variable: %s", err))
}
// Track quota usage
var quotaUsed *structs.VariablesQuota
if existingQuota != nil {
quotaUsed = existingQuota.(*structs.VariablesQuota)
quotaUsed = quotaUsed.Copy()
} else {
quotaUsed = &structs.VariablesQuota{
Namespace: sv.Namespace,
CreateIndex: idx,
}
}
if quotaChange > math.MaxInt64-quotaUsed.Size {
// this limit is actually shared across all namespaces in the region's
// quota (if there is one), but we need this check here to prevent
// overflow as well
return req.ErrorResponse(idx, fmt.Errorf("variables can store a maximum of %d bytes of encrypted data per namespace", math.MaxInt))
}
if quotaChange > 0 {
quotaUsed.Size += quotaChange
} else if quotaChange < 0 {
quotaUsed.Size -= helper.Min(quotaUsed.Size, -quotaChange)
}
err = s.enforceVariablesQuota(idx, tx, sv.Namespace, quotaChange)
if err != nil {
return req.ErrorResponse(idx, err)
}
// we check enforcement above even if there's no change because another
// namespace may have used up quota to make this no longer valid, but we
// only update the table if this namespace has changed
if quotaChange != 0 {
quotaUsed.ModifyIndex = idx
if err := tx.Insert(TableVariablesQuotas, quotaUsed); err != nil {
return req.ErrorResponse(idx, fmt.Errorf("variable quota insert failed: %v", err))
}
}
if err := tx.Insert(tableIndex,
&IndexEntry{TableVariables, idx}); err != nil {
return req.ErrorResponse(idx, fmt.Errorf("failed updating variable index: %s", err))
}
return req.SuccessResponse(idx, &sv.VariableMetadata)
}
// VarGet is used to retrieve a key/value pair from the state store.
func (s *StateStore) VarGet(ws memdb.WatchSet, namespace, path string) (uint64, *structs.VariableEncrypted, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
return svGetTxn(tx, ws, namespace, path)
}
// svGetTxn is the inner method that gets a variable inside an existing
// transaction.
func svGetTxn(tx ReadTxn,
ws memdb.WatchSet, namespace, path string) (uint64, *structs.VariableEncrypted, error) {
// Get the table index.
idx := svMaxIndex(tx)
watchCh, entry, err := tx.FirstWatch(TableVariables, indexID, namespace, path)
if err != nil {
return 0, nil, fmt.Errorf("failed variable lookup: %s", err)
}
ws.Add(watchCh)
if entry != nil {
return idx, entry.(*structs.VariableEncrypted), nil
}
return idx, nil, nil
}
// VarDelete is used to delete a single variable in the
// the state store.
func (s *StateStore) VarDelete(idx uint64, req *structs.VarApplyStateRequest) *structs.VarApplyStateResponse {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
// Perform the actual delete
resp := s.svDeleteTxn(tx, idx, req)
if !resp.IsOk() {
return resp
}
err := tx.Commit()
if err != nil {
return req.ErrorResponse(idx, err)
}
return resp
}
// VarDeleteCAS is used to conditionally delete a variable if and only if it has
// a given modify index. If the CAS index (cidx) specified is not equal to the
// last observed index for the given variable, then the call is a noop,
// otherwise a normal delete is invoked.
func (s *StateStore) VarDeleteCAS(idx uint64, req *structs.VarApplyStateRequest) *structs.VarApplyStateResponse {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
resp := s.svDeleteCASTxn(tx, idx, req)
if !resp.IsOk() {
return resp
}
err := tx.Commit()
if err != nil {
return req.ErrorResponse(idx, err)
}
return resp
}
// svDeleteCASTxn is an inner method used to check the existing value
// of a variable within an existing transaction as part of a
// conditional delete.
func (s *StateStore) svDeleteCASTxn(tx WriteTxn, idx uint64, req *structs.VarApplyStateRequest) *structs.VarApplyStateResponse {
sv := req.Var
raw, err := tx.First(TableVariables, indexID, sv.Namespace, sv.Path)
if err != nil {
return req.ErrorResponse(idx, fmt.Errorf("failed variable lookup: %s", err))
}
svEx, ok := raw.(*structs.VariableEncrypted)
// ModifyIndex of 0 means that we are doing a delete-if-not-exists.
if sv.ModifyIndex == 0 && raw != nil {
return req.ConflictResponse(idx, svEx)
}
// If the ModifyIndex is set but the variable doesn't exist, return a
// plausible zero value as the conflict
if sv.ModifyIndex != 0 && raw == nil {
zeroVal := &structs.VariableEncrypted{
VariableMetadata: structs.VariableMetadata{
Namespace: sv.Namespace,
Path: sv.Path,
},
}
return req.ConflictResponse(idx, zeroVal)
}
// If the existing index does not match the provided CAS index arg, then we
// shouldn't update anything and can safely return early here.
if !ok || sv.ModifyIndex != svEx.ModifyIndex {
return req.ConflictResponse(idx, svEx)
}
// Call the actual deletion if the above passed.
return s.svDeleteTxn(tx, idx, req)
}
// svDeleteTxn is the inner method used to perform the actual deletion
// of a variable within an existing transaction.
func (s *StateStore) svDeleteTxn(tx WriteTxn, idx uint64, req *structs.VarApplyStateRequest) *structs.VarApplyStateResponse {
// Look up the entry in the state store.
existingRaw, err := tx.First(TableVariables, indexID, req.Var.Namespace, req.Var.Path)
if err != nil {
return req.ErrorResponse(idx, fmt.Errorf("failed variable lookup: %s", err))
}
if existingRaw == nil {
return req.SuccessResponse(idx, nil)
}
existingQuota, err := tx.First(TableVariablesQuotas, indexID, req.Var.Namespace)
if err != nil {
return req.ErrorResponse(idx, fmt.Errorf("variable quota lookup failed: %v", err))
}
sv := existingRaw.(*structs.VariableEncrypted)
// Track quota usage
if existingQuota != nil {
quotaUsed := existingQuota.(*structs.VariablesQuota)
quotaUsed = quotaUsed.Copy()
quotaUsed.Size -= helper.Min(quotaUsed.Size, int64(len(sv.Data)))
quotaUsed.ModifyIndex = idx
if err := tx.Insert(TableVariablesQuotas, quotaUsed); err != nil {
return req.ErrorResponse(idx, fmt.Errorf("variable quota insert failed: %v", err))
}
}
// Delete the variable and update the index table.
if err := tx.Delete(TableVariables, sv); err != nil {
return req.ErrorResponse(idx, fmt.Errorf("failed deleting variable entry: %s", err))
}
if err := tx.Insert(tableIndex, &IndexEntry{TableVariables, idx}); err != nil {
return req.ErrorResponse(idx, fmt.Errorf("failed updating variable index: %s", err))
}
return req.SuccessResponse(idx, nil)
}
// This extra indirection is to facilitate the tombstone case if it matters.
func svMaxIndex(tx ReadTxn) uint64 {
return maxIndexTxn(tx, TableVariables)
}
// WriteTxn is implemented by memdb.Txn to perform write operations.
type WriteTxn interface {
ReadTxn
Defer(func())
Delete(table string, obj interface{}) error
DeleteAll(table, index string, args ...interface{}) (int, error)
DeletePrefix(table string, index string, prefix string) (bool, error)
Insert(table string, obj interface{}) error
}
// maxIndex is a helper used to retrieve the highest known index
// amongst a set of tables in the db.
func (s *StateStore) maxIndex(tables ...string) uint64 {
tx := s.db.ReadTxn()
defer tx.Abort()
return maxIndexTxn(tx, tables...)
}
// maxIndexTxn is a helper used to retrieve the highest known index
// amongst a set of tables in the db.
func maxIndexTxn(tx ReadTxn, tables ...string) uint64 {
return maxIndexWatchTxn(tx, nil, tables...)
}
func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
var lindex uint64
for _, table := range tables {
ch, ti, err := tx.FirstWatch(tableIndex, "id", table)
if err != nil {
panic(fmt.Sprintf("unknown index: %s err: %s", table, err))
}
if idx, ok := ti.(*IndexEntry); ok && idx.Value > lindex {
lindex = idx.Value
}
ws.Add(ch)
}
return lindex
}
// VariablesQuotas queries all the quotas and is used only for
// snapshot/restore and key rotation
func (s *StateStore) VariablesQuotas(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
iter, err := txn.Get(TableVariablesQuotas, indexID)
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}
// VariablesQuotaByNamespace queries for quotas for a particular namespace
func (s *StateStore) VariablesQuotaByNamespace(ws memdb.WatchSet, namespace string) (*structs.VariablesQuota, error) {
txn := s.db.ReadTxn()
watchCh, raw, err := txn.FirstWatch(TableVariablesQuotas, indexID, namespace)
if err != nil {
return nil, fmt.Errorf("variable quota lookup failed: %v", err)
}
ws.Add(watchCh)
if raw == nil {
return nil, nil
}
quotaUsed := raw.(*structs.VariablesQuota)
return quotaUsed, nil
}