Port some changes back to OSS (#8359)

This commit is contained in:
Brian Kassouf 2020-02-14 16:39:13 -08:00 committed by GitHub
parent 9fb430e8bf
commit 3bbd9dc34c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 181 additions and 33 deletions

View File

@ -1290,6 +1290,9 @@ CLUSTER_SYNTHESIS_COMPLETE:
return 1
}
// Apply any enterprise configuration onto the coreConfig.
adjustCoreConfigForEnt(config, coreConfig)
// Initialize the core
core, newCoreError := vault.NewCore(coreConfig)
if newCoreError != nil {

View File

@ -25,6 +25,8 @@ const (
// Config is the configuration for the vault server.
type Config struct {
entConfig
Listeners []*Listener `hcl:"-"`
Storage *Storage `hcl:"-"`
HAStorage *Storage `hcl:"-"`
@ -641,6 +643,11 @@ func ParseConfig(d string) (*Config, error) {
}
}
entConfig := &(result.entConfig)
if err := entConfig.parseConfig(list); err != nil {
return nil, errwrap.Wrapf("error parsing enterprise config: {{err}}", err)
}
return &result, nil
}

View File

@ -1,13 +1,18 @@
// +build !enterprise
package server
import (
"github.com/hashicorp/hcl/hcl/ast"
)
var (
parseEntropy = parseEntropyOSS
)
type entConfig struct {
}
func parseEntropyOSS(result *Config, list *ast.ObjectList, blockName string) error {
func (ec *entConfig) parseConfig(list *ast.ObjectList) error {
return nil
}
func parseEntropy(result *Config, list *ast.ObjectList, blockName string) error {
return nil
}

View File

@ -18,8 +18,12 @@ import (
var (
onEnterprise = false
createSecureRandomReaderFunc = createSecureRandomReader
adjustCoreConfigForEnt = adjustCoreConfigForEntNoop
)
func adjustCoreConfigForEntNoop(config *server.Config, coreConfig *vault.CoreConfig) {
}
func createSecureRandomReader(config *server.Config, seal *vault.Seal) (io.Reader, error) {
return rand.Reader, nil
}

View File

@ -139,6 +139,14 @@ func EnsureCoresUnsealed(t testing.T, c *vault.TestCluster) {
}
}
func EnsureCoreUnsealed(t testing.T, c *vault.TestCluster, core *vault.TestClusterCore) {
t.Helper()
err := AttemptUnsealCore(c, core)
if err != nil {
t.Fatalf("failed to unseal core: %v", err)
}
}
func AttemptUnsealCores(c *vault.TestCluster) error {
for i, core := range c.Cores {
err := AttemptUnsealCore(c, core)
@ -307,6 +315,19 @@ func WaitForActiveNode(t testing.T, cluster *vault.TestCluster) *vault.TestClust
return nil
}
func WaitForStandbyNode(t testing.T, core *vault.TestClusterCore) {
t.Helper()
for i := 0; i < 30; i++ {
if isLeader, _, clusterAddr, _ := core.Core.Leader(); isLeader != true && clusterAddr != "" {
return
}
time.Sleep(time.Second)
}
t.Fatalf("node did not become standby")
}
func RekeyCluster(t testing.T, cluster *vault.TestCluster, recovery bool) [][]byte {
t.Helper()
cluster.Logger.Info("rekeying cluster", "recovery", recovery)

View File

@ -20,8 +20,7 @@ const (
// performance replication.
PerformanceReplicationALPN = "replication_v1"
// DRReplicationALPN is the negotiated protocol used for
// dr replication.
// DRReplicationALPN is the negotiated protocol used for dr replication.
DRReplicationALPN = "replication_dr_v1"
PerfStandbyALPN = "perf_standby_v1"
@ -29,4 +28,8 @@ const (
RequestForwardingALPN = "req_fw_sb-act_v1"
RaftStorageALPN = "raft_storage_v1"
// ReplicationResolverALPN is the negotiated protocol used for
// resolving replicaiton addresses
ReplicationResolverALPN = "replication_resolver_v1"
)

View File

@ -144,6 +144,10 @@ func (r ReplicationState) GetPerformanceString() string {
}
}
func (r ReplicationState) IsPrimaryState() bool {
return r.HasState(ReplicationPerformancePrimary | ReplicationDRPrimary)
}
func (r ReplicationState) HasState(flag ReplicationState) bool { return r&flag != 0 }
func (r *ReplicationState) AddState(flag ReplicationState) { *r |= flag }
func (r *ReplicationState) ClearState(flag ReplicationState) { *r &= ^flag }

View File

@ -3,6 +3,7 @@ package cluster
import (
"crypto/tls"
"errors"
"fmt"
"net"
"sync"
"time"
@ -27,6 +28,8 @@ type InmemLayer struct {
stopped *atomic.Bool
stopCh chan struct{}
connectionCh chan *ConnectionInfo
}
// NewInmemLayer returns a new in-memory layer configured to listen on the
@ -43,6 +46,12 @@ func NewInmemLayer(addr string, logger log.Logger) *InmemLayer {
}
}
func (l *InmemLayer) SetConnectionCh(ch chan *ConnectionInfo) {
l.l.Lock()
l.connectionCh = ch
l.l.Unlock()
}
// Addrs implements NetworkLayer.
func (l *InmemLayer) Addrs() []net.Addr {
l.l.Lock()
@ -80,11 +89,37 @@ func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Con
l.l.Lock()
defer l.l.Unlock()
if addr == l.addr {
panic(fmt.Sprintf("%q attempted to dial itself", l.addr))
}
peer, ok := l.peers[addr]
if !ok {
return nil, errors.New("inmemlayer: no address found")
}
alpn := ""
if tlsConfig != nil {
alpn = tlsConfig.NextProtos[0]
}
if l.logger.IsDebug() {
l.logger.Debug("dailing connection", "node", l.addr, "remote", addr, "alpn", alpn)
}
if l.connectionCh != nil {
select {
case l.connectionCh <- &ConnectionInfo{
Node: l.addr,
Remote: addr,
IsServer: false,
ALPN: alpn,
}:
case <-time.After(2 * time.Second):
l.logger.Warn("failed to send connection info")
}
}
conn, err := peer.clientConn(l.addr)
if err != nil {
return nil, err
@ -116,6 +151,21 @@ func (l *InmemLayer) clientConn(addr string) (net.Conn, error) {
l.servConns[addr] = append(l.servConns[addr], servConn)
if l.logger.IsDebug() {
l.logger.Debug("received connection", "node", l.addr, "remote", addr)
}
if l.connectionCh != nil {
select {
case l.connectionCh <- &ConnectionInfo{
Node: l.addr,
Remote: addr,
IsServer: true,
}:
case <-time.After(2 * time.Second):
l.logger.Warn("failed to send connection info")
}
}
select {
case l.listener.pendingConns <- servConn:
case <-time.After(2 * time.Second):
@ -127,10 +177,10 @@ func (l *InmemLayer) clientConn(addr string) (net.Conn, error) {
// Connect is used to connect this transport to another transport for
// a given peer name. This allows for local routing.
func (l *InmemLayer) Connect(peer string, remote *InmemLayer) {
func (l *InmemLayer) Connect(remote *InmemLayer) {
l.l.Lock()
defer l.l.Unlock()
l.peers[peer] = remote
l.peers[remote.addr] = remote
}
// Disconnect is used to remove the ability to route to a given peer.
@ -265,24 +315,18 @@ type InmemLayerCluster struct {
// NewInmemLayerCluster returns a new in-memory layer set that builds n nodes
// and connects them all together.
func NewInmemLayerCluster(nodes int, logger log.Logger) (*InmemLayerCluster, error) {
clusterID, err := base62.Random(4)
if err != nil {
return nil, err
}
clusterName := "cluster_" + clusterID
var layers []*InmemLayer
for i := 0; i < nodes; i++ {
nodeID, err := base62.Random(4)
func NewInmemLayerCluster(clusterName string, nodes int, logger log.Logger) (*InmemLayerCluster, error) {
if clusterName == "" {
clusterID, err := base62.Random(4)
if err != nil {
return nil, err
}
clusterName = "cluster_" + clusterID
}
nodeName := clusterName + "_node_" + nodeID
layers = append(layers, NewInmemLayer(nodeName, logger))
layers := make([]*InmemLayer, nodes)
for i := 0; i < nodes; i++ {
layers[i] = NewInmemLayer(fmt.Sprintf("%s_node_%d", clusterName, i), logger)
}
// Connect all the peers together
@ -293,8 +337,8 @@ func NewInmemLayerCluster(nodes int, logger log.Logger) (*InmemLayerCluster, err
continue
}
node.Connect(peer.addr, peer)
peer.Connect(node.addr, node)
node.Connect(peer)
peer.Connect(node)
}
}
@ -306,8 +350,8 @@ func NewInmemLayerCluster(nodes int, logger log.Logger) (*InmemLayerCluster, err
func (ic *InmemLayerCluster) ConnectCluster(remote *InmemLayerCluster) {
for _, node := range ic.layers {
for _, peer := range remote.layers {
node.Connect(peer.addr, peer)
peer.Connect(node.addr, node)
node.Connect(peer)
peer.Connect(node)
}
}
}
@ -321,3 +365,16 @@ func (ic *InmemLayerCluster) Layers() []NetworkLayer {
return ret
}
func (ic *InmemLayerCluster) SetConnectionCh(ch chan *ConnectionInfo) {
for _, node := range ic.layers {
node.SetConnectionCh(ch)
}
}
type ConnectionInfo struct {
Node string
Remote string
IsServer bool
ALPN string
}

View File

@ -5,11 +5,16 @@ import (
"testing"
"time"
log "github.com/hashicorp/go-hclog"
"go.uber.org/atomic"
)
func TestInmemCluster_Connect(t *testing.T) {
cluster, err := NewInmemLayerCluster(3, nil)
cluster, err := NewInmemLayerCluster("c1", 3, log.New(&log.LoggerOptions{
Mutex: &sync.Mutex{},
Level: log.Trace,
Name: "inmem-cluster",
}))
if err != nil {
t.Fatal(err)
}
@ -70,7 +75,11 @@ func TestInmemCluster_Connect(t *testing.T) {
}
func TestInmemCluster_Disconnect(t *testing.T) {
cluster, err := NewInmemLayerCluster(3, nil)
cluster, err := NewInmemLayerCluster("c1", 3, log.New(&log.LoggerOptions{
Mutex: &sync.Mutex{},
Level: log.Trace,
Name: "inmem-cluster",
}))
if err != nil {
t.Fatal(err)
}
@ -133,7 +142,11 @@ func TestInmemCluster_Disconnect(t *testing.T) {
}
func TestInmemCluster_DisconnectAll(t *testing.T) {
cluster, err := NewInmemLayerCluster(3, nil)
cluster, err := NewInmemLayerCluster("c1", 3, log.New(&log.LoggerOptions{
Mutex: &sync.Mutex{},
Level: log.Trace,
Name: "inmem-cluster",
}))
if err != nil {
t.Fatal(err)
}
@ -162,11 +175,19 @@ func TestInmemCluster_DisconnectAll(t *testing.T) {
}
func TestInmemCluster_ConnectCluster(t *testing.T) {
cluster, err := NewInmemLayerCluster(3, nil)
cluster, err := NewInmemLayerCluster("c1", 3, log.New(&log.LoggerOptions{
Mutex: &sync.Mutex{},
Level: log.Trace,
Name: "inmem-cluster",
}))
if err != nil {
t.Fatal(err)
}
cluster2, err := NewInmemLayerCluster(3, nil)
cluster2, err := NewInmemLayerCluster("c2", 3, log.New(&log.LoggerOptions{
Mutex: &sync.Mutex{},
Level: log.Trace,
Name: "inmem-cluster",
}))
if err != nil {
t.Fatal(err)
}

View File

@ -5,6 +5,7 @@ import (
"context"
"crypto/tls"
"net/http"
"sync"
"testing"
"time"
@ -178,7 +179,11 @@ func TestCluster_ForwardRequests(t *testing.T) {
t.Run("inmemLayer", func(t *testing.T) {
// Run again with in-memory network
inmemCluster, err := cluster.NewInmemLayerCluster(3, nil)
inmemCluster, err := cluster.NewInmemLayerCluster("inmem-cluster", 3, log.New(&log.LoggerOptions{
Mutex: &sync.Mutex{},
Level: log.Trace,
Name: "inmem-cluster",
}))
if err != nil {
t.Fatal(err)
}

View File

@ -518,6 +518,8 @@ type Core struct {
// CoreConfig is used to parameterize a core
type CoreConfig struct {
entCoreConfig
DevToken string
BuiltinRegistry BuiltinRegistry
@ -635,6 +637,7 @@ func (c *CoreConfig) Clone() *CoreConfig {
AllLoggers: c.AllLoggers,
CounterSyncInterval: c.CounterSyncInterval,
ClusterNetworkLayer: c.ClusterNetworkLayer,
entCoreConfig: c.entCoreConfig.Clone(),
}
}
@ -1416,6 +1419,10 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro
return false, err
}
if err := c.setupReplicationResolverHandler(); err != nil {
c.logger.Warn("failed to start replication resolver server", "error", err)
}
// Do post-unseal setup if HA is not enabled
if c.ha == nil {
// We still need to set up cluster info even if it's not part of a
@ -1745,6 +1752,8 @@ func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, shutdownRaft b
c.logger.Debug("runStandby done")
}
c.teardownReplicationResolverHandler()
// If the storage backend needs to be sealed
if shutdownRaft {
if raftStorage, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {

View File

@ -13,6 +13,11 @@ import (
)
type entCore struct{}
type entCoreConfig struct{}
func (e entCoreConfig) Clone() entCoreConfig {
return entCoreConfig{}
}
type LicensingConfig struct {
AdditionalPublicKeys []interface{}
@ -40,8 +45,12 @@ func coreInit(c *Core, conf *CoreConfig) error {
}
return nil
}
func (c *Core) setupReplicationResolverHandler() error {
return nil
}
func createSecondaries(*Core, *CoreConfig) {}
func (c *Core) teardownReplicationResolverHandler() {}
func createSecondaries(*Core, *CoreConfig) {}
func addExtraLogicalBackends(*Core, map[string]logical.Factory) {}