auto-config: move autoConfigBackend impl off of Server

Most of these methods are used exclusively for the AutoConfig RPC
endpoint. This PR uses a pattern that we've used in other places as an
incremental step to reducing the scope of Server.
This commit is contained in:
Daniel Nephin 2021-06-23 17:14:28 -04:00
parent 605275b4dc
commit 34c8585b29
8 changed files with 213 additions and 186 deletions

View File

@ -0,0 +1,96 @@
package consul
import (
"crypto/x509"
"fmt"
"net"
"time"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
)
type autoConfigBackend struct {
Server *Server
}
func (b autoConfigBackend) ForwardRPC(method string, info structs.RPCInfo, reply interface{}) (bool, error) {
return b.Server.ForwardRPC(method, info, reply)
}
func (b autoConfigBackend) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) {
return b.Server.caManager.SignCertificate(csr, id)
}
// GetCARoots returns the CA roots.
func (b autoConfigBackend) GetCARoots() (*structs.IndexedCARoots, error) {
return b.Server.getCARoots(nil, b.Server.fsm.State())
}
// DatacenterJoinAddresses will return all the strings suitable for usage in
// retry join operations to connect to the the LAN or LAN segment gossip pool.
func (b autoConfigBackend) DatacenterJoinAddresses(segment string) ([]string, error) {
members, err := b.Server.LANSegmentMembers(segment)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err)
}
var joinAddrs []string
for _, m := range members {
if ok, _ := metadata.IsConsulServer(m); ok {
serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)}
joinAddrs = append(joinAddrs, serfAddr.String())
}
}
return joinAddrs, nil
}
// CreateACLToken will create an ACL token from the given template
func (b autoConfigBackend) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) {
// we have to require local tokens or else it would require having these servers use a token with acl:write to make a
// token create RPC to the servers in the primary DC.
if !b.Server.LocalTokensEnabled() {
return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", b.Server.config.Datacenter)
}
newToken := *template
// generate the accessor id
if newToken.AccessorID == "" {
accessor, err := lib.GenerateUUID(b.Server.checkTokenUUID)
if err != nil {
return nil, err
}
newToken.AccessorID = accessor
}
// generate the secret id
if newToken.SecretID == "" {
secret, err := lib.GenerateUUID(b.Server.checkTokenUUID)
if err != nil {
return nil, err
}
newToken.SecretID = secret
}
newToken.CreateTime = time.Now()
req := structs.ACLTokenBatchSetRequest{
Tokens: structs.ACLTokens{&newToken},
CAS: false,
}
// perform the request to mint the new token
if _, err := b.Server.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil {
return nil, err
}
// return the full token definition from the FSM
_, token, err := b.Server.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta)
return token, err
}

View File

@ -0,0 +1,115 @@
package consul
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
)
func TestAutoConfigBackend_DatacenterJoinAddresses(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
conf := testClusterConfig{
Datacenter: "primary",
Servers: 3,
}
nodes := newTestCluster(t, &conf)
var expected []string
for _, srv := range nodes.Servers {
expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort))
}
backend := autoConfigBackend{Server: nodes.Servers[0]}
actual, err := backend.DatacenterJoinAddresses("")
require.NoError(t, err)
require.ElementsMatch(t, expected, actual)
}
func TestAutoConfigBackend_CreateACLToken(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, srv, codec := testACLServerWithConfig(t, nil, false)
waitForLeaderEstablishment(t, srv)
r1, err := upsertTestRole(codec, TestDefaultMasterToken, "dc1")
require.NoError(t, err)
t.Run("predefined-ids", func(t *testing.T) {
accessor := "554cd3ab-5d4e-4d6e-952e-4e8b6c77bfb3"
secret := "ef453f31-ad58-4ec8-8bf8-342e99763026"
in := &structs.ACLToken{
AccessorID: accessor,
SecretID: secret,
Description: "test",
Policies: []structs.ACLTokenPolicyLink{
{
ID: structs.ACLPolicyGlobalManagementID,
},
},
NodeIdentities: []*structs.ACLNodeIdentity{
{
NodeName: "foo",
Datacenter: "bar",
},
},
ServiceIdentities: []*structs.ACLServiceIdentity{
{
ServiceName: "web",
},
},
Roles: []structs.ACLTokenRoleLink{
{
ID: r1.ID,
},
},
}
b := autoConfigBackend{Server: srv}
out, err := b.CreateACLToken(in)
require.NoError(t, err)
require.Equal(t, accessor, out.AccessorID)
require.Equal(t, secret, out.SecretID)
require.Equal(t, "test", out.Description)
require.NotZero(t, out.CreateTime)
require.Len(t, out.Policies, 1)
require.Len(t, out.Roles, 1)
require.Len(t, out.NodeIdentities, 1)
require.Len(t, out.ServiceIdentities, 1)
require.Equal(t, structs.ACLPolicyGlobalManagementID, out.Policies[0].ID)
require.Equal(t, "foo", out.NodeIdentities[0].NodeName)
require.Equal(t, "web", out.ServiceIdentities[0].ServiceName)
require.Equal(t, r1.ID, out.Roles[0].ID)
})
t.Run("autogen-ids", func(t *testing.T) {
in := &structs.ACLToken{
Description: "test",
NodeIdentities: []*structs.ACLNodeIdentity{
{
NodeName: "foo",
Datacenter: "bar",
},
},
}
b := autoConfigBackend{Server: srv}
out, err := b.CreateACLToken(in)
require.NoError(t, err)
require.NotEmpty(t, out.AccessorID)
require.NotEmpty(t, out.SecretID)
require.Equal(t, "test", out.Description)
require.NotZero(t, out.CreateTime)
require.Len(t, out.NodeIdentities, 1)
require.Equal(t, "foo", out.NodeIdentities[0].NodeName)
})
}

