From 48b9a43f1d2cf32b4694e37ea810e0b3a5726165 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 28 Feb 2018 10:28:07 -0800 Subject: [PATCH] agent/consul: Intention.Apply, FSM methods, very little validation --- agent/consul/fsm/commands_oss.go | 24 +++++++++++ agent/consul/intention_endpoint.go | 54 +++++++++++++++++++++++++ agent/consul/intention_endpoint_test.go | 31 ++++++++++++++ agent/structs/intention.go | 30 ++++++++++++++ agent/structs/structs.go | 1 + 5 files changed, 140 insertions(+) diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index ede04eef6..c90f185e0 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -20,6 +20,7 @@ func init() { registerCommand(structs.PreparedQueryRequestType, (*FSM).applyPreparedQueryOperation) registerCommand(structs.TxnRequestType, (*FSM).applyTxn) registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate) + registerCommand(structs.IntentionRequestType, (*FSM).applyIntentionOperation) } func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { @@ -246,3 +247,26 @@ func (c *FSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} { } return c.state.AutopilotSetConfig(index, &req.Config) } + +// applyIntentionOperation applies the given intention operation to the state store. +func (c *FSM) applyIntentionOperation(buf []byte, index uint64) interface{} { + var req structs.IntentionRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "intention"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + defer metrics.MeasureSinceWithLabels([]string{"fsm", "intention"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + switch req.Op { + case structs.IntentionOpCreate, structs.IntentionOpUpdate: + return c.state.IntentionSet(index, req.Intention) + case structs.IntentionOpDelete: + panic("TODO") + //return c.state.PreparedQueryDelete(index, req.Query.ID) + default: + c.logger.Printf("[WARN] consul.fsm: Invalid Intention operation '%s'", req.Op) + return fmt.Errorf("Invalid Intention operation '%s'", req.Op) + } +} diff --git a/agent/consul/intention_endpoint.go b/agent/consul/intention_endpoint.go index 7737d06dd..8d07b4e7b 100644 --- a/agent/consul/intention_endpoint.go +++ b/agent/consul/intention_endpoint.go @@ -1,9 +1,13 @@ package consul import ( + "time" + + "github.com/armon/go-metrics" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-uuid" ) // Intention manages the Connect intentions. @@ -12,6 +16,56 @@ type Intention struct { srv *Server } +// Apply creates or updates an intention in the data store. +func (s *Intention) Apply( + args *structs.IntentionRequest, + reply *string) error { + if done, err := s.srv.forward("Intention.Apply", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"consul", "intention", "apply"}, time.Now()) + defer metrics.MeasureSince([]string{"intention", "apply"}, time.Now()) + + // If no ID is provided, generate a new ID. This must be done prior to + // appending to the Raft log, because the ID is not deterministic. Once + // the entry is in the log, the state update MUST be deterministic or + // the followers will not converge. + if args.Op == structs.IntentionOpCreate && args.Intention.ID == "" { + state := s.srv.fsm.State() + for { + var err error + args.Intention.ID, err = uuid.GenerateUUID() + if err != nil { + s.srv.logger.Printf("[ERR] consul.intention: UUID generation failed: %v", err) + return err + } + + _, ixn, err := state.IntentionGet(nil, args.Intention.ID) + if err != nil { + s.srv.logger.Printf("[ERR] consul.intention: intention lookup failed: %v", err) + return err + } + if ixn == nil { + break + } + } + } + *reply = args.Intention.ID + + // Commit + resp, err := s.srv.raftApply(structs.IntentionRequestType, args) + if err != nil { + s.srv.logger.Printf("[ERR] consul.intention: Apply failed %v", err) + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + return nil +} + +// List returns all the intentions. func (s *Intention) List( args *structs.DCSpecificRequest, reply *structs.IndexedIntentions) error { diff --git a/agent/consul/intention_endpoint_test.go b/agent/consul/intention_endpoint_test.go index 13242374c..51fa635e3 100644 --- a/agent/consul/intention_endpoint_test.go +++ b/agent/consul/intention_endpoint_test.go @@ -9,6 +9,37 @@ import ( "github.com/hashicorp/net-rpc-msgpackrpc" ) +func TestIntentionApply_new(t *testing.T) { + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Setup a basic record to create + ixn := structs.IntentionRequest{ + Datacenter: "dc1", + Op: structs.IntentionOpCreate, + Intention: &structs.Intention{ + SourceName: "test", + }, + } + var reply string + + // Create + if err := msgpackrpc.CallWithCodec(codec, "Intention.Apply", &ixn, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if reply == "" { + t.Fatal("reply should be non-empty") + } + + // TODO test read +} + func TestIntentionList(t *testing.T) { t.Parallel() dir1, s1 := testServer(t) diff --git a/agent/structs/intention.go b/agent/structs/intention.go index 7837ad431..81f07080c 100644 --- a/agent/structs/intention.go +++ b/agent/structs/intention.go @@ -69,3 +69,33 @@ type IndexedIntentions struct { Intentions Intentions QueryMeta } + +// IntentionOp is the operation for a request related to intentions. +type IntentionOp string + +const ( + IntentionOpCreate IntentionOp = "create" + IntentionOpUpdate IntentionOp = "update" + IntentionOpDelete IntentionOp = "delete" +) + +// IntentionRequest is used to create, update, and delete intentions. +type IntentionRequest struct { + // Datacenter is the target for this request. + Datacenter string + + // Op is the type of operation being requested. + Op IntentionOp + + // Intention is the intention. + Intention *Intention + + // WriteRequest is a common struct containing ACL tokens and other + // write-related common elements for requests. + WriteRequest +} + +// RequestDatacenter returns the datacenter for a given request. +func (q *IntentionRequest) RequestDatacenter() string { + return q.Datacenter +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 77075b3e3..8a1860912 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -39,6 +39,7 @@ const ( AutopilotRequestType = 9 AreaRequestType = 10 ACLBootstrapRequestType = 11 // FSM snapshots only. + IntentionRequestType = 12 ) const (