agent/consul: Intention.Apply, FSM methods, very little validation
This commit is contained in:
parent
b19a289596
commit
48b9a43f1d
|
@ -20,6 +20,7 @@ func init() {
|
|||
registerCommand(structs.PreparedQueryRequestType, (*FSM).applyPreparedQueryOperation)
|
||||
registerCommand(structs.TxnRequestType, (*FSM).applyTxn)
|
||||
registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate)
|
||||
registerCommand(structs.IntentionRequestType, (*FSM).applyIntentionOperation)
|
||||
}
|
||||
|
||||
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
|
||||
|
@ -246,3 +247,26 @@ func (c *FSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
|
|||
}
|
||||
return c.state.AutopilotSetConfig(index, &req.Config)
|
||||
}
|
||||
|
||||
// applyIntentionOperation applies the given intention operation to the state store.
|
||||
func (c *FSM) applyIntentionOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.IntentionRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "intention"}, time.Now(),
|
||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "intention"}, time.Now(),
|
||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||
switch req.Op {
|
||||
case structs.IntentionOpCreate, structs.IntentionOpUpdate:
|
||||
return c.state.IntentionSet(index, req.Intention)
|
||||
case structs.IntentionOpDelete:
|
||||
panic("TODO")
|
||||
//return c.state.PreparedQueryDelete(index, req.Query.ID)
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid Intention operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid Intention operation '%s'", req.Op)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
// Intention manages the Connect intentions.
|
||||
|
@ -12,6 +16,56 @@ type Intention struct {
|
|||
srv *Server
|
||||
}
|
||||
|
||||
// Apply creates or updates an intention in the data store.
|
||||
func (s *Intention) Apply(
|
||||
args *structs.IntentionRequest,
|
||||
reply *string) error {
|
||||
if done, err := s.srv.forward("Intention.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"consul", "intention", "apply"}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"intention", "apply"}, time.Now())
|
||||
|
||||
// If no ID is provided, generate a new ID. This must be done prior to
|
||||
// appending to the Raft log, because the ID is not deterministic. Once
|
||||
// the entry is in the log, the state update MUST be deterministic or
|
||||
// the followers will not converge.
|
||||
if args.Op == structs.IntentionOpCreate && args.Intention.ID == "" {
|
||||
state := s.srv.fsm.State()
|
||||
for {
|
||||
var err error
|
||||
args.Intention.ID, err = uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
s.srv.logger.Printf("[ERR] consul.intention: UUID generation failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
_, ixn, err := state.IntentionGet(nil, args.Intention.ID)
|
||||
if err != nil {
|
||||
s.srv.logger.Printf("[ERR] consul.intention: intention lookup failed: %v", err)
|
||||
return err
|
||||
}
|
||||
if ixn == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
*reply = args.Intention.ID
|
||||
|
||||
// Commit
|
||||
resp, err := s.srv.raftApply(structs.IntentionRequestType, args)
|
||||
if err != nil {
|
||||
s.srv.logger.Printf("[ERR] consul.intention: Apply failed %v", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// List returns all the intentions.
|
||||
func (s *Intention) List(
|
||||
args *structs.DCSpecificRequest,
|
||||
reply *structs.IndexedIntentions) error {
|
||||
|
|
|
@ -9,6 +9,37 @@ import (
|
|||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
|
||||
func TestIntentionApply_new(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Setup a basic record to create
|
||||
ixn := structs.IntentionRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.IntentionOpCreate,
|
||||
Intention: &structs.Intention{
|
||||
SourceName: "test",
|
||||
},
|
||||
}
|
||||
var reply string
|
||||
|
||||
// Create
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Intention.Apply", &ixn, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if reply == "" {
|
||||
t.Fatal("reply should be non-empty")
|
||||
}
|
||||
|
||||
// TODO test read
|
||||
}
|
||||
|
||||
func TestIntentionList(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServer(t)
|
||||
|
|
|
@ -69,3 +69,33 @@ type IndexedIntentions struct {
|
|||
Intentions Intentions
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// IntentionOp is the operation for a request related to intentions.
|
||||
type IntentionOp string
|
||||
|
||||
const (
|
||||
IntentionOpCreate IntentionOp = "create"
|
||||
IntentionOpUpdate IntentionOp = "update"
|
||||
IntentionOpDelete IntentionOp = "delete"
|
||||
)
|
||||
|
||||
// IntentionRequest is used to create, update, and delete intentions.
|
||||
type IntentionRequest struct {
|
||||
// Datacenter is the target for this request.
|
||||
Datacenter string
|
||||
|
||||
// Op is the type of operation being requested.
|
||||
Op IntentionOp
|
||||
|
||||
// Intention is the intention.
|
||||
Intention *Intention
|
||||
|
||||
// WriteRequest is a common struct containing ACL tokens and other
|
||||
// write-related common elements for requests.
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
// RequestDatacenter returns the datacenter for a given request.
|
||||
func (q *IntentionRequest) RequestDatacenter() string {
|
||||
return q.Datacenter
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ const (
|
|||
AutopilotRequestType = 9
|
||||
AreaRequestType = 10
|
||||
ACLBootstrapRequestType = 11 // FSM snapshots only.
|
||||
IntentionRequestType = 12
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
Loading…
Reference in New Issue