agent/consul: CAS operations for setting the CA root
This commit is contained in:
parent
712888258b
commit
80a058a573
|
@ -16,6 +16,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
|
@ -25,6 +26,14 @@ import (
|
|||
"github.com/pascaldekloe/goe/verify"
|
||||
)
|
||||
|
||||
// TestMain is the main entrypoint for `go test`.
|
||||
func TestMain(m *testing.M) {
|
||||
// Enable the test RPC endpoints
|
||||
consul.TestEndpoint()
|
||||
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
func externalIP() (string, error) {
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
@ -25,3 +26,23 @@ func TestConnectCARoots_empty(t *testing.T) {
|
|||
assert.Equal(value.ActiveRootID, "")
|
||||
assert.Len(value.Roots, 0)
|
||||
}
|
||||
|
||||
func TestConnectCARoots_list(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
assert := assert.New(t)
|
||||
a := NewTestAgent(t.Name(), "")
|
||||
defer a.Shutdown()
|
||||
|
||||
state := consul.TestServerState(a.Agent.delegate.(*consul.Server))
|
||||
t.Log(state.CARoots(nil))
|
||||
|
||||
req, _ := http.NewRequest("GET", "/v1/connect/ca/roots", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.ConnectCARoots(resp, req)
|
||||
assert.Nil(err)
|
||||
|
||||
value := obj.(structs.IndexedCARoots)
|
||||
assert.Equal(value.ActiveRootID, "")
|
||||
assert.Len(value.Roots, 0)
|
||||
}
|
||||
|
|
|
@ -99,6 +99,9 @@ func (s *ConnectCA) Sign(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if root == nil {
|
||||
return fmt.Errorf("no active CA found")
|
||||
}
|
||||
|
||||
// Determine the signing certificate. It is the set signing cert
|
||||
// unless that is empty, in which case it is identically to the public
|
||||
|
|
|
@ -30,8 +30,9 @@ func TestConnectCARoots(t *testing.T) {
|
|||
ca1 := connect.TestCA(t, nil)
|
||||
ca2 := connect.TestCA(t, nil)
|
||||
ca2.Active = false
|
||||
assert.Nil(state.CARootSet(1, ca1))
|
||||
assert.Nil(state.CARootSet(2, ca2))
|
||||
ok, err := state.CARootSetCAS(1, 0, []*structs.CARoot{ca1, ca2})
|
||||
assert.True(ok)
|
||||
assert.Nil(err)
|
||||
|
||||
// Request
|
||||
args := &structs.DCSpecificRequest{
|
||||
|
@ -70,7 +71,9 @@ func TestConnectCASign(t *testing.T) {
|
|||
// Insert a CA
|
||||
state := s1.fsm.State()
|
||||
ca := connect.TestCA(t, nil)
|
||||
assert.Nil(state.CARootSet(1, ca))
|
||||
ok, err := state.CARootSetCAS(1, 0, []*structs.CARoot{ca})
|
||||
assert.True(ok)
|
||||
assert.Nil(err)
|
||||
|
||||
// Generate a CSR and request signing
|
||||
args := &structs.CASignRequest{
|
||||
|
|
|
@ -21,6 +21,7 @@ func init() {
|
|||
registerCommand(structs.TxnRequestType, (*FSM).applyTxn)
|
||||
registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate)
|
||||
registerCommand(structs.IntentionRequestType, (*FSM).applyIntentionOperation)
|
||||
registerCommand(structs.ConnectCARequestType, (*FSM).applyConnectCAOperation)
|
||||
}
|
||||
|
||||
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
|
||||
|
@ -269,3 +270,28 @@ func (c *FSM) applyIntentionOperation(buf []byte, index uint64) interface{} {
|
|||
return fmt.Errorf("Invalid Intention operation '%s'", req.Op)
|
||||
}
|
||||
}
|
||||
|
||||
// applyConnectCAOperation applies the given CA operation to the state store.
|
||||
func (c *FSM) applyConnectCAOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.CARequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "ca"}, time.Now(),
|
||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "ca"}, time.Now(),
|
||||
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
|
||||
switch req.Op {
|
||||
case structs.CAOpSet:
|
||||
act, err := c.state.CARootSetCAS(index, req.Index, req.Roots)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return act
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid CA operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid CA operation '%s'", req.Op)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,52 +73,58 @@ func (s *Store) CARootActive(ws memdb.WatchSet) (uint64, *structs.CARoot, error)
|
|||
return idx, result, err
|
||||
}
|
||||
|
||||
// CARootSet creates or updates a CA root.
|
||||
// CARootSetCAS sets the current CA root state using a check-and-set operation.
|
||||
// On success, this will replace the previous set of CARoots completely with
|
||||
// the given set of roots.
|
||||
//
|
||||
// NOTE(mitchellh): I have a feeling we'll want a CARootMultiSetCAS to
|
||||
// perform a check-and-set on the entire set of CARoots versus an individual
|
||||
// set, since we'll want to modify them atomically during events such as
|
||||
// rotation.
|
||||
func (s *Store) CARootSet(idx uint64, v *structs.CARoot) error {
|
||||
// The first boolean result returns whether the transaction succeeded or not.
|
||||
func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, error) {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
if err := s.caRootSetTxn(tx, idx, v); err != nil {
|
||||
return err
|
||||
// Get the current max index
|
||||
if midx := maxIndexTxn(tx, caRootTableName); midx != cidx {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Go through and find any existing matching CAs so we can preserve and
|
||||
// update their Create/ModifyIndex values.
|
||||
for _, r := range rs {
|
||||
if r.ID == "" {
|
||||
return false, ErrMissingCARootID
|
||||
}
|
||||
|
||||
existing, err := tx.First(caRootTableName, "id", r.ID)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed CA root lookup: %s", err)
|
||||
}
|
||||
|
||||
if existing != nil {
|
||||
r.CreateIndex = existing.(*structs.CARoot).CreateIndex
|
||||
} else {
|
||||
r.CreateIndex = idx
|
||||
}
|
||||
r.ModifyIndex = idx
|
||||
}
|
||||
|
||||
// Delete all
|
||||
_, err := tx.DeleteAll(caRootTableName, "id")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Insert all
|
||||
for _, r := range rs {
|
||||
if err := tx.Insert(caRootTableName, r); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
// Update the index
|
||||
if err := tx.Insert("index", &IndexEntry{caRootTableName, idx}); err != nil {
|
||||
return false, fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// caRootSetTxn is the inner method used to insert or update a CA root with
|
||||
// the proper indexes into the state store.
|
||||
func (s *Store) caRootSetTxn(tx *memdb.Txn, idx uint64, v *structs.CARoot) error {
|
||||
// ID is required
|
||||
if v.ID == "" {
|
||||
return ErrMissingCARootID
|
||||
}
|
||||
|
||||
// Check for an existing value
|
||||
existing, err := tx.First(caRootTableName, "id", v.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed CA root lookup: %s", err)
|
||||
}
|
||||
if existing != nil {
|
||||
old := existing.(*structs.CARoot)
|
||||
v.CreateIndex = old.CreateIndex
|
||||
} else {
|
||||
v.CreateIndex = idx
|
||||
}
|
||||
v.ModifyIndex = idx
|
||||
|
||||
// Insert
|
||||
if err := tx.Insert(caRootTableName, v); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{caRootTableName, idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
|
26
agent/consul/testing.go
Normal file
26
agent/consul/testing.go
Normal file
|
@ -0,0 +1,26 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// testEndpointsOnce ensures that endpoints for testing are registered once.
|
||||
var testEndpointsOnce sync.Once
|
||||
|
||||
// TestEndpoints registers RPC endpoints specifically for testing. These
|
||||
// endpoints enable some internal data access that we normally disallow, but
|
||||
// are useful for modifying server state.
|
||||
//
|
||||
// To use this, modify TestMain to call this function prior to running tests.
|
||||
//
|
||||
// These should NEVER be registered outside of tests.
|
||||
//
|
||||
// NOTE(mitchellh): This was created so that the downstream agent tests can
|
||||
// modify internal Connect CA state. When the CA plugin work comes in with
|
||||
// a more complete CA API, this may no longer be necessary and we can remove it.
|
||||
// That would be ideal.
|
||||
func TestEndpoint() {
|
||||
testEndpointsOnce.Do(func() {
|
||||
registerEndpoint(func(s *Server) interface{} { return &Test{s} })
|
||||
})
|
||||
}
|
43
agent/consul/testing_endpoint.go
Normal file
43
agent/consul/testing_endpoint.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// Test is an RPC endpoint that is only available during `go test` when
|
||||
// `TestEndpoint` is called. This is not and must not ever be available
|
||||
// during a real running Consul agent, since it this endpoint bypasses
|
||||
// critical ACL checks.
|
||||
type Test struct {
|
||||
// srv is a pointer back to the server.
|
||||
srv *Server
|
||||
}
|
||||
|
||||
// ConnectCASetRoots sets the current CA roots state.
|
||||
func (s *Test) ConnectCASetRoots(
|
||||
args []*structs.CARoot,
|
||||
reply *interface{}) error {
|
||||
|
||||
// Get the highest index
|
||||
state := s.srv.fsm.State()
|
||||
idx, _, err := state.CARoots(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Commit
|
||||
resp, err := s.srv.raftApply(structs.ConnectCARequestType, &structs.CARequest{
|
||||
Op: structs.CAOpSet,
|
||||
Index: idx,
|
||||
Roots: args,
|
||||
})
|
||||
if err != nil {
|
||||
s.srv.logger.Printf("[ERR] consul.test: Apply failed %v", err)
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
42
agent/consul/testing_endpoint_test.go
Normal file
42
agent/consul/testing_endpoint_test.go
Normal file
|
@ -0,0 +1,42 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// Test setting the CAs
|
||||
func TestTestConnectCASetRoots(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
assert := assert.New(t)
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Prepare
|
||||
ca1 := connect.TestCA(t, nil)
|
||||
ca2 := connect.TestCA(t, nil)
|
||||
ca2.Active = false
|
||||
|
||||
// Request
|
||||
args := []*structs.CARoot{ca1, ca2}
|
||||
var reply interface{}
|
||||
assert.Nil(msgpackrpc.CallWithCodec(codec, "Test.ConnectCASetRoots", args, &reply))
|
||||
|
||||
// Verify they're there
|
||||
state := s1.fsm.State()
|
||||
_, actual, err := state.CARoots(nil)
|
||||
assert.Nil(err)
|
||||
assert.Len(actual, 2)
|
||||
}
|
13
agent/consul/testing_test.go
Normal file
13
agent/consul/testing_test.go
Normal file
|
@ -0,0 +1,13 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
// Register the test RPC endpoint
|
||||
TestEndpoint()
|
||||
|
||||
os.Exit(m.Run())
|
||||
}
|
|
@ -76,3 +76,25 @@ type IssuedCert struct {
|
|||
// state store, but is present in the sign API response.
|
||||
Cert string `json:",omitempty"`
|
||||
}
|
||||
|
||||
// CAOp is the operation for a request related to intentions.
|
||||
type CAOp string
|
||||
|
||||
const (
|
||||
CAOpSet CAOp = "set"
|
||||
)
|
||||
|
||||
// CARequest is used to modify connect CA data. This is used by the
|
||||
// FSM (agent/consul/fsm) to apply changes.
|
||||
type CARequest struct {
|
||||
// Op is the type of operation being requested. This determines what
|
||||
// other fields are required.
|
||||
Op CAOp
|
||||
|
||||
// Index is used by CAOpSet for a CAS operation.
|
||||
Index uint64
|
||||
|
||||
// Roots is a list of roots. This is used for CAOpSet. One root must
|
||||
// always be active.
|
||||
Roots []*CARoot
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ const (
|
|||
AreaRequestType = 10
|
||||
ACLBootstrapRequestType = 11 // FSM snapshots only.
|
||||
IntentionRequestType = 12
|
||||
ConnectCARequestType = 13
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
Loading…
Reference in a new issue