2016-05-11 04:41:47 +00:00
|
|
|
package consul
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/armon/go-metrics"
|
2020-11-13 02:12:12 +00:00
|
|
|
"github.com/armon/go-metrics/prometheus"
|
2021-04-08 22:58:15 +00:00
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
|
2016-05-13 00:38:25 +00:00
|
|
|
"github.com/hashicorp/consul/acl"
|
2017-07-06 10:34:00 +00:00
|
|
|
"github.com/hashicorp/consul/agent/structs"
|
2018-12-12 17:14:02 +00:00
|
|
|
"github.com/hashicorp/consul/api"
|
2016-05-11 04:41:47 +00:00
|
|
|
)
|
|
|
|
|
2020-11-13 02:12:12 +00:00
|
|
|
var TxnSummaries = []prometheus.SummaryDefinition{
|
|
|
|
{
|
2020-11-13 21:18:04 +00:00
|
|
|
Name: []string{"txn", "apply"},
|
2020-11-16 19:02:11 +00:00
|
|
|
Help: "Measures the time spent applying a transaction operation.",
|
2020-11-13 02:12:12 +00:00
|
|
|
},
|
|
|
|
{
|
2020-11-13 21:18:04 +00:00
|
|
|
Name: []string{"txn", "read"},
|
2020-11-16 19:02:11 +00:00
|
|
|
Help: "Measures the time spent returning a read transaction.",
|
2020-11-13 02:12:12 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2016-05-11 04:41:47 +00:00
|
|
|
// Txn endpoint is used to perform multi-object atomic transactions.
|
|
|
|
type Txn struct {
|
2020-01-28 23:50:41 +00:00
|
|
|
srv *Server
|
|
|
|
logger hclog.Logger
|
2016-05-11 04:41:47 +00:00
|
|
|
}
|
|
|
|
|
2016-05-13 00:38:25 +00:00
|
|
|
// preCheck is used to verify the incoming operations before any further
|
|
|
|
// processing takes place. This checks things like ACLs.
|
2018-10-19 16:04:07 +00:00
|
|
|
func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.TxnErrors {
|
2016-05-13 00:38:25 +00:00
|
|
|
var errors structs.TxnErrors
|
|
|
|
|
|
|
|
// Perform the pre-apply checks for any KV operations.
|
|
|
|
for i, op := range ops {
|
2018-12-03 07:11:48 +00:00
|
|
|
switch {
|
|
|
|
case op.KV != nil:
|
2020-01-28 23:50:41 +00:00
|
|
|
ok, err := kvsPreApply(t.logger, t.srv, authorizer, op.KV.Verb, &op.KV.DirEnt)
|
2016-05-13 00:38:25 +00:00
|
|
|
if err != nil {
|
2017-03-23 23:05:35 +00:00
|
|
|
errors = append(errors, &structs.TxnError{
|
|
|
|
OpIndex: i,
|
|
|
|
What: err.Error(),
|
|
|
|
})
|
2016-05-13 00:38:25 +00:00
|
|
|
} else if !ok {
|
|
|
|
err = fmt.Errorf("failed to lock key %q due to lock delay", op.KV.DirEnt.Key)
|
2017-03-23 23:05:35 +00:00
|
|
|
errors = append(errors, &structs.TxnError{
|
|
|
|
OpIndex: i,
|
|
|
|
What: err.Error(),
|
|
|
|
})
|
2016-05-13 00:38:25 +00:00
|
|
|
}
|
2018-12-12 10:29:54 +00:00
|
|
|
case op.Node != nil:
|
2018-12-12 17:14:02 +00:00
|
|
|
// Skip the pre-apply checks if this is a GET.
|
|
|
|
if op.Node.Verb == api.NodeGet {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2018-12-12 10:29:54 +00:00
|
|
|
node := op.Node.Node
|
|
|
|
if err := nodePreApply(node.Node, string(node.ID)); err != nil {
|
|
|
|
errors = append(errors, &structs.TxnError{
|
|
|
|
OpIndex: i,
|
|
|
|
What: err.Error(),
|
|
|
|
})
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check that the token has permissions for the given operation.
|
|
|
|
if err := vetNodeTxnOp(op.Node, authorizer); err != nil {
|
|
|
|
errors = append(errors, &structs.TxnError{
|
|
|
|
OpIndex: i,
|
|
|
|
What: err.Error(),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
case op.Service != nil:
|
2018-12-12 17:14:02 +00:00
|
|
|
// Skip the pre-apply checks if this is a GET.
|
|
|
|
if op.Service.Verb == api.ServiceGet {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2018-12-12 10:29:54 +00:00
|
|
|
service := &op.Service.Service
|
2021-08-04 22:18:51 +00:00
|
|
|
if err := servicePreApply(service, authorizer, op.Service.FillAuthzContext); err != nil {
|
2018-12-12 10:29:54 +00:00
|
|
|
errors = append(errors, &structs.TxnError{
|
|
|
|
OpIndex: i,
|
|
|
|
What: err.Error(),
|
|
|
|
})
|
|
|
|
}
|
2018-12-03 07:11:48 +00:00
|
|
|
case op.Check != nil:
|
2018-12-12 17:14:02 +00:00
|
|
|
// Skip the pre-apply checks if this is a GET.
|
|
|
|
if op.Check.Verb == api.CheckGet {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2018-12-03 07:11:48 +00:00
|
|
|
checkPreApply(&op.Check.Check)
|
2018-12-12 10:29:54 +00:00
|
|
|
|
|
|
|
// Check that the token has permissions for the given operation.
|
|
|
|
if err := vetCheckTxnOp(op.Check, authorizer); err != nil {
|
|
|
|
errors = append(errors, &structs.TxnError{
|
|
|
|
OpIndex: i,
|
|
|
|
What: err.Error(),
|
|
|
|
})
|
|
|
|
}
|
2016-05-13 00:38:25 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return errors
|
|
|
|
}
|
|
|
|
|
2021-07-30 16:55:04 +00:00
|
|
|
// vetNodeTxnOp applies the given ACL policy to a node transaction operation.
|
2021-08-04 22:36:48 +00:00
|
|
|
func vetNodeTxnOp(op *structs.TxnNodeOp, authz acl.Authorizer) error {
|
2021-07-30 16:55:04 +00:00
|
|
|
var authzContext acl.AuthorizerContext
|
|
|
|
op.FillAuthzContext(&authzContext)
|
|
|
|
|
2021-08-04 22:36:48 +00:00
|
|
|
if authz.NodeWrite(op.Node.Node, &authzContext) != acl.Allow {
|
2021-07-30 16:55:04 +00:00
|
|
|
return acl.ErrPermissionDenied
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// vetCheckTxnOp applies the given ACL policy to a check transaction operation.
|
2021-08-04 22:36:48 +00:00
|
|
|
func vetCheckTxnOp(op *structs.TxnCheckOp, authz acl.Authorizer) error {
|
2021-07-30 16:55:04 +00:00
|
|
|
var authzContext acl.AuthorizerContext
|
|
|
|
op.FillAuthzContext(&authzContext)
|
|
|
|
|
|
|
|
if op.Check.ServiceID == "" {
|
|
|
|
// Node-level check.
|
2021-08-04 22:36:48 +00:00
|
|
|
if authz.NodeWrite(op.Check.Node, &authzContext) != acl.Allow {
|
2021-07-30 16:55:04 +00:00
|
|
|
return acl.ErrPermissionDenied
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Service-level check.
|
2021-08-04 22:36:48 +00:00
|
|
|
if authz.ServiceWrite(op.Check.ServiceName, &authzContext) != acl.Allow {
|
2021-07-30 16:55:04 +00:00
|
|
|
return acl.ErrPermissionDenied
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-05-11 04:41:47 +00:00
|
|
|
// Apply is used to apply multiple operations in a single, atomic transaction.
|
2016-05-11 08:35:27 +00:00
|
|
|
func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error {
|
2021-04-20 18:55:24 +00:00
|
|
|
if done, err := t.srv.ForwardRPC("Txn.Apply", args, reply); done {
|
2016-05-11 04:41:47 +00:00
|
|
|
return err
|
|
|
|
}
|
2017-10-04 23:43:27 +00:00
|
|
|
defer metrics.MeasureSince([]string{"txn", "apply"}, time.Now())
|
2016-05-11 04:41:47 +00:00
|
|
|
|
2016-05-13 00:38:25 +00:00
|
|
|
// Run the pre-checks before we send the transaction into Raft.
|
2021-08-04 21:51:19 +00:00
|
|
|
authz, err := t.srv.ResolveToken(args.Token)
|
2016-05-11 04:41:47 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-08-04 21:51:19 +00:00
|
|
|
reply.Errors = t.preCheck(authz, args.Ops)
|
2016-05-11 04:41:47 +00:00
|
|
|
if len(reply.Errors) > 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Apply the update.
|
2016-05-11 08:35:27 +00:00
|
|
|
resp, err := t.srv.raftApply(structs.TxnRequestType, args)
|
2016-05-11 04:41:47 +00:00
|
|
|
if err != nil {
|
2021-04-08 22:58:15 +00:00
|
|
|
return fmt.Errorf("raft apply failed: %w", err)
|
2016-05-11 04:41:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Convert the return type. This should be a cheap copy since we are
|
|
|
|
// just taking the two slices.
|
2016-05-11 08:35:27 +00:00
|
|
|
if txnResp, ok := resp.(structs.TxnResponse); ok {
|
2021-08-04 21:51:19 +00:00
|
|
|
txnResp.Results = FilterTxnResults(authz, txnResp.Results)
|
2016-05-11 08:35:27 +00:00
|
|
|
*reply = txnResp
|
2016-05-11 04:41:47 +00:00
|
|
|
} else {
|
|
|
|
return fmt.Errorf("unexpected return type %T", resp)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
2016-05-13 00:38:25 +00:00
|
|
|
|
|
|
|
// Read is used to perform a read-only transaction that doesn't modify the state
|
2019-03-06 17:13:28 +00:00
|
|
|
// store. This is much more scalable since it doesn't go through Raft and
|
2016-05-13 00:38:25 +00:00
|
|
|
// supports staleness, so this should be preferred if you're just performing
|
|
|
|
// reads.
|
|
|
|
func (t *Txn) Read(args *structs.TxnReadRequest, reply *structs.TxnReadResponse) error {
|
2021-04-20 18:55:24 +00:00
|
|
|
if done, err := t.srv.ForwardRPC("Txn.Read", args, reply); done {
|
2016-05-13 00:38:25 +00:00
|
|
|
return err
|
|
|
|
}
|
2017-10-04 23:43:27 +00:00
|
|
|
defer metrics.MeasureSince([]string{"txn", "read"}, time.Now())
|
2016-05-13 00:38:25 +00:00
|
|
|
|
|
|
|
// We have to do this ourselves since we are not doing a blocking RPC.
|
|
|
|
if args.RequireConsistent {
|
|
|
|
if err := t.srv.consistentRead(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run the pre-checks before we perform the read.
|
2021-08-04 21:51:19 +00:00
|
|
|
authz, err := t.srv.ResolveToken(args.Token)
|
2016-05-13 00:38:25 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-12-03 20:41:03 +00:00
|
|
|
|
|
|
|
// There are currently two different ways we handle permission issues.
|
|
|
|
//
|
|
|
|
// For simple reads such as KVGet and KVGetTree, the txn succeeds but the
|
|
|
|
// offending results are omitted. For more involved operations such as
|
|
|
|
// KVCheckIndex, the txn fails and permission denied errors are returned.
|
|
|
|
//
|
|
|
|
// TODO: Maybe we should unify these, or at least cover it in the docs?
|
2021-08-04 21:51:19 +00:00
|
|
|
reply.Errors = t.preCheck(authz, args.Ops)
|
2016-05-13 00:38:25 +00:00
|
|
|
if len(reply.Errors) > 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run the read transaction.
|
|
|
|
state := t.srv.fsm.State()
|
|
|
|
reply.Results, reply.Errors = state.TxnRO(args.Ops)
|
2021-12-03 20:41:03 +00:00
|
|
|
|
|
|
|
total := len(reply.Results)
|
2021-08-04 21:51:19 +00:00
|
|
|
reply.Results = FilterTxnResults(authz, reply.Results)
|
2021-12-03 20:41:03 +00:00
|
|
|
reply.QueryMeta.ResultsFilteredByACLs = total != len(reply.Results)
|
2021-12-03 17:11:26 +00:00
|
|
|
|
|
|
|
// We have to do this ourselves since we are not doing a blocking RPC.
|
|
|
|
t.srv.setQueryMeta(&reply.QueryMeta, args.Token)
|
|
|
|
|
2016-05-13 00:38:25 +00:00
|
|
|
return nil
|
|
|
|
}
|