Added connect proxy config and local agent state setup on boot.
This commit is contained in:
parent
280382c25f
commit
78e48fd547
|
@ -246,6 +246,8 @@ func LocalConfig(cfg *config.RuntimeConfig) local.Config {
|
||||||
NodeID: cfg.NodeID,
|
NodeID: cfg.NodeID,
|
||||||
NodeName: cfg.NodeName,
|
NodeName: cfg.NodeName,
|
||||||
TaggedAddresses: map[string]string{},
|
TaggedAddresses: map[string]string{},
|
||||||
|
ProxyBindMinPort: cfg.ConnectProxyBindMinPort,
|
||||||
|
ProxyBindMaxPort: cfg.ConnectProxyBindMaxPort,
|
||||||
}
|
}
|
||||||
for k, v := range cfg.TaggedAddresses {
|
for k, v := range cfg.TaggedAddresses {
|
||||||
lc.TaggedAddresses[k] = v
|
lc.TaggedAddresses[k] = v
|
||||||
|
@ -328,6 +330,9 @@ func (a *Agent) Start() error {
|
||||||
if err := a.loadServices(c); err != nil {
|
if err := a.loadServices(c); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := a.loadProxies(c); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if err := a.loadChecks(c); err != nil {
|
if err := a.loadChecks(c); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1973,6 +1978,58 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddProxy adds a new local Connect Proxy instance to be managed by the agent.
|
||||||
|
//
|
||||||
|
// It REQUIRES that the service that is being proxied is already present in the
|
||||||
|
// local state. Note that this is only used for agent-managed proxies so we can
|
||||||
|
// ensure that we always make this true. For externally managed and registered
|
||||||
|
// proxies we explicitly allow the proxy to be registered first to make
|
||||||
|
// bootstrap ordering of a new service simpler but the same is not true here
|
||||||
|
// since this is only ever called when setting up a _managed_ proxy which was
|
||||||
|
// registered as part of a service registration either from config or HTTP API
|
||||||
|
// call.
|
||||||
|
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error {
|
||||||
|
// Lookup the target service token in state if there is one.
|
||||||
|
token := a.State.ServiceToken(proxy.TargetServiceID)
|
||||||
|
|
||||||
|
// Add the proxy to local state first since we may need to assign a port which
|
||||||
|
// needs to be coordinate under state lock. AddProxy will generate the
|
||||||
|
// NodeService for the proxy populated with the allocated (or configured) port
|
||||||
|
// and an ID, but it doesn't add it to the agent directly since that could
|
||||||
|
// deadlock and we may need to coordinate adding it and persisting etc.
|
||||||
|
proxyService, err := a.State.AddProxy(proxy, token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(banks): register proxy health checks.
|
||||||
|
err = a.AddService(proxyService, nil, persist, token)
|
||||||
|
if err != nil {
|
||||||
|
// Remove the state too
|
||||||
|
a.State.RemoveProxy(proxyService.ID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(banks): persist some of the local proxy state (not the _proxy_ token).
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveProxy stops and removes a local proxy instance.
|
||||||
|
func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
|
||||||
|
// Validate proxyID
|
||||||
|
if proxyID == "" {
|
||||||
|
return fmt.Errorf("proxyID missing")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.State.RemoveProxy(proxyID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(banks): unpersist proxy
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *Agent) cancelCheckMonitors(checkID types.CheckID) {
|
func (a *Agent) cancelCheckMonitors(checkID types.CheckID) {
|
||||||
// Stop any monitors
|
// Stop any monitors
|
||||||
delete(a.checkReapAfter, checkID)
|
delete(a.checkReapAfter, checkID)
|
||||||
|
@ -2366,6 +2423,25 @@ func (a *Agent) unloadChecks() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// loadProxies will load connect proxy definitions from configuration and
|
||||||
|
// persisted definitions on disk, and load them into the local agent.
|
||||||
|
func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {
|
||||||
|
for _, proxy := range conf.ConnectProxies {
|
||||||
|
if err := a.AddProxy(proxy, false); err != nil {
|
||||||
|
return fmt.Errorf("failed adding proxy: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(banks): persist proxy state and re-load it here?
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// unloadProxies will deregister all proxies known to the local agent.
|
||||||
|
func (a *Agent) unloadProxies() error {
|
||||||
|
// TODO(banks): implement me
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// snapshotCheckState is used to snapshot the current state of the health
|
// snapshotCheckState is used to snapshot the current state of the health
|
||||||
// checks. This is done before we reload our checks, so that we can properly
|
// checks. This is done before we reload our checks, so that we can properly
|
||||||
// restore into the same state.
|
// restore into the same state.
|
||||||
|
@ -2514,6 +2590,9 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
||||||
if err := a.loadServices(newCfg); err != nil {
|
if err := a.loadServices(newCfg); err != nil {
|
||||||
return fmt.Errorf("Failed reloading services: %s", err)
|
return fmt.Errorf("Failed reloading services: %s", err)
|
||||||
}
|
}
|
||||||
|
if err := a.loadProxies(newCfg); err != nil {
|
||||||
|
return fmt.Errorf("Failed reloading proxies: %s", err)
|
||||||
|
}
|
||||||
if err := a.loadChecks(newCfg); err != nil {
|
if err := a.loadChecks(newCfg); err != nil {
|
||||||
return fmt.Errorf("Failed reloading checks: %s", err)
|
return fmt.Errorf("Failed reloading checks: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/checks"
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/consul"
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -2235,3 +2237,103 @@ func TestAgent_reloadWatchesHTTPS(t *testing.T) {
|
||||||
t.Fatalf("bad: %s", err)
|
t.Fatalf("bad: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAgent_AddProxy(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
a := NewTestAgent(t.Name(), `
|
||||||
|
node_name = "node1"
|
||||||
|
`)
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
// Register a target service we can use
|
||||||
|
reg := &structs.NodeService{
|
||||||
|
Service: "web",
|
||||||
|
Port: 8080,
|
||||||
|
}
|
||||||
|
require.NoError(t, a.AddService(reg, nil, false, ""))
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
desc string
|
||||||
|
proxy *structs.ConnectManagedProxy
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "basic proxy adding, unregistered service",
|
||||||
|
proxy: &structs.ConnectManagedProxy{
|
||||||
|
ExecMode: structs.ProxyExecModeDaemon,
|
||||||
|
Command: "consul connect proxy",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": "bar",
|
||||||
|
},
|
||||||
|
TargetServiceID: "db", // non-existent service.
|
||||||
|
},
|
||||||
|
// Target service must be registered.
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "basic proxy adding, unregistered service",
|
||||||
|
proxy: &structs.ConnectManagedProxy{
|
||||||
|
ExecMode: structs.ProxyExecModeDaemon,
|
||||||
|
Command: "consul connect proxy",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": "bar",
|
||||||
|
},
|
||||||
|
TargetServiceID: "web",
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.desc, func(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
err := a.AddProxy(tt.proxy, false)
|
||||||
|
if tt.wantErr {
|
||||||
|
require.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Test the ID was created as we expect.
|
||||||
|
got := a.State.Proxy("web-proxy")
|
||||||
|
require.Equal(tt.proxy, got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAgent_RemoveProxy(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
a := NewTestAgent(t.Name(), `
|
||||||
|
node_name = "node1"
|
||||||
|
`)
|
||||||
|
defer a.Shutdown()
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
// Register a target service we can use
|
||||||
|
reg := &structs.NodeService{
|
||||||
|
Service: "web",
|
||||||
|
Port: 8080,
|
||||||
|
}
|
||||||
|
require.NoError(a.AddService(reg, nil, false, ""))
|
||||||
|
|
||||||
|
// Add a proxy for web
|
||||||
|
pReg := &structs.ConnectManagedProxy{
|
||||||
|
TargetServiceID: "web",
|
||||||
|
}
|
||||||
|
require.NoError(a.AddProxy(pReg, false))
|
||||||
|
|
||||||
|
// Test the ID was created as we expect.
|
||||||
|
gotProxy := a.State.Proxy("web-proxy")
|
||||||
|
require.Equal(pReg, gotProxy)
|
||||||
|
|
||||||
|
err := a.RemoveProxy("web-proxy", false)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
gotProxy = a.State.Proxy("web-proxy")
|
||||||
|
require.Nil(gotProxy)
|
||||||
|
|
||||||
|
// Removing invalid proxy should be an error
|
||||||
|
err = a.RemoveProxy("foobar", false)
|
||||||
|
require.Error(err)
|
||||||
|
}
|
||||||
|
|
|
@ -322,8 +322,15 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var services []*structs.ServiceDefinition
|
var services []*structs.ServiceDefinition
|
||||||
|
var proxies []*structs.ConnectManagedProxy
|
||||||
for _, service := range c.Services {
|
for _, service := range c.Services {
|
||||||
services = append(services, b.serviceVal(&service))
|
services = append(services, b.serviceVal(&service))
|
||||||
|
// Register any connect proxies requested
|
||||||
|
if proxy := b.connectManagedProxyVal(&service); proxy != nil {
|
||||||
|
proxies = append(proxies, proxy)
|
||||||
|
}
|
||||||
|
// TODO(banks): support connect-native registrations (v.Connect.Enabled ==
|
||||||
|
// true)
|
||||||
}
|
}
|
||||||
if c.Service != nil {
|
if c.Service != nil {
|
||||||
services = append(services, b.serviceVal(c.Service))
|
services = append(services, b.serviceVal(c.Service))
|
||||||
|
@ -520,6 +527,9 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
||||||
consulRaftHeartbeatTimeout := b.durationVal("consul.raft.heartbeat_timeout", c.Consul.Raft.HeartbeatTimeout) * time.Duration(performanceRaftMultiplier)
|
consulRaftHeartbeatTimeout := b.durationVal("consul.raft.heartbeat_timeout", c.Consul.Raft.HeartbeatTimeout) * time.Duration(performanceRaftMultiplier)
|
||||||
consulRaftLeaderLeaseTimeout := b.durationVal("consul.raft.leader_lease_timeout", c.Consul.Raft.LeaderLeaseTimeout) * time.Duration(performanceRaftMultiplier)
|
consulRaftLeaderLeaseTimeout := b.durationVal("consul.raft.leader_lease_timeout", c.Consul.Raft.LeaderLeaseTimeout) * time.Duration(performanceRaftMultiplier)
|
||||||
|
|
||||||
|
// Connect proxy defaults.
|
||||||
|
proxyBindMinPort, proxyBindMaxPort := b.connectProxyPortRange(c.Connect)
|
||||||
|
|
||||||
// ----------------------------------------------------------------
|
// ----------------------------------------------------------------
|
||||||
// build runtime config
|
// build runtime config
|
||||||
//
|
//
|
||||||
|
@ -638,6 +648,9 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
||||||
CheckUpdateInterval: b.durationVal("check_update_interval", c.CheckUpdateInterval),
|
CheckUpdateInterval: b.durationVal("check_update_interval", c.CheckUpdateInterval),
|
||||||
Checks: checks,
|
Checks: checks,
|
||||||
ClientAddrs: clientAddrs,
|
ClientAddrs: clientAddrs,
|
||||||
|
ConnectProxies: proxies,
|
||||||
|
ConnectProxyBindMinPort: proxyBindMinPort,
|
||||||
|
ConnectProxyBindMaxPort: proxyBindMaxPort,
|
||||||
DataDir: b.stringVal(c.DataDir),
|
DataDir: b.stringVal(c.DataDir),
|
||||||
Datacenter: strings.ToLower(b.stringVal(c.Datacenter)),
|
Datacenter: strings.ToLower(b.stringVal(c.Datacenter)),
|
||||||
DevMode: b.boolVal(b.Flags.DevMode),
|
DevMode: b.boolVal(b.Flags.DevMode),
|
||||||
|
@ -1010,6 +1023,75 @@ func (b *Builder) serviceVal(v *ServiceDefinition) *structs.ServiceDefinition {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Builder) connectManagedProxyVal(v *ServiceDefinition) *structs.ConnectManagedProxy {
|
||||||
|
if v.Connect == nil || v.Connect.Proxy == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
p := v.Connect.Proxy
|
||||||
|
|
||||||
|
targetID := b.stringVal(v.ID)
|
||||||
|
if targetID == "" {
|
||||||
|
targetID = b.stringVal(v.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
execMode := structs.ProxyExecModeDaemon
|
||||||
|
if p.ExecMode != nil {
|
||||||
|
switch *p.ExecMode {
|
||||||
|
case "daemon":
|
||||||
|
execMode = structs.ProxyExecModeDaemon
|
||||||
|
case "script":
|
||||||
|
execMode = structs.ProxyExecModeScript
|
||||||
|
default:
|
||||||
|
b.err = multierror.Append(fmt.Errorf(
|
||||||
|
"service[%s]: invalid connect proxy exec_mode: %s", targetID,
|
||||||
|
*p.ExecMode))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &structs.ConnectManagedProxy{
|
||||||
|
ExecMode: execMode,
|
||||||
|
Command: b.stringVal(p.Command),
|
||||||
|
Config: p.Config,
|
||||||
|
// ProxyService will be setup when the agent registers the configured
|
||||||
|
// proxies and starts them etc. We could do it here but we may need to do
|
||||||
|
// things like probe the OS for a free port etc. And we have enough info to
|
||||||
|
// resolve all this later.
|
||||||
|
ProxyService: nil,
|
||||||
|
TargetServiceID: targetID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Builder) connectProxyPortRange(v *Connect) (int, int) {
|
||||||
|
// Choose this default range just because. There are zero "safe" ranges that
|
||||||
|
// don't have something somewhere that uses them which is why this is
|
||||||
|
// configurable. We rely on the host not having any of these ports for non
|
||||||
|
// agent managed proxies. I went with 20k because I know of at least one
|
||||||
|
// super-common server memcached that defaults to the 10k range.
|
||||||
|
start := 20000
|
||||||
|
end := 20256 // 256 proxies on a host is enough for anyone ;)
|
||||||
|
|
||||||
|
if v == nil || v.ProxyDefaults == nil {
|
||||||
|
return start, end
|
||||||
|
}
|
||||||
|
|
||||||
|
min, max := v.ProxyDefaults.BindMinPort, v.ProxyDefaults.BindMaxPort
|
||||||
|
if min == nil && max == nil {
|
||||||
|
return start, end
|
||||||
|
}
|
||||||
|
|
||||||
|
// If either was set show a warning if the overall range was invalid
|
||||||
|
if min == nil || max == nil || *max < *min {
|
||||||
|
b.warn("Connect proxy_defaults bind_min_port and bind_max_port must both "+
|
||||||
|
"be set with max >= min. To disable automatic port allocation set both "+
|
||||||
|
"to 0. Using default range %d..%d.", start, end)
|
||||||
|
return start, end
|
||||||
|
}
|
||||||
|
|
||||||
|
return *min, *max
|
||||||
|
}
|
||||||
|
|
||||||
func (b *Builder) boolVal(v *bool) bool {
|
func (b *Builder) boolVal(v *bool) bool {
|
||||||
if v == nil {
|
if v == nil {
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -159,6 +159,7 @@ type Config struct {
|
||||||
CheckUpdateInterval *string `json:"check_update_interval,omitempty" hcl:"check_update_interval" mapstructure:"check_update_interval"`
|
CheckUpdateInterval *string `json:"check_update_interval,omitempty" hcl:"check_update_interval" mapstructure:"check_update_interval"`
|
||||||
Checks []CheckDefinition `json:"checks,omitempty" hcl:"checks" mapstructure:"checks"`
|
Checks []CheckDefinition `json:"checks,omitempty" hcl:"checks" mapstructure:"checks"`
|
||||||
ClientAddr *string `json:"client_addr,omitempty" hcl:"client_addr" mapstructure:"client_addr"`
|
ClientAddr *string `json:"client_addr,omitempty" hcl:"client_addr" mapstructure:"client_addr"`
|
||||||
|
Connect *Connect `json:"connect,omitempty" hcl:"connect" mapstructure:"connect"`
|
||||||
DNS DNS `json:"dns_config,omitempty" hcl:"dns_config" mapstructure:"dns_config"`
|
DNS DNS `json:"dns_config,omitempty" hcl:"dns_config" mapstructure:"dns_config"`
|
||||||
DNSDomain *string `json:"domain,omitempty" hcl:"domain" mapstructure:"domain"`
|
DNSDomain *string `json:"domain,omitempty" hcl:"domain" mapstructure:"domain"`
|
||||||
DNSRecursors []string `json:"recursors,omitempty" hcl:"recursors" mapstructure:"recursors"`
|
DNSRecursors []string `json:"recursors,omitempty" hcl:"recursors" mapstructure:"recursors"`
|
||||||
|
@ -324,6 +325,7 @@ type ServiceDefinition struct {
|
||||||
Checks []CheckDefinition `json:"checks,omitempty" hcl:"checks" mapstructure:"checks"`
|
Checks []CheckDefinition `json:"checks,omitempty" hcl:"checks" mapstructure:"checks"`
|
||||||
Token *string `json:"token,omitempty" hcl:"token" mapstructure:"token"`
|
Token *string `json:"token,omitempty" hcl:"token" mapstructure:"token"`
|
||||||
EnableTagOverride *bool `json:"enable_tag_override,omitempty" hcl:"enable_tag_override" mapstructure:"enable_tag_override"`
|
EnableTagOverride *bool `json:"enable_tag_override,omitempty" hcl:"enable_tag_override" mapstructure:"enable_tag_override"`
|
||||||
|
Connect *ServiceConnect `json:"connect,omitempty" hcl:"connect" mapstructure:"connect"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CheckDefinition struct {
|
type CheckDefinition struct {
|
||||||
|
@ -349,6 +351,47 @@ type CheckDefinition struct {
|
||||||
DeregisterCriticalServiceAfter *string `json:"deregister_critical_service_after,omitempty" hcl:"deregister_critical_service_after" mapstructure:"deregister_critical_service_after"`
|
DeregisterCriticalServiceAfter *string `json:"deregister_critical_service_after,omitempty" hcl:"deregister_critical_service_after" mapstructure:"deregister_critical_service_after"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServiceConnect is the connect block within a service registration
|
||||||
|
type ServiceConnect struct {
|
||||||
|
// TODO(banks) add way to specify that the app is connect-native
|
||||||
|
// Proxy configures a connect proxy instance for the service
|
||||||
|
Proxy *ServiceConnectProxy `json:"proxy,omitempty" hcl:"proxy" mapstructure:"proxy"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServiceConnectProxy struct {
|
||||||
|
Command *string `json:"command,omitempty" hcl:"command" mapstructure:"command"`
|
||||||
|
ExecMode *string `json:"exec_mode,omitempty" hcl:"exec_mode" mapstructure:"exec_mode"`
|
||||||
|
Config map[string]interface{} `json:"config,omitempty" hcl:"config" mapstructure:"config"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect is the agent-global connect configuration.
|
||||||
|
type Connect struct {
|
||||||
|
// Enabled opts the agent into connect. It should be set on all clients and
|
||||||
|
// servers in a cluster for correct connect operation. TODO(banks) review that.
|
||||||
|
Enabled bool `json:"enabled,omitempty" hcl:"enabled" mapstructure:"enabled"`
|
||||||
|
ProxyDefaults *ConnectProxyDefaults `json:"proxy_defaults,omitempty" hcl:"proxy_defaults" mapstructure:"proxy_defaults"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConnectProxyDefaults is the agent-global connect proxy configuration.
|
||||||
|
type ConnectProxyDefaults struct {
|
||||||
|
// BindMinPort, BindMaxPort are the inclusive lower and upper bounds on the
|
||||||
|
// port range allocated to the agent to assign to connect proxies that have no
|
||||||
|
// bind_port specified.
|
||||||
|
BindMinPort *int `json:"bind_min_port,omitempty" hcl:"bind_min_port" mapstructure:"bind_min_port"`
|
||||||
|
BindMaxPort *int `json:"bind_max_port,omitempty" hcl:"bind_max_port" mapstructure:"bind_max_port"`
|
||||||
|
// ExecMode is used where a registration doesn't include an exec_mode.
|
||||||
|
// Defaults to daemon.
|
||||||
|
ExecMode *string `json:"exec_mode,omitempty" hcl:"exec_mode" mapstructure:"exec_mode"`
|
||||||
|
// DaemonCommand is used to start proxy in exec_mode = daemon if not specified
|
||||||
|
// at registration time.
|
||||||
|
DaemonCommand *string `json:"daemon_command,omitempty" hcl:"daemon_command" mapstructure:"daemon_command"`
|
||||||
|
// ScriptCommand is used to start proxy in exec_mode = script if not specified
|
||||||
|
// at registration time.
|
||||||
|
ScriptCommand *string `json:"script_command,omitempty" hcl:"script_command" mapstructure:"script_command"`
|
||||||
|
// Config is merged into an Config specified at registration time.
|
||||||
|
Config map[string]interface{} `json:"config,omitempty" hcl:"config" mapstructure:"config"`
|
||||||
|
}
|
||||||
|
|
||||||
type DNS struct {
|
type DNS struct {
|
||||||
AllowStale *bool `json:"allow_stale,omitempty" hcl:"allow_stale" mapstructure:"allow_stale"`
|
AllowStale *bool `json:"allow_stale,omitempty" hcl:"allow_stale" mapstructure:"allow_stale"`
|
||||||
ARecordLimit *int `json:"a_record_limit,omitempty" hcl:"a_record_limit" mapstructure:"a_record_limit"`
|
ARecordLimit *int `json:"a_record_limit,omitempty" hcl:"a_record_limit" mapstructure:"a_record_limit"`
|
||||||
|
|
|
@ -616,6 +616,41 @@ type RuntimeConfig struct {
|
||||||
// flag: -client string
|
// flag: -client string
|
||||||
ClientAddrs []*net.IPAddr
|
ClientAddrs []*net.IPAddr
|
||||||
|
|
||||||
|
// ConnectEnabled opts the agent into connect. It should be set on all clients
|
||||||
|
// and servers in a cluster for correct connect operation. TODO(banks) review
|
||||||
|
// that.
|
||||||
|
ConnectEnabled bool
|
||||||
|
|
||||||
|
// ConnectProxies is a list of configured proxies taken from the "connect"
|
||||||
|
// block of service registrations.
|
||||||
|
ConnectProxies []*structs.ConnectManagedProxy
|
||||||
|
|
||||||
|
// ConnectProxyBindMinPort is the inclusive start of the range of ports
|
||||||
|
// allocated to the agent for starting proxy listeners on where no explicit
|
||||||
|
// port is specified.
|
||||||
|
ConnectProxyBindMinPort int
|
||||||
|
|
||||||
|
// ConnectProxyBindMaxPort is the inclusive end of the range of ports
|
||||||
|
// allocated to the agent for starting proxy listeners on where no explicit
|
||||||
|
// port is specified.
|
||||||
|
ConnectProxyBindMaxPort int
|
||||||
|
|
||||||
|
// ConnectProxyDefaultExecMode is used where a registration doesn't include an
|
||||||
|
// exec_mode. Defaults to daemon.
|
||||||
|
ConnectProxyDefaultExecMode *string
|
||||||
|
|
||||||
|
// ConnectProxyDefaultDaemonCommand is used to start proxy in exec_mode =
|
||||||
|
// daemon if not specified at registration time.
|
||||||
|
ConnectProxyDefaultDaemonCommand *string
|
||||||
|
|
||||||
|
// ConnectProxyDefaultScriptCommand is used to start proxy in exec_mode =
|
||||||
|
// script if not specified at registration time.
|
||||||
|
ConnectProxyDefaultScriptCommand *string
|
||||||
|
|
||||||
|
// ConnectProxyDefaultConfig is merged with any config specified at
|
||||||
|
// registration time to allow global control of defaults.
|
||||||
|
ConnectProxyDefaultConfig map[string]interface{}
|
||||||
|
|
||||||
// DNSAddrs contains the list of TCP and UDP addresses the DNS server will
|
// DNSAddrs contains the list of TCP and UDP addresses the DNS server will
|
||||||
// bind to. If the DNS endpoint is disabled (ports.dns <= 0) the list is
|
// bind to. If the DNS endpoint is disabled (ports.dns <= 0) the list is
|
||||||
// empty.
|
// empty.
|
||||||
|
|
|
@ -2353,6 +2353,21 @@ func TestFullConfig(t *testing.T) {
|
||||||
],
|
],
|
||||||
"check_update_interval": "16507s",
|
"check_update_interval": "16507s",
|
||||||
"client_addr": "93.83.18.19",
|
"client_addr": "93.83.18.19",
|
||||||
|
"connect": {
|
||||||
|
"enabled": true,
|
||||||
|
"proxy_defaults": {
|
||||||
|
"bind_min_port": 2000,
|
||||||
|
"bind_max_port": 3000,
|
||||||
|
"exec_mode": "script",
|
||||||
|
"daemon_command": "consul connect proxy",
|
||||||
|
"script_command": "proxyctl.sh",
|
||||||
|
"config": {
|
||||||
|
"foo": "bar",
|
||||||
|
"connect_timeout_ms": 1000,
|
||||||
|
"pedantic_mode": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
"data_dir": "` + dataDir + `",
|
"data_dir": "` + dataDir + `",
|
||||||
"datacenter": "rzo029wg",
|
"datacenter": "rzo029wg",
|
||||||
"disable_anonymous_signature": true,
|
"disable_anonymous_signature": true,
|
||||||
|
@ -2613,7 +2628,16 @@ func TestFullConfig(t *testing.T) {
|
||||||
"ttl": "11222s",
|
"ttl": "11222s",
|
||||||
"deregister_critical_service_after": "68482s"
|
"deregister_critical_service_after": "68482s"
|
||||||
}
|
}
|
||||||
]
|
],
|
||||||
|
"connect": {
|
||||||
|
"proxy": {
|
||||||
|
"exec_mode": "daemon",
|
||||||
|
"command": "awesome-proxy",
|
||||||
|
"config": {
|
||||||
|
"foo": "qux"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"session_ttl_min": "26627s",
|
"session_ttl_min": "26627s",
|
||||||
|
@ -2786,6 +2810,21 @@ func TestFullConfig(t *testing.T) {
|
||||||
]
|
]
|
||||||
check_update_interval = "16507s"
|
check_update_interval = "16507s"
|
||||||
client_addr = "93.83.18.19"
|
client_addr = "93.83.18.19"
|
||||||
|
connect {
|
||||||
|
enabled = true
|
||||||
|
proxy_defaults {
|
||||||
|
bind_min_port = 2000
|
||||||
|
bind_max_port = 3000
|
||||||
|
exec_mode = "script"
|
||||||
|
daemon_command = "consul connect proxy"
|
||||||
|
script_command = "proxyctl.sh"
|
||||||
|
config = {
|
||||||
|
foo = "bar"
|
||||||
|
connect_timeout_ms = 1000
|
||||||
|
pedantic_mode = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
data_dir = "` + dataDir + `"
|
data_dir = "` + dataDir + `"
|
||||||
datacenter = "rzo029wg"
|
datacenter = "rzo029wg"
|
||||||
disable_anonymous_signature = true
|
disable_anonymous_signature = true
|
||||||
|
@ -3047,6 +3086,15 @@ func TestFullConfig(t *testing.T) {
|
||||||
deregister_critical_service_after = "68482s"
|
deregister_critical_service_after = "68482s"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
connect {
|
||||||
|
proxy {
|
||||||
|
exec_mode = "daemon"
|
||||||
|
command = "awesome-proxy"
|
||||||
|
config = {
|
||||||
|
foo = "qux"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
session_ttl_min = "26627s"
|
session_ttl_min = "26627s"
|
||||||
|
@ -3355,8 +3403,23 @@ func TestFullConfig(t *testing.T) {
|
||||||
DeregisterCriticalServiceAfter: 13209 * time.Second,
|
DeregisterCriticalServiceAfter: 13209 * time.Second,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
CheckUpdateInterval: 16507 * time.Second,
|
CheckUpdateInterval: 16507 * time.Second,
|
||||||
ClientAddrs: []*net.IPAddr{ipAddr("93.83.18.19")},
|
ClientAddrs: []*net.IPAddr{ipAddr("93.83.18.19")},
|
||||||
|
ConnectProxies: []*structs.ConnectManagedProxy{
|
||||||
|
{
|
||||||
|
ExecMode: structs.ProxyExecModeDaemon,
|
||||||
|
Command: "awesome-proxy",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": "qux", // Overriden by service
|
||||||
|
// Note globals are not merged here but on rendering to the proxy
|
||||||
|
// endpoint. That's because proxies can be added later too so merging
|
||||||
|
// at config time is redundant if we have to do it later anyway.
|
||||||
|
},
|
||||||
|
TargetServiceID: "MRHVMZuD",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ConnectProxyBindMinPort: 2000,
|
||||||
|
ConnectProxyBindMaxPort: 3000,
|
||||||
DNSAddrs: []net.Addr{tcpAddr("93.95.95.81:7001"), udpAddr("93.95.95.81:7001")},
|
DNSAddrs: []net.Addr{tcpAddr("93.95.95.81:7001"), udpAddr("93.95.95.81:7001")},
|
||||||
DNSARecordLimit: 29907,
|
DNSARecordLimit: 29907,
|
||||||
DNSAllowStale: true,
|
DNSAllowStale: true,
|
||||||
|
@ -4018,6 +4081,14 @@ func TestSanitize(t *testing.T) {
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"ClientAddrs": [],
|
"ClientAddrs": [],
|
||||||
|
"ConnectEnabled": false,
|
||||||
|
"ConnectProxies": [],
|
||||||
|
"ConnectProxyBindMaxPort": 0,
|
||||||
|
"ConnectProxyBindMinPort": 0,
|
||||||
|
"ConnectProxyDefaultConfig": {},
|
||||||
|
"ConnectProxyDefaultDaemonCommand": null,
|
||||||
|
"ConnectProxyDefaultExecMode": null,
|
||||||
|
"ConnectProxyDefaultScriptCommand": null,
|
||||||
"ConsulCoordinateUpdateBatchSize": 0,
|
"ConsulCoordinateUpdateBatchSize": 0,
|
||||||
"ConsulCoordinateUpdateMaxBatches": 0,
|
"ConsulCoordinateUpdateMaxBatches": 0,
|
||||||
"ConsulCoordinateUpdatePeriod": "15s",
|
"ConsulCoordinateUpdatePeriod": "15s",
|
||||||
|
@ -4150,9 +4221,11 @@ func TestSanitize(t *testing.T) {
|
||||||
"Checks": [],
|
"Checks": [],
|
||||||
"EnableTagOverride": false,
|
"EnableTagOverride": false,
|
||||||
"ID": "",
|
"ID": "",
|
||||||
|
"Kind": "",
|
||||||
"Meta": {},
|
"Meta": {},
|
||||||
"Name": "foo",
|
"Name": "foo",
|
||||||
"Port": 0,
|
"Port": 0,
|
||||||
|
"ProxyDestination": "",
|
||||||
"Tags": [],
|
"Tags": [],
|
||||||
"Token": "hidden"
|
"Token": "hidden"
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package local
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -10,6 +11,8 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-uuid"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
|
@ -27,6 +30,8 @@ type Config struct {
|
||||||
NodeID types.NodeID
|
NodeID types.NodeID
|
||||||
NodeName string
|
NodeName string
|
||||||
TaggedAddresses map[string]string
|
TaggedAddresses map[string]string
|
||||||
|
ProxyBindMinPort int
|
||||||
|
ProxyBindMaxPort int
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceState describes the state of a service record.
|
// ServiceState describes the state of a service record.
|
||||||
|
@ -107,6 +112,21 @@ type rpc interface {
|
||||||
RPC(method string, args interface{}, reply interface{}) error
|
RPC(method string, args interface{}, reply interface{}) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ManagedProxy represents the local state for a registered proxy instance.
|
||||||
|
type ManagedProxy struct {
|
||||||
|
Proxy *structs.ConnectManagedProxy
|
||||||
|
|
||||||
|
// ProxyToken is a special local-only security token that grants the bearer
|
||||||
|
// access to the proxy's config as well as allowing it to request certificates
|
||||||
|
// on behalf of the TargetService. Certain connect endpoints will validate
|
||||||
|
// against this token and if it matches will then use the TargetService.Token
|
||||||
|
// to actually authenticate the upstream RPC on behalf of the service. This
|
||||||
|
// token is passed securely to the proxy process via ENV vars and should never
|
||||||
|
// be exposed any other way. Unmanaged proxies will never see this and need to
|
||||||
|
// use service-scoped ACL tokens distributed externally.
|
||||||
|
ProxyToken string
|
||||||
|
}
|
||||||
|
|
||||||
// State is used to represent the node's services,
|
// State is used to represent the node's services,
|
||||||
// and checks. We use it to perform anti-entropy with the
|
// and checks. We use it to perform anti-entropy with the
|
||||||
// catalog representation
|
// catalog representation
|
||||||
|
@ -150,17 +170,28 @@ type State struct {
|
||||||
|
|
||||||
// tokens contains the ACL tokens
|
// tokens contains the ACL tokens
|
||||||
tokens *token.Store
|
tokens *token.Store
|
||||||
|
|
||||||
|
// managedProxies is a map of all manged connect proxies registered locally on
|
||||||
|
// this agent. This is NOT kept in sync with servers since it's agent-local
|
||||||
|
// config only. Proxy instances have separate service registrations in the
|
||||||
|
// services map above which are kept in sync via anti-entropy. Un-managed
|
||||||
|
// proxies (that registered themselves separately from the service
|
||||||
|
// registration) do not appear here as the agent doesn't need to manage their
|
||||||
|
// process nor config. The _do_ still exist in services above though as
|
||||||
|
// services with Kind == connect-proxy.
|
||||||
|
managedProxies map[string]*ManagedProxy
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLocalState creates a new local state for the agent.
|
// NewState creates a new local state for the agent.
|
||||||
func NewState(c Config, lg *log.Logger, tokens *token.Store) *State {
|
func NewState(c Config, lg *log.Logger, tokens *token.Store) *State {
|
||||||
l := &State{
|
l := &State{
|
||||||
config: c,
|
config: c,
|
||||||
logger: lg,
|
logger: lg,
|
||||||
services: make(map[string]*ServiceState),
|
services: make(map[string]*ServiceState),
|
||||||
checks: make(map[types.CheckID]*CheckState),
|
checks: make(map[types.CheckID]*CheckState),
|
||||||
metadata: make(map[string]string),
|
metadata: make(map[string]string),
|
||||||
tokens: tokens,
|
tokens: tokens,
|
||||||
|
managedProxies: make(map[string]*ManagedProxy),
|
||||||
}
|
}
|
||||||
l.SetDiscardCheckOutput(c.DiscardCheckOutput)
|
l.SetDiscardCheckOutput(c.DiscardCheckOutput)
|
||||||
return l
|
return l
|
||||||
|
@ -529,6 +560,142 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState {
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddProxy is used to add a connect proxy entry to the local state. This
|
||||||
|
// assumes the proxy's NodeService is already registered via Agent.AddService
|
||||||
|
// (since that has to do other book keeping). The token passed here is the ACL
|
||||||
|
// token the service used to register itself so must have write on service
|
||||||
|
// record.
|
||||||
|
func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*structs.NodeService, error) {
|
||||||
|
if proxy == nil {
|
||||||
|
return nil, fmt.Errorf("no proxy")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lookup the local service
|
||||||
|
target := l.Service(proxy.TargetServiceID)
|
||||||
|
if target == nil {
|
||||||
|
return nil, fmt.Errorf("target service ID %s not registered",
|
||||||
|
proxy.TargetServiceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get bind info from config
|
||||||
|
cfg, err := proxy.ParseConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct almost all of the NodeService that needs to be registered by the
|
||||||
|
// caller outside of the lock.
|
||||||
|
svc := &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
ID: target.ID + "-proxy",
|
||||||
|
Service: target.ID + "-proxy",
|
||||||
|
ProxyDestination: target.Service,
|
||||||
|
Address: cfg.BindAddress,
|
||||||
|
Port: cfg.BindPort,
|
||||||
|
}
|
||||||
|
|
||||||
|
pToken, err := uuid.GenerateUUID()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock now. We can't lock earlier as l.Service would deadlock and shouldn't
|
||||||
|
// anyway to minimise the critical section.
|
||||||
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
|
|
||||||
|
// Allocate port if needed (min and max inclusive)
|
||||||
|
rangeLen := l.config.ProxyBindMaxPort - l.config.ProxyBindMinPort + 1
|
||||||
|
if svc.Port < 1 && l.config.ProxyBindMinPort > 0 && rangeLen > 0 {
|
||||||
|
// This should be a really short list so don't bother optimising lookup yet.
|
||||||
|
OUTER:
|
||||||
|
for _, offset := range rand.Perm(rangeLen) {
|
||||||
|
p := l.config.ProxyBindMinPort + offset
|
||||||
|
// See if this port was already allocated to another proxy
|
||||||
|
for _, other := range l.managedProxies {
|
||||||
|
if other.Proxy.ProxyService.Port == p {
|
||||||
|
// allready taken, skip to next random pick in the range
|
||||||
|
continue OUTER
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// We made it through all existing proxies without a match so claim this one
|
||||||
|
svc.Port = p
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If no ports left (or auto ports disabled) fail
|
||||||
|
if svc.Port < 1 {
|
||||||
|
return nil, fmt.Errorf("no port provided for proxy bind_port and none "+
|
||||||
|
" left in the allocated range [%d, %d]", l.config.ProxyBindMinPort,
|
||||||
|
l.config.ProxyBindMaxPort)
|
||||||
|
}
|
||||||
|
|
||||||
|
proxy.ProxyService = svc
|
||||||
|
|
||||||
|
// All set, add the proxy and return the service
|
||||||
|
l.managedProxies[svc.ID] = &ManagedProxy{
|
||||||
|
Proxy: proxy,
|
||||||
|
ProxyToken: pToken,
|
||||||
|
}
|
||||||
|
|
||||||
|
// No need to trigger sync as proxy state is local only.
|
||||||
|
return svc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveProxy is used to remove a proxy entry from the local state.
|
||||||
|
func (l *State) RemoveProxy(id string) error {
|
||||||
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
|
|
||||||
|
p := l.managedProxies[id]
|
||||||
|
if p == nil {
|
||||||
|
return fmt.Errorf("Proxy %s does not exist", id)
|
||||||
|
}
|
||||||
|
delete(l.managedProxies, id)
|
||||||
|
|
||||||
|
// No need to trigger sync as proxy state is local only.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Proxy returns the local proxy state.
|
||||||
|
func (l *State) Proxy(id string) *structs.ConnectManagedProxy {
|
||||||
|
l.RLock()
|
||||||
|
defer l.RUnlock()
|
||||||
|
|
||||||
|
p := l.managedProxies[id]
|
||||||
|
if p == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return p.Proxy
|
||||||
|
}
|
||||||
|
|
||||||
|
// Proxies returns the locally registered proxies.
|
||||||
|
func (l *State) Proxies() map[string]*structs.ConnectManagedProxy {
|
||||||
|
l.RLock()
|
||||||
|
defer l.RUnlock()
|
||||||
|
|
||||||
|
m := make(map[string]*structs.ConnectManagedProxy)
|
||||||
|
for id, p := range l.managedProxies {
|
||||||
|
m[id] = p.Proxy
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProxyToken returns the local proxy token for a given proxy. Note this is not
|
||||||
|
// an ACL token so it won't fallback to using the agent-configured default ACL
|
||||||
|
// token. If the proxy doesn't exist an error is returned, otherwise the token
|
||||||
|
// is guaranteed to exist.
|
||||||
|
func (l *State) ProxyToken(id string) (string, error) {
|
||||||
|
l.RLock()
|
||||||
|
defer l.RUnlock()
|
||||||
|
|
||||||
|
p := l.managedProxies[id]
|
||||||
|
if p == nil {
|
||||||
|
return "", fmt.Errorf("proxy %s not registered", id)
|
||||||
|
}
|
||||||
|
return p.ProxyToken, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Metadata returns the local node metadata fields that the
|
// Metadata returns the local node metadata fields that the
|
||||||
// agent is aware of and are being kept in sync with the server
|
// agent is aware of and are being kept in sync with the server
|
||||||
func (l *State) Metadata() map[string]string {
|
func (l *State) Metadata() map[string]string {
|
||||||
|
|
|
@ -3,10 +3,14 @@ package local_test
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent"
|
"github.com/hashicorp/consul/agent"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/local"
|
"github.com/hashicorp/consul/agent/local"
|
||||||
|
@ -1664,3 +1668,128 @@ func checksInSync(state *local.State, wantChecks int) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateProxyManagement(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
state := local.NewState(local.Config{
|
||||||
|
ProxyPortRangeStart: 20000,
|
||||||
|
ProxyPortRangeEnd: 20002,
|
||||||
|
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
|
||||||
|
|
||||||
|
// Stub state syncing
|
||||||
|
state.TriggerSyncChanges = func() {}
|
||||||
|
|
||||||
|
p1 := structs.ConnectManagedProxy{
|
||||||
|
ExecMode: structs.ProxyExecModeDaemon,
|
||||||
|
Command: "consul connect proxy",
|
||||||
|
TargetServiceID: "web",
|
||||||
|
}
|
||||||
|
|
||||||
|
require := require.New(t)
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
_, err := state.AddProxy(&p1, "fake-token")
|
||||||
|
require.Error(err, "should fail as the target service isn't registered")
|
||||||
|
|
||||||
|
// Sanity check done, lets add a couple of target services to the state
|
||||||
|
err = state.AddService(&structs.NodeService{
|
||||||
|
Service: "web",
|
||||||
|
}, "fake-token-web")
|
||||||
|
require.NoError(err)
|
||||||
|
err = state.AddService(&structs.NodeService{
|
||||||
|
Service: "cache",
|
||||||
|
}, "fake-token-cache")
|
||||||
|
require.NoError(err)
|
||||||
|
require.NoError(err)
|
||||||
|
err = state.AddService(&structs.NodeService{
|
||||||
|
Service: "db",
|
||||||
|
}, "fake-token-db")
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Should work now
|
||||||
|
svc, err := state.AddProxy(&p1, "fake-token")
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
assert.Equal("web-proxy", svc.ID)
|
||||||
|
assert.Equal("web-proxy", svc.Service)
|
||||||
|
assert.Equal(structs.ServiceKindConnectProxy, svc.Kind)
|
||||||
|
assert.Equal("web", svc.ProxyDestination)
|
||||||
|
assert.Equal("", svc.Address, "should have empty address by default")
|
||||||
|
// Port is non-deterministic but could be either of 20000 or 20001
|
||||||
|
assert.Contains([]int{20000, 20001}, svc.Port)
|
||||||
|
|
||||||
|
// Second proxy should claim other port
|
||||||
|
p2 := p1
|
||||||
|
p2.TargetServiceID = "cache"
|
||||||
|
svc2, err := state.AddProxy(&p2, "fake-token")
|
||||||
|
require.NoError(err)
|
||||||
|
assert.Contains([]int{20000, 20001}, svc2.Port)
|
||||||
|
assert.NotEqual(svc.Port, svc2.Port)
|
||||||
|
|
||||||
|
// Just saving this for later...
|
||||||
|
p2Token, err := state.ProxyToken(svc2.ID)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Third proxy should fail as all ports are used
|
||||||
|
p3 := p1
|
||||||
|
p3.TargetServiceID = "db"
|
||||||
|
_, err = state.AddProxy(&p3, "fake-token")
|
||||||
|
require.Error(err)
|
||||||
|
|
||||||
|
// But if we set a port explicitly it should be OK
|
||||||
|
p3.Config = map[string]interface{}{
|
||||||
|
"bind_port": 1234,
|
||||||
|
"bind_address": "0.0.0.0",
|
||||||
|
}
|
||||||
|
svc3, err := state.AddProxy(&p3, "fake-token")
|
||||||
|
require.NoError(err)
|
||||||
|
require.Equal("0.0.0.0", svc3.Address)
|
||||||
|
require.Equal(1234, svc3.Port)
|
||||||
|
|
||||||
|
// Remove one of the auto-assigned proxies
|
||||||
|
err = state.RemoveProxy(svc2.ID)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Should be able to create a new proxy for that service with the port (it
|
||||||
|
// should have been "freed").
|
||||||
|
p4 := p2
|
||||||
|
svc4, err := state.AddProxy(&p4, "fake-token")
|
||||||
|
require.NoError(err)
|
||||||
|
assert.Contains([]int{20000, 20001}, svc2.Port)
|
||||||
|
assert.Equal(svc4.Port, svc2.Port, "should get the same port back that we freed")
|
||||||
|
|
||||||
|
// Remove a proxy that doesn't exist should error
|
||||||
|
err = state.RemoveProxy("nope")
|
||||||
|
require.Error(err)
|
||||||
|
|
||||||
|
assert.Equal(&p4, state.Proxy(p4.ProxyService.ID),
|
||||||
|
"should fetch the right proxy details")
|
||||||
|
assert.Nil(state.Proxy("nope"))
|
||||||
|
|
||||||
|
proxies := state.Proxies()
|
||||||
|
assert.Len(proxies, 3)
|
||||||
|
assert.Equal(&p1, proxies[svc.ID])
|
||||||
|
assert.Equal(&p4, proxies[svc4.ID])
|
||||||
|
assert.Equal(&p3, proxies[svc3.ID])
|
||||||
|
|
||||||
|
tokens := make([]string, 4)
|
||||||
|
tokens[0], err = state.ProxyToken(svc.ID)
|
||||||
|
require.NoError(err)
|
||||||
|
// p2 not registered anymore but lets make sure p4 got a new token when it
|
||||||
|
// re-registered with same ID.
|
||||||
|
tokens[1] = p2Token
|
||||||
|
tokens[2], err = state.ProxyToken(svc3.ID)
|
||||||
|
require.NoError(err)
|
||||||
|
tokens[3], err = state.ProxyToken(svc4.ID)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Quick check all are distinct
|
||||||
|
for i := 0; i < len(tokens)-1; i++ {
|
||||||
|
assert.Len(tokens[i], 36) // Sanity check for UUIDish thing.
|
||||||
|
for j := i + 1; j < len(tokens); j++ {
|
||||||
|
assert.NotEqual(tokens[i], tokens[j], "tokens for proxy %d and %d match",
|
||||||
|
i+1, j+1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
package structs
|
package structs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/mitchellh/mapstructure"
|
||||||
|
)
|
||||||
|
|
||||||
// ConnectAuthorizeRequest is the structure of a request to authorize
|
// ConnectAuthorizeRequest is the structure of a request to authorize
|
||||||
// a connection.
|
// a connection.
|
||||||
type ConnectAuthorizeRequest struct {
|
type ConnectAuthorizeRequest struct {
|
||||||
|
@ -15,3 +19,75 @@ type ConnectAuthorizeRequest struct {
|
||||||
ClientCertURI string
|
ClientCertURI string
|
||||||
ClientCertSerial string
|
ClientCertSerial string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ProxyExecMode encodes the mode for running a managed connect proxy.
|
||||||
|
type ProxyExecMode int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ProxyExecModeDaemon executes a proxy process as a supervised daemon.
|
||||||
|
ProxyExecModeDaemon ProxyExecMode = iota
|
||||||
|
|
||||||
|
// ProxyExecModeScript executes a proxy config script on each change to it's
|
||||||
|
// config.
|
||||||
|
ProxyExecModeScript
|
||||||
|
)
|
||||||
|
|
||||||
|
// ConnectManagedProxy represents the agent-local state for a configured proxy
|
||||||
|
// instance. This is never stored or sent to the servers and is only used to
|
||||||
|
// store the config for the proxy that the agent needs to track. For now it's
|
||||||
|
// really generic with only the fields the agent needs to act on defined while
|
||||||
|
// the rest of the proxy config is passed as opaque bag of attributes to support
|
||||||
|
// arbitrary config params for third-party proxy integrations. "External"
|
||||||
|
// proxies by definition register themselves and manage their own config
|
||||||
|
// externally so are never represented in agent state.
|
||||||
|
type ConnectManagedProxy struct {
|
||||||
|
// ExecMode is one of daemon or script.
|
||||||
|
ExecMode ProxyExecMode
|
||||||
|
|
||||||
|
// Command is the command to execute. Empty defaults to self-invoking the same
|
||||||
|
// consul binary with proxy subcomand for ProxyExecModeDaemon and is an error
|
||||||
|
// for ProxyExecModeScript.
|
||||||
|
Command string
|
||||||
|
|
||||||
|
// Config is the arbitrary configuration data provided with the registration.
|
||||||
|
Config map[string]interface{}
|
||||||
|
|
||||||
|
// ProxyService is a pointer to the local proxy's service record for
|
||||||
|
// convenience. The proxies ID and name etc. can be read from there. It may be
|
||||||
|
// nil if the agent is starting up and hasn't registered the service yet.
|
||||||
|
ProxyService *NodeService
|
||||||
|
|
||||||
|
// TargetServiceID is the ID of the target service on the localhost. It may
|
||||||
|
// not exist yet since bootstrapping is allowed to happen in either order.
|
||||||
|
TargetServiceID string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConnectManagedProxyConfig represents the parts of the proxy config the agent
|
||||||
|
// needs to understand. It's bad UX to make the user specify these separately
|
||||||
|
// just to make parsing simpler for us so this encapsulates the fields in
|
||||||
|
// ConnectManagedProxy.Config that we care about. They are all optoinal anyway
|
||||||
|
// and this is used to decode them with mapstructure.
|
||||||
|
type ConnectManagedProxyConfig struct {
|
||||||
|
BindAddress string `mapstructure:"bind_address"`
|
||||||
|
BindPort int `mapstructure:"bind_port"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseConfig attempts to read the fields we care about from the otherwise
|
||||||
|
// opaque config map. They are all optional but it may fail if one is specified
|
||||||
|
// but an invalid value.
|
||||||
|
func (p *ConnectManagedProxy) ParseConfig() (*ConnectManagedProxyConfig, error) {
|
||||||
|
var cfg ConnectManagedProxyConfig
|
||||||
|
d, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
|
||||||
|
ErrorUnused: false,
|
||||||
|
WeaklyTypedInput: true, // allow string port etc.
|
||||||
|
Result: &cfg,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = d.Decode(p.Config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &cfg, nil
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,115 @@
|
||||||
|
package structs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestConnectManagedProxy_ParseConfig(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
config map[string]interface{}
|
||||||
|
want *ConnectManagedProxyConfig
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty",
|
||||||
|
config: nil,
|
||||||
|
want: &ConnectManagedProxyConfig{},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "specified",
|
||||||
|
config: map[string]interface{}{
|
||||||
|
"bind_address": "127.0.0.1",
|
||||||
|
"bind_port": 1234,
|
||||||
|
},
|
||||||
|
want: &ConnectManagedProxyConfig{
|
||||||
|
BindAddress: "127.0.0.1",
|
||||||
|
BindPort: 1234,
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "stringy port",
|
||||||
|
config: map[string]interface{}{
|
||||||
|
"bind_address": "127.0.0.1",
|
||||||
|
"bind_port": "1234",
|
||||||
|
},
|
||||||
|
want: &ConnectManagedProxyConfig{
|
||||||
|
BindAddress: "127.0.0.1",
|
||||||
|
BindPort: 1234,
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty addr",
|
||||||
|
config: map[string]interface{}{
|
||||||
|
"bind_address": "",
|
||||||
|
"bind_port": "1234",
|
||||||
|
},
|
||||||
|
want: &ConnectManagedProxyConfig{
|
||||||
|
BindAddress: "",
|
||||||
|
BindPort: 1234,
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty port",
|
||||||
|
config: map[string]interface{}{
|
||||||
|
"bind_address": "127.0.0.1",
|
||||||
|
"bind_port": "",
|
||||||
|
},
|
||||||
|
want: nil,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "junk address",
|
||||||
|
config: map[string]interface{}{
|
||||||
|
"bind_address": 42,
|
||||||
|
"bind_port": "",
|
||||||
|
},
|
||||||
|
want: nil,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "zero port, missing addr",
|
||||||
|
config: map[string]interface{}{
|
||||||
|
"bind_port": 0,
|
||||||
|
},
|
||||||
|
want: &ConnectManagedProxyConfig{
|
||||||
|
BindPort: 0,
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "extra fields present",
|
||||||
|
config: map[string]interface{}{
|
||||||
|
"bind_port": 1234,
|
||||||
|
"flamingos": true,
|
||||||
|
"upstream": []map[string]interface{}{
|
||||||
|
{"foo": "bar"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: &ConnectManagedProxyConfig{
|
||||||
|
BindPort: 1234,
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
p := &ConnectManagedProxy{
|
||||||
|
Config: tt.config,
|
||||||
|
}
|
||||||
|
got, err := p.ParseConfig()
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("ConnectManagedProxy.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("ConnectManagedProxy.ParseConfig() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue