open-consul/agent/consul/state/txn.go

426 lines
12 KiB
Go
Raw Normal View History

package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
// txnKVS handles all KV-related operations.
func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
var entry *structs.DirEntry
var err error
switch op.Verb {
case api.KVSet:
entry = &op.DirEnt
err = kvsSetTxn(tx, idx, entry, false)
case api.KVDelete:
err = s.kvsDeleteTxn(tx, idx, op.DirEnt.Key, &op.DirEnt.EnterpriseMeta)
case api.KVDeleteCAS:
var ok bool
ok, err = s.kvsDeleteCASTxn(tx, idx, op.DirEnt.ModifyIndex, op.DirEnt.Key, &op.DirEnt.EnterpriseMeta)
if !ok && err == nil {
err = fmt.Errorf("failed to delete key %q, index is stale", op.DirEnt.Key)
}
case api.KVDeleteTree:
err = s.kvsDeleteTreeTxn(tx, idx, op.DirEnt.Key, &op.DirEnt.EnterpriseMeta)
case api.KVCAS:
var ok bool
entry = &op.DirEnt
ok, err = kvsSetCASTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to set key %q, index is stale", op.DirEnt.Key)
}
case api.KVLock:
var ok bool
entry = &op.DirEnt
ok, err = kvsLockTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to lock key %q, lock is already held", op.DirEnt.Key)
}
case api.KVUnlock:
var ok bool
entry = &op.DirEnt
ok, err = kvsUnlockTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to unlock key %q, lock isn't held, or is held by another session", op.DirEnt.Key)
}
case api.KVGet:
_, entry, err = kvsGetTxn(tx, nil, op.DirEnt.Key, op.DirEnt.EnterpriseMeta)
if entry == nil && err == nil {
err = fmt.Errorf("key %q doesn't exist", op.DirEnt.Key)
}
case api.KVGetTree:
var entries structs.DirEntries
_, entries, err = s.kvsListTxn(tx, nil, op.DirEnt.Key, op.DirEnt.EnterpriseMeta)
if err == nil {
results := make(structs.TxnResults, 0, len(entries))
for _, e := range entries {
result := structs.TxnResult{KV: e}
results = append(results, &result)
}
return results, nil
}
case api.KVCheckSession:
entry, err = kvsCheckSessionTxn(tx, op.DirEnt.Key, op.DirEnt.Session, &op.DirEnt.EnterpriseMeta)
case api.KVCheckIndex:
entry, err = kvsCheckIndexTxn(tx, op.DirEnt.Key, op.DirEnt.ModifyIndex, op.DirEnt.EnterpriseMeta)
case api.KVCheckNotExists:
_, entry, err = kvsGetTxn(tx, nil, op.DirEnt.Key, op.DirEnt.EnterpriseMeta)
if entry != nil && err == nil {
err = fmt.Errorf("key %q exists", op.DirEnt.Key)
}
default:
err = fmt.Errorf("unknown KV verb %q", op.Verb)
}
if err != nil {
return nil, err
}
// For a GET we keep the value, otherwise we clone and blank out the
// value (we have to clone so we don't modify the entry being used by
// the state store).
if entry != nil {
if op.Verb == api.KVGet {
result := structs.TxnResult{KV: entry}
return structs.TxnResults{&result}, nil
}
clone := entry.Clone()
clone.Value = nil
result := structs.TxnResult{KV: clone}
return structs.TxnResults{&result}, nil
}
return nil, nil
}
// txnSession handles all Session-related operations.
func txnSession(tx WriteTxn, idx uint64, op *structs.TxnSessionOp) error {
var err error
switch op.Verb {
case api.SessionDelete:
err = sessionDeleteWithSession(tx, &op.Session, idx)
default:
err = fmt.Errorf("unknown Session verb %q", op.Verb)
}
if err != nil {
return fmt.Errorf("failed to delete session: %v", err)
}
return nil
}
connect: intentions are now managed as a new config entry kind "service-intentions" (#8834) - Upgrade the ConfigEntry.ListAll RPC to be kind-aware so that older copies of consul will not see new config entries it doesn't understand replicate down. - Add shim conversion code so that the old API/CLI method of interacting with intentions will continue to work so long as none of these are edited via config entry endpoints. Almost all of the read-only APIs will continue to function indefinitely. - Add new APIs that operate on individual intentions without IDs so that the UI doesn't need to implement CAS operations. - Add a new serf feature flag indicating support for intentions-as-config-entries. - The old line-item intentions way of interacting with the state store will transparently flip between the legacy memdb table and the config entry representations so that readers will never see a hiccup during migration where the results are incomplete. It uses a piece of system metadata to control the flip. - The primary datacenter will begin migrating intentions into config entries on startup once all servers in the datacenter are on a version of Consul with the intentions-as-config-entries feature flag. When it is complete the old state store representations will be cleared. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up. - The secondary datacenters continue to run the old intentions replicator until all servers in the secondary DC and primary DC support intentions-as-config-entries (via serf flag). Once this condition it met the old intentions replicator ceases. - The secondary datacenters replicate the new config entries as they are migrated in the primary. When they detect that the primary has zeroed it's old state store table it waits until all config entries up to that point are replicated and then zeroes its own copy of the old state store table. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up.
2020-10-06 18:24:05 +00:00
// txnLegacyIntention handles all Intention-related operations.
//
// Deprecated: see TxnOp.Intention description
func txnLegacyIntention(tx WriteTxn, idx uint64, op *structs.TxnIntentionOp) error {
switch op.Op {
case structs.IntentionOpCreate, structs.IntentionOpUpdate:
connect: intentions are now managed as a new config entry kind "service-intentions" (#8834) - Upgrade the ConfigEntry.ListAll RPC to be kind-aware so that older copies of consul will not see new config entries it doesn't understand replicate down. - Add shim conversion code so that the old API/CLI method of interacting with intentions will continue to work so long as none of these are edited via config entry endpoints. Almost all of the read-only APIs will continue to function indefinitely. - Add new APIs that operate on individual intentions without IDs so that the UI doesn't need to implement CAS operations. - Add a new serf feature flag indicating support for intentions-as-config-entries. - The old line-item intentions way of interacting with the state store will transparently flip between the legacy memdb table and the config entry representations so that readers will never see a hiccup during migration where the results are incomplete. It uses a piece of system metadata to control the flip. - The primary datacenter will begin migrating intentions into config entries on startup once all servers in the datacenter are on a version of Consul with the intentions-as-config-entries feature flag. When it is complete the old state store representations will be cleared. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up. - The secondary datacenters continue to run the old intentions replicator until all servers in the secondary DC and primary DC support intentions-as-config-entries (via serf flag). Once this condition it met the old intentions replicator ceases. - The secondary datacenters replicate the new config entries as they are migrated in the primary. When they detect that the primary has zeroed it's old state store table it waits until all config entries up to that point are replicated and then zeroes its own copy of the old state store table. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up.
2020-10-06 18:24:05 +00:00
return legacyIntentionSetTxn(tx, idx, op.Intention)
case structs.IntentionOpDelete:
connect: intentions are now managed as a new config entry kind "service-intentions" (#8834) - Upgrade the ConfigEntry.ListAll RPC to be kind-aware so that older copies of consul will not see new config entries it doesn't understand replicate down. - Add shim conversion code so that the old API/CLI method of interacting with intentions will continue to work so long as none of these are edited via config entry endpoints. Almost all of the read-only APIs will continue to function indefinitely. - Add new APIs that operate on individual intentions without IDs so that the UI doesn't need to implement CAS operations. - Add a new serf feature flag indicating support for intentions-as-config-entries. - The old line-item intentions way of interacting with the state store will transparently flip between the legacy memdb table and the config entry representations so that readers will never see a hiccup during migration where the results are incomplete. It uses a piece of system metadata to control the flip. - The primary datacenter will begin migrating intentions into config entries on startup once all servers in the datacenter are on a version of Consul with the intentions-as-config-entries feature flag. When it is complete the old state store representations will be cleared. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up. - The secondary datacenters continue to run the old intentions replicator until all servers in the secondary DC and primary DC support intentions-as-config-entries (via serf flag). Once this condition it met the old intentions replicator ceases. - The secondary datacenters replicate the new config entries as they are migrated in the primary. When they detect that the primary has zeroed it's old state store table it waits until all config entries up to that point are replicated and then zeroes its own copy of the old state store table. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up.
2020-10-06 18:24:05 +00:00
return legacyIntentionDeleteTxn(tx, idx, op.Intention.ID)
case structs.IntentionOpDeleteAll:
fallthrough // deliberately not available via this api
case structs.IntentionOpUpsert:
fallthrough // deliberately not available via this api
default:
return fmt.Errorf("unknown Intention op %q", op.Op)
}
}
2018-12-03 07:51:18 +00:00
// txnNode handles all Node-related operations.
func (s *Store) txnNode(tx WriteTxn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) {
2018-12-03 07:51:18 +00:00
var entry *structs.Node
var err error
// TODO(partitions): change these errors to include node partitions when printing
getNode := func() (*structs.Node, error) {
if op.Node.ID != "" {
return getNodeIDTxn(tx, op.Node.ID, op.Node.GetEnterpriseMeta(), op.Node.PeerName)
} else {
return getNodeTxn(tx, op.Node.Node, op.Node.GetEnterpriseMeta(), op.Node.PeerName)
}
}
switch op.Verb {
case api.NodeGet:
entry, err = getNode()
if entry == nil && err == nil {
err = fmt.Errorf("node %q doesn't exist", op.Node.Node)
}
2018-12-03 07:51:18 +00:00
case api.NodeSet:
err = s.ensureNodeTxn(tx, idx, false, &op.Node)
2018-12-12 13:22:25 +00:00
if err == nil {
entry, err = getNode()
2018-12-12 13:22:25 +00:00
}
2018-12-03 07:51:18 +00:00
case api.NodeCAS:
var ok bool
ok, err = s.ensureNodeCASTxn(tx, idx, &op.Node)
if !ok && err == nil {
err = fmt.Errorf("failed to set node %q, index is stale", op.Node.Node)
2018-12-12 13:22:25 +00:00
break
2018-12-03 07:51:18 +00:00
}
entry, err = getNode()
2018-12-03 07:51:18 +00:00
case api.NodeDelete:
err = s.deleteNodeTxn(tx, idx, op.Node.Node, op.Node.GetEnterpriseMeta(), op.Node.PeerName)
2018-12-03 07:51:18 +00:00
case api.NodeDeleteCAS:
var ok bool
ok, err = s.deleteNodeCASTxn(tx, idx, op.Node.ModifyIndex, op.Node.Node, op.Node.GetEnterpriseMeta(), op.Node.PeerName)
2018-12-03 07:51:18 +00:00
if !ok && err == nil {
err = fmt.Errorf("failed to delete node %q, index is stale", op.Node.Node)
}
default:
err = fmt.Errorf("unknown Node verb %q", op.Verb)
}
if err != nil {
return nil, err
}
// For a GET we keep the value, otherwise we clone and blank out the
// value (we have to clone so we don't modify the entry being used by
// the state store).
if entry != nil {
if op.Verb == api.NodeGet {
result := structs.TxnResult{Node: entry}
return structs.TxnResults{&result}, nil
}
clone := *entry
result := structs.TxnResult{Node: &clone}
return structs.TxnResults{&result}, nil
}
return nil, nil
}
2018-12-03 08:41:24 +00:00
// txnService handles all Service-related operations.
func (s *Store) txnService(tx WriteTxn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) {
2018-12-03 08:41:24 +00:00
switch op.Verb {
case api.ServiceGet:
entry, err := getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta, op.Service.PeerName)
switch {
case err != nil:
return nil, err
case entry == nil:
return nil, fmt.Errorf("service %q on node %q doesn't exist", op.Service.ID, op.Node)
default:
return structs.TxnResults{&structs.TxnResult{Service: entry}}, nil
}
2018-12-03 08:41:24 +00:00
case api.ServiceSet:
if err := ensureServiceTxn(tx, idx, op.Node, false, &op.Service); err != nil {
return nil, err
}
entry, err := getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta, op.Service.PeerName)
return newTxnResultFromNodeServiceEntry(entry), err
2018-12-03 08:41:24 +00:00
case api.ServiceCAS:
err := ensureServiceCASTxn(tx, idx, op.Node, &op.Service)
switch {
case err == errCASCompareFailed:
err := fmt.Errorf("failed to set service %q on node %q, index is stale", op.Service.ID, op.Node)
return nil, err
case err != nil:
return nil, err
2018-12-03 08:41:24 +00:00
}
entry, err := getNodeServiceTxn(tx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta, op.Service.PeerName)
return newTxnResultFromNodeServiceEntry(entry), err
2018-12-03 08:41:24 +00:00
case api.ServiceDelete:
err := s.deleteServiceTxn(tx, idx, op.Node, op.Service.ID, &op.Service.EnterpriseMeta, op.Service.PeerName)
return nil, err
2018-12-03 08:41:24 +00:00
case api.ServiceDeleteCAS:
ok, err := s.deleteServiceCASTxn(tx, idx, op.Service.ModifyIndex, op.Node, op.Service.ID, &op.Service.EnterpriseMeta, op.Service.PeerName)
2018-12-03 08:41:24 +00:00
if !ok && err == nil {
return nil, fmt.Errorf("failed to delete service %q on node %q, index is stale", op.Service.ID, op.Node)
2018-12-03 08:41:24 +00:00
}
return nil, err
2018-12-03 08:41:24 +00:00
default:
return nil, fmt.Errorf("unknown Service verb %q", op.Verb)
2018-12-03 08:41:24 +00:00
}
}
2018-12-03 08:41:24 +00:00
// newTxnResultFromNodeServiceEntry returns a TxnResults with a single result,
// a copy of entry. The entry is copied to prevent modification of the state
// store.
func newTxnResultFromNodeServiceEntry(entry *structs.NodeService) structs.TxnResults {
if entry == nil {
return nil
2018-12-03 08:41:24 +00:00
}
clone := *entry
result := structs.TxnResult{Service: &clone}
return structs.TxnResults{&result}
2018-12-03 08:41:24 +00:00
}
// txnCheck handles all Check-related operations.
func (s *Store) txnCheck(tx WriteTxn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) {
var entry *structs.HealthCheck
var err error
switch op.Verb {
case api.CheckGet:
_, entry, err = getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID, &op.Check.EnterpriseMeta, op.Check.PeerName)
if entry == nil && err == nil {
err = fmt.Errorf("check %q on node %q doesn't exist", op.Check.CheckID, op.Check.Node)
}
case api.CheckSet:
err = s.ensureCheckTxn(tx, idx, false, &op.Check)
2018-12-12 13:22:25 +00:00
if err == nil {
_, entry, err = getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID, &op.Check.EnterpriseMeta, op.Check.PeerName)
2018-12-12 13:22:25 +00:00
}
case api.CheckCAS:
var ok bool
entry = &op.Check
ok, err = s.ensureCheckCASTxn(tx, idx, entry)
if !ok && err == nil {
err = fmt.Errorf("failed to set check %q on node %q, index is stale", entry.CheckID, entry.Node)
2018-12-12 13:22:25 +00:00
break
}
_, entry, err = getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID, &op.Check.EnterpriseMeta, op.Check.PeerName)
case api.CheckDelete:
err = s.deleteCheckTxn(tx, idx, op.Check.Node, op.Check.CheckID, &op.Check.EnterpriseMeta, op.Check.PeerName)
case api.CheckDeleteCAS:
var ok bool
ok, err = s.deleteCheckCASTxn(tx, idx, op.Check.ModifyIndex, op.Check.Node, op.Check.CheckID, &op.Check.EnterpriseMeta, op.Check.PeerName)
if !ok && err == nil {
err = fmt.Errorf("failed to delete check %q on node %q, index is stale", op.Check.CheckID, op.Check.Node)
}
default:
err = fmt.Errorf("unknown Check verb %q", op.Verb)
}
if err != nil {
return nil, err
}
// For a GET we keep the value, otherwise we clone and blank out the
// value (we have to clone so we don't modify the entry being used by
// the state store).
if entry != nil {
if op.Verb == api.CheckGet {
result := structs.TxnResult{Check: entry}
return structs.TxnResults{&result}, nil
}
clone := entry.Clone()
result := structs.TxnResult{Check: clone}
return structs.TxnResults{&result}, nil
}
return nil, nil
}
// txnDispatch runs the given operations inside the state store transaction.
func (s *Store) txnDispatch(tx WriteTxn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
results := make(structs.TxnResults, 0, len(ops))
errors := make(structs.TxnErrors, 0, len(ops))
for i, op := range ops {
var ret structs.TxnResults
var err error
// Dispatch based on the type of operation.
switch {
case op.KV != nil:
ret, err = s.txnKVS(tx, idx, op.KV)
2018-12-03 08:41:24 +00:00
case op.Node != nil:
ret, err = s.txnNode(tx, idx, op.Node)
case op.Service != nil:
ret, err = s.txnService(tx, idx, op.Service)
case op.Check != nil:
ret, err = s.txnCheck(tx, idx, op.Check)
case op.Session != nil:
err = txnSession(tx, idx, op.Session)
connect: intentions are now managed as a new config entry kind "service-intentions" (#8834) - Upgrade the ConfigEntry.ListAll RPC to be kind-aware so that older copies of consul will not see new config entries it doesn't understand replicate down. - Add shim conversion code so that the old API/CLI method of interacting with intentions will continue to work so long as none of these are edited via config entry endpoints. Almost all of the read-only APIs will continue to function indefinitely. - Add new APIs that operate on individual intentions without IDs so that the UI doesn't need to implement CAS operations. - Add a new serf feature flag indicating support for intentions-as-config-entries. - The old line-item intentions way of interacting with the state store will transparently flip between the legacy memdb table and the config entry representations so that readers will never see a hiccup during migration where the results are incomplete. It uses a piece of system metadata to control the flip. - The primary datacenter will begin migrating intentions into config entries on startup once all servers in the datacenter are on a version of Consul with the intentions-as-config-entries feature flag. When it is complete the old state store representations will be cleared. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up. - The secondary datacenters continue to run the old intentions replicator until all servers in the secondary DC and primary DC support intentions-as-config-entries (via serf flag). Once this condition it met the old intentions replicator ceases. - The secondary datacenters replicate the new config entries as they are migrated in the primary. When they detect that the primary has zeroed it's old state store table it waits until all config entries up to that point are replicated and then zeroes its own copy of the old state store table. We also record a piece of system metadata indicating this has occurred. We use this metadata to skip ALL of this code the next time the leader starts up.
2020-10-06 18:24:05 +00:00
case op.Intention != nil:
// NOTE: this branch is deprecated and exists for backwards
// compatibility with pre-1.9.0 raft logs and during upgrades.
err = txnLegacyIntention(tx, idx, op.Intention)
default:
err = fmt.Errorf("no operation specified")
}
// Accumulate the results.
results = append(results, ret...)
// Capture any error along with the index of the operation that
// failed.
if err != nil {
2017-03-23 23:05:35 +00:00
errors = append(errors, &structs.TxnError{
OpIndex: i,
What: err.Error(),
})
}
}
if len(errors) > 0 {
return nil, errors
}
return results, nil
}
// TxnRW tries to run the given operations all inside a single transaction. If
// any of the operations fail, the entire transaction will be rolled back. This
// is done in a full write transaction on the state store, so reads and writes
// are possible
2017-04-21 00:46:29 +00:00
func (s *Store) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
results, errors := s.txnDispatch(tx, idx, ops)
if len(errors) > 0 {
return nil, errors
}
2020-06-02 20:34:56 +00:00
err := tx.Commit()
if err != nil {
return nil, structs.TxnErrors{
{What: err.Error(), OpIndex: 0},
}
}
return results, nil
}
// TxnRO runs the given operations inside a single read transaction in the state
// store. You must verify outside this function that no write operations are
// present, otherwise you'll get an error from the state store.
2017-04-21 00:46:29 +00:00
func (s *Store) TxnRO(ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
tx := s.db.Txn(false)
defer tx.Abort()
results, errors := s.txnDispatch(tx, 0, ops)
if len(errors) > 0 {
return nil, errors
}
return results, nil
}