open-consul/agent/consul/intention_endpoint.go

211 lines
5.2 KiB
Go
Raw Normal View History

package consul
import (
2018-02-28 23:54:48 +00:00
"errors"
"fmt"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
)
2018-02-28 23:54:48 +00:00
var (
// ErrIntentionNotFound is returned if the intention lookup failed.
ErrIntentionNotFound = errors.New("Intention not found")
)
// Intention manages the Connect intentions.
type Intention struct {
// srv is a pointer back to the server.
srv *Server
}
// Apply creates or updates an intention in the data store.
func (s *Intention) Apply(
args *structs.IntentionRequest,
reply *string) error {
if done, err := s.srv.forward("Intention.Apply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "intention", "apply"}, time.Now())
defer metrics.MeasureSince([]string{"intention", "apply"}, time.Now())
// Always set a non-nil intention to avoid nil-access below
if args.Intention == nil {
args.Intention = &structs.Intention{}
}
// If no ID is provided, generate a new ID. This must be done prior to
// appending to the Raft log, because the ID is not deterministic. Once
// the entry is in the log, the state update MUST be deterministic or
// the followers will not converge.
if args.Op == structs.IntentionOpCreate {
if args.Intention.ID != "" {
return fmt.Errorf("ID must be empty when creating a new intention")
}
state := s.srv.fsm.State()
for {
var err error
args.Intention.ID, err = uuid.GenerateUUID()
if err != nil {
s.srv.logger.Printf("[ERR] consul.intention: UUID generation failed: %v", err)
return err
}
_, ixn, err := state.IntentionGet(nil, args.Intention.ID)
if err != nil {
s.srv.logger.Printf("[ERR] consul.intention: intention lookup failed: %v", err)
return err
}
if ixn == nil {
break
}
}
// Set the created at
args.Intention.CreatedAt = time.Now().UTC()
}
*reply = args.Intention.ID
// If this is not a create, then we have to verify the ID.
if args.Op != structs.IntentionOpCreate {
state := s.srv.fsm.State()
_, ixn, err := state.IntentionGet(nil, args.Intention.ID)
if err != nil {
return fmt.Errorf("Intention lookup failed: %v", err)
}
if ixn == nil {
return fmt.Errorf("Cannot modify non-existent intention: '%s'", args.Intention.ID)
}
}
// We always update the updatedat field. This has no effect for deletion.
args.Intention.UpdatedAt = time.Now().UTC()
// Default source type
if args.Intention.SourceType == "" {
args.Intention.SourceType = structs.IntentionSourceConsul
}
// Until we support namespaces, we force all namespaces to be default
if args.Intention.SourceNS == "" {
args.Intention.SourceNS = structs.IntentionDefaultNamespace
}
if args.Intention.DestinationNS == "" {
args.Intention.DestinationNS = structs.IntentionDefaultNamespace
}
// Validate. We do not validate on delete since it is valid to only
// send an ID in that case.
if args.Op != structs.IntentionOpDelete {
if err := args.Intention.Validate(); err != nil {
return err
}
2018-03-03 17:43:37 +00:00
}
// Commit
resp, err := s.srv.raftApply(structs.IntentionRequestType, args)
if err != nil {
s.srv.logger.Printf("[ERR] consul.intention: Apply failed %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}
2018-02-28 18:44:49 +00:00
// Get returns a single intention by ID.
func (s *Intention) Get(
args *structs.IntentionQueryRequest,
reply *structs.IndexedIntentions) error {
// Forward if necessary
if done, err := s.srv.forward("Intention.Get", args, args, reply); done {
return err
}
return s.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, ixn, err := state.IntentionGet(ws, args.IntentionID)
if err != nil {
return err
}
if ixn == nil {
2018-02-28 23:54:48 +00:00
return ErrIntentionNotFound
2018-02-28 18:44:49 +00:00
}
reply.Index = index
reply.Intentions = structs.Intentions{ixn}
// TODO: acl filtering
return nil
},
)
}
// List returns all the intentions.
func (s *Intention) List(
args *structs.DCSpecificRequest,
reply *structs.IndexedIntentions) error {
// Forward if necessary
if done, err := s.srv.forward("Intention.List", args, args, reply); done {
return err
}
return s.srv.blockingQuery(
&args.QueryOptions, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, ixns, err := state.Intentions(ws)
if err != nil {
return err
}
reply.Index, reply.Intentions = index, ixns
if reply.Intentions == nil {
reply.Intentions = make(structs.Intentions, 0)
}
// filterACL
return nil
},
)
}
// Match returns the set of intentions that match the given source/destination.
func (s *Intention) Match(
args *structs.IntentionQueryRequest,
reply *structs.IndexedIntentionMatches) error {
// Forward if necessary
if done, err := s.srv.forward("Intention.Match", args, args, reply); done {
return err
}
// TODO(mitchellh): validate
return s.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, matches, err := state.IntentionMatch(ws, args.Match)
if err != nil {
return err
}
reply.Index = index
reply.Matches = matches
// TODO(mitchellh): acl filtering
return nil
},
)
}