View File

@ -113,14 +113,6 @@ type AutoConfigBackend interface {
SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error)
} }
type autoConfigBackend struct {
*Server
}
func (b autoConfigBackend) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) {
return b.Server.caManager.SignCertificate(csr, id)
}
// AutoConfig endpoint is used for cluster auto configuration operations // AutoConfig endpoint is used for cluster auto configuration operations
type AutoConfig struct { type AutoConfig struct {
// currently AutoConfig does not support pushing down any configuration that would be reloadable on the servers // currently AutoConfig does not support pushing down any configuration that would be reloadable on the servers

View File

@ -195,7 +195,7 @@ func TestAutoConfigInitialConfiguration(t *testing.T) {
waitForLeaderEstablishment(t, s) waitForLeaderEstablishment(t, s)
roots, err := s.GetCARoots() roots, err := s.getCARoots(nil, s.fsm.State())
require.NoError(t, err) require.NoError(t, err)
pbroots, err := translateCARootsToProtobuf(roots) pbroots, err := translateCARootsToProtobuf(roots)

View File

@ -1025,7 +1025,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) (reterr error)
// getIntermediateCAPrimary regenerates the intermediate cert in the primary datacenter. // getIntermediateCAPrimary regenerates the intermediate cert in the primary datacenter.
// This is only run for CAs that require an intermediary in the primary DC, such as Vault. // This is only run for CAs that require an intermediary in the primary DC, such as Vault.
// It should only be called while the state lock is held by setting the state to non-ready. // It should only be called while the state lock is held by setting the state to non-ready.
func (c *CAManager) getIntermediateCAPrimary(provider ca.Provider, newActiveRoot *structs.CARoot) error { func (c *CAManager) getIntermediateCAPrimary(provider ca.Provider, newActiveRoot *structs.CARoot) error {
// Generate and sign an intermediate cert using the root CA. // Generate and sign an intermediate cert using the root CA.
intermediatePEM, err := provider.GenerateIntermediate() intermediatePEM, err := provider.GenerateIntermediate()

View File

@ -1455,74 +1455,6 @@ func (s *Server) isReadyForConsistentReads() bool {
return atomic.LoadInt32(&s.readyForConsistentReads) == 1 return atomic.LoadInt32(&s.readyForConsistentReads) == 1
} }
// CreateACLToken will create an ACL token from the given template
// TODO: move to autoConfigBackend
func (s *Server) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) {
// we have to require local tokens or else it would require having these servers use a token with acl:write to make a
// token create RPC to the servers in the primary DC.
if !s.LocalTokensEnabled() {
return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", s.config.Datacenter)
}
newToken := *template
// generate the accessor id
if newToken.AccessorID == "" {
accessor, err := lib.GenerateUUID(s.checkTokenUUID)
if err != nil {
return nil, err
}
newToken.AccessorID = accessor
}
// generate the secret id
if newToken.SecretID == "" {
secret, err := lib.GenerateUUID(s.checkTokenUUID)
if err != nil {
return nil, err
}
newToken.SecretID = secret
}
newToken.CreateTime = time.Now()
req := structs.ACLTokenBatchSetRequest{
Tokens: structs.ACLTokens{&newToken},
CAS: false,
}
// perform the request to mint the new token
if _, err := s.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil {
return nil, err
}
// return the full token definition from the FSM
_, token, err := s.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta)
return token, err
}
// DatacenterJoinAddresses will return all the strings suitable for usage in
// retry join operations to connect to the the LAN or LAN segment gossip pool.
// TODO: move to autoConfigBackend
func (s *Server) DatacenterJoinAddresses(segment string) ([]string, error) {
members, err := s.LANSegmentMembers(segment)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err)
}
var joinAddrs []string
for _, m := range members {
if ok, _ := metadata.IsConsulServer(m); ok {
serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)}
joinAddrs = append(joinAddrs, serfAddr.String())
}
}
return joinAddrs, nil
}
// peersInfoContent is used to help operators understand what happened to the // peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same // peers.json file. This is written to a file called peers.info in the same
// location. // location.

View File

@ -11,12 +11,6 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
// GetCARoots will retrieve CARoots
// TODO: move to autoConfigBackend
func (s *Server) GetCARoots() (*structs.IndexedCARoots, error) {
return s.getCARoots(nil, s.fsm.State())
}
func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.IndexedCARoots, error) { func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.IndexedCARoots, error) {
index, roots, config, err := state.CARootsAndConfig(ws) index, roots, config, err := state.CARootsAndConfig(ws)
if err != nil { if err != nil {

View File

@ -1654,105 +1654,3 @@ func TestServer_CALogging(t *testing.T) {
require.Contains(t, buf.String(), "consul CA provider configured") require.Contains(t, buf.String(), "consul CA provider configured")
} }
func TestServer_DatacenterJoinAddresses(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
conf := testClusterConfig{
Datacenter: "primary",
Servers: 3,
}
nodes := newTestCluster(t, &conf)
var expected []string
for _, srv := range nodes.Servers {
expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort))
}
actual, err := nodes.Servers[0].DatacenterJoinAddresses("")
require.NoError(t, err)
require.ElementsMatch(t, expected, actual)
}
func TestServer_CreateACLToken(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, srv, codec := testACLServerWithConfig(t, nil, false)
waitForLeaderEstablishment(t, srv)
r1, err := upsertTestRole(codec, TestDefaultMasterToken, "dc1")
require.NoError(t, err)
t.Run("predefined-ids", func(t *testing.T) {
accessor := "554cd3ab-5d4e-4d6e-952e-4e8b6c77bfb3"
secret := "ef453f31-ad58-4ec8-8bf8-342e99763026"
in := &structs.ACLToken{
AccessorID: accessor,
SecretID: secret,
Description: "test",
Policies: []structs.ACLTokenPolicyLink{
{
ID: structs.ACLPolicyGlobalManagementID,
},
},
NodeIdentities: []*structs.ACLNodeIdentity{
{
NodeName: "foo",
Datacenter: "bar",
},
},
ServiceIdentities: []*structs.ACLServiceIdentity{
{
ServiceName: "web",
},
},
Roles: []structs.ACLTokenRoleLink{
{
ID: r1.ID,
},
},
}
out, err := srv.CreateACLToken(in)
require.NoError(t, err)
require.Equal(t, accessor, out.AccessorID)
require.Equal(t, secret, out.SecretID)
require.Equal(t, "test", out.Description)
require.NotZero(t, out.CreateTime)
require.Len(t, out.Policies, 1)
require.Len(t, out.Roles, 1)
require.Len(t, out.NodeIdentities, 1)
require.Len(t, out.ServiceIdentities, 1)
require.Equal(t, structs.ACLPolicyGlobalManagementID, out.Policies[0].ID)
require.Equal(t, "foo", out.NodeIdentities[0].NodeName)
require.Equal(t, "web", out.ServiceIdentities[0].ServiceName)
require.Equal(t, r1.ID, out.Roles[0].ID)
})
t.Run("autogen-ids", func(t *testing.T) {
in := &structs.ACLToken{
Description: "test",
NodeIdentities: []*structs.ACLNodeIdentity{
{
NodeName: "foo",
Datacenter: "bar",
},
},
}
out, err := srv.CreateACLToken(in)
require.NoError(t, err)
require.NotEmpty(t, out.AccessorID)
require.NotEmpty(t, out.SecretID)
require.Equal(t, "test", out.Description)
require.NotZero(t, out.CreateTime)
require.Len(t, out.NodeIdentities, 1)
require.Equal(t, "foo", out.NodeIdentities[0].NodeName)
})
}