Introduce optional service_registration stanza (#7887)

* move ServiceDiscovery into methods

* add ServiceDiscoveryFactory

* add serviceDiscovery field to vault.Core

* refactor ConsulServiceDiscovery into separate struct

* cleanup

* revert accidental change to go.mod

* cleanup

* get rid of un-needed struct tags in vault.CoreConfig

* add service_discovery parser

* add ServiceDiscovery to config

* cleanup

* cleanup

* add test for ConfigServiceDiscovery to Core

* unit testing for config service_discovery stanza

* cleanup

* get rid of un-needed redirect_addr stuff in service_discovery stanza

* improve test suite

* cleanup

* clean up test a bit

* create docs for service_discovery

* check if service_discovery is configured, but storage does not support HA

* tinker with test

* tinker with test

* tweak docs

* move ServiceDiscovery into its own package

* tweak a variable name

* fix comment

* rename service_discovery to service_registration

* tweak service_registration config

* Revert "tweak service_registration config"

This reverts commit 5509920a8ab4c5a216468f262fc07c98121dce35.

* simplify naming

* refactor into ./serviceregistration/consul
This commit is contained in:
Mike Jarmy 2019-12-06 09:46:39 -05:00 committed by GitHub
parent 854d00c609
commit e42bc0ffc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1991 additions and 968 deletions

View File

@ -62,6 +62,9 @@ import (
physZooKeeper "github.com/hashicorp/vault/physical/zookeeper" physZooKeeper "github.com/hashicorp/vault/physical/zookeeper"
physFile "github.com/hashicorp/vault/sdk/physical/file" physFile "github.com/hashicorp/vault/sdk/physical/file"
physInmem "github.com/hashicorp/vault/sdk/physical/inmem" physInmem "github.com/hashicorp/vault/sdk/physical/inmem"
sr "github.com/hashicorp/vault/serviceregistration"
csr "github.com/hashicorp/vault/serviceregistration/consul"
) )
const ( const (
@ -155,6 +158,10 @@ var (
"raft": physRaft.NewRaftBackend, "raft": physRaft.NewRaftBackend,
"zookeeper": physZooKeeper.NewZooKeeperBackend, "zookeeper": physZooKeeper.NewZooKeeperBackend,
} }
serviceRegistrations = map[string]sr.Factory{
"consul": csr.NewConsulServiceRegistration,
}
) )
// Commands is the mapping of all the available commands. // Commands is the mapping of all the available commands.
@ -517,9 +524,12 @@ func initCommands(ui, serverCmdUi cli.Ui, runOpts *RunOptions) {
CredentialBackends: credentialBackends, CredentialBackends: credentialBackends,
LogicalBackends: logicalBackends, LogicalBackends: logicalBackends,
PhysicalBackends: physicalBackends, PhysicalBackends: physicalBackends,
ShutdownCh: MakeShutdownCh(),
SighupCh: MakeSighupCh(), ServiceRegistrations: serviceRegistrations,
SigUSR2Ch: MakeSigUSR2Ch(),
ShutdownCh: MakeShutdownCh(),
SighupCh: MakeSighupCh(),
SigUSR2Ch: MakeSigUSR2Ch(),
}, nil }, nil
}, },
"ssh": func() (cli.Command, error) { "ssh": func() (cli.Command, error) {

View File

@ -47,6 +47,7 @@ import (
"github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/sdk/version" "github.com/hashicorp/vault/sdk/version"
sr "github.com/hashicorp/vault/serviceregistration"
"github.com/hashicorp/vault/vault" "github.com/hashicorp/vault/vault"
vaultseal "github.com/hashicorp/vault/vault/seal" vaultseal "github.com/hashicorp/vault/vault/seal"
shamirseal "github.com/hashicorp/vault/vault/seal/shamir" shamirseal "github.com/hashicorp/vault/vault/seal/shamir"
@ -79,6 +80,8 @@ type ServerCommand struct {
LogicalBackends map[string]logical.Factory LogicalBackends map[string]logical.Factory
PhysicalBackends map[string]physical.Factory PhysicalBackends map[string]physical.Factory
ServiceRegistrations map[string]sr.Factory
ShutdownCh chan struct{} ShutdownCh chan struct{}
SighupCh chan struct{} SighupCh chan struct{}
SigUSR2Ch chan struct{} SigUSR2Ch chan struct{}
@ -935,6 +938,24 @@ func (c *ServerCommand) Run(args []string) int {
return 1 return 1
} }
// Initialize the Service Discovery, if there is one
var configSR sr.ServiceRegistration
if config.ServiceRegistration != nil {
sdFactory, ok := c.ServiceRegistrations[config.ServiceRegistration.Type]
if !ok {
c.UI.Error(fmt.Sprintf("Unknown service_registration type %s", config.ServiceRegistration.Type))
return 1
}
namedSDLogger := c.logger.Named("service_registration." + config.ServiceRegistration.Type)
allLoggers = append(allLoggers, namedSDLogger)
configSR, err = sdFactory(config.ServiceRegistration.Config, namedSDLogger)
if err != nil {
c.UI.Error(fmt.Sprintf("Error initializing service_registration of type %s: %s", config.ServiceRegistration.Type, err))
return 1
}
}
infoKeys := make([]string, 0, 10) infoKeys := make([]string, 0, 10)
info := make(map[string]string) info := make(map[string]string)
info["log level"] = logLevelString info["log level"] = logLevelString
@ -1019,6 +1040,7 @@ func (c *ServerCommand) Run(args []string) int {
RedirectAddr: config.Storage.RedirectAddr, RedirectAddr: config.Storage.RedirectAddr,
StorageType: config.Storage.Type, StorageType: config.Storage.Type,
HAPhysical: nil, HAPhysical: nil,
ServiceRegistration: configSR,
Seal: barrierSeal, Seal: barrierSeal,
AuditBackends: c.AuditBackends, AuditBackends: c.AuditBackends,
CredentialBackends: c.CredentialBackends, CredentialBackends: c.CredentialBackends,
@ -1218,6 +1240,13 @@ CLUSTER_SYNTHESIS_COMPLETE:
} }
} }
// If ServiceRegistration is configured, then the backend must support HA
isBackendHA := coreConfig.HAPhysical != nil && coreConfig.HAPhysical.HAEnabled()
if (coreConfig.ServiceRegistration != nil) && !isBackendHA {
c.UI.Output("service_registration is configured, but storage does not support HA")
return 1
}
// Initialize the core // Initialize the core
core, newCoreError := vault.NewCore(coreConfig) core, newCoreError := vault.NewCore(coreConfig)
if newCoreError != nil { if newCoreError != nil {
@ -1473,21 +1502,17 @@ CLUSTER_SYNTHESIS_COMPLETE:
// Instantiate the wait group // Instantiate the wait group
c.WaitGroup = &sync.WaitGroup{} c.WaitGroup = &sync.WaitGroup{}
// If the backend supports service discovery, run service discovery // If service discovery is available, run service discovery
if coreConfig.HAPhysical != nil && coreConfig.HAPhysical.HAEnabled() { if disc := coreConfig.GetServiceRegistration(); disc != nil {
sd, ok := coreConfig.HAPhysical.(physical.ServiceDiscovery) activeFunc := func() bool {
if ok { if isLeader, _, _, err := core.Leader(); err == nil {
activeFunc := func() bool { return isLeader
if isLeader, _, _, err := core.Leader(); err == nil {
return isLeader
}
return false
}
if err := sd.RunServiceDiscovery(c.WaitGroup, c.ShutdownCh, coreConfig.RedirectAddr, activeFunc, core.Sealed, core.PerfStandby); err != nil {
c.UI.Error(fmt.Sprintf("Error initializing service discovery: %v", err))
return 1
} }
return false
}
if err := disc.RunServiceRegistration(c.WaitGroup, c.ShutdownCh, coreConfig.RedirectAddr, activeFunc, core.Sealed, core.PerfStandby); err != nil {
c.UI.Error(fmt.Sprintf("Error initializing service discovery: %v", err))
return 1
} }
} }

View File

@ -28,6 +28,8 @@ type Config struct {
Storage *Storage `hcl:"-"` Storage *Storage `hcl:"-"`
HAStorage *Storage `hcl:"-"` HAStorage *Storage `hcl:"-"`
ServiceRegistration *ServiceRegistration `hcl:"-"`
Seals []*Seal `hcl:"-"` Seals []*Seal `hcl:"-"`
Entropy *Entropy `hcl:"-"` Entropy *Entropy `hcl:"-"`
@ -150,6 +152,16 @@ func (b *Storage) GoString() string {
return fmt.Sprintf("*%#v", *b) return fmt.Sprintf("*%#v", *b)
} }
// ServiceRegistration is the optional service discovery for the server.
type ServiceRegistration struct {
Type string
Config map[string]string
}
func (b *ServiceRegistration) GoString() string {
return fmt.Sprintf("*%#v", *b)
}
// Seal contains Seal configuration for the server // Seal contains Seal configuration for the server
type Seal struct { type Seal struct {
Type string Type string
@ -292,6 +304,11 @@ func (c *Config) Merge(c2 *Config) *Config {
result.HAStorage = c2.HAStorage result.HAStorage = c2.HAStorage
} }
result.ServiceRegistration = c.ServiceRegistration
if c2.ServiceRegistration != nil {
result.ServiceRegistration = c2.ServiceRegistration
}
result.Entropy = c.Entropy result.Entropy = c.Entropy
if c2.Entropy != nil { if c2.Entropy != nil {
result.Entropy = c2.Entropy result.Entropy = c2.Entropy
@ -585,6 +602,13 @@ func ParseConfig(d string) (*Config, error) {
} }
} }
// Parse service discovery
if o := list.Filter("service_registration"); len(o.Items) > 0 {
if err := parseServiceRegistration(&result, o, "service_registration"); err != nil {
return nil, errwrap.Wrapf("error parsing 'service_registration': {{err}}", err)
}
}
if o := list.Filter("hsm"); len(o.Items) > 0 { if o := list.Filter("hsm"); len(o.Items) > 0 {
if err := parseSeals(&result, o, "hsm"); err != nil { if err := parseSeals(&result, o, "hsm"); err != nil {
return nil, errwrap.Wrapf("error parsing 'hsm': {{err}}", err) return nil, errwrap.Wrapf("error parsing 'hsm': {{err}}", err)
@ -829,6 +853,30 @@ func parseHAStorage(result *Config, list *ast.ObjectList, name string) error {
return nil return nil
} }
func parseServiceRegistration(result *Config, list *ast.ObjectList, name string) error {
if len(list.Items) > 1 {
return fmt.Errorf("only one %q block is permitted", name)
}
// Get our item
item := list.Items[0]
key := name
if len(item.Keys) > 0 {
key = item.Keys[0].Token.Value().(string)
}
var m map[string]string
if err := hcl.DecodeObject(&m, item.Val); err != nil {
return multierror.Prefix(err, fmt.Sprintf("%s.%s:", name, key))
}
result.ServiceRegistration = &ServiceRegistration{
Type: strings.ToLower(key),
Config: m,
}
return nil
}
func parseSeals(result *Config, list *ast.ObjectList, blockName string) error { func parseSeals(result *Config, list *ast.ObjectList, blockName string) error {
if len(list.Items) > 2 { if len(list.Items) > 2 {
return fmt.Errorf("only two or less %q blocks are permitted", blockName) return fmt.Errorf("only two or less %q blocks are permitted", blockName)
@ -1009,6 +1057,14 @@ func (c *Config) Sanitized() map[string]interface{} {
result["ha_storage"] = sanitizedHAStorage result["ha_storage"] = sanitizedHAStorage
} }
// Sanitize service_registration stanza
if c.ServiceRegistration != nil {
sanitizedServiceRegistration := map[string]interface{}{
"type": c.ServiceRegistration.Type,
}
result["service_registration"] = sanitizedServiceRegistration
}
// Sanitize seals stanza // Sanitize seals stanza
if len(c.Seals) != 0 { if len(c.Seals) != 0 {
var sanitizedSeals []interface{} var sanitizedSeals []interface{}

View File

@ -47,6 +47,13 @@ func testLoadConfigFile_topLevel(t *testing.T, entropy *Entropy) {
DisableClustering: true, DisableClustering: true,
}, },
ServiceRegistration: &ServiceRegistration{
Type: "consul",
Config: map[string]string{
"foo": "bar",
},
},
Telemetry: &Telemetry{ Telemetry: &Telemetry{
StatsdAddr: "bar", StatsdAddr: "bar",
StatsiteAddr: "foo", StatsiteAddr: "foo",
@ -126,6 +133,13 @@ func testLoadConfigFile_json2(t *testing.T, entropy *Entropy) {
DisableClustering: true, DisableClustering: true,
}, },
ServiceRegistration: &ServiceRegistration{
Type: "consul",
Config: map[string]string{
"foo": "bar",
},
},
CacheSize: 45678, CacheSize: 45678,
EnableUI: true, EnableUI: true,
@ -261,6 +275,13 @@ func testLoadConfigFile(t *testing.T) {
DisableClustering: true, DisableClustering: true,
}, },
ServiceRegistration: &ServiceRegistration{
Type: "consul",
Config: map[string]string{
"foo": "bar",
},
},
Telemetry: &Telemetry{ Telemetry: &Telemetry{
StatsdAddr: "bar", StatsdAddr: "bar",
StatsiteAddr: "foo", StatsiteAddr: "foo",
@ -324,6 +345,13 @@ func testLoadConfigFile_json(t *testing.T) {
DisableClustering: true, DisableClustering: true,
}, },
ServiceRegistration: &ServiceRegistration{
Type: "consul",
Config: map[string]string{
"foo": "bar",
},
},
ClusterCipherSuites: "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA", ClusterCipherSuites: "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
Telemetry: &Telemetry{ Telemetry: &Telemetry{
@ -476,6 +504,9 @@ func testConfig_Sanitized(t *testing.T) {
"redirect_addr": "top_level_api_addr", "redirect_addr": "top_level_api_addr",
"type": "consul", "type": "consul",
}, },
"service_registration": map[string]interface{}{
"type": "consul",
},
"telemetry": map[string]interface{}{ "telemetry": map[string]interface{}{
"circonus_api_app": "", "circonus_api_app": "",
"circonus_api_token": "", "circonus_api_token": "",

View File

@ -18,6 +18,10 @@ ha_backend "consul" {
disable_clustering = "true" disable_clustering = "true"
} }
service_registration "consul" {
foo = "bar"
}
telemetry { telemetry {
statsd_address = "bar" statsd_address = "bar"
statsite_address = "foo" statsite_address = "foo"

View File

@ -11,6 +11,11 @@
"disable_clustering": "true" "disable_clustering": "true"
} }
}, },
"service_registration": {
"consul": {
"foo": "bar",
}
},
"telemetry": { "telemetry": {
"statsite_address": "baz" "statsite_address": "baz"
}, },

View File

@ -21,6 +21,10 @@ ha_storage "consul" {
disable_clustering = "true" disable_clustering = "true"
} }
service_registration "consul" {
foo = "bar"
}
telemetry { telemetry {
statsd_address = "bar" statsd_address = "bar"
statsite_address = "foo" statsite_address = "foo"

View File

@ -25,6 +25,11 @@
"disable_clustering": "true" "disable_clustering": "true"
} }
}, },
"service_registration":{
"consul":{
"foo":"bar"
}
},
"cache_size": 45678, "cache_size": 45678,
"telemetry":{ "telemetry":{
"statsd_address":"bar", "statsd_address":"bar",

View File

@ -22,6 +22,10 @@ ha_backend "consul" {
token = "foo" token = "foo"
} }
service_registration "consul" {
token = "foo"
}
telemetry { telemetry {
statsd_address = "bar" statsd_address = "bar"
circonus_api_token = "baz" circonus_api_token = "baz"
@ -38,4 +42,4 @@ default_lease_ttl = "10h"
cluster_name = "testcluster" cluster_name = "testcluster"
pid_file = "./pidfile" pid_file = "./pidfile"
raw_storage_endpoint = true raw_storage_endpoint = true
disable_sealwrap = true disable_sealwrap = true

View File

@ -4,58 +4,23 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"regexp"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic"
"time" "time"
"golang.org/x/net/http2"
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
"crypto/tls"
"crypto/x509"
metrics "github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/errwrap" "github.com/hashicorp/errwrap"
multierror "github.com/hashicorp/go-multierror" multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/helper/parseutil" "github.com/hashicorp/vault/sdk/helper/parseutil"
"github.com/hashicorp/vault/sdk/helper/strutil"
"github.com/hashicorp/vault/sdk/helper/tlsutil"
"github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/sdk/physical"
sr "github.com/hashicorp/vault/serviceregistration"
csr "github.com/hashicorp/vault/serviceregistration/consul"
) )
const ( const (
// checkJitterFactor specifies the jitter factor used to stagger checks
checkJitterFactor = 16
// checkMinBuffer specifies provides a guarantee that a check will not
// be executed too close to the TTL check timeout
checkMinBuffer = 100 * time.Millisecond
// consulRetryInterval specifies the retry duration to use when an
// API call to the Consul agent fails.
consulRetryInterval = 1 * time.Second
// defaultCheckTimeout changes the timeout of TTL checks
defaultCheckTimeout = 5 * time.Second
// DefaultServiceName is the default Consul service name used when
// advertising a Vault instance.
DefaultServiceName = "vault"
// reconcileTimeout is how often Vault should query Consul to detect
// and fix any state drift.
reconcileTimeout = 60 * time.Second
// consistencyModeDefault is the configuration value used to tell // consistencyModeDefault is the configuration value used to tell
// consul to use default consistency. // consul to use default consistency.
consistencyModeDefault = "default" consistencyModeDefault = "default"
@ -65,41 +30,23 @@ const (
consistencyModeStrong = "strong" consistencyModeStrong = "strong"
) )
type notifyEvent struct{}
// Verify ConsulBackend satisfies the correct interfaces // Verify ConsulBackend satisfies the correct interfaces
var _ physical.Backend = (*ConsulBackend)(nil) var _ physical.Backend = (*ConsulBackend)(nil)
var _ physical.HABackend = (*ConsulBackend)(nil) var _ physical.HABackend = (*ConsulBackend)(nil)
var _ physical.Lock = (*ConsulLock)(nil) var _ physical.Lock = (*ConsulLock)(nil)
var _ physical.Transactional = (*ConsulBackend)(nil) var _ physical.Transactional = (*ConsulBackend)(nil)
var _ physical.ServiceDiscovery = (*ConsulBackend)(nil) var _ sr.ServiceRegistration = (*ConsulBackend)(nil)
var (
hostnameRegex = regexp.MustCompile(`^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$`)
)
// ConsulBackend is a physical backend that stores data at specific // ConsulBackend is a physical backend that stores data at specific
// prefix within Consul. It is used for most production situations as // prefix within Consul. It is used for most production situations as
// it allows Vault to run on multiple machines in a highly-available manner. // it allows Vault to run on multiple machines in a highly-available manner.
type ConsulBackend struct { type ConsulBackend struct {
path string *csr.ConsulServiceRegistration
logger log.Logger
client *api.Client
kv *api.KV
permitPool *physical.PermitPool
serviceLock sync.RWMutex
redirectHost string
redirectPort int64
serviceName string
serviceTags []string
serviceAddress *string
disableRegistration bool
checkTimeout time.Duration
consistencyMode string
notifyActiveCh chan notifyEvent path string
notifySealedCh chan notifyEvent kv *api.KV
notifyPerfStandbyCh chan notifyEvent permitPool *physical.PermitPool
consistencyMode string
sessionTTL string sessionTTL string
lockWaitTime time.Duration lockWaitTime time.Duration
@ -108,6 +55,15 @@ type ConsulBackend struct {
// NewConsulBackend constructs a Consul backend using the given API client // NewConsulBackend constructs a Consul backend using the given API client
// and the prefix in the KV store. // and the prefix in the KV store.
func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) { func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
// Create the ConsulServiceRegistration struct that we will embed in the
// ConsulBackend
sreg, err := csr.NewConsulServiceRegistration(conf, logger)
if err != nil {
return nil, err
}
csreg := sreg.(*csr.ConsulServiceRegistration)
// Get the path in Consul // Get the path in Consul
path, ok := conf["path"] path, ok := conf["path"]
if !ok { if !ok {
@ -127,67 +83,6 @@ func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backe
path = strings.TrimPrefix(path, "/") path = strings.TrimPrefix(path, "/")
} }
// Allow admins to disable consul integration
disableReg, ok := conf["disable_registration"]
var disableRegistration bool
if ok && disableReg != "" {
b, err := parseutil.ParseBool(disableReg)
if err != nil {
return nil, errwrap.Wrapf("failed parsing disable_registration parameter: {{err}}", err)
}
disableRegistration = b
}
if logger.IsDebug() {
logger.Debug("config disable_registration set", "disable_registration", disableRegistration)
}
// Get the service name to advertise in Consul
service, ok := conf["service"]
if !ok {
service = DefaultServiceName
}
if !hostnameRegex.MatchString(service) {
return nil, errors.New("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes")
}
if logger.IsDebug() {
logger.Debug("config service set", "service", service)
}
// Get the additional tags to attach to the registered service name
tags := conf["service_tags"]
if logger.IsDebug() {
logger.Debug("config service_tags set", "service_tags", tags)
}
// Get the service-specific address to override the use of the HA redirect address
var serviceAddr *string
serviceAddrStr, ok := conf["service_address"]
if ok {
serviceAddr = &serviceAddrStr
}
if logger.IsDebug() {
logger.Debug("config service_address set", "service_address", serviceAddr)
}
checkTimeout := defaultCheckTimeout
checkTimeoutStr, ok := conf["check_timeout"]
if ok {
d, err := parseutil.ParseDurationSecond(checkTimeoutStr)
if err != nil {
return nil, err
}
min, _ := DurationMinusBufferDomain(d, checkMinBuffer, checkJitterFactor)
if min < checkMinBuffer {
return nil, fmt.Errorf("consul check_timeout must be greater than %v", min)
}
checkTimeout = d
if logger.IsDebug() {
logger.Debug("config check_timeout set", "check_timeout", d)
}
}
sessionTTL := api.DefaultLockSessionTTL sessionTTL := api.DefaultLockSessionTTL
sessionTTLStr, ok := conf["session_ttl"] sessionTTLStr, ok := conf["session_ttl"]
if ok { if ok {
@ -214,63 +109,6 @@ func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backe
} }
} }
// Configure the client
consulConf := api.DefaultConfig()
// Set MaxIdleConnsPerHost to the number of processes used in expiration.Restore
consulConf.Transport.MaxIdleConnsPerHost = consts.ExpirationRestoreWorkerCount
if addr, ok := conf["address"]; ok {
consulConf.Address = addr
if logger.IsDebug() {
logger.Debug("config address set", "address", addr)
}
// Copied from the Consul API module; set the Scheme based on
// the protocol field if address looks ike a URL.
// This can enable the TLS configuration below.
parts := strings.SplitN(addr, "://", 2)
if len(parts) == 2 {
if parts[0] == "http" || parts[0] == "https" {
consulConf.Scheme = parts[0]
consulConf.Address = parts[1]
if logger.IsDebug() {
logger.Debug("config address parsed", "scheme", parts[0])
logger.Debug("config scheme parsed", "address", parts[1])
}
} // allow "unix:" or whatever else consul supports in the future
}
}
if scheme, ok := conf["scheme"]; ok {
consulConf.Scheme = scheme
if logger.IsDebug() {
logger.Debug("config scheme set", "scheme", scheme)
}
}
if token, ok := conf["token"]; ok {
consulConf.Token = token
logger.Debug("config token set")
}
if consulConf.Scheme == "https" {
// Use the parsed Address instead of the raw conf['address']
tlsClientConfig, err := setupTLSConfig(conf, consulConf.Address)
if err != nil {
return nil, err
}
consulConf.Transport.TLSClientConfig = tlsClientConfig
if err := http2.ConfigureTransport(consulConf.Transport); err != nil {
return nil, err
}
logger.Debug("configured TLS")
}
consulConf.HttpClient = &http.Client{Transport: consulConf.Transport}
client, err := api.NewClient(consulConf)
if err != nil {
return nil, errwrap.Wrapf("client setup failed: {{err}}", err)
}
maxParStr, ok := conf["max_parallel"] maxParStr, ok := conf["max_parallel"]
var maxParInt int var maxParInt int
if ok { if ok {
@ -296,94 +134,19 @@ func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backe
// Setup the backend // Setup the backend
c := &ConsulBackend{ c := &ConsulBackend{
path: path, ConsulServiceRegistration: csreg,
logger: logger,
client: client, path: path,
kv: client.KV(), kv: csreg.Client.KV(),
permitPool: physical.NewPermitPool(maxParInt), permitPool: physical.NewPermitPool(maxParInt),
serviceName: service, consistencyMode: consistencyMode,
serviceTags: strutil.ParseDedupLowercaseAndSortStrings(tags, ","),
serviceAddress: serviceAddr, sessionTTL: sessionTTL,
checkTimeout: checkTimeout, lockWaitTime: lockWaitTime,
disableRegistration: disableRegistration,
consistencyMode: consistencyMode,
notifyActiveCh: make(chan notifyEvent),
notifySealedCh: make(chan notifyEvent),
notifyPerfStandbyCh: make(chan notifyEvent),
sessionTTL: sessionTTL,
lockWaitTime: lockWaitTime,
} }
return c, nil return c, nil
} }
func setupTLSConfig(conf map[string]string, address string) (*tls.Config, error) {
serverName, _, err := net.SplitHostPort(address)
switch {
case err == nil:
case strings.Contains(err.Error(), "missing port"):
serverName = conf["address"]
default:
return nil, err
}
insecureSkipVerify := false
tlsSkipVerify, ok := conf["tls_skip_verify"]
if ok && tlsSkipVerify != "" {
b, err := parseutil.ParseBool(tlsSkipVerify)
if err != nil {
return nil, errwrap.Wrapf("failed parsing tls_skip_verify parameter: {{err}}", err)
}
insecureSkipVerify = b
}
tlsMinVersionStr, ok := conf["tls_min_version"]
if !ok {
// Set the default value
tlsMinVersionStr = "tls12"
}
tlsMinVersion, ok := tlsutil.TLSLookup[tlsMinVersionStr]
if !ok {
return nil, fmt.Errorf("invalid 'tls_min_version'")
}
tlsClientConfig := &tls.Config{
MinVersion: tlsMinVersion,
InsecureSkipVerify: insecureSkipVerify,
ServerName: serverName,
}
_, okCert := conf["tls_cert_file"]
_, okKey := conf["tls_key_file"]
if okCert && okKey {
tlsCert, err := tls.LoadX509KeyPair(conf["tls_cert_file"], conf["tls_key_file"])
if err != nil {
return nil, errwrap.Wrapf("client tls setup failed: {{err}}", err)
}
tlsClientConfig.Certificates = []tls.Certificate{tlsCert}
}
if tlsCaFile, ok := conf["tls_ca_file"]; ok {
caPool := x509.NewCertPool()
data, err := ioutil.ReadFile(tlsCaFile)
if err != nil {
return nil, errwrap.Wrapf("failed to read CA file: {{err}}", err)
}
if !caPool.AppendCertsFromPEM(data) {
return nil, fmt.Errorf("failed to parse CA certificate")
}
tlsClientConfig.RootCAs = caPool
}
return tlsClientConfig, nil
}
// Used to run multiple entries via a transaction // Used to run multiple entries via a transaction
func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error { func (c *ConsulBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
if len(txns) == 0 { if len(txns) == 0 {
@ -539,12 +302,12 @@ func (c *ConsulBackend) LockWith(key, value string) (physical.Lock, error) {
SessionTTL: c.sessionTTL, SessionTTL: c.sessionTTL,
LockWaitTime: c.lockWaitTime, LockWaitTime: c.lockWaitTime,
} }
lock, err := c.client.LockOpts(opts) lock, err := c.Client.LockOpts(opts)
if err != nil { if err != nil {
return nil, errwrap.Wrapf("failed to create lock: {{err}}", err) return nil, errwrap.Wrapf("failed to create lock: {{err}}", err)
} }
cl := &ConsulLock{ cl := &ConsulLock{
client: c.client, client: c.Client,
key: c.path + key, key: c.path + key,
lock: lock, lock: lock,
consistencyMode: c.consistencyMode, consistencyMode: c.consistencyMode,
@ -560,7 +323,7 @@ func (c *ConsulBackend) HAEnabled() bool {
// DetectHostAddr is used to detect the host address by asking the Consul agent // DetectHostAddr is used to detect the host address by asking the Consul agent
func (c *ConsulBackend) DetectHostAddr() (string, error) { func (c *ConsulBackend) DetectHostAddr() (string, error) {
agent := c.client.Agent() agent := c.Client.Agent()
self, err := agent.Self() self, err := agent.Self()
if err != nil { if err != nil {
return "", err return "", err
@ -609,322 +372,3 @@ func (c *ConsulLock) Value() (bool, string, error) {
value := string(pair.Value) value := string(pair.Value)
return held, value, nil return held, value, nil
} }
func (c *ConsulBackend) NotifyActiveStateChange() error {
select {
case c.notifyActiveCh <- notifyEvent{}:
default:
// NOTE: If this occurs Vault's active status could be out of
// sync with Consul until reconcileTimer expires.
c.logger.Warn("concurrent state change notify dropped")
}
return nil
}
func (c *ConsulBackend) NotifyPerformanceStandbyStateChange() error {
select {
case c.notifyPerfStandbyCh <- notifyEvent{}:
default:
// NOTE: If this occurs Vault's active status could be out of
// sync with Consul until reconcileTimer expires.
c.logger.Warn("concurrent state change notify dropped")
}
return nil
}
func (c *ConsulBackend) NotifySealedStateChange() error {
select {
case c.notifySealedCh <- notifyEvent{}:
default:
// NOTE: If this occurs Vault's sealed status could be out of
// sync with Consul until checkTimer expires.
c.logger.Warn("concurrent sealed state change notify dropped")
}
return nil
}
func (c *ConsulBackend) checkDuration() time.Duration {
return DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor)
}
func (c *ConsulBackend) RunServiceDiscovery(waitGroup *sync.WaitGroup, shutdownCh physical.ShutdownChannel, redirectAddr string, activeFunc physical.ActiveFunction, sealedFunc physical.SealedFunction, perfStandbyFunc physical.PerformanceStandbyFunction) (err error) {
if err := c.setRedirectAddr(redirectAddr); err != nil {
return err
}
// 'server' command will wait for the below goroutine to complete
waitGroup.Add(1)
go c.runEventDemuxer(waitGroup, shutdownCh, redirectAddr, activeFunc, sealedFunc, perfStandbyFunc)
return nil
}
func (c *ConsulBackend) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh physical.ShutdownChannel, redirectAddr string, activeFunc physical.ActiveFunction, sealedFunc physical.SealedFunction, perfStandbyFunc physical.PerformanceStandbyFunction) {
// This defer statement should be executed last. So push it first.
defer waitGroup.Done()
// Fire the reconcileTimer immediately upon starting the event demuxer
reconcileTimer := time.NewTimer(0)
defer reconcileTimer.Stop()
// Schedule the first check. Consul TTL checks are passing by
// default, checkTimer does not need to be run immediately.
checkTimer := time.NewTimer(c.checkDuration())
defer checkTimer.Stop()
// Use a reactor pattern to handle and dispatch events to singleton
// goroutine handlers for execution. It is not acceptable to drop
// inbound events from Notify*().
//
// goroutines are dispatched if the demuxer can acquire a lock (via
// an atomic CAS incr) on the handler. Handlers are responsible for
// deregistering themselves (atomic CAS decr). Handlers and the
// demuxer share a lock to synchronize information at the beginning
// and end of a handler's life (or after a handler wakes up from
// sleeping during a back-off/retry).
var shutdown bool
var registeredServiceID string
checkLock := new(int32)
serviceRegLock := new(int32)
for !shutdown {
select {
case <-c.notifyActiveCh:
// Run reconcile immediately upon active state change notification
reconcileTimer.Reset(0)
case <-c.notifySealedCh:
// Run check timer immediately upon a seal state change notification
checkTimer.Reset(0)
case <-c.notifyPerfStandbyCh:
// Run check timer immediately upon a seal state change notification
checkTimer.Reset(0)
case <-reconcileTimer.C:
// Unconditionally rearm the reconcileTimer
reconcileTimer.Reset(reconcileTimeout - RandomStagger(reconcileTimeout/checkJitterFactor))
// Abort if service discovery is disabled or a
// reconcile handler is already active
if !c.disableRegistration && atomic.CompareAndSwapInt32(serviceRegLock, 0, 1) {
// Enter handler with serviceRegLock held
go func() {
defer atomic.CompareAndSwapInt32(serviceRegLock, 1, 0)
for !shutdown {
serviceID, err := c.reconcileConsul(registeredServiceID, activeFunc, sealedFunc, perfStandbyFunc)
if err != nil {
if c.logger.IsWarn() {
c.logger.Warn("reconcile unable to talk with Consul backend", "error", err)
}
time.Sleep(consulRetryInterval)
continue
}
c.serviceLock.Lock()
defer c.serviceLock.Unlock()
registeredServiceID = serviceID
return
}
}()
}
case <-checkTimer.C:
checkTimer.Reset(c.checkDuration())
// Abort if service discovery is disabled or a
// reconcile handler is active
if !c.disableRegistration && atomic.CompareAndSwapInt32(checkLock, 0, 1) {
// Enter handler with checkLock held
go func() {
defer atomic.CompareAndSwapInt32(checkLock, 1, 0)
for !shutdown {
sealed := sealedFunc()
if err := c.runCheck(sealed); err != nil {
if c.logger.IsWarn() {
c.logger.Warn("check unable to talk with Consul backend", "error", err)
}
time.Sleep(consulRetryInterval)
continue
}
return
}
}()
}
case <-shutdownCh:
c.logger.Info("shutting down consul backend")
shutdown = true
}
}
c.serviceLock.RLock()
defer c.serviceLock.RUnlock()
if err := c.client.Agent().ServiceDeregister(registeredServiceID); err != nil {
if c.logger.IsWarn() {
c.logger.Warn("service deregistration failed", "error", err)
}
}
}
// checkID returns the ID used for a Consul Check. Assume at least a read
// lock is held.
func (c *ConsulBackend) checkID() string {
return fmt.Sprintf("%s:vault-sealed-check", c.serviceID())
}
// serviceID returns the Vault ServiceID for use in Consul. Assume at least
// a read lock is held.
func (c *ConsulBackend) serviceID() string {
return fmt.Sprintf("%s:%s:%d", c.serviceName, c.redirectHost, c.redirectPort)
}
// reconcileConsul queries the state of Vault Core and Consul and fixes up
// Consul's state according to what's in Vault. reconcileConsul is called
// without any locks held and can be run concurrently, therefore no changes
// to ConsulBackend can be made in this method (i.e. wtb const receiver for
// compiler enforced safety).
func (c *ConsulBackend) reconcileConsul(registeredServiceID string, activeFunc physical.ActiveFunction, sealedFunc physical.SealedFunction, perfStandbyFunc physical.PerformanceStandbyFunction) (serviceID string, err error) {
// Query vault Core for its current state
active := activeFunc()
sealed := sealedFunc()
perfStandby := perfStandbyFunc()
agent := c.client.Agent()
catalog := c.client.Catalog()
serviceID = c.serviceID()
// Get the current state of Vault from Consul
var currentVaultService *api.CatalogService
if services, _, err := catalog.Service(c.serviceName, "", &api.QueryOptions{AllowStale: true}); err == nil {
for _, service := range services {
if serviceID == service.ServiceID {
currentVaultService = service
break
}
}
}
tags := c.fetchServiceTags(active, perfStandby)
var reregister bool
switch {
case currentVaultService == nil, registeredServiceID == "":
reregister = true
default:
switch {
case !strutil.EquivalentSlices(currentVaultService.ServiceTags, tags):
reregister = true
}
}
if !reregister {
// When re-registration is not required, return a valid serviceID
// to avoid registration in the next cycle.
return serviceID, nil
}
// If service address was set explicitly in configuration, use that
// as the service-specific address instead of the HA redirect address.
var serviceAddress string
if c.serviceAddress == nil {
serviceAddress = c.redirectHost
} else {
serviceAddress = *c.serviceAddress
}
service := &api.AgentServiceRegistration{
ID: serviceID,
Name: c.serviceName,
Tags: tags,
Port: int(c.redirectPort),
Address: serviceAddress,
EnableTagOverride: false,
}
checkStatus := api.HealthCritical
if !sealed {
checkStatus = api.HealthPassing
}
sealedCheck := &api.AgentCheckRegistration{
ID: c.checkID(),
Name: "Vault Sealed Status",
Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
ServiceID: serviceID,
AgentServiceCheck: api.AgentServiceCheck{
TTL: c.checkTimeout.String(),
Status: checkStatus,
},
}
if err := agent.ServiceRegister(service); err != nil {
return "", errwrap.Wrapf(`service registration failed: {{err}}`, err)
}
if err := agent.CheckRegister(sealedCheck); err != nil {
return serviceID, errwrap.Wrapf(`service check registration failed: {{err}}`, err)
}
return serviceID, nil
}
// runCheck immediately pushes a TTL check.
func (c *ConsulBackend) runCheck(sealed bool) error {
// Run a TTL check
agent := c.client.Agent()
if !sealed {
return agent.PassTTL(c.checkID(), "Vault Unsealed")
} else {
return agent.FailTTL(c.checkID(), "Vault Sealed")
}
}
// fetchServiceTags returns all of the relevant tags for Consul.
func (c *ConsulBackend) fetchServiceTags(active bool, perfStandby bool) []string {
activeTag := "standby"
if active {
activeTag = "active"
}
result := append(c.serviceTags, activeTag)
if perfStandby {
result = append(c.serviceTags, "performance-standby")
}
return result
}
func (c *ConsulBackend) setRedirectAddr(addr string) (err error) {
if addr == "" {
return fmt.Errorf("redirect address must not be empty")
}
url, err := url.Parse(addr)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf("failed to parse redirect URL %q: {{err}}", addr), err)
}
var portStr string
c.redirectHost, portStr, err = net.SplitHostPort(url.Host)
if err != nil {
if url.Scheme == "http" {
portStr = "80"
} else if url.Scheme == "https" {
portStr = "443"
} else if url.Scheme == "unix" {
portStr = "-1"
c.redirectHost = url.Path
} else {
return errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in redirect address "%v": {{err}}`, url.Host), err)
}
}
c.redirectPort, err = strconv.ParseInt(portStr, 10, 0)
if err != nil || c.redirectPort < -1 || c.redirectPort > 65535 {
return errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err)
}
return nil
}

View File

@ -15,8 +15,8 @@ import (
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/testhelpers/consul" "github.com/hashicorp/vault/helper/testhelpers/consul"
"github.com/hashicorp/vault/sdk/helper/logging" "github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/helper/strutil"
"github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/sdk/physical"
sr "github.com/hashicorp/vault/serviceregistration"
) )
type consulConf map[string]string type consulConf map[string]string
@ -52,7 +52,7 @@ func testConsul_testConsulBackend(t *testing.T) {
} }
} }
func testActiveFunc(activePct float64) physical.ActiveFunction { func testActiveFunc(activePct float64) sr.ActiveFunction {
return func() bool { return func() bool {
var active bool var active bool
standbyProb := rand.Float64() standbyProb := rand.Float64()
@ -63,7 +63,7 @@ func testActiveFunc(activePct float64) physical.ActiveFunction {
} }
} }
func testSealedFunc(sealedPct float64) physical.SealedFunction { func testSealedFunc(sealedPct float64) sr.SealedFunction {
return func() bool { return func() bool {
var sealed bool var sealed bool
unsealedProb := rand.Float64() unsealedProb := rand.Float64()
@ -74,7 +74,7 @@ func testSealedFunc(sealedPct float64) physical.SealedFunction {
} }
} }
func testPerformanceStandbyFunc(perfPct float64) physical.PerformanceStandbyFunction { func testPerformanceStandbyFunc(perfPct float64) sr.PerformanceStandbyFunction {
return func() bool { return func() bool {
var ps bool var ps bool
unsealedProb := rand.Float64() unsealedProb := rand.Float64()
@ -85,98 +85,6 @@ func testPerformanceStandbyFunc(perfPct float64) physical.PerformanceStandbyFunc
} }
} }
func TestConsul_ServiceTags(t *testing.T) {
consulConfig := map[string]string{
"path": "seaTech/",
"service": "astronomy",
"service_tags": "deadbeef, cafeefac, deadc0de, feedface",
"redirect_addr": "http://127.0.0.2:8200",
"check_timeout": "6s",
"address": "127.0.0.2",
"scheme": "https",
"token": "deadbeef-cafeefac-deadc0de-feedface",
"max_parallel": "4",
"disable_registration": "false",
}
logger := logging.NewVaultLogger(log.Debug)
be, err := NewConsulBackend(consulConfig, logger)
if err != nil {
t.Fatal(err)
}
c, ok := be.(*ConsulBackend)
if !ok {
t.Fatalf("failed to create physical Consul backend")
}
expected := []string{"deadbeef", "cafeefac", "deadc0de", "feedface"}
actual := c.fetchServiceTags(false, false)
if !strutil.EquivalentSlices(actual, append(expected, "standby")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "standby"), actual)
}
actual = c.fetchServiceTags(true, false)
if !strutil.EquivalentSlices(actual, append(expected, "active")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "active"), actual)
}
actual = c.fetchServiceTags(false, true)
if !strutil.EquivalentSlices(actual, append(expected, "performance-standby")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "performance-standby"), actual)
}
actual = c.fetchServiceTags(true, true)
if !strutil.EquivalentSlices(actual, append(expected, "performance-standby")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "performance-standby"), actual)
}
}
func TestConsul_ServiceAddress(t *testing.T) {
tests := []struct {
consulConfig map[string]string
serviceAddrNil bool
}{
{
consulConfig: map[string]string{
"service_address": "",
},
},
{
consulConfig: map[string]string{
"service_address": "vault.example.com",
},
},
{
serviceAddrNil: true,
},
}
for _, test := range tests {
logger := logging.NewVaultLogger(log.Debug)
be, err := NewConsulBackend(test.consulConfig, logger)
if err != nil {
t.Fatalf("expected Consul to initialize: %v", err)
}
c, ok := be.(*ConsulBackend)
if !ok {
t.Fatalf("Expected ConsulBackend")
}
if test.serviceAddrNil {
if c.serviceAddress != nil {
t.Fatalf("expected service address to be nil")
}
} else {
if c.serviceAddress == nil {
t.Fatalf("did not expect service address to be nil")
}
}
}
}
func TestConsul_newConsulBackend(t *testing.T) { func TestConsul_newConsulBackend(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -294,40 +202,24 @@ func TestConsul_newConsulBackend(t *testing.T) {
if !ok { if !ok {
t.Fatalf("Expected ConsulBackend: %s", test.name) t.Fatalf("Expected ConsulBackend: %s", test.name)
} }
c.disableRegistration = true
if c.disableRegistration == false { var shutdownCh sr.ShutdownChannel
addr := os.Getenv("CONSUL_HTTP_ADDR")
if addr == "" {
continue
}
}
var shutdownCh physical.ShutdownChannel
waitGroup := &sync.WaitGroup{} waitGroup := &sync.WaitGroup{}
if err := c.RunServiceDiscovery(waitGroup, shutdownCh, test.redirectAddr, testActiveFunc(0.5), testSealedFunc(0.5), testPerformanceStandbyFunc(0.5)); err != nil { if err := c.RunServiceRegistration(waitGroup, shutdownCh, test.redirectAddr, testActiveFunc(0.5), testSealedFunc(0.5), testPerformanceStandbyFunc(0.5)); err != nil {
t.Fatalf("bad: %v", err) t.Fatalf("bad: %v", err)
} }
if test.checkTimeout != c.checkTimeout {
t.Errorf("bad: %v != %v", test.checkTimeout, c.checkTimeout)
}
if test.path != c.path { if test.path != c.path {
t.Errorf("bad: %s %v != %v", test.name, test.path, c.path) t.Errorf("bad: %s %v != %v", test.name, test.path, c.path)
} }
if test.service != c.serviceName {
t.Errorf("bad: %v != %v", test.service, c.serviceName)
}
if test.consistencyMode != c.consistencyMode { if test.consistencyMode != c.consistencyMode {
t.Errorf("bad consistency_mode value: %v != %v", test.consistencyMode, c.consistencyMode) t.Errorf("bad consistency_mode value: %v != %v", test.consistencyMode, c.consistencyMode)
} }
// The configuration stored in the Consul "client" object is not exported, so // The configuration stored in the Consul "client" object is not exported, so
// we either have to skip validating it, or add a method to export it, or use reflection. // we either have to skip validating it, or add a method to export it, or use reflection.
consulConfig := reflect.Indirect(reflect.ValueOf(c.client)).FieldByName("config") consulConfig := reflect.Indirect(reflect.ValueOf(c.Client)).FieldByName("config")
consulConfigScheme := consulConfig.FieldByName("Scheme").String() consulConfigScheme := consulConfig.FieldByName("Scheme").String()
consulConfigAddress := consulConfig.FieldByName("Address").String() consulConfigAddress := consulConfig.FieldByName("Address").String()
@ -346,109 +238,6 @@ func TestConsul_newConsulBackend(t *testing.T) {
} }
} }
func TestConsul_serviceTags(t *testing.T) {
tests := []struct {
active bool
perfStandby bool
tags []string
}{
{
active: true,
perfStandby: false,
tags: []string{"active"},
},
{
active: false,
perfStandby: false,
tags: []string{"standby"},
},
{
active: false,
perfStandby: true,
tags: []string{"performance-standby"},
},
{
active: true,
perfStandby: true,
tags: []string{"performance-standby"},
},
}
c := testConsulBackend(t)
for _, test := range tests {
tags := c.fetchServiceTags(test.active, test.perfStandby)
if !reflect.DeepEqual(tags[:], test.tags[:]) {
t.Errorf("Bad %v: %v %v", test.active, tags, test.tags)
}
}
}
func TestConsul_setRedirectAddr(t *testing.T) {
tests := []struct {
addr string
host string
port int64
pass bool
}{
{
addr: "http://127.0.0.1:8200/",
host: "127.0.0.1",
port: 8200,
pass: true,
},
{
addr: "http://127.0.0.1:8200",
host: "127.0.0.1",
port: 8200,
pass: true,
},
{
addr: "https://127.0.0.1:8200",
host: "127.0.0.1",
port: 8200,
pass: true,
},
{
addr: "unix:///tmp/.vault.addr.sock",
host: "/tmp/.vault.addr.sock",
port: -1,
pass: true,
},
{
addr: "127.0.0.1:8200",
pass: false,
},
{
addr: "127.0.0.1",
pass: false,
},
}
for _, test := range tests {
c := testConsulBackend(t)
err := c.setRedirectAddr(test.addr)
if test.pass {
if err != nil {
t.Fatalf("bad: %v", err)
}
} else {
if err == nil {
t.Fatalf("bad, expected fail")
} else {
continue
}
}
if c.redirectHost != test.host {
t.Fatalf("bad: %v != %v", c.redirectHost, test.host)
}
if c.redirectPort != test.port {
t.Fatalf("bad: %v != %v", c.redirectPort, test.port)
}
}
}
func TestConsul_NotifyActiveStateChange(t *testing.T) { func TestConsul_NotifyActiveStateChange(t *testing.T) {
c := testConsulBackend(t) c := testConsulBackend(t)
@ -465,76 +254,6 @@ func TestConsul_NotifySealedStateChange(t *testing.T) {
} }
} }
func TestConsul_serviceID(t *testing.T) {
tests := []struct {
name string
redirectAddr string
serviceName string
expected string
valid bool
}{
{
name: "valid host w/o slash",
redirectAddr: "http://127.0.0.1:8200",
serviceName: "sea-tech-astronomy",
expected: "sea-tech-astronomy:127.0.0.1:8200",
valid: true,
},
{
name: "valid host w/ slash",
redirectAddr: "http://127.0.0.1:8200/",
serviceName: "sea-tech-astronomy",
expected: "sea-tech-astronomy:127.0.0.1:8200",
valid: true,
},
{
name: "valid https host w/ slash",
redirectAddr: "https://127.0.0.1:8200/",
serviceName: "sea-tech-astronomy",
expected: "sea-tech-astronomy:127.0.0.1:8200",
valid: true,
},
{
name: "invalid host name",
redirectAddr: "https://127.0.0.1:8200/",
serviceName: "sea_tech_astronomy",
expected: "",
valid: false,
},
}
logger := logging.NewVaultLogger(log.Debug)
for _, test := range tests {
be, err := NewConsulBackend(consulConf{
"service": test.serviceName,
}, logger)
if !test.valid {
if err == nil {
t.Fatalf("expected an error initializing for name %q", test.serviceName)
}
continue
}
if test.valid && err != nil {
t.Fatalf("expected Consul to initialize: %v", err)
}
c, ok := be.(*ConsulBackend)
if !ok {
t.Fatalf("Expected ConsulBackend")
}
if err := c.setRedirectAddr(test.redirectAddr); err != nil {
t.Fatalf("bad: %s %v", test.name, err)
}
serviceID := c.serviceID()
if serviceID != test.expected {
t.Fatalf("bad: %v != %v", serviceID, test.expected)
}
}
}
func TestConsulBackend(t *testing.T) { func TestConsulBackend(t *testing.T) {
consulToken := os.Getenv("CONSUL_HTTP_TOKEN") consulToken := os.Getenv("CONSUL_HTTP_TOKEN")
addr := os.Getenv("CONSUL_HTTP_ADDR") addr := os.Getenv("CONSUL_HTTP_ADDR")

View File

@ -3,7 +3,6 @@ package physical
import ( import (
"context" "context"
"strings" "strings"
"sync"
log "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog"
) )
@ -24,9 +23,6 @@ const (
ErrValueTooLarge = "put failed due to value being too large" ErrValueTooLarge = "put failed due to value being too large"
) )
// ShutdownSignal
type ShutdownChannel chan struct{}
// Backend is the interface required for a physical // Backend is the interface required for a physical
// backend. A physical backend is used to durably store // backend. A physical backend is used to durably store
// data outside of Vault. As such, it is completely untrusted, // data outside of Vault. As such, it is completely untrusted,
@ -76,35 +72,6 @@ type RedirectDetect interface {
DetectHostAddr() (string, error) DetectHostAddr() (string, error)
} }
// Callback signatures for RunServiceDiscovery
type ActiveFunction func() bool
type SealedFunction func() bool
type PerformanceStandbyFunction func() bool
// ServiceDiscovery is an optional interface that an HABackend can implement.
// If they do, the state of a backend is advertised to the service discovery
// network.
type ServiceDiscovery interface {
// NotifyActiveStateChange is used by Core to notify a backend
// capable of ServiceDiscovery that this Vault instance has changed
// its status to active or standby.
NotifyActiveStateChange() error
// NotifySealedStateChange is used by Core to notify a backend
// capable of ServiceDiscovery that Vault has changed its Sealed
// status to sealed or unsealed.
NotifySealedStateChange() error
// NotifyPerformanceStandbyStateChange is used by Core to notify a backend
// capable of ServiceDiscovery that this Vault instance has changed it
// status to performance standby or standby.
NotifyPerformanceStandbyStateChange() error
// Run executes any background service discovery tasks until the
// shutdown channel is closed.
RunServiceDiscovery(waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, redirectAddr string, activeFunc ActiveFunction, sealedFunc SealedFunction, perfStandbyFunc PerformanceStandbyFunction) error
}
type Lock interface { type Lock interface {
// Lock is used to acquire the given lock // Lock is used to acquire the given lock
// The stopCh is optional and if closed should interrupt the lock // The stopCh is optional and if closed should interrupt the lock

View File

@ -0,0 +1,641 @@
package consul
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/errwrap"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/helper/parseutil"
"github.com/hashicorp/vault/sdk/helper/strutil"
"github.com/hashicorp/vault/sdk/helper/tlsutil"
sr "github.com/hashicorp/vault/serviceregistration"
"golang.org/x/net/http2"
)
const (
// checkJitterFactor specifies the jitter factor used to stagger checks
checkJitterFactor = 16
// checkMinBuffer specifies provides a guarantee that a check will not
// be executed too close to the TTL check timeout
checkMinBuffer = 100 * time.Millisecond
// consulRetryInterval specifies the retry duration to use when an
// API call to the Consul agent fails.
consulRetryInterval = 1 * time.Second
// defaultCheckTimeout changes the timeout of TTL checks
defaultCheckTimeout = 5 * time.Second
// DefaultServiceName is the default Consul service name used when
// advertising a Vault instance.
DefaultServiceName = "vault"
// reconcileTimeout is how often Vault should query Consul to detect
// and fix any state drift.
reconcileTimeout = 60 * time.Second
)
type notifyEvent struct{}
var _ sr.ServiceRegistration = (*ConsulServiceRegistration)(nil)
var (
hostnameRegex = regexp.MustCompile(`^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$`)
)
// ConsulServiceRegistration is a ServiceRegistration that advertises the state of
// Vault to Consul.
type ConsulServiceRegistration struct {
Client *api.Client
logger log.Logger
serviceLock sync.RWMutex
redirectHost string
redirectPort int64
serviceName string
serviceTags []string
serviceAddress *string
disableRegistration bool
checkTimeout time.Duration
notifyActiveCh chan notifyEvent
notifySealedCh chan notifyEvent
notifyPerfStandbyCh chan notifyEvent
}
// NewConsulServiceRegistration constructs a Consul-based ServiceRegistration.
func NewConsulServiceRegistration(conf map[string]string, logger log.Logger) (sr.ServiceRegistration, error) {
// Allow admins to disable consul integration
disableReg, ok := conf["disable_registration"]
var disableRegistration bool
if ok && disableReg != "" {
b, err := parseutil.ParseBool(disableReg)
if err != nil {
return nil, errwrap.Wrapf("failed parsing disable_registration parameter: {{err}}", err)
}
disableRegistration = b
}
if logger.IsDebug() {
logger.Debug("config disable_registration set", "disable_registration", disableRegistration)
}
// Get the service name to advertise in Consul
service, ok := conf["service"]
if !ok {
service = DefaultServiceName
}
if !hostnameRegex.MatchString(service) {
return nil, errors.New("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes")
}
if logger.IsDebug() {
logger.Debug("config service set", "service", service)
}
// Get the additional tags to attach to the registered service name
tags := conf["service_tags"]
if logger.IsDebug() {
logger.Debug("config service_tags set", "service_tags", tags)
}
// Get the service-specific address to override the use of the HA redirect address
var serviceAddr *string
serviceAddrStr, ok := conf["service_address"]
if ok {
serviceAddr = &serviceAddrStr
}
if logger.IsDebug() {
logger.Debug("config service_address set", "service_address", serviceAddr)
}
checkTimeout := defaultCheckTimeout
checkTimeoutStr, ok := conf["check_timeout"]
if ok {
d, err := parseutil.ParseDurationSecond(checkTimeoutStr)
if err != nil {
return nil, err
}
min, _ := durationMinusBufferDomain(d, checkMinBuffer, checkJitterFactor)
if min < checkMinBuffer {
return nil, fmt.Errorf("consul check_timeout must be greater than %v", min)
}
checkTimeout = d
if logger.IsDebug() {
logger.Debug("config check_timeout set", "check_timeout", d)
}
}
// Configure the client
consulConf := api.DefaultConfig()
// Set MaxIdleConnsPerHost to the number of processes used in expiration.Restore
consulConf.Transport.MaxIdleConnsPerHost = consts.ExpirationRestoreWorkerCount
if addr, ok := conf["address"]; ok {
consulConf.Address = addr
if logger.IsDebug() {
logger.Debug("config address set", "address", addr)
}
// Copied from the Consul API module; set the Scheme based on
// the protocol field if address looks ike a URL.
// This can enable the TLS configuration below.
parts := strings.SplitN(addr, "://", 2)
if len(parts) == 2 {
if parts[0] == "http" || parts[0] == "https" {
consulConf.Scheme = parts[0]
consulConf.Address = parts[1]
if logger.IsDebug() {
logger.Debug("config address parsed", "scheme", parts[0])
logger.Debug("config scheme parsed", "address", parts[1])
}
} // allow "unix:" or whatever else consul supports in the future
}
}
if scheme, ok := conf["scheme"]; ok {
consulConf.Scheme = scheme
if logger.IsDebug() {
logger.Debug("config scheme set", "scheme", scheme)
}
}
if token, ok := conf["token"]; ok {
consulConf.Token = token
logger.Debug("config token set")
}
if consulConf.Scheme == "https" {
// Use the parsed Address instead of the raw conf['address']
tlsClientConfig, err := setupTLSConfig(conf, consulConf.Address)
if err != nil {
return nil, err
}
consulConf.Transport.TLSClientConfig = tlsClientConfig
if err := http2.ConfigureTransport(consulConf.Transport); err != nil {
return nil, err
}
logger.Debug("configured TLS")
}
consulConf.HttpClient = &http.Client{Transport: consulConf.Transport}
client, err := api.NewClient(consulConf)
if err != nil {
return nil, errwrap.Wrapf("client setup failed: {{err}}", err)
}
// Setup the backend
c := &ConsulServiceRegistration{
Client: client,
logger: logger,
serviceName: service,
serviceTags: strutil.ParseDedupLowercaseAndSortStrings(tags, ","),
serviceAddress: serviceAddr,
checkTimeout: checkTimeout,
disableRegistration: disableRegistration,
notifyActiveCh: make(chan notifyEvent),
notifySealedCh: make(chan notifyEvent),
notifyPerfStandbyCh: make(chan notifyEvent),
}
return c, nil
}
func setupTLSConfig(conf map[string]string, address string) (*tls.Config, error) {
serverName, _, err := net.SplitHostPort(address)
switch {
case err == nil:
case strings.Contains(err.Error(), "missing port"):
serverName = conf["address"]
default:
return nil, err
}
insecureSkipVerify := false
tlsSkipVerify, ok := conf["tls_skip_verify"]
if ok && tlsSkipVerify != "" {
b, err := parseutil.ParseBool(tlsSkipVerify)
if err != nil {
return nil, errwrap.Wrapf("failed parsing tls_skip_verify parameter: {{err}}", err)
}
insecureSkipVerify = b
}
tlsMinVersionStr, ok := conf["tls_min_version"]
if !ok {
// Set the default value
tlsMinVersionStr = "tls12"
}
tlsMinVersion, ok := tlsutil.TLSLookup[tlsMinVersionStr]
if !ok {
return nil, fmt.Errorf("invalid 'tls_min_version'")
}
tlsClientConfig := &tls.Config{
MinVersion: tlsMinVersion,
InsecureSkipVerify: insecureSkipVerify,
ServerName: serverName,
}
_, okCert := conf["tls_cert_file"]
_, okKey := conf["tls_key_file"]
if okCert && okKey {
tlsCert, err := tls.LoadX509KeyPair(conf["tls_cert_file"], conf["tls_key_file"])
if err != nil {
return nil, errwrap.Wrapf("client tls setup failed: {{err}}", err)
}
tlsClientConfig.Certificates = []tls.Certificate{tlsCert}
}
if tlsCaFile, ok := conf["tls_ca_file"]; ok {
caPool := x509.NewCertPool()
data, err := ioutil.ReadFile(tlsCaFile)
if err != nil {
return nil, errwrap.Wrapf("failed to read CA file: {{err}}", err)
}
if !caPool.AppendCertsFromPEM(data) {
return nil, fmt.Errorf("failed to parse CA certificate")
}
tlsClientConfig.RootCAs = caPool
}
return tlsClientConfig, nil
}
func (c *ConsulServiceRegistration) NotifyActiveStateChange() error {
select {
case c.notifyActiveCh <- notifyEvent{}:
default:
// NOTE: If this occurs Vault's active status could be out of
// sync with Consul until reconcileTimer expires.
c.logger.Warn("concurrent state change notify dropped")
}
return nil
}
func (c *ConsulServiceRegistration) NotifyPerformanceStandbyStateChange() error {
select {
case c.notifyPerfStandbyCh <- notifyEvent{}:
default:
// NOTE: If this occurs Vault's active status could be out of
// sync with Consul until reconcileTimer expires.
c.logger.Warn("concurrent state change notify dropped")
}
return nil
}
func (c *ConsulServiceRegistration) NotifySealedStateChange() error {
select {
case c.notifySealedCh <- notifyEvent{}:
default:
// NOTE: If this occurs Vault's sealed status could be out of
// sync with Consul until checkTimer expires.
c.logger.Warn("concurrent sealed state change notify dropped")
}
return nil
}
func (c *ConsulServiceRegistration) checkDuration() time.Duration {
return durationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor)
}
func (c *ConsulServiceRegistration) RunServiceRegistration(waitGroup *sync.WaitGroup, shutdownCh sr.ShutdownChannel, redirectAddr string, activeFunc sr.ActiveFunction, sealedFunc sr.SealedFunction, perfStandbyFunc sr.PerformanceStandbyFunction) (err error) {
if err := c.setRedirectAddr(redirectAddr); err != nil {
return err
}
// 'server' command will wait for the below goroutine to complete
waitGroup.Add(1)
go c.runEventDemuxer(waitGroup, shutdownCh, redirectAddr, activeFunc, sealedFunc, perfStandbyFunc)
return nil
}
func (c *ConsulServiceRegistration) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh sr.ShutdownChannel, redirectAddr string, activeFunc sr.ActiveFunction, sealedFunc sr.SealedFunction, perfStandbyFunc sr.PerformanceStandbyFunction) {
// This defer statement should be executed last. So push it first.
defer waitGroup.Done()
// Fire the reconcileTimer immediately upon starting the event demuxer
reconcileTimer := time.NewTimer(0)
defer reconcileTimer.Stop()
// Schedule the first check. Consul TTL checks are passing by
// default, checkTimer does not need to be run immediately.
checkTimer := time.NewTimer(c.checkDuration())
defer checkTimer.Stop()
// Use a reactor pattern to handle and dispatch events to singleton
// goroutine handlers for execution. It is not acceptable to drop
// inbound events from Notify*().
//
// goroutines are dispatched if the demuxer can acquire a lock (via
// an atomic CAS incr) on the handler. Handlers are responsible for
// deregistering themselves (atomic CAS decr). Handlers and the
// demuxer share a lock to synchronize information at the beginning
// and end of a handler's life (or after a handler wakes up from
// sleeping during a back-off/retry).
var shutdown bool
var registeredServiceID string
checkLock := new(int32)
serviceRegLock := new(int32)
for !shutdown {
select {
case <-c.notifyActiveCh:
// Run reconcile immediately upon active state change notification
reconcileTimer.Reset(0)
case <-c.notifySealedCh:
// Run check timer immediately upon a seal state change notification
checkTimer.Reset(0)
case <-c.notifyPerfStandbyCh:
// Run check timer immediately upon a seal state change notification
checkTimer.Reset(0)
case <-reconcileTimer.C:
// Unconditionally rearm the reconcileTimer
reconcileTimer.Reset(reconcileTimeout - randomStagger(reconcileTimeout/checkJitterFactor))
// Abort if service discovery is disabled or a
// reconcile handler is already active
if !c.disableRegistration && atomic.CompareAndSwapInt32(serviceRegLock, 0, 1) {
// Enter handler with serviceRegLock held
go func() {
defer atomic.CompareAndSwapInt32(serviceRegLock, 1, 0)
for !shutdown {
serviceID, err := c.reconcileConsul(registeredServiceID, activeFunc, sealedFunc, perfStandbyFunc)
if err != nil {
if c.logger.IsWarn() {
c.logger.Warn("reconcile unable to talk with Consul backend", "error", err)
}
time.Sleep(consulRetryInterval)
continue
}
c.serviceLock.Lock()
defer c.serviceLock.Unlock()
registeredServiceID = serviceID
return
}
}()
}
case <-checkTimer.C:
checkTimer.Reset(c.checkDuration())
// Abort if service discovery is disabled or a
// reconcile handler is active
if !c.disableRegistration && atomic.CompareAndSwapInt32(checkLock, 0, 1) {
// Enter handler with checkLock held
go func() {
defer atomic.CompareAndSwapInt32(checkLock, 1, 0)
for !shutdown {
sealed := sealedFunc()
if err := c.runCheck(sealed); err != nil {
if c.logger.IsWarn() {
c.logger.Warn("check unable to talk with Consul backend", "error", err)
}
time.Sleep(consulRetryInterval)
continue
}
return
}
}()
}
case <-shutdownCh:
c.logger.Info("shutting down consul backend")
shutdown = true
}
}
c.serviceLock.RLock()
defer c.serviceLock.RUnlock()
if err := c.Client.Agent().ServiceDeregister(registeredServiceID); err != nil {
if c.logger.IsWarn() {
c.logger.Warn("service deregistration failed", "error", err)
}
}
}
// checkID returns the ID used for a Consul Check. Assume at least a read
// lock is held.
func (c *ConsulServiceRegistration) checkID() string {
return fmt.Sprintf("%s:vault-sealed-check", c.serviceID())
}
// serviceID returns the Vault ServiceID for use in Consul. Assume at least
// a read lock is held.
func (c *ConsulServiceRegistration) serviceID() string {
return fmt.Sprintf("%s:%s:%d", c.serviceName, c.redirectHost, c.redirectPort)
}
// reconcileConsul queries the state of Vault Core and Consul and fixes up
// Consul's state according to what's in Vault. reconcileConsul is called
// without any locks held and can be run concurrently, therefore no changes
// to ConsulServiceRegistration can be made in this method (i.e. wtb const receiver for
// compiler enforced safety).
func (c *ConsulServiceRegistration) reconcileConsul(registeredServiceID string, activeFunc sr.ActiveFunction, sealedFunc sr.SealedFunction, perfStandbyFunc sr.PerformanceStandbyFunction) (serviceID string, err error) {
// Query vault Core for its current state
active := activeFunc()
sealed := sealedFunc()
perfStandby := perfStandbyFunc()
agent := c.Client.Agent()
catalog := c.Client.Catalog()
serviceID = c.serviceID()
// Get the current state of Vault from Consul
var currentVaultService *api.CatalogService
if services, _, err := catalog.Service(c.serviceName, "", &api.QueryOptions{AllowStale: true}); err == nil {
for _, service := range services {
if serviceID == service.ServiceID {
currentVaultService = service
break
}
}
}
tags := c.fetchServiceTags(active, perfStandby)
var reregister bool
switch {
case currentVaultService == nil, registeredServiceID == "":
reregister = true
default:
switch {
case !strutil.EquivalentSlices(currentVaultService.ServiceTags, tags):
reregister = true
}
}
if !reregister {
// When re-registration is not required, return a valid serviceID
// to avoid registration in the next cycle.
return serviceID, nil
}
// If service address was set explicitly in configuration, use that
// as the service-specific address instead of the HA redirect address.
var serviceAddress string
if c.serviceAddress == nil {
serviceAddress = c.redirectHost
} else {
serviceAddress = *c.serviceAddress
}
service := &api.AgentServiceRegistration{
ID: serviceID,
Name: c.serviceName,
Tags: tags,
Port: int(c.redirectPort),
Address: serviceAddress,
EnableTagOverride: false,
}
checkStatus := api.HealthCritical
if !sealed {
checkStatus = api.HealthPassing
}
sealedCheck := &api.AgentCheckRegistration{
ID: c.checkID(),
Name: "Vault Sealed Status",
Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
ServiceID: serviceID,
AgentServiceCheck: api.AgentServiceCheck{
TTL: c.checkTimeout.String(),
Status: checkStatus,
},
}
if err := agent.ServiceRegister(service); err != nil {
return "", errwrap.Wrapf(`service registration failed: {{err}}`, err)
}
if err := agent.CheckRegister(sealedCheck); err != nil {
return serviceID, errwrap.Wrapf(`service check registration failed: {{err}}`, err)
}
return serviceID, nil
}
// runCheck immediately pushes a TTL check.
func (c *ConsulServiceRegistration) runCheck(sealed bool) error {
// Run a TTL check
agent := c.Client.Agent()
if !sealed {
return agent.PassTTL(c.checkID(), "Vault Unsealed")
} else {
return agent.FailTTL(c.checkID(), "Vault Sealed")
}
}
// fetchServiceTags returns all of the relevant tags for Consul.
func (c *ConsulServiceRegistration) fetchServiceTags(active bool, perfStandby bool) []string {
activeTag := "standby"
if active {
activeTag = "active"
}
result := append(c.serviceTags, activeTag)
if perfStandby {
result = append(c.serviceTags, "performance-standby")
}
return result
}
func (c *ConsulServiceRegistration) setRedirectAddr(addr string) (err error) {
if addr == "" {
return fmt.Errorf("redirect address must not be empty")
}
url, err := url.Parse(addr)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf("failed to parse redirect URL %q: {{err}}", addr), err)
}
var portStr string
c.redirectHost, portStr, err = net.SplitHostPort(url.Host)
if err != nil {
if url.Scheme == "http" {
portStr = "80"
} else if url.Scheme == "https" {
portStr = "443"
} else if url.Scheme == "unix" {
portStr = "-1"
c.redirectHost = url.Path
} else {
return errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in redirect address "%v": {{err}}`, url.Host), err)
}
}
c.redirectPort, err = strconv.ParseInt(portStr, 10, 0)
if err != nil || c.redirectPort < -1 || c.redirectPort > 65535 {
return errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err)
}
return nil
}
// durationMinusBuffer returns a duration, minus a buffer and jitter
// subtracted from the duration. This function is used primarily for
// servicing Consul TTL Checks in advance of the TTL.
func durationMinusBuffer(intv time.Duration, buffer time.Duration, jitter int64) time.Duration {
d := intv - buffer
if jitter == 0 {
d -= randomStagger(d)
} else {
d -= randomStagger(time.Duration(int64(d) / jitter))
}
return d
}
// durationMinusBufferDomain returns the domain of valid durations from a
// call to durationMinusBuffer. This function is used to check user
// specified input values to durationMinusBuffer.
func durationMinusBufferDomain(intv time.Duration, buffer time.Duration, jitter int64) (min time.Duration, max time.Duration) {
max = intv - buffer
if jitter == 0 {
min = max
} else {
min = max - time.Duration(int64(max)/jitter)
}
return min, max
}
// randomStagger returns an interval between 0 and the duration
func randomStagger(intv time.Duration) time.Duration {
if intv == 0 {
return 0
}
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}

View File

@ -0,0 +1,632 @@
package consul
import (
"math/rand"
"os"
"reflect"
"sync"
"testing"
"time"
"github.com/go-test/deep"
"github.com/hashicorp/consul/api"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/testhelpers/consul"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/helper/strutil"
"github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/sdk/physical/inmem"
sr "github.com/hashicorp/vault/serviceregistration"
"github.com/hashicorp/vault/vault"
)
type consulConf map[string]string
func testConsulServiceRegistration(t *testing.T) *ConsulServiceRegistration {
return testConsulServiceRegistrationConfig(t, &consulConf{})
}
func testConsulServiceRegistrationConfig(t *testing.T, conf *consulConf) *ConsulServiceRegistration {
logger := logging.NewVaultLogger(log.Debug)
be, err := NewConsulServiceRegistration(*conf, logger)
if err != nil {
t.Fatalf("Expected Consul to initialize: %v", err)
}
c, ok := be.(*ConsulServiceRegistration)
if !ok {
t.Fatalf("Expected ConsulServiceRegistration")
}
return c
}
func testActiveFunc(activePct float64) sr.ActiveFunction {
return func() bool {
var active bool
standbyProb := rand.Float64()
if standbyProb > activePct {
active = true
}
return active
}
}
func testSealedFunc(sealedPct float64) sr.SealedFunction {
return func() bool {
var sealed bool
unsealedProb := rand.Float64()
if unsealedProb > sealedPct {
sealed = true
}
return sealed
}
}
func testPerformanceStandbyFunc(perfPct float64) sr.PerformanceStandbyFunction {
return func() bool {
var ps bool
unsealedProb := rand.Float64()
if unsealedProb > perfPct {
ps = true
}
return ps
}
}
// TestConsul_ServiceRegistration tests whether consul ServiceRegistration works
func TestConsul_ServiceRegistration(t *testing.T) {
// Prepare a docker-based consul instance
cleanup, addr, token := consul.PrepareTestContainer(t, "1.4.0-rc1")
defer cleanup()
// Create a consul client
cfg := api.DefaultConfig()
cfg.Address = addr
cfg.Token = token
client, err := api.NewClient(cfg)
if err != nil {
t.Fatal(err)
}
// transitionFrom waits patiently for the services in the Consul catalog to
// transition from a known value, and then returns the new value.
transitionFrom := func(t *testing.T, known map[string][]string) map[string][]string {
t.Helper()
// Wait for up to 10 seconds
for i := 0; i < 10; i++ {
services, _, err := client.Catalog().Services(nil)
if err != nil {
t.Fatal(err)
}
if diff := deep.Equal(services, known); diff != nil {
return services
}
time.Sleep(time.Second)
}
t.Fatalf("Catalog Services never transitioned from %v", known)
return nil
}
// Create a ServiceRegistration that points to our consul instance
logger := logging.NewVaultLogger(log.Trace)
sd, err := NewConsulServiceRegistration(map[string]string{
"address": addr,
"token": token,
}, logger)
if err != nil {
t.Fatal(err)
}
// Create the core
inm, err := inmem.NewInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
}
inmha, err := inmem.NewInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
}
const redirectAddr = "http://127.0.0.1:8200"
core, err := vault.NewCore(&vault.CoreConfig{
ServiceRegistration: sd,
Physical: inm,
HAPhysical: inmha.(physical.HABackend),
RedirectAddr: redirectAddr,
DisableMlock: true,
})
if err != nil {
t.Fatal(err)
}
// Vault should not yet be registered with Consul
services, _, err := client.Catalog().Services(nil)
if err != nil {
t.Fatal(err)
}
if diff := deep.Equal(services, map[string][]string{
"consul": []string{},
}); diff != nil {
t.Fatal(diff)
}
// Run service discovery on the core
wg := &sync.WaitGroup{}
var shutdown chan struct{}
activeFunc := func() bool {
if isLeader, _, _, err := core.Leader(); err == nil {
return isLeader
}
return false
}
err = sd.RunServiceRegistration(
wg, shutdown, redirectAddr, activeFunc, core.Sealed, core.PerfStandby)
if err != nil {
t.Fatal(err)
}
// Vault should soon be registered with Consul in standby mode
services = transitionFrom(t, map[string][]string{
"consul": []string{},
})
if diff := deep.Equal(services, map[string][]string{
"consul": []string{},
"vault": []string{"standby"},
}); diff != nil {
t.Fatal(diff)
}
// Initialize and unseal the core
keys, _ := vault.TestCoreInit(t, core)
for _, key := range keys {
if _, err := vault.TestCoreUnseal(core, vault.TestKeyCopy(key)); err != nil {
t.Fatalf("unseal err: %s", err)
}
}
if core.Sealed() {
t.Fatal("should not be sealed")
}
// Wait for the core to become active
vault.TestWaitActive(t, core)
// Vault should soon be registered with Consul in active mode
services = transitionFrom(t, map[string][]string{
"consul": []string{},
"vault": []string{"standby"},
})
if diff := deep.Equal(services, map[string][]string{
"consul": []string{},
"vault": []string{"active"},
}); diff != nil {
t.Fatal(diff)
}
}
func TestConsul_ServiceTags(t *testing.T) {
consulConfig := map[string]string{
"path": "seaTech/",
"service": "astronomy",
"service_tags": "deadbeef, cafeefac, deadc0de, feedface",
"redirect_addr": "http://127.0.0.2:8200",
"check_timeout": "6s",
"address": "127.0.0.2",
"scheme": "https",
"token": "deadbeef-cafeefac-deadc0de-feedface",
"max_parallel": "4",
"disable_registration": "false",
}
logger := logging.NewVaultLogger(log.Debug)
be, err := NewConsulServiceRegistration(consulConfig, logger)
if err != nil {
t.Fatal(err)
}
c, ok := be.(*ConsulServiceRegistration)
if !ok {
t.Fatalf("failed to create physical Consul backend")
}
expected := []string{"deadbeef", "cafeefac", "deadc0de", "feedface"}
actual := c.fetchServiceTags(false, false)
if !strutil.EquivalentSlices(actual, append(expected, "standby")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "standby"), actual)
}
actual = c.fetchServiceTags(true, false)
if !strutil.EquivalentSlices(actual, append(expected, "active")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "active"), actual)
}
actual = c.fetchServiceTags(false, true)
if !strutil.EquivalentSlices(actual, append(expected, "performance-standby")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "performance-standby"), actual)
}
actual = c.fetchServiceTags(true, true)
if !strutil.EquivalentSlices(actual, append(expected, "performance-standby")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "performance-standby"), actual)
}
}
func TestConsul_ServiceAddress(t *testing.T) {
tests := []struct {
consulConfig map[string]string
serviceAddrNil bool
}{
{
consulConfig: map[string]string{
"service_address": "",
},
},
{
consulConfig: map[string]string{
"service_address": "vault.example.com",
},
},
{
serviceAddrNil: true,
},
}
for _, test := range tests {
logger := logging.NewVaultLogger(log.Debug)
be, err := NewConsulServiceRegistration(test.consulConfig, logger)
if err != nil {
t.Fatalf("expected Consul to initialize: %v", err)
}
c, ok := be.(*ConsulServiceRegistration)
if !ok {
t.Fatalf("Expected ConsulServiceRegistration")
}
if test.serviceAddrNil {
if c.serviceAddress != nil {
t.Fatalf("expected service address to be nil")
}
} else {
if c.serviceAddress == nil {
t.Fatalf("did not expect service address to be nil")
}
}
}
}
func TestConsul_newConsulServiceRegistration(t *testing.T) {
tests := []struct {
name string
consulConfig map[string]string
fail bool
redirectAddr string
checkTimeout time.Duration
path string
service string
address string
scheme string
token string
max_parallel int
disableReg bool
consistencyMode string
}{
{
name: "Valid default config",
consulConfig: map[string]string{},
checkTimeout: 5 * time.Second,
redirectAddr: "http://127.0.0.1:8200",
path: "vault/",
service: "vault",
address: "127.0.0.1:8500",
scheme: "http",
token: "",
max_parallel: 4,
disableReg: false,
consistencyMode: "default",
},
{
name: "Valid modified config",
consulConfig: map[string]string{
"path": "seaTech/",
"service": "astronomy",
"redirect_addr": "http://127.0.0.2:8200",
"check_timeout": "6s",
"address": "127.0.0.2",
"scheme": "https",
"token": "deadbeef-cafeefac-deadc0de-feedface",
"max_parallel": "4",
"disable_registration": "false",
"consistency_mode": "strong",
},
checkTimeout: 6 * time.Second,
path: "seaTech/",
service: "astronomy",
redirectAddr: "http://127.0.0.2:8200",
address: "127.0.0.2",
scheme: "https",
token: "deadbeef-cafeefac-deadc0de-feedface",
max_parallel: 4,
consistencyMode: "strong",
},
{
name: "Unix socket",
consulConfig: map[string]string{
"address": "unix:///tmp/.consul.http.sock",
},
address: "/tmp/.consul.http.sock",
scheme: "http", // Default, not overridden?
// Defaults
checkTimeout: 5 * time.Second,
redirectAddr: "http://127.0.0.1:8200",
path: "vault/",
service: "vault",
token: "",
max_parallel: 4,
disableReg: false,
consistencyMode: "default",
},
{
name: "Scheme in address",
consulConfig: map[string]string{
"address": "https://127.0.0.2:5000",
},
address: "127.0.0.2:5000",
scheme: "https",
// Defaults
checkTimeout: 5 * time.Second,
redirectAddr: "http://127.0.0.1:8200",
path: "vault/",
service: "vault",
token: "",
max_parallel: 4,
disableReg: false,
consistencyMode: "default",
},
{
name: "check timeout too short",
fail: true,
consulConfig: map[string]string{
"check_timeout": "99ms",
},
},
}
for _, test := range tests {
logger := logging.NewVaultLogger(log.Debug)
be, err := NewConsulServiceRegistration(test.consulConfig, logger)
if test.fail {
if err == nil {
t.Fatalf(`Expected config "%s" to fail`, test.name)
} else {
continue
}
} else if !test.fail && err != nil {
t.Fatalf("Expected config %s to not fail: %v", test.name, err)
}
c, ok := be.(*ConsulServiceRegistration)
if !ok {
t.Fatalf("Expected ConsulServiceRegistration: %s", test.name)
}
c.disableRegistration = true
if c.disableRegistration == false {
addr := os.Getenv("CONSUL_HTTP_ADDR")
if addr == "" {
continue
}
}
var shutdownCh sr.ShutdownChannel
waitGroup := &sync.WaitGroup{}
if err := c.RunServiceRegistration(waitGroup, shutdownCh, test.redirectAddr, testActiveFunc(0.5), testSealedFunc(0.5), testPerformanceStandbyFunc(0.5)); err != nil {
t.Fatalf("bad: %v", err)
}
if test.checkTimeout != c.checkTimeout {
t.Errorf("bad: %v != %v", test.checkTimeout, c.checkTimeout)
}
if test.service != c.serviceName {
t.Errorf("bad: %v != %v", test.service, c.serviceName)
}
// The configuration stored in the Consul "client" object is not exported, so
// we either have to skip validating it, or add a method to export it, or use reflection.
consulConfig := reflect.Indirect(reflect.ValueOf(c.Client)).FieldByName("config")
consulConfigScheme := consulConfig.FieldByName("Scheme").String()
consulConfigAddress := consulConfig.FieldByName("Address").String()
if test.scheme != consulConfigScheme {
t.Errorf("bad scheme value: %v != %v", test.scheme, consulConfigScheme)
}
if test.address != consulConfigAddress {
t.Errorf("bad address value: %v != %v", test.address, consulConfigAddress)
}
// FIXME(sean@): Unable to test max_parallel
// if test.max_parallel != cap(c.permitPool) {
// t.Errorf("bad: %v != %v", test.max_parallel, cap(c.permitPool))
// }
}
}
func TestConsul_serviceTags(t *testing.T) {
tests := []struct {
active bool
perfStandby bool
tags []string
}{
{
active: true,
perfStandby: false,
tags: []string{"active"},
},
{
active: false,
perfStandby: false,
tags: []string{"standby"},
},
{
active: false,
perfStandby: true,
tags: []string{"performance-standby"},
},
{
active: true,
perfStandby: true,
tags: []string{"performance-standby"},
},
}
c := testConsulServiceRegistration(t)
for _, test := range tests {
tags := c.fetchServiceTags(test.active, test.perfStandby)
if !reflect.DeepEqual(tags[:], test.tags[:]) {
t.Errorf("Bad %v: %v %v", test.active, tags, test.tags)
}
}
}
func TestConsul_setRedirectAddr(t *testing.T) {
tests := []struct {
addr string
host string
port int64
pass bool
}{
{
addr: "http://127.0.0.1:8200/",
host: "127.0.0.1",
port: 8200,
pass: true,
},
{
addr: "http://127.0.0.1:8200",
host: "127.0.0.1",
port: 8200,
pass: true,
},
{
addr: "https://127.0.0.1:8200",
host: "127.0.0.1",
port: 8200,
pass: true,
},
{
addr: "unix:///tmp/.vault.addr.sock",
host: "/tmp/.vault.addr.sock",
port: -1,
pass: true,
},
{
addr: "127.0.0.1:8200",
pass: false,
},
{
addr: "127.0.0.1",
pass: false,
},
}
for _, test := range tests {
c := testConsulServiceRegistration(t)
err := c.setRedirectAddr(test.addr)
if test.pass {
if err != nil {
t.Fatalf("bad: %v", err)
}
} else {
if err == nil {
t.Fatalf("bad, expected fail")
} else {
continue
}
}
if c.redirectHost != test.host {
t.Fatalf("bad: %v != %v", c.redirectHost, test.host)
}
if c.redirectPort != test.port {
t.Fatalf("bad: %v != %v", c.redirectPort, test.port)
}
}
}
func TestConsul_serviceID(t *testing.T) {
tests := []struct {
name string
redirectAddr string
serviceName string
expected string
valid bool
}{
{
name: "valid host w/o slash",
redirectAddr: "http://127.0.0.1:8200",
serviceName: "sea-tech-astronomy",
expected: "sea-tech-astronomy:127.0.0.1:8200",
valid: true,
},
{
name: "valid host w/ slash",
redirectAddr: "http://127.0.0.1:8200/",
serviceName: "sea-tech-astronomy",
expected: "sea-tech-astronomy:127.0.0.1:8200",
valid: true,
},
{
name: "valid https host w/ slash",
redirectAddr: "https://127.0.0.1:8200/",
serviceName: "sea-tech-astronomy",
expected: "sea-tech-astronomy:127.0.0.1:8200",
valid: true,
},
{
name: "invalid host name",
redirectAddr: "https://127.0.0.1:8200/",
serviceName: "sea_tech_astronomy",
expected: "",
valid: false,
},
}
logger := logging.NewVaultLogger(log.Debug)
for _, test := range tests {
be, err := NewConsulServiceRegistration(consulConf{
"service": test.serviceName,
}, logger)
if !test.valid {
if err == nil {
t.Fatalf("expected an error initializing for name %q", test.serviceName)
}
continue
}
if test.valid && err != nil {
t.Fatalf("expected Consul to initialize: %v", err)
}
c, ok := be.(*ConsulServiceRegistration)
if !ok {
t.Fatalf("Expected ConsulServiceRegistration")
}
if err := c.setRedirectAddr(test.redirectAddr); err != nil {
t.Fatalf("bad: %s %v", test.name, err)
}
serviceID := c.serviceID()
if serviceID != test.expected {
t.Fatalf("bad: %v != %v", serviceID, test.expected)
}
}
}

View File

@ -0,0 +1,40 @@
package serviceregistration
import (
"sync"
log "github.com/hashicorp/go-hclog"
)
// Factory is the factory function to create a ServiceRegistration.
type Factory func(config map[string]string, logger log.Logger) (ServiceRegistration, error)
// ServiceRegistration is an interface that advertises the state of Vault to a
// service discovery network.
type ServiceRegistration interface {
// NotifyActiveStateChange is used by Core to notify that this Vault
// instance has changed its status to active or standby.
NotifyActiveStateChange() error
// NotifySealedStateChange is used by Core to notify that Vault has changed
// its Sealed status to sealed or unsealed.
NotifySealedStateChange() error
// NotifyPerformanceStandbyStateChange is used by Core to notify that this
// Vault instance has changed it status to performance standby or standby.
NotifyPerformanceStandbyStateChange() error
// Run executes any background service discovery tasks until the
// shutdown channel is closed.
RunServiceRegistration(
waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, redirectAddr string,
activeFunc ActiveFunction, sealedFunc SealedFunction, perfStandbyFunc PerformanceStandbyFunction) error
}
// Callback signatures for RunServiceRegistration
type ActiveFunction func() bool
type SealedFunction func() bool
type PerformanceStandbyFunction func() bool
// ShutdownChannel is the shutdown signal for RunServiceRegistration
type ShutdownChannel chan struct{}

View File

@ -39,6 +39,7 @@ import (
"github.com/hashicorp/vault/sdk/helper/tlsutil" "github.com/hashicorp/vault/sdk/helper/tlsutil"
"github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/sdk/physical"
sr "github.com/hashicorp/vault/serviceregistration"
"github.com/hashicorp/vault/shamir" "github.com/hashicorp/vault/shamir"
"github.com/hashicorp/vault/vault/cluster" "github.com/hashicorp/vault/vault/cluster"
"github.com/hashicorp/vault/vault/seal" "github.com/hashicorp/vault/vault/seal"
@ -192,6 +193,9 @@ type Core struct {
// physical backend is the un-trusted backend with durable data // physical backend is the un-trusted backend with durable data
physical physical.Backend physical physical.Backend
// serviceRegistration is the ServiceRegistration network
serviceRegistration sr.ServiceRegistration
// underlyingPhysical will always point to the underlying backend // underlyingPhysical will always point to the underlying backend
// implementation. This is an un-trusted backend with durable data // implementation. This is an un-trusted backend with durable data
underlyingPhysical physical.Backend underlyingPhysical physical.Backend
@ -482,60 +486,62 @@ type Core struct {
// CoreConfig is used to parameterize a core // CoreConfig is used to parameterize a core
type CoreConfig struct { type CoreConfig struct {
DevToken string `json:"dev_token" structs:"dev_token" mapstructure:"dev_token"` DevToken string
BuiltinRegistry BuiltinRegistry `json:"builtin_registry" structs:"builtin_registry" mapstructure:"builtin_registry"` BuiltinRegistry BuiltinRegistry
LogicalBackends map[string]logical.Factory `json:"logical_backends" structs:"logical_backends" mapstructure:"logical_backends"` LogicalBackends map[string]logical.Factory
CredentialBackends map[string]logical.Factory `json:"credential_backends" structs:"credential_backends" mapstructure:"credential_backends"` CredentialBackends map[string]logical.Factory
AuditBackends map[string]audit.Factory `json:"audit_backends" structs:"audit_backends" mapstructure:"audit_backends"` AuditBackends map[string]audit.Factory
Physical physical.Backend `json:"physical" structs:"physical" mapstructure:"physical"` Physical physical.Backend
StorageType string `json:"storage_type" structs:"storage_type" mapstructure:"storage_type"` StorageType string
// May be nil, which disables HA operations // May be nil, which disables HA operations
HAPhysical physical.HABackend `json:"ha_physical" structs:"ha_physical" mapstructure:"ha_physical"` HAPhysical physical.HABackend
Seal Seal `json:"seal" structs:"seal" mapstructure:"seal"` ServiceRegistration sr.ServiceRegistration
SecureRandomReader io.Reader `json:"secure_random_reader" structs:"secure_random_reader" mapstructure:"secure_random_reader"` Seal Seal
Logger log.Logger `json:"logger" structs:"logger" mapstructure:"logger"` SecureRandomReader io.Reader
Logger log.Logger
// Disables the LRU cache on the physical backend // Disables the LRU cache on the physical backend
DisableCache bool `json:"disable_cache" structs:"disable_cache" mapstructure:"disable_cache"` DisableCache bool
// Disables mlock syscall // Disables mlock syscall
DisableMlock bool `json:"disable_mlock" structs:"disable_mlock" mapstructure:"disable_mlock"` DisableMlock bool
// Custom cache size for the LRU cache on the physical backend, or zero for default // Custom cache size for the LRU cache on the physical backend, or zero for default
CacheSize int `json:"cache_size" structs:"cache_size" mapstructure:"cache_size"` CacheSize int
// Set as the leader address for HA // Set as the leader address for HA
RedirectAddr string `json:"redirect_addr" structs:"redirect_addr" mapstructure:"redirect_addr"` RedirectAddr string
// Set as the cluster address for HA // Set as the cluster address for HA
ClusterAddr string `json:"cluster_addr" structs:"cluster_addr" mapstructure:"cluster_addr"` ClusterAddr string
DefaultLeaseTTL time.Duration `json:"default_lease_ttl" structs:"default_lease_ttl" mapstructure:"default_lease_ttl"` DefaultLeaseTTL time.Duration
MaxLeaseTTL time.Duration `json:"max_lease_ttl" structs:"max_lease_ttl" mapstructure:"max_lease_ttl"` MaxLeaseTTL time.Duration
ClusterName string `json:"cluster_name" structs:"cluster_name" mapstructure:"cluster_name"` ClusterName string
ClusterCipherSuites string `json:"cluster_cipher_suites" structs:"cluster_cipher_suites" mapstructure:"cluster_cipher_suites"` ClusterCipherSuites string
EnableUI bool `json:"ui" structs:"ui" mapstructure:"ui"` EnableUI bool
// Enable the raw endpoint // Enable the raw endpoint
EnableRaw bool `json:"enable_raw" structs:"enable_raw" mapstructure:"enable_raw"` EnableRaw bool
PluginDirectory string `json:"plugin_directory" structs:"plugin_directory" mapstructure:"plugin_directory"` PluginDirectory string
DisableSealWrap bool `json:"disable_sealwrap" structs:"disable_sealwrap" mapstructure:"disable_sealwrap"` DisableSealWrap bool
RawConfig *server.Config RawConfig *server.Config
@ -569,6 +575,7 @@ func (c *CoreConfig) Clone() *CoreConfig {
AuditBackends: c.AuditBackends, AuditBackends: c.AuditBackends,
Physical: c.Physical, Physical: c.Physical,
HAPhysical: c.HAPhysical, HAPhysical: c.HAPhysical,
ServiceRegistration: c.ServiceRegistration,
Seal: c.Seal, Seal: c.Seal,
Logger: c.Logger, Logger: c.Logger,
DisableCache: c.DisableCache, DisableCache: c.DisableCache,
@ -596,6 +603,26 @@ func (c *CoreConfig) Clone() *CoreConfig {
} }
} }
// GetServiceRegistration returns the config's ServiceRegistration, or nil if it does
// not exist.
func (c *CoreConfig) GetServiceRegistration() sr.ServiceRegistration {
// Check whether there is a ServiceRegistration explictly configured
if c.ServiceRegistration != nil {
return c.ServiceRegistration
}
// Check if HAPhysical is configured and implements ServiceRegistration
if c.HAPhysical != nil && c.HAPhysical.HAEnabled() {
if disc, ok := c.HAPhysical.(sr.ServiceRegistration); ok {
return disc
}
}
// No service discovery is available.
return nil
}
// NewCore is used to construct a new core // NewCore is used to construct a new core
func NewCore(conf *CoreConfig) (*Core, error) { func NewCore(conf *CoreConfig) (*Core, error) {
if conf.HAPhysical != nil && conf.HAPhysical.HAEnabled() { if conf.HAPhysical != nil && conf.HAPhysical.HAEnabled() {
@ -651,6 +678,7 @@ func NewCore(conf *CoreConfig) (*Core, error) {
entCore: entCore{}, entCore: entCore{},
devToken: conf.DevToken, devToken: conf.DevToken,
physical: conf.Physical, physical: conf.Physical,
serviceRegistration: conf.GetServiceRegistration(),
underlyingPhysical: conf.Physical, underlyingPhysical: conf.Physical,
storageType: conf.StorageType, storageType: conf.StorageType,
redirectAddr: conf.RedirectAddr, redirectAddr: conf.RedirectAddr,
@ -1347,13 +1375,10 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro
c.logger.Info("vault is unsealed") c.logger.Info("vault is unsealed")
} }
if c.ha != nil { if c.serviceRegistration != nil {
sd, ok := c.ha.(physical.ServiceDiscovery) if err := c.serviceRegistration.NotifySealedStateChange(); err != nil {
if ok { if c.logger.IsWarn() {
if err := sd.NotifySealedStateChange(); err != nil { c.logger.Warn("failed to notify unsealed status", "error", err)
if c.logger.IsWarn() {
c.logger.Warn("failed to notify unsealed status", "error", err)
}
} }
} }
} }
@ -1651,13 +1676,10 @@ func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, shutdownRaft b
return err return err
} }
if c.ha != nil { if c.serviceRegistration != nil {
sd, ok := c.ha.(physical.ServiceDiscovery) if err := c.serviceRegistration.NotifySealedStateChange(); err != nil {
if ok { if c.logger.IsWarn() {
if err := sd.NotifySealedStateChange(); err != nil { c.logger.Warn("failed to notify sealed status", "error", err)
if c.logger.IsWarn() {
c.logger.Warn("failed to notify sealed status", "error", err)
}
} }
} }
} }

View File

@ -3,6 +3,7 @@ package vault
import ( import (
"context" "context"
"reflect" "reflect"
"sync"
"testing" "testing"
"time" "time"
@ -17,6 +18,7 @@ import (
"github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/sdk/physical/inmem" "github.com/hashicorp/vault/sdk/physical/inmem"
sr "github.com/hashicorp/vault/serviceregistration"
) )
var ( var (
@ -2446,3 +2448,114 @@ func TestCore_HandleRequest_TokenCreate_RegisterAuthFailure(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
// mockServiceRegistration helps test whether standalone ServiceRegistration works
type mockServiceRegistration struct {
notifyActiveCount int
notifySealedCount int
notifyPerfCount int
runDiscoveryCount int
}
func (m *mockServiceRegistration) NotifyActiveStateChange() error {
m.notifyActiveCount++
return nil
}
func (m *mockServiceRegistration) NotifySealedStateChange() error {
m.notifySealedCount++
return nil
}
func (m *mockServiceRegistration) NotifyPerformanceStandbyStateChange() error {
m.notifyPerfCount++
return nil
}
func (m *mockServiceRegistration) RunServiceRegistration(
_ *sync.WaitGroup, _ sr.ShutdownChannel, _ string,
_ sr.ActiveFunction, _ sr.SealedFunction,
_ sr.PerformanceStandbyFunction) error {
m.runDiscoveryCount++
return nil
}
// TestCore_ServiceRegistration tests whether standalone ServiceRegistration works
func TestCore_ServiceRegistration(t *testing.T) {
// Make a mock service discovery
sr := &mockServiceRegistration{}
// Create the core
logger = logging.NewVaultLogger(log.Trace)
inm, err := inmem.NewInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
}
inmha, err := inmem.NewInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
}
const redirectAddr = "http://127.0.0.1:8200"
core, err := NewCore(&CoreConfig{
ServiceRegistration: sr,
Physical: inm,
HAPhysical: inmha.(physical.HABackend),
RedirectAddr: redirectAddr,
DisableMlock: true,
})
if err != nil {
t.Fatal(err)
}
// Vault should not yet be registered
if diff := deep.Equal(sr, &mockServiceRegistration{}); diff != nil {
t.Fatal(diff)
}
// Run service discovery on the core
wg := &sync.WaitGroup{}
var shutdown chan struct{}
activeFunc := func() bool {
if isLeader, _, _, err := core.Leader(); err == nil {
return isLeader
}
return false
}
err = sr.RunServiceRegistration(
wg, shutdown, redirectAddr, activeFunc, core.Sealed, core.PerfStandby)
if err != nil {
t.Fatal(err)
}
// Vault should be registered
if diff := deep.Equal(sr, &mockServiceRegistration{
runDiscoveryCount: 1,
}); diff != nil {
t.Fatal(diff)
}
// Initialize and unseal the core
keys, _ := TestCoreInit(t, core)
for _, key := range keys {
if _, err := TestCoreUnseal(core, TestKeyCopy(key)); err != nil {
t.Fatalf("unseal err: %s", err)
}
}
if core.Sealed() {
t.Fatal("should not be sealed")
}
// Wait for core to become active
TestWaitActive(t, core)
// Vault should be registered, unsealed, and active
if diff := deep.Equal(sr, &mockServiceRegistration{
runDiscoveryCount: 1,
notifyActiveCount: 1,
notifySealedCount: 1,
}); diff != nil {
t.Fatal(diff)
}
}

View File

@ -924,9 +924,8 @@ func (c *Core) advertiseLeader(ctx context.Context, uuid string, leaderLostCh <-
return err return err
} }
sd, ok := c.ha.(physical.ServiceDiscovery) if c.serviceRegistration != nil {
if ok { if err := c.serviceRegistration.NotifyActiveStateChange(); err != nil {
if err := sd.NotifyActiveStateChange(); err != nil {
if c.logger.IsWarn() { if c.logger.IsWarn() {
c.logger.Warn("failed to notify active status", "error", err) c.logger.Warn("failed to notify active status", "error", err)
} }
@ -960,9 +959,8 @@ func (c *Core) clearLeader(uuid string) error {
err := c.barrier.Delete(context.Background(), key) err := c.barrier.Delete(context.Background(), key)
// Advertise ourselves as a standby // Advertise ourselves as a standby
sd, ok := c.ha.(physical.ServiceDiscovery) if c.serviceRegistration != nil {
if ok { if err := c.serviceRegistration.NotifyActiveStateChange(); err != nil {
if err := sd.NotifyActiveStateChange(); err != nil {
if c.logger.IsWarn() { if c.logger.IsWarn() {
c.logger.Warn("failed to notify standby status", "error", err) c.logger.Warn("failed to notify standby status", "error", err)
} }

View File

@ -0,0 +1,231 @@
---
layout: "docs"
page_title: "Consul - Service Registration - Configuration"
sidebar_title: "Consul"
sidebar_current: "docs-configuration-storage-consul"
description: |-
Consul Service Registration registers Vault as a service in Consul with a default
health check.
---
# Consul Service Registration
Consul Service Registration registers Vault as a service in [Consul][consul] with
a default health check.
- **HashiCorp Supported** Consul Service Registration is officially supported
by HashiCorp.
```hcl
service_registration "consul" {
address = "127.0.0.1:8500"
}
```
Once properly configured, an unsealed Vault installation should be available and
accessible at:
```text
active.vault.service.consul
```
Unsealed Vault instances in standby mode are available at:
```text
standby.vault.service.consul
```
All unsealed Vault instances are available as healthy at:
```text
vault.service.consul
```
Sealed Vault instances will mark themselves as unhealthy to avoid being returned
at Consul's service discovery layer.
## `consul` Parameters
- `address` `(string: "127.0.0.1:8500")` Specifies the address of the Consul
agent to communicate with. This can be an IP address, DNS record, or unix
socket. It is recommended that you communicate with a local Consul agent; do
not communicate directly with a server.
- `check_timeout` `(string: "5s")` Specifies the check interval used to send
health check information back to Consul. This is specified using a label
suffix like `"30s"` or `"1h"`.
- `disable_registration` `(string: "false")` Specifies whether Vault should
register itself with Consul.
- `scheme` `(string: "http")` Specifies the scheme to use when communicating
with Consul. This can be set to "http" or "https". It is highly recommended
you communicate with Consul over https over non-local connections. When
communicating over a unix socket, this option is ignored.
- `service` `(string: "vault")` Specifies the name of the service to register
in Consul.
- `service_tags` `(string: "")` Specifies a comma-separated list of tags to
attach to the service registration in Consul.
- `service_address` `(string: nil)` Specifies a service-specific address to
set on the service registration in Consul. If unset, Vault will use what it
knows to be the HA redirect address - which is usually desirable. Setting
this parameter to `""` will tell Consul to leverage the configuration of the
node the service is registered on dynamically. This could be beneficial if
you intend to leverage Consul's
[`translate_wan_addrs`][consul-translate-wan-addrs] parameter.
- `token` `(string: "")` Specifies the [Consul ACL token][consul-acl] with
permission to read and write from the `path` in Consul's key-value store.
This is **not** a Vault token. See the ACL section below for help.
The following settings apply when communicating with Consul via an encrypted
connection. You can read more about encrypting Consul connections on the
[Consul encryption page][consul-encryption].
- `tls_ca_file` `(string: "")` Specifies the path to the CA certificate used
for Consul communication. This defaults to system bundle if not specified.
This should be set according to the
[`ca_file`](https://www.consul.io/docs/agent/options.html#ca_file) setting in
Consul.
- `tls_cert_file` `(string: "")` (optional) Specifies the path to the
certificate for Consul communication. This should be set according to the
[`cert_file`](https://www.consul.io/docs/agent/options.html#cert_file) setting
in Consul.
- `tls_key_file` `(string: "")` Specifies the path to the private key for
Consul communication. This should be set according to the
[`key_file`](https://www.consul.io/docs/agent/options.html#key_file) setting
in Consul.
- `tls_min_version` `(string: "tls12")` Specifies the minimum TLS version to
use. Accepted values are `"tls10"`, `"tls11"` or `"tls12"`.
- `tls_skip_verify` `(string: "false")` Disable verification of TLS certificates.
Using this option is highly discouraged.
## ACLs
If using ACLs in Consul, you'll need appropriate permissions. For Consul 0.8,
the following will work for most use-cases, assuming that your service name is
`vault` and the prefix being used is `vault/`:
```json
{
"key": {
"vault/": {
"policy": "write"
}
},
"node": {
"": {
"policy": "write"
}
},
"service": {
"vault": {
"policy": "write"
}
},
"agent": {
"": {
"policy": "write"
}
},
"session": {
"": {
"policy": "write"
}
}
}
```
For Consul 1.4+, the following example takes into account the changed ACL
language:
```json
{
"key_prefix": {
"vault/": {
"policy": "write"
}
},
"node_prefix": {
"": {
"policy": "write"
}
},
"service": {
"vault": {
"policy": "write"
}
},
"agent_prefix": {
"": {
"policy": "write"
}
},
"session_prefix": {
"": {
"policy": "write"
}
}
}
```
## `consul` Examples
### Local Agent
This example shows a sample configuration which communicates with a local
Consul agent running on `127.0.0.1:8500`.
```hcl
service_registration "consul" {}
```
### Detailed Customization
This example shows communicating with Consul on a custom address with an ACL
token.
```hcl
service_registration "consul" {
address = "10.5.7.92:8194"
token = "abcd1234"
}
```
### Consul via Unix Socket
This example shows communicating with Consul over a local unix socket.
```hcl
service_registration "consul" {
address = "unix:///tmp/.consul.http.sock"
}
```
### Custom TLS
This example shows using a custom CA, certificate, and key file to securely
communicate with Consul over TLS.
```hcl
service_registration "consul" {
scheme = "https"
tls_ca_file = "/etc/pem/vault.ca"
tls_cert_file = "/etc/pem/vault.cert"
tls_key_file = "/etc/pem/vault.key"
}
```
[consul]: https://www.consul.io/ "Consul by HashiCorp"
[consul-acl]: https://www.consul.io/docs/guides/acl.html "Consul ACLs"
[consul-encryption]: https://www.consul.io/docs/agent/encryption.html "Consul Encryption"
[consul-translate-wan-addrs]: https://www.consul.io/docs/agent/options.html#translate_wan_addrs "Consul Configuration"

View File

@ -0,0 +1,67 @@
---
layout: "docs"
page_title: "Service Registration - Configuration"
sidebar_title: "<code>service_registration</code>"
sidebar_current: "docs-configuration-serviceDiscovery"
description: |-
The optional `service_registration` stanza configures Vault's mechanism for
service registration.
---
# `service_registration` Stanza
The optional `service_registration` stanza configures Vault's mechanism for
service registration. The `service_registration` stanza is designed for use cases
where you would like to use a system like [Consul][consul] for [service
discovery][consul-discovery], but use a different system for the [storage
backend][storage-backend].
When Consul is configured as the [storage backend][consul-backend], Vault
implicitly uses Consul for service registration, so the `service_registration` stanza
is not needed.
For times when you would like to use a different storage backend, like
[Raft][raft-backend], but still have service registration available, the
`service_registration` stanza can be used:
```hcl
service_registration "consul" {
address = "127.0.0.1:8500"
}
storage "raft" {
path = "/path/to/raft/data"
node_id = "raft_node_1"
}
```
For information about a specific service registration provider, choose one from
the navigation on the left.
## Configuration
Service registration configuration is done through the Vault configuration file
using the `service_registration` stanza:
```hcl
service_registration [NAME] {
[PARAMETERS...]
}
```
For example:
```hcl
service_registration "consul" {
address = "127.0.0.1:8500"
}
```
For configuration options which also read an environment variable, the
environment variable will take precedence over values in the configuration
file.
[consul]: https://www.consul.io/
[consul-discovery]: https://www.consul.io/discovery.html
[storage-backend]: /docs/configuration/storage/index.html
[consul-backend]: /docs/configuration/storage/consul.html
[raft-backend]: /docs/configuration/storage/raft.html

View File

@ -78,6 +78,11 @@
'swift', 'swift',
'zookeeper' 'zookeeper'
] ]
}, {
category: 'service-registration',
content: [
'consul',
]
}, },
'telemetry', 'telemetry',
{ category: 'ui' }, { category: 'ui' },