TLS watching integrated into Service with some basic tests.

There are also a lot of small bug fixes found when testing lots of things end-to-end for the first time and some cleanup now it's integrated with real CA code.
This commit is contained in:
Paul Banks 2018-04-26 14:01:20 +01:00 committed by Mitchell Hashimoto
parent dcd277de8a
commit 02ab461dae
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
27 changed files with 868 additions and 270 deletions

View File

@ -28,7 +28,6 @@ import (
"github.com/hashicorp/serf/serf"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
// NOTE(mitcehllh): This is temporary while certs are stubbed out.
)
type Self struct {
@ -1000,14 +999,71 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http
}
contentHash := fmt.Sprintf("%x", hash)
// Merge globals defaults
config := make(map[string]interface{})
for k, v := range s.agent.config.ConnectProxyDefaultConfig {
if _, ok := config[k]; !ok {
config[k] = v
}
}
execMode := "daemon"
// If there is a global default mode use that instead
if s.agent.config.ConnectProxyDefaultExecMode != "" {
execMode = s.agent.config.ConnectProxyDefaultExecMode
}
// If it's actually set though, use the one set
if proxy.Proxy.ExecMode != structs.ProxyExecModeUnspecified {
execMode = proxy.Proxy.ExecMode.String()
}
// TODO(banks): default the binary to current binary. Probably needs to be
// done deeper though as it will be needed for actually managing proxy
// lifecycle.
command := proxy.Proxy.Command
if command == "" {
if execMode == "daemon" {
command = s.agent.config.ConnectProxyDefaultDaemonCommand
}
if execMode == "script" {
command = s.agent.config.ConnectProxyDefaultScriptCommand
}
}
// No global defaults set either...
if command == "" {
command = "consul connect proxy"
}
// Set defaults for anything that is still not specified but required.
// Note that these are not included in the content hash. Since we expect
// them to be static in general but some like the default target service
// port might not be. In that edge case services can set that explicitly
// when they re-register which will be caught though.
for k, v := range proxy.Proxy.Config {
config[k] = v
}
if _, ok := config["bind_port"]; !ok {
config["bind_port"] = proxy.Proxy.ProxyService.Port
}
if _, ok := config["bind_address"]; !ok {
// Default to binding to the same address the agent is configured to
// bind to.
config["bind_address"] = s.agent.config.BindAddr.String()
}
if _, ok := config["local_service_address"]; !ok {
// Default to localhost and the port the service registered with
config["local_service_address"] = fmt.Sprintf("127.0.0.1:%d",
target.Port)
}
reply := &api.ConnectProxyConfig{
ProxyServiceID: proxy.Proxy.ProxyService.ID,
TargetServiceID: target.ID,
TargetServiceName: target.Service,
ContentHash: contentHash,
ExecMode: api.ProxyExecMode(proxy.Proxy.ExecMode.String()),
Command: proxy.Proxy.Command,
Config: proxy.Proxy.Config,
ExecMode: api.ProxyExecMode(execMode),
Command: command,
Config: config,
}
return contentHash, reply, nil
})
@ -1040,10 +1096,13 @@ func (s *HTTPServer) agentLocalBlockingQuery(resp http.ResponseWriter, hash stri
// Apply a small amount of jitter to the request.
wait += lib.RandomStagger(wait / 16)
timeout = time.NewTimer(wait)
ws = memdb.NewWatchSet()
}
for {
// Must reset this every loop in case the Watch set is already closed but
// hash remains same. In that case we'll need to re-block on ws.Watch()
// again.
ws = memdb.NewWatchSet()
curHash, curResp, err := fn(ws)
if err != nil {
return curResp, err

View File

@ -2316,7 +2316,7 @@ func requireLeafValidUnderCA(t *testing.T, issued *structs.IssuedCert,
require.NoError(t, err)
}
func TestAgentConnectProxy(t *testing.T) {
func TestAgentConnectProxyConfig_Blocking(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
@ -2354,7 +2354,7 @@ func TestAgentConnectProxy(t *testing.T) {
TargetServiceName: "test",
ContentHash: "84346af2031659c9",
ExecMode: "daemon",
Command: "",
Command: "consul connect proxy",
Config: map[string]interface{}{
"upstreams": []interface{}{
map[string]interface{}{
@ -2362,15 +2362,17 @@ func TestAgentConnectProxy(t *testing.T) {
"local_port": float64(3131),
},
},
"bind_port": float64(1234),
"connect_timeout_ms": float64(500),
"bind_address": "127.0.0.1",
"local_service_address": "127.0.0.1:8000",
"bind_port": float64(1234),
"connect_timeout_ms": float64(500),
},
}
ur, err := copystructure.Copy(expectedResponse)
require.NoError(t, err)
updatedResponse := ur.(*api.ConnectProxyConfig)
updatedResponse.ContentHash = "7d53473b0e9db5a"
updatedResponse.ContentHash = "e1e3395f0d00cd41"
upstreams := updatedResponse.Config["upstreams"].([]interface{})
upstreams = append(upstreams,
map[string]interface{}{
@ -2431,6 +2433,41 @@ func TestAgentConnectProxy(t *testing.T) {
wantErr: false,
wantResp: updatedResponse,
},
{
// This test exercises a case that caused a busy loop to eat CPU for the
// entire duration of the blocking query. If a service gets re-registered
// wth same proxy config then the old proxy config chan is closed causing
// blocked watchset.Watch to return false indicating a change. But since
// the hash is the same when the blocking fn is re-called we should just
// keep blocking on the next iteration. The bug hit was that the WatchSet
// ws was not being reset in the loop and so when you try to `Watch` it
// the second time it just returns immediately making the blocking loop
// into a busy-poll!
//
// This test though doesn't catch that because busy poll still has the
// correct external behaviour. I don't want to instrument the loop to
// assert it's not executing too fast here as I can't think of a clean way
// and the issue is fixed now so this test doesn't actually catch the
// error, but does provide an easy way to verify the behaviour by hand:
// 1. Make this test fail e.g. change wantErr to true
// 2. Add a log.Println or similar into the blocking loop/function
// 3. See whether it's called just once or many times in a tight loop.
name: "blocking fetch interrupted with no change (same hash)",
url: "/v1/agent/connect/proxy/test-proxy?wait=200ms&hash=" + expectedResponse.ContentHash,
updateFunc: func() {
time.Sleep(100 * time.Millisecond)
// Re-register with _same_ proxy config
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(reg))
resp := httptest.NewRecorder()
_, err = a.srv.AgentRegisterService(resp, req)
require.NoError(t, err)
require.Equal(t, 200, resp.Code, "body: %s", resp.Body.String())
},
wantWait: 200 * time.Millisecond,
wantCode: 200,
wantErr: false,
wantResp: expectedResponse,
},
}
for _, tt := range tests {
@ -2479,6 +2516,201 @@ func TestAgentConnectProxy(t *testing.T) {
}
}
func TestAgentConnectProxyConfig_ConfigHandling(t *testing.T) {
t.Parallel()
// Define a local service with a managed proxy. It's registered in the test
// loop to make sure agent state is predictable whatever order tests execute
// since some alter this service config.
reg := &structs.ServiceDefinition{
ID: "test-id",
Name: "test",
Address: "127.0.0.1",
Port: 8000,
Check: structs.CheckType{
TTL: 15 * time.Second,
},
Connect: &structs.ServiceDefinitionConnect{},
}
tests := []struct {
name string
globalConfig string
proxy structs.ServiceDefinitionConnectProxy
wantMode api.ProxyExecMode
wantCommand string
wantConfig map[string]interface{}
}{
{
name: "defaults",
globalConfig: `
bind_addr = "0.0.0.0"
connect {
enabled = true
proxy_defaults = {
bind_min_port = 10000
bind_max_port = 10000
}
}
`,
proxy: structs.ServiceDefinitionConnectProxy{},
wantMode: api.ProxyExecModeDaemon,
wantCommand: "consul connect proxy",
wantConfig: map[string]interface{}{
"bind_address": "0.0.0.0",
"bind_port": 10000, // "randomly" chosen from our range of 1
"local_service_address": "127.0.0.1:8000", // port from service reg
},
},
{
name: "global defaults - script",
globalConfig: `
bind_addr = "0.0.0.0"
connect {
enabled = true
proxy_defaults = {
bind_min_port = 10000
bind_max_port = 10000
exec_mode = "script"
script_command = "script.sh"
}
}
`,
proxy: structs.ServiceDefinitionConnectProxy{},
wantMode: api.ProxyExecModeScript,
wantCommand: "script.sh",
wantConfig: map[string]interface{}{
"bind_address": "0.0.0.0",
"bind_port": 10000, // "randomly" chosen from our range of 1
"local_service_address": "127.0.0.1:8000", // port from service reg
},
},
{
name: "global defaults - daemon",
globalConfig: `
bind_addr = "0.0.0.0"
connect {
enabled = true
proxy_defaults = {
bind_min_port = 10000
bind_max_port = 10000
exec_mode = "daemon"
daemon_command = "daemon.sh"
}
}
`,
proxy: structs.ServiceDefinitionConnectProxy{},
wantMode: api.ProxyExecModeDaemon,
wantCommand: "daemon.sh",
wantConfig: map[string]interface{}{
"bind_address": "0.0.0.0",
"bind_port": 10000, // "randomly" chosen from our range of 1
"local_service_address": "127.0.0.1:8000", // port from service reg
},
},
{
name: "global default config merge",
globalConfig: `
bind_addr = "0.0.0.0"
connect {
enabled = true
proxy_defaults = {
bind_min_port = 10000
bind_max_port = 10000
config = {
connect_timeout_ms = 1000
}
}
}
`,
proxy: structs.ServiceDefinitionConnectProxy{
Config: map[string]interface{}{
"foo": "bar",
},
},
wantMode: api.ProxyExecModeDaemon,
wantCommand: "consul connect proxy",
wantConfig: map[string]interface{}{
"bind_address": "0.0.0.0",
"bind_port": 10000, // "randomly" chosen from our range of 1
"local_service_address": "127.0.0.1:8000", // port from service reg
"connect_timeout_ms": 1000,
"foo": "bar",
},
},
{
name: "overrides in reg",
globalConfig: `
bind_addr = "0.0.0.0"
connect {
enabled = true
proxy_defaults = {
bind_min_port = 10000
bind_max_port = 10000
exec_mode = "daemon"
daemon_command = "daemon.sh"
script_command = "script.sh"
config = {
connect_timeout_ms = 1000
}
}
}
`,
proxy: structs.ServiceDefinitionConnectProxy{
ExecMode: "script",
Command: "foo.sh",
Config: map[string]interface{}{
"connect_timeout_ms": 2000,
"bind_address": "127.0.0.1",
"bind_port": 1024,
"local_service_address": "127.0.0.1:9191",
},
},
wantMode: api.ProxyExecModeScript,
wantCommand: "foo.sh",
wantConfig: map[string]interface{}{
"bind_address": "127.0.0.1",
"bind_port": float64(1024),
"local_service_address": "127.0.0.1:9191",
"connect_timeout_ms": float64(2000),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
a := NewTestAgent(t.Name(), tt.globalConfig)
defer a.Shutdown()
// Register the basic service with the required config
{
reg.Connect.Proxy = &tt.proxy
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(reg))
resp := httptest.NewRecorder()
_, err := a.srv.AgentRegisterService(resp, req)
require.NoError(err)
require.Equal(200, resp.Code, "body: %s", resp.Body.String())
}
req, _ := http.NewRequest("GET", "/v1/agent/connect/proxy/test-id-proxy", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.AgentConnectProxyConfig(resp, req)
require.NoError(err)
proxyCfg := obj.(*api.ConnectProxyConfig)
assert.Equal("test-id-proxy", proxyCfg.ProxyServiceID)
assert.Equal("test-id", proxyCfg.TargetServiceID)
assert.Equal("test", proxyCfg.TargetServiceName)
assert.Equal(tt.wantMode, proxyCfg.ExecMode)
assert.Equal(tt.wantCommand, proxyCfg.Command)
require.Equal(tt.wantConfig, proxyCfg.Config)
})
}
}
func TestAgentConnectAuthorize_badBody(t *testing.T) {
t.Parallel()

View File

@ -531,6 +531,17 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
connectCAConfig = c.Connect.CAConfig
}
proxyDefaultExecMode := ""
proxyDefaultDaemonCommand := ""
proxyDefaultScriptCommand := ""
proxyDefaultConfig := make(map[string]interface{})
if c.Connect != nil && c.Connect.ProxyDefaults != nil {
proxyDefaultExecMode = b.stringVal(c.Connect.ProxyDefaults.ExecMode)
proxyDefaultDaemonCommand = b.stringVal(c.Connect.ProxyDefaults.DaemonCommand)
proxyDefaultScriptCommand = b.stringVal(c.Connect.ProxyDefaults.ScriptCommand)
proxyDefaultConfig = c.Connect.ProxyDefaults.Config
}
// ----------------------------------------------------------------
// build runtime config
//
@ -638,100 +649,104 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
TelemetryStatsiteAddr: b.stringVal(c.Telemetry.StatsiteAddr),
// Agent
AdvertiseAddrLAN: advertiseAddrLAN,
AdvertiseAddrWAN: advertiseAddrWAN,
BindAddr: bindAddr,
Bootstrap: b.boolVal(c.Bootstrap),
BootstrapExpect: b.intVal(c.BootstrapExpect),
CAFile: b.stringVal(c.CAFile),
CAPath: b.stringVal(c.CAPath),
CertFile: b.stringVal(c.CertFile),
CheckUpdateInterval: b.durationVal("check_update_interval", c.CheckUpdateInterval),
Checks: checks,
ClientAddrs: clientAddrs,
ConnectEnabled: connectEnabled,
ConnectProxyBindMinPort: proxyBindMinPort,
ConnectProxyBindMaxPort: proxyBindMaxPort,
ConnectCAProvider: connectCAProvider,
ConnectCAConfig: connectCAConfig,
DataDir: b.stringVal(c.DataDir),
Datacenter: strings.ToLower(b.stringVal(c.Datacenter)),
DevMode: b.boolVal(b.Flags.DevMode),
DisableAnonymousSignature: b.boolVal(c.DisableAnonymousSignature),
DisableCoordinates: b.boolVal(c.DisableCoordinates),
DisableHostNodeID: b.boolVal(c.DisableHostNodeID),
DisableKeyringFile: b.boolVal(c.DisableKeyringFile),
DisableRemoteExec: b.boolVal(c.DisableRemoteExec),
DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck),
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale),
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
EnableDebug: b.boolVal(c.EnableDebug),
EnableScriptChecks: b.boolVal(c.EnableScriptChecks),
EnableSyslog: b.boolVal(c.EnableSyslog),
EnableUI: b.boolVal(c.UI),
EncryptKey: b.stringVal(c.EncryptKey),
EncryptVerifyIncoming: b.boolVal(c.EncryptVerifyIncoming),
EncryptVerifyOutgoing: b.boolVal(c.EncryptVerifyOutgoing),
KeyFile: b.stringVal(c.KeyFile),
LeaveDrainTime: b.durationVal("performance.leave_drain_time", c.Performance.LeaveDrainTime),
LeaveOnTerm: leaveOnTerm,
LogLevel: b.stringVal(c.LogLevel),
NodeID: types.NodeID(b.stringVal(c.NodeID)),
NodeMeta: c.NodeMeta,
NodeName: b.nodeName(c.NodeName),
NonVotingServer: b.boolVal(c.NonVotingServer),
PidFile: b.stringVal(c.PidFile),
RPCAdvertiseAddr: rpcAdvertiseAddr,
RPCBindAddr: rpcBindAddr,
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCMaxBurst: b.intVal(c.Limits.RPCMaxBurst),
RPCProtocol: b.intVal(c.RPCProtocol),
RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)),
RaftProtocol: b.intVal(c.RaftProtocol),
RaftSnapshotThreshold: b.intVal(c.RaftSnapshotThreshold),
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN),
ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN),
RejoinAfterLeave: b.boolVal(c.RejoinAfterLeave),
RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN),
RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN),
RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN),
RetryJoinMaxAttemptsLAN: b.intVal(c.RetryJoinMaxAttemptsLAN),
RetryJoinMaxAttemptsWAN: b.intVal(c.RetryJoinMaxAttemptsWAN),
RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN),
SegmentName: b.stringVal(c.SegmentName),
Segments: segments,
SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN,
SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN,
SerfBindAddrLAN: serfBindAddrLAN,
SerfBindAddrWAN: serfBindAddrWAN,
SerfPortLAN: serfPortLAN,
SerfPortWAN: serfPortWAN,
ServerMode: b.boolVal(c.ServerMode),
ServerName: b.stringVal(c.ServerName),
ServerPort: serverPort,
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN),
StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN),
SyslogFacility: b.stringVal(c.SyslogFacility),
TLSCipherSuites: b.tlsCipherSuites("tls_cipher_suites", c.TLSCipherSuites),
TLSMinVersion: b.stringVal(c.TLSMinVersion),
TLSPreferServerCipherSuites: b.boolVal(c.TLSPreferServerCipherSuites),
TaggedAddresses: c.TaggedAddresses,
TranslateWANAddrs: b.boolVal(c.TranslateWANAddrs),
UIDir: b.stringVal(c.UIDir),
UnixSocketGroup: b.stringVal(c.UnixSocket.Group),
UnixSocketMode: b.stringVal(c.UnixSocket.Mode),
UnixSocketUser: b.stringVal(c.UnixSocket.User),
VerifyIncoming: b.boolVal(c.VerifyIncoming),
VerifyIncomingHTTPS: b.boolVal(c.VerifyIncomingHTTPS),
VerifyIncomingRPC: b.boolVal(c.VerifyIncomingRPC),
VerifyOutgoing: b.boolVal(c.VerifyOutgoing),
VerifyServerHostname: b.boolVal(c.VerifyServerHostname),
Watches: c.Watches,
AdvertiseAddrLAN: advertiseAddrLAN,
AdvertiseAddrWAN: advertiseAddrWAN,
BindAddr: bindAddr,
Bootstrap: b.boolVal(c.Bootstrap),
BootstrapExpect: b.intVal(c.BootstrapExpect),
CAFile: b.stringVal(c.CAFile),
CAPath: b.stringVal(c.CAPath),
CertFile: b.stringVal(c.CertFile),
CheckUpdateInterval: b.durationVal("check_update_interval", c.CheckUpdateInterval),
Checks: checks,
ClientAddrs: clientAddrs,
ConnectEnabled: connectEnabled,
ConnectCAProvider: connectCAProvider,
ConnectCAConfig: connectCAConfig,
ConnectProxyBindMinPort: proxyBindMinPort,
ConnectProxyBindMaxPort: proxyBindMaxPort,
ConnectProxyDefaultExecMode: proxyDefaultExecMode,
ConnectProxyDefaultDaemonCommand: proxyDefaultDaemonCommand,
ConnectProxyDefaultScriptCommand: proxyDefaultScriptCommand,
ConnectProxyDefaultConfig: proxyDefaultConfig,
DataDir: b.stringVal(c.DataDir),
Datacenter: strings.ToLower(b.stringVal(c.Datacenter)),
DevMode: b.boolVal(b.Flags.DevMode),
DisableAnonymousSignature: b.boolVal(c.DisableAnonymousSignature),
DisableCoordinates: b.boolVal(c.DisableCoordinates),
DisableHostNodeID: b.boolVal(c.DisableHostNodeID),
DisableKeyringFile: b.boolVal(c.DisableKeyringFile),
DisableRemoteExec: b.boolVal(c.DisableRemoteExec),
DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck),
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale),
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
EnableDebug: b.boolVal(c.EnableDebug),
EnableScriptChecks: b.boolVal(c.EnableScriptChecks),
EnableSyslog: b.boolVal(c.EnableSyslog),
EnableUI: b.boolVal(c.UI),
EncryptKey: b.stringVal(c.EncryptKey),
EncryptVerifyIncoming: b.boolVal(c.EncryptVerifyIncoming),
EncryptVerifyOutgoing: b.boolVal(c.EncryptVerifyOutgoing),
KeyFile: b.stringVal(c.KeyFile),
LeaveDrainTime: b.durationVal("performance.leave_drain_time", c.Performance.LeaveDrainTime),
LeaveOnTerm: leaveOnTerm,
LogLevel: b.stringVal(c.LogLevel),
NodeID: types.NodeID(b.stringVal(c.NodeID)),
NodeMeta: c.NodeMeta,
NodeName: b.nodeName(c.NodeName),
NonVotingServer: b.boolVal(c.NonVotingServer),
PidFile: b.stringVal(c.PidFile),
RPCAdvertiseAddr: rpcAdvertiseAddr,
RPCBindAddr: rpcBindAddr,
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCMaxBurst: b.intVal(c.Limits.RPCMaxBurst),
RPCProtocol: b.intVal(c.RPCProtocol),
RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)),
RaftProtocol: b.intVal(c.RaftProtocol),
RaftSnapshotThreshold: b.intVal(c.RaftSnapshotThreshold),
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN),
ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN),
RejoinAfterLeave: b.boolVal(c.RejoinAfterLeave),
RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN),
RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN),
RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN),
RetryJoinMaxAttemptsLAN: b.intVal(c.RetryJoinMaxAttemptsLAN),
RetryJoinMaxAttemptsWAN: b.intVal(c.RetryJoinMaxAttemptsWAN),
RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN),
SegmentName: b.stringVal(c.SegmentName),
Segments: segments,
SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN,
SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN,
SerfBindAddrLAN: serfBindAddrLAN,
SerfBindAddrWAN: serfBindAddrWAN,
SerfPortLAN: serfPortLAN,
SerfPortWAN: serfPortWAN,
ServerMode: b.boolVal(c.ServerMode),
ServerName: b.stringVal(c.ServerName),
ServerPort: serverPort,
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN),
StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN),
SyslogFacility: b.stringVal(c.SyslogFacility),
TLSCipherSuites: b.tlsCipherSuites("tls_cipher_suites", c.TLSCipherSuites),
TLSMinVersion: b.stringVal(c.TLSMinVersion),
TLSPreferServerCipherSuites: b.boolVal(c.TLSPreferServerCipherSuites),
TaggedAddresses: c.TaggedAddresses,
TranslateWANAddrs: b.boolVal(c.TranslateWANAddrs),
UIDir: b.stringVal(c.UIDir),
UnixSocketGroup: b.stringVal(c.UnixSocket.Group),
UnixSocketMode: b.stringVal(c.UnixSocket.Mode),
UnixSocketUser: b.stringVal(c.UnixSocket.User),
VerifyIncoming: b.boolVal(c.VerifyIncoming),
VerifyIncomingHTTPS: b.boolVal(c.VerifyIncomingHTTPS),
VerifyIncomingRPC: b.boolVal(c.VerifyIncomingRPC),
VerifyOutgoing: b.boolVal(c.VerifyOutgoing),
VerifyServerHostname: b.boolVal(c.VerifyServerHostname),
Watches: c.Watches,
}
if rt.BootstrapExpect == 1 {

View File

@ -633,15 +633,15 @@ type RuntimeConfig struct {
// ConnectProxyDefaultExecMode is used where a registration doesn't include an
// exec_mode. Defaults to daemon.
ConnectProxyDefaultExecMode *string
ConnectProxyDefaultExecMode string
// ConnectProxyDefaultDaemonCommand is used to start proxy in exec_mode =
// daemon if not specified at registration time.
ConnectProxyDefaultDaemonCommand *string
ConnectProxyDefaultDaemonCommand string
// ConnectProxyDefaultScriptCommand is used to start proxy in exec_mode =
// script if not specified at registration time.
ConnectProxyDefaultScriptCommand *string
ConnectProxyDefaultScriptCommand string
// ConnectProxyDefaultConfig is merged with any config specified at
// registration time to allow global control of defaults.

View File

@ -2830,7 +2830,9 @@ func TestFullConfig(t *testing.T) {
script_command = "proxyctl.sh"
config = {
foo = "bar"
connect_timeout_ms = 1000
# hack float since json parses numbers as float and we have to
# assert against the same thing
connect_timeout_ms = 1000.0
pedantic_mode = true
}
}
@ -3423,6 +3425,14 @@ func TestFullConfig(t *testing.T) {
"g4cvJyys": "IRLXE9Ds",
"hyMy9Oxn": "XeBp4Sis",
},
ConnectProxyDefaultExecMode: "script",
ConnectProxyDefaultDaemonCommand: "consul connect proxy",
ConnectProxyDefaultScriptCommand: "proxyctl.sh",
ConnectProxyDefaultConfig: map[string]interface{}{
"foo": "bar",
"connect_timeout_ms": float64(1000),
"pedantic_mode": true,
},
DNSAddrs: []net.Addr{tcpAddr("93.95.95.81:7001"), udpAddr("93.95.95.81:7001")},
DNSARecordLimit: 29907,
DNSAllowStale: true,
@ -4099,9 +4109,9 @@ func TestSanitize(t *testing.T) {
"ConnectProxyBindMaxPort": 0,
"ConnectProxyBindMinPort": 0,
"ConnectProxyDefaultConfig": {},
"ConnectProxyDefaultDaemonCommand": null,
"ConnectProxyDefaultExecMode": null,
"ConnectProxyDefaultScriptCommand": null,
"ConnectProxyDefaultDaemonCommand": "",
"ConnectProxyDefaultExecMode": "",
"ConnectProxyDefaultScriptCommand": "",
"ConsulCoordinateUpdateBatchSize": 0,
"ConsulCoordinateUpdateMaxBatches": 0,
"ConsulCoordinateUpdatePeriod": "15s",

View File

@ -150,7 +150,7 @@ func TestLeaf(t testing.T, service string, root *structs.CARoot) (string, string
spiffeId := &SpiffeIDService{
Host: fmt.Sprintf("%s.consul", testClusterID),
Namespace: "default",
Datacenter: "dc01",
Datacenter: "dc1",
Service: service,
}

View File

@ -9,7 +9,7 @@ func TestSpiffeIDService(t testing.T, service string) *SpiffeIDService {
return &SpiffeIDService{
Host: testClusterID + ".consul",
Namespace: "default",
Datacenter: "dc01",
Datacenter: "dc1",
Service: service,
}
}

View File

@ -48,7 +48,7 @@ func init() {
registerEndpoint("/v1/connect/ca/roots", []string{"GET"}, (*HTTPServer).ConnectCARoots)
registerEndpoint("/v1/connect/intentions", []string{"GET", "POST"}, (*HTTPServer).IntentionEndpoint)
registerEndpoint("/v1/connect/intentions/match", []string{"GET"}, (*HTTPServer).IntentionMatch)
registerEndpoint("/v1/connect/intentions/", []string{"GET"}, (*HTTPServer).IntentionSpecific)
registerEndpoint("/v1/connect/intentions/", []string{"GET", "PUT", "DELETE"}, (*HTTPServer).IntentionSpecific)
registerEndpoint("/v1/coordinate/datacenters", []string{"GET"}, (*HTTPServer).CoordinateDatacenters)
registerEndpoint("/v1/coordinate/nodes", []string{"GET"}, (*HTTPServer).CoordinateNodes)
registerEndpoint("/v1/coordinate/node/", []string{"GET"}, (*HTTPServer).CoordinateNode)

View File

@ -608,7 +608,12 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str
l.Lock()
defer l.Unlock()
// Allocate port if needed (min and max inclusive)
// Does this proxy instance allready exist?
if existing, ok := l.managedProxies[svc.ID]; ok {
svc.Port = existing.Proxy.ProxyService.Port
}
// Allocate port if needed (min and max inclusive).
rangeLen := l.config.ProxyBindMaxPort - l.config.ProxyBindMinPort + 1
if svc.Port < 1 && l.config.ProxyBindMinPort > 0 && rangeLen > 0 {
// This should be a really short list so don't bother optimising lookup yet.

View File

@ -1721,6 +1721,21 @@ func TestStateProxyManagement(t *testing.T) {
// Port is non-deterministic but could be either of 20000 or 20001
assert.Contains([]int{20000, 20001}, svc.Port)
{
// Re-registering same proxy again should not pick a random port but re-use
// the assigned one.
svcDup, err := state.AddProxy(&p1, "fake-token")
require.NoError(err)
assert.Equal("web-proxy", svcDup.ID)
assert.Equal("web-proxy", svcDup.Service)
assert.Equal(structs.ServiceKindConnectProxy, svcDup.Kind)
assert.Equal("web", svcDup.ProxyDestination)
assert.Equal("", svcDup.Address, "should have empty address by default")
// Port must be same as before
assert.Equal(svc.Port, svcDup.Port)
}
// Second proxy should claim other port
p2 := p1
p2.TargetServiceID = "cache"

View File

@ -24,8 +24,11 @@ type ConnectAuthorizeRequest struct {
type ProxyExecMode int
const (
// ProxyExecModeUnspecified uses the global default proxy mode.
ProxyExecModeUnspecified ProxyExecMode = iota
// ProxyExecModeDaemon executes a proxy process as a supervised daemon.
ProxyExecModeDaemon ProxyExecMode = iota
ProxyExecModeDaemon
// ProxyExecModeScript executes a proxy config script on each change to it's
// config.
@ -35,6 +38,8 @@ const (
// String implements Stringer
func (m ProxyExecMode) String() string {
switch m {
case ProxyExecModeUnspecified:
return "global_default"
case ProxyExecModeDaemon:
return "daemon"
case ProxyExecModeScript:

View File

@ -55,10 +55,11 @@ func (s *ServiceDefinition) ConnectManagedProxy() (*ConnectManagedProxy, error)
// which we shouldn't hard code ourselves here...
ns := s.NodeService()
execMode := ProxyExecModeDaemon
execMode := ProxyExecModeUnspecified
switch s.Connect.Proxy.ExecMode {
case "":
execMode = ProxyExecModeDaemon
// Use default
break
case "daemon":
execMode = ProxyExecModeDaemon
case "script":

View File

@ -609,9 +609,6 @@ func (a *Agent) ConnectCARoots(q *QueryOptions) (*CARootList, *QueryMeta, error)
}
// ConnectCALeaf gets the leaf certificate for the given service ID.
//
// TODO(mitchellh): we need to test this better once we have a way to
// configure CAs from the API package (when the CA work is done).
func (a *Agent) ConnectCALeaf(serviceID string, q *QueryOptions) (*LeafCert, *QueryMeta, error) {
r := a.c.newRequest("GET", "/v1/agent/connect/ca/leaf/"+serviceID)
r.setQueryOptions(q)

View File

@ -1049,17 +1049,71 @@ func TestAPI_AgentConnectCARoots_empty(t *testing.T) {
agent := c.Agent()
list, meta, err := agent.ConnectCARoots(nil)
require.Nil(err)
require.NoError(err)
require.Equal(uint64(0), meta.LastIndex)
require.Len(list.Roots, 0)
}
func TestAPI_AgentConnectCARoots_list(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
// Force auto port range to 1 port so we have deterministic response.
c.Connect = map[string]interface{}{
"enabled": true,
}
})
defer s.Stop()
agent := c.Agent()
list, meta, err := agent.ConnectCARoots(nil)
require.NoError(err)
require.True(meta.LastIndex > 0)
require.Len(list.Roots, 1)
}
func TestAPI_AgentConnectCALeaf(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
// Force auto port range to 1 port so we have deterministic response.
c.Connect = map[string]interface{}{
"enabled": true,
}
})
defer s.Stop()
agent := c.Agent()
// Setup service
reg := &AgentServiceRegistration{
Name: "foo",
Tags: []string{"bar", "baz"},
Port: 8000,
}
require.NoError(agent.ServiceRegister(reg))
leaf, meta, err := agent.ConnectCALeaf("foo", nil)
require.NoError(err)
require.True(meta.LastIndex > 0)
// Sanity checks here as we have actual certificate validation checks at many
// other levels.
require.NotEmpty(leaf.SerialNumber)
require.NotEmpty(leaf.CertPEM)
require.NotEmpty(leaf.PrivateKeyPEM)
require.Equal("foo", leaf.Service)
require.True(strings.HasSuffix(leaf.ServiceURI, "/svc/foo"))
require.True(leaf.ModifyIndex > 0)
require.True(leaf.ValidAfter.Before(time.Now()))
require.True(leaf.ValidBefore.After(time.Now()))
}
// TODO(banks): once we have CA stuff setup properly we can probably make this
// much more complete. This is just a sanity check that the agent code basically
// works.
func TestAPI_AgentConnectAuthorize(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t)
defer s.Stop()
@ -1079,7 +1133,15 @@ func TestAPI_AgentConnectAuthorize(t *testing.T) {
func TestAPI_AgentConnectProxyConfig(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
// Force auto port range to 1 port so we have deterministic response.
c.Connect = map[string]interface{}{
"proxy_defaults": map[string]interface{}{
"bind_min_port": 20000,
"bind_max_port": 20000,
},
}
})
defer s.Stop()
agent := c.Agent()
@ -1107,9 +1169,12 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
TargetServiceName: "foo",
ContentHash: "e662ea8600d84cf0",
ExecMode: "daemon",
Command: "",
Command: "consul connect proxy",
Config: map[string]interface{}{
"foo": "bar",
"bind_address": "127.0.0.1",
"bind_port": float64(20000),
"foo": "bar",
"local_service_address": "127.0.0.1:8000",
},
}
require.Equal(t, expectConfig, config)

View File

@ -52,11 +52,6 @@ type Config struct {
// private key to be used in development instead of the ones supplied by
// Connect.
DevServiceKeyFile string `json:"dev_service_key_file" hcl:"dev_service_key_file"`
// service is a connect.Service instance representing the proxied service. It
// is created internally by the code responsible for setting up config as it
// may depend on other external dependencies
service *connect.Service
}
// PublicListenerConfig contains the parameters needed for the incoming mTLS
@ -89,6 +84,9 @@ func (plc *PublicListenerConfig) applyDefaults() {
if plc.HandshakeTimeoutMs == 0 {
plc.HandshakeTimeoutMs = 10000
}
if plc.BindAddress == "" {
plc.BindAddress = "0.0.0.0"
}
}
// UpstreamConfig configures an upstream (outgoing) listener.
@ -258,7 +256,6 @@ func NewAgentConfigWatcher(client *api.Client, proxyID string,
func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal,
val interface{}) {
log.Printf("DEBUG: got hash %s", blockVal.(watch.WaitHashVal))
resp, ok := val.(*api.ConnectProxyConfig)
if !ok {
@ -266,25 +263,16 @@ func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal,
return
}
// Setup Service instance now we know target ID etc
service, err := connect.NewService(resp.TargetServiceID, w.client)
if err != nil {
w.logger.Printf("[WARN] proxy config watch failed to initialize"+
" service: %s", err)
return
}
// Create proxy config from the response
cfg := &Config{
ProxyID: w.proxyID,
// Token should be already setup in the client
ProxiedServiceID: resp.TargetServiceID,
ProxiedServiceNamespace: "default",
service: service,
}
// Unmarshal configs
err = mapstructure.Decode(resp.Config, &cfg.PublicListener)
err := mapstructure.Decode(resp.Config, &cfg.PublicListener)
if err != nil {
w.logger.Printf("[ERR] proxy config watch public listener config "+
"couldn't be parsed: %s", err)

View File

@ -175,11 +175,6 @@ func TestAgentConfigWatcher(t *testing.T) {
},
}
// nil this out as comparisons are problematic, we'll explicitly sanity check
// it's reasonable later.
assert.NotNil(t, cfg.service)
cfg.service = nil
assert.Equal(t, expectCfg, cfg)
// TODO(banks): Sanity check the service is viable and gets TLS certs eventually from
@ -213,11 +208,6 @@ func TestAgentConfigWatcher(t *testing.T) {
})
expectCfg.PublicListener.LocalConnectTimeoutMs = 444
// nil this out as comparisons are problematic, we'll explicitly sanity check
// it's reasonable later.
assert.NotNil(t, cfg.service)
cfg.service = nil
assert.Equal(t, expectCfg, cfg)
}

View File

@ -20,8 +20,10 @@ type Listener struct {
// Service is the connect service instance to use.
Service *connect.Service
// listenFunc, dialFunc and bindAddr are set by type-specific constructors
listenFunc func() (net.Listener, error)
dialFunc func() (net.Conn, error)
bindAddr string
stopFlag int32
stopChan chan struct{}
@ -42,17 +44,17 @@ type Listener struct {
// connections and proxy them to the configured local application over TCP.
func NewPublicListener(svc *connect.Service, cfg PublicListenerConfig,
logger *log.Logger) *Listener {
bindAddr := fmt.Sprintf("%s:%d", cfg.BindAddress, cfg.BindPort)
return &Listener{
Service: svc,
listenFunc: func() (net.Listener, error) {
return tls.Listen("tcp",
fmt.Sprintf("%s:%d", cfg.BindAddress, cfg.BindPort),
svc.ServerTLSConfig())
return tls.Listen("tcp", bindAddr, svc.ServerTLSConfig())
},
dialFunc: func() (net.Conn, error) {
return net.DialTimeout("tcp", cfg.LocalServiceAddress,
time.Duration(cfg.LocalConnectTimeoutMs)*time.Millisecond)
},
bindAddr: bindAddr,
stopChan: make(chan struct{}),
listeningChan: make(chan struct{}),
logger: logger,
@ -63,11 +65,11 @@ func NewPublicListener(svc *connect.Service, cfg PublicListenerConfig,
// connections that are proxied to a discovered Connect service instance.
func NewUpstreamListener(svc *connect.Service, cfg UpstreamConfig,
logger *log.Logger) *Listener {
bindAddr := fmt.Sprintf("%s:%d", cfg.LocalBindAddress, cfg.LocalBindPort)
return &Listener{
Service: svc,
listenFunc: func() (net.Listener, error) {
return net.Listen("tcp",
fmt.Sprintf("%s:%d", cfg.LocalBindAddress, cfg.LocalBindPort))
return net.Listen("tcp", bindAddr)
},
dialFunc: func() (net.Conn, error) {
if cfg.resolver == nil {
@ -78,6 +80,7 @@ func NewUpstreamListener(svc *connect.Service, cfg UpstreamConfig,
defer cancel()
return svc.Dial(ctx, cfg.resolver)
},
bindAddr: bindAddr,
stopChan: make(chan struct{}),
listeningChan: make(chan struct{}),
logger: logger,
@ -142,3 +145,8 @@ func (l *Listener) Close() error {
func (l *Listener) Wait() {
<-l.listeningChan
}
// BindAddr returns the address the listen is bound to.
func (l *Listener) BindAddr() string {
return l.bindAddr
}

View File

@ -1,6 +1,8 @@
package proxy
import (
"bytes"
"crypto/x509"
"log"
"github.com/hashicorp/consul/api"
@ -14,6 +16,7 @@ type Proxy struct {
cfgWatcher ConfigWatcher
stopChan chan struct{}
logger *log.Logger
service *connect.Service
}
// NewFromConfigFile returns a Proxy instance configured just from a local file.
@ -27,12 +30,11 @@ func NewFromConfigFile(client *api.Client, filename string,
}
service, err := connect.NewDevServiceFromCertFiles(cfg.ProxiedServiceID,
client, logger, cfg.DevCAFile, cfg.DevServiceCertFile,
logger, cfg.DevCAFile, cfg.DevServiceCertFile,
cfg.DevServiceKeyFile)
if err != nil {
return nil, err
}
cfg.service = service
p := &Proxy{
proxyID: cfg.ProxyID,
@ -40,6 +42,7 @@ func NewFromConfigFile(client *api.Client, filename string,
cfgWatcher: NewStaticConfigWatcher(cfg),
stopChan: make(chan struct{}),
logger: logger,
service: service,
}
return p, nil
}
@ -47,16 +50,18 @@ func NewFromConfigFile(client *api.Client, filename string,
// New returns a Proxy with the given id, consuming the provided (configured)
// agent. It is ready to Run().
func New(client *api.Client, proxyID string, logger *log.Logger) (*Proxy, error) {
cw, err := NewAgentConfigWatcher(client, proxyID, logger)
if err != nil {
return nil, err
}
p := &Proxy{
proxyID: proxyID,
client: client,
cfgWatcher: &AgentConfigWatcher{
client: client,
proxyID: proxyID,
logger: logger,
},
stopChan: make(chan struct{}),
logger: logger,
proxyID: proxyID,
client: client,
cfgWatcher: cw,
stopChan: make(chan struct{}),
logger: logger,
// Can't load service yet as we only have the proxy's ID not the service's
// until initial config fetch happens.
}
return p, nil
}
@ -71,16 +76,29 @@ func (p *Proxy) Serve() error {
select {
case newCfg := <-p.cfgWatcher.Watch():
p.logger.Printf("[DEBUG] got new config")
if newCfg.service == nil {
p.logger.Printf("[ERR] new config has nil service")
continue
}
if cfg == nil {
// Initial setup
// Setup Service instance now we know target ID etc
service, err := connect.NewService(newCfg.ProxiedServiceID, p.client)
if err != nil {
return err
}
p.service = service
go func() {
<-service.ReadyWait()
p.logger.Printf("[INFO] proxy loaded config and ready to serve")
tcfg := service.ServerTLSConfig()
cert, _ := tcfg.GetCertificate(nil)
leaf, _ := x509.ParseCertificate(cert.Certificate[0])
p.logger.Printf("[DEBUG] leaf: %s roots: %s", leaf.URIs[0], bytes.Join(tcfg.RootCAs.Subjects(), []byte(",")))
}()
newCfg.PublicListener.applyDefaults()
l := NewPublicListener(newCfg.service, newCfg.PublicListener, p.logger)
err := p.startListener("public listener", l)
l := NewPublicListener(p.service, newCfg.PublicListener, p.logger)
err = p.startListener("public listener", l)
if err != nil {
return err
}
@ -93,7 +111,13 @@ func (p *Proxy) Serve() error {
uc.applyDefaults()
uc.resolver = UpstreamResolverFromClient(p.client, uc)
l := NewUpstreamListener(newCfg.service, uc, p.logger)
if uc.LocalBindPort < 1 {
p.logger.Printf("[ERR] upstream %s has no local_bind_port. "+
"Can't start upstream.", uc.String())
continue
}
l := NewUpstreamListener(p.service, uc, p.logger)
err := p.startListener(uc.String(), l)
if err != nil {
p.logger.Printf("[ERR] failed to start upstream %s: %s", uc.String(),
@ -110,6 +134,7 @@ func (p *Proxy) Serve() error {
// startPublicListener is run from the internal state machine loop
func (p *Proxy) startListener(name string, l *Listener) error {
p.logger.Printf("[INFO] %s starting on %s", name, l.BindAddr())
go func() {
err := l.Serve()
if err != nil {
@ -122,6 +147,7 @@ func (p *Proxy) startListener(name string, l *Listener) error {
go func() {
<-p.stopChan
l.Close()
}()
return nil
@ -131,4 +157,7 @@ func (p *Proxy) startListener(name string, l *Listener) error {
// called only once.
func (p *Proxy) Close() {
close(p.stopChan)
if p.service != nil {
p.service.Close()
}
}

View File

@ -7,7 +7,6 @@ import (
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/api"
testing "github.com/mitchellh/go-testing-interface"
)
// Resolver is the interface implemented by a service discovery mechanism to get
@ -122,7 +121,12 @@ func (cr *ConsulResolver) resolveService(ctx context.Context) (string, connect.C
// propagating these trust domains we need to actually fetch the trust domain
// somehow. We also need to implement namespaces. Use of test function here is
// temporary pending the work on trust domains.
certURI := connect.TestSpiffeIDService(&testing.RuntimeT{}, cr.Name)
certURI := &connect.SpiffeIDService{
Host: "11111111-2222-3333-4444-555555555555.consul",
Namespace: "default",
Datacenter: svcs[idx].Node.Datacenter,
Service: svcs[idx].Service.ProxyDestination,
}
return fmt.Sprintf("%s:%d", addr, port), certURI, nil
}

View File

@ -41,7 +41,8 @@ type Service struct {
// fetch certificates and print a loud error message. It will not Close() or
// kill the process since that could lead to a crash loop in every service if
// ACL token was revoked. All attempts to dial will error and any incoming
// connections will fail to verify.
// connections will fail to verify. It may be nil if the Service is being
// configured from local files for development or testing.
client *api.Client
// tlsCfg is the dynamic TLS config
@ -63,6 +64,10 @@ type Service struct {
// NewService creates and starts a Service. The caller must close the returned
// service to free resources and allow the program to exit normally. This is
// typically called in a signal handler.
//
// Caller must provide client which is already configured to speak to the local
// Consul agent, and with an ACL token that has `service:write` privileges for
// the serviceID specified.
func NewService(serviceID string, client *api.Client) (*Service, error) {
return NewServiceWithLogger(serviceID, client,
log.New(os.Stderr, "", log.LstdFlags))
@ -89,7 +94,8 @@ func NewServiceWithLogger(serviceID string, client *api.Client,
s.rootsWatch.HybridHandler = s.rootsWatchHandler
p, err = watch.Parse(map[string]interface{}{
"type": "connect_leaf",
"type": "connect_leaf",
"service_id": s.serviceID,
})
if err != nil {
return nil, err
@ -97,26 +103,33 @@ func NewServiceWithLogger(serviceID string, client *api.Client,
s.leafWatch = p
s.leafWatch.HybridHandler = s.leafWatchHandler
//go s.rootsWatch.RunWithClientAndLogger(s.client, s.logger)
//go s.leafWatch.RunWithClientAndLogger(s.client, s.logger)
go s.rootsWatch.RunWithClientAndLogger(client, s.logger)
go s.leafWatch.RunWithClientAndLogger(client, s.logger)
return s, nil
}
// NewDevServiceFromCertFiles creates a Service using certificate and key files
// passed instead of fetching them from the client.
func NewDevServiceFromCertFiles(serviceID string, client *api.Client,
logger *log.Logger, caFile, certFile, keyFile string) (*Service, error) {
s := &Service{
serviceID: serviceID,
client: client,
logger: logger,
}
func NewDevServiceFromCertFiles(serviceID string, logger *log.Logger,
caFile, certFile, keyFile string) (*Service, error) {
tlsCfg, err := devTLSConfigFromFiles(caFile, certFile, keyFile)
if err != nil {
return nil, err
}
s.tlsCfg = newDynamicTLSConfig(tlsCfg)
return NewDevServiceWithTLSConfig(serviceID, logger, tlsCfg)
}
// NewDevServiceWithTLSConfig creates a Service using static TLS config passed.
// It's mostly useful for testing.
func NewDevServiceWithTLSConfig(serviceID string, logger *log.Logger,
tlsCfg *tls.Config) (*Service, error) {
s := &Service{
serviceID: serviceID,
logger: logger,
tlsCfg: newDynamicTLSConfig(tlsCfg),
}
return s, nil
}
@ -274,3 +287,17 @@ func (s *Service) leafWatchHandler(blockParam watch.BlockingParamVal, raw interf
s.tlsCfg.SetLeaf(&cert)
}
// Ready returns whether or not both roots and a leaf certificate are
// configured. If both are non-nil, they are assumed to be valid and usable.
func (s *Service) Ready() bool {
return s.tlsCfg.Ready()
}
// ReadyWait returns a chan that is closed when the the Service becomes ready
// for use. Note that if the Service is ready when it is called it returns a nil
// chan. Ready means that it has root and leaf certificates configured which we
// assume are valid.
func (s *Service) ReadyWait() <-chan struct{} {
return s.tlsCfg.ReadyWait()
}

View File

@ -1,16 +1,21 @@
package connect
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil/retry"
"github.com/stretchr/testify/require"
)
@ -111,10 +116,91 @@ func TestService_Dial(t *testing.T) {
}
func TestService_ServerTLSConfig(t *testing.T) {
// TODO(banks): it's mostly meaningless to test this now since we directly set
// the tlsCfg in our TestService helper which is all we'd be asserting on here
// not the actual implementation. Once agent tls fetching is built, it becomes
// more meaningful to actually verify it's returning the correct config.
require := require.New(t)
a := agent.NewTestAgent("007", "")
defer a.Shutdown()
client := a.Client()
agent := client.Agent()
// NewTestAgent setup a CA already by default
// Register a local agent service with a managed proxy
reg := &api.AgentServiceRegistration{
Name: "web",
Port: 8080,
}
err := agent.ServiceRegister(reg)
require.NoError(err)
// Now we should be able to create a service that will eventually get it's TLS
// all by itself!
service, err := NewService("web", client)
require.NoError(err)
// Wait for it to be ready
select {
case <-service.ReadyWait():
// continue with test case below
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for Service.ReadyWait after 1s")
}
tlsCfg := service.ServerTLSConfig()
// Sanity check it has a leaf with the right ServiceID and that validates with
// the given roots.
require.NotNil(tlsCfg.GetCertificate)
leaf, err := tlsCfg.GetCertificate(&tls.ClientHelloInfo{})
require.NoError(err)
cert, err := x509.ParseCertificate(leaf.Certificate[0])
require.NoError(err)
require.Len(cert.URIs, 1)
require.True(strings.HasSuffix(cert.URIs[0].String(), "/svc/web"))
// Verify it as a client would
err = clientSideVerifier(tlsCfg, leaf.Certificate)
require.NoError(err)
// Now test that rotating the root updates
{
// Setup a new generated CA
connect.TestCAConfigSet(t, a, nil)
}
// After some time, both root and leaves should be different but both should
// still be correct.
oldRootSubjects := bytes.Join(tlsCfg.RootCAs.Subjects(), []byte(", "))
//oldLeafSerial := connect.HexString(cert.SerialNumber.Bytes())
oldLeafKeyID := connect.HexString(cert.SubjectKeyId)
retry.Run(t, func(r *retry.R) {
updatedCfg := service.ServerTLSConfig()
// Wait until roots are different
rootSubjects := bytes.Join(updatedCfg.RootCAs.Subjects(), []byte(", "))
if bytes.Equal(oldRootSubjects, rootSubjects) {
r.Fatalf("root certificates should have changed, got %s",
rootSubjects)
}
leaf, err := updatedCfg.GetCertificate(&tls.ClientHelloInfo{})
r.Check(err)
cert, err := x509.ParseCertificate(leaf.Certificate[0])
r.Check(err)
// TODO(banks): Current CA implementation resets the serial index when CA
// config changes which means same serial is issued by new CA config failing
// this test. Re-enable once the CA is changed to fix that.
// if oldLeafSerial == connect.HexString(cert.SerialNumber.Bytes()) {
// r.Fatalf("leaf certificate should have changed, got serial %s",
// oldLeafSerial)
// }
if oldLeafKeyID == connect.HexString(cert.SubjectKeyId) {
r.Fatalf("leaf should have a different key, got matching SubjectKeyID = %s",
oldLeafKeyID)
}
})
}
func TestService_HTTPClient(t *testing.T) {

View File

@ -8,6 +8,7 @@ import (
"log"
"net"
"net/http"
"os"
"sync/atomic"
"github.com/hashicorp/consul/agent/connect"
@ -20,16 +21,12 @@ import (
func TestService(t testing.T, service string, ca *structs.CARoot) *Service {
t.Helper()
// Don't need to talk to client since we are setting TLSConfig locally. This
// will cause server verification to skip AuthZ too.
svc, err := NewService(service, nil)
// Don't need to talk to client since we are setting TLSConfig locally
svc, err := NewDevServiceWithTLSConfig(service,
log.New(os.Stderr, "", log.LstdFlags), TestTLSConfig(t, service, ca))
if err != nil {
t.Fatal(err)
}
// Override the tlsConfig hackily.
svc.tlsCfg = newDynamicTLSConfig(TestTLSConfig(t, service, ca))
return svc
}

View File

@ -4,7 +4,9 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"log"
"sync"
"github.com/hashicorp/consul/agent/connect"
@ -104,7 +106,8 @@ func verifyServerCertMatchesURI(certs []*x509.Certificate,
if cert.URIs[0].String() == expectedStr {
return nil
}
return errors.New("peer certificate mismatch")
return fmt.Errorf("peer certificate mismatch got %s, want %s",
cert.URIs[0].String(), expectedStr)
}
// newServerSideVerifier returns a verifierFunc that wraps the provided
@ -115,21 +118,25 @@ func newServerSideVerifier(client *api.Client, serviceID string) verifierFunc {
return func(tlsCfg *tls.Config, rawCerts [][]byte) error {
leaf, err := verifyChain(tlsCfg, rawCerts, false)
if err != nil {
log.Printf("connect: failed TLS verification: %s", err)
return err
}
// Check leaf is a cert we understand
if len(leaf.URIs) < 1 {
log.Printf("connect: invalid leaf certificate")
return errors.New("connect: invalid leaf certificate")
}
certURI, err := connect.ParseCertURI(leaf.URIs[0])
if err != nil {
log.Printf("connect: invalid leaf certificate URI")
return errors.New("connect: invalid leaf certificate URI")
}
// No AuthZ if there is no client.
if client == nil {
log.Printf("connect: nil client")
return nil
}
@ -148,9 +155,11 @@ func newServerSideVerifier(client *api.Client, serviceID string) verifierFunc {
}
resp, err := client.Agent().ConnectAuthorize(req)
if err != nil {
log.Printf("connect: authz call failed: %s", err)
return errors.New("connect: authz call failed: " + err.Error())
}
if !resp.Authorized {
log.Printf("connect: authz call denied: %s", resp.Reason)
return errors.New("connect: authz denied: " + resp.Reason)
}
return nil
@ -217,9 +226,17 @@ func verifyChain(tlsCfg *tls.Config, rawCerts [][]byte, client bool) (*x509.Cert
type dynamicTLSConfig struct {
base *tls.Config
sync.Mutex
sync.RWMutex
leaf *tls.Certificate
roots *x509.CertPool
// readyCh is closed when the config first gets both leaf and roots set.
// Watchers can wait on this via ReadyWait.
readyCh chan struct{}
}
type tlsCfgUpdate struct {
ch chan struct{}
next *tlsCfgUpdate
}
// newDynamicTLSConfig returns a dynamicTLSConfig constructed from base.
@ -235,6 +252,9 @@ func newDynamicTLSConfig(base *tls.Config) *dynamicTLSConfig {
if base.RootCAs != nil {
cfg.roots = base.RootCAs
}
if !cfg.Ready() {
cfg.readyCh = make(chan struct{})
}
return cfg
}
@ -246,8 +266,8 @@ func newDynamicTLSConfig(base *tls.Config) *dynamicTLSConfig {
// client can use this config for a long time and will still verify against the
// latest roots even though the roots in the struct is has can't change.
func (cfg *dynamicTLSConfig) Get(v verifierFunc) *tls.Config {
cfg.Lock()
defer cfg.Unlock()
cfg.RLock()
defer cfg.RUnlock()
copy := cfg.base.Clone()
copy.RootCAs = cfg.roots
copy.ClientCAs = cfg.roots
@ -281,6 +301,7 @@ func (cfg *dynamicTLSConfig) SetRoots(roots *x509.CertPool) error {
cfg.Lock()
defer cfg.Unlock()
cfg.roots = roots
cfg.notify()
return nil
}
@ -289,19 +310,43 @@ func (cfg *dynamicTLSConfig) SetLeaf(leaf *tls.Certificate) error {
cfg.Lock()
defer cfg.Unlock()
cfg.leaf = leaf
cfg.notify()
return nil
}
// notify is called under lock during an update to check if we are now ready.
func (cfg *dynamicTLSConfig) notify() {
if cfg.readyCh != nil && cfg.leaf != nil && cfg.roots != nil {
close(cfg.readyCh)
cfg.readyCh = nil
}
}
// Roots returns the current CA root CertPool.
func (cfg *dynamicTLSConfig) Roots() *x509.CertPool {
cfg.Lock()
defer cfg.Unlock()
cfg.RLock()
defer cfg.RUnlock()
return cfg.roots
}
// Leaf returns the current Leaf certificate.
func (cfg *dynamicTLSConfig) Leaf() *tls.Certificate {
cfg.Lock()
defer cfg.Unlock()
cfg.RLock()
defer cfg.RUnlock()
return cfg.leaf
}
// Ready returns whether or not both roots and a leaf certificate are
// configured. If both are non-nil, they are assumed to be valid and usable.
func (cfg *dynamicTLSConfig) Ready() bool {
cfg.RLock()
defer cfg.RUnlock()
return cfg.leaf != nil && cfg.roots != nil
}
// ReadyWait returns a chan that is closed when the the tlsConfig becomes Ready
// for use. Note that if the config is ready when it is called it returns a nil
// chan.
func (cfg *dynamicTLSConfig) ReadyWait() <-chan struct{} {
return cfg.readyCh
}

View File

@ -358,3 +358,45 @@ func TestDynamicTLSConfig(t *testing.T) {
requireCorrectVerifier(t, newCfg, gotBefore, v1Ch)
requireCorrectVerifier(t, newCfg, gotAfter, v2Ch)
}
func TestDynamicTLSConfig_Ready(t *testing.T) {
require := require.New(t)
ca1 := connect.TestCA(t, nil)
baseCfg := TestTLSConfig(t, "web", ca1)
c := newDynamicTLSConfig(defaultTLSConfig())
readyCh := c.ReadyWait()
assertBlocked(t, readyCh)
require.False(c.Ready(), "no roots or leaf, should not be ready")
err := c.SetLeaf(&baseCfg.Certificates[0])
require.NoError(err)
assertBlocked(t, readyCh)
require.False(c.Ready(), "no roots, should not be ready")
err = c.SetRoots(baseCfg.RootCAs)
require.NoError(err)
assertNotBlocked(t, readyCh)
require.True(c.Ready(), "should be ready")
}
func assertBlocked(t *testing.T, ch <-chan struct{}) {
t.Helper()
select {
case <-ch:
t.Fatalf("want blocked chan")
default:
return
}
}
func assertNotBlocked(t *testing.T, ch <-chan struct{}) {
t.Helper()
select {
case <-ch:
return
default:
t.Fatalf("want unblocked chan but it blocked")
}
}

View File

@ -17,6 +17,7 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
@ -94,6 +95,7 @@ type TestServerConfig struct {
VerifyIncomingHTTPS bool `json:"verify_incoming_https,omitempty"`
VerifyOutgoing bool `json:"verify_outgoing,omitempty"`
EnableScriptChecks bool `json:"enable_script_checks,omitempty"`
Connect map[string]interface{} `json:"connect,omitempty"`
ReadyTimeout time.Duration `json:"-"`
Stdout, Stderr io.Writer `json:"-"`
Args []string `json:"-"`
@ -211,6 +213,7 @@ func newTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, e
return nil, errors.Wrap(err, "failed marshaling json")
}
log.Printf("CONFIG JSON: %s", string(b))
configFile := filepath.Join(tmpdir, "config.json")
if err := ioutil.WriteFile(configFile, b, 0644); err != nil {
defer os.RemoveAll(tmpdir)

View File

@ -236,8 +236,7 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
// connectRootsWatch is used to watch for changes to Connect Root certificates.
func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) {
// We don't support stale since roots are likely to be cached locally in the
// agent anyway.
// We don't support stale since roots are cached locally in the agent.
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
agent := p.client.Agent()
@ -257,8 +256,7 @@ func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) {
// connectLeafWatch is used to watch for changes to Connect Leaf certificates
// for given local service id.
func connectLeafWatch(params map[string]interface{}) (WatcherFunc, error) {
// We don't support stale since certs are likely to be cached locally in the
// agent anyway.
// We don't support stale since certs are cached locally in the agent.
var serviceID string
if err := assignValue(params, "service_id", &serviceID); err != nil {

View File

@ -7,8 +7,10 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/connect"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/watch"
"github.com/stretchr/testify/require"
@ -526,14 +528,12 @@ func TestEventWatch(t *testing.T) {
}
func TestConnectRootsWatch(t *testing.T) {
// TODO(banks) enable and make it work once this is supported. Note that this
// test actually passes currently just by busy-polling the roots endpoint
// until it changes.
t.Skip("CA and Leaf implementation don't actually support blocking yet")
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
// NewTestAgent will bootstrap a new CA
a := agent.NewTestAgent(t.Name(), "")
defer a.Shutdown()
var originalCAID string
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"connect_roots"}`)
plan.Handler = func(idx uint64, raw interface{}) {
@ -544,7 +544,14 @@ func TestConnectRootsWatch(t *testing.T) {
if !ok || v == nil {
return // ignore
}
// TODO(banks): verify the right roots came back.
// Only 1 CA is the bootstrapped state (i.e. first response). Ignore this
// state and wait for the new CA to show up too.
if len(v.Roots) == 1 {
originalCAID = v.ActiveRootID
return
}
assert.NotEmpty(t, originalCAID)
assert.NotEqual(t, originalCAID, v.ActiveRootID)
invoke <- nil
}
@ -553,22 +560,8 @@ func TestConnectRootsWatch(t *testing.T) {
go func() {
defer wg.Done()
time.Sleep(20 * time.Millisecond)
// TODO(banks): this is a hack since CA config is in flux. We _did_ expose a
// temporary agent endpoint for PUTing config, but didn't expose it in `api`
// package intentionally. If we are going to hack around with temporary API,
// we can might as well drop right down to the RPC level...
args := structs.CAConfiguration{
Provider: "static",
Config: map[string]interface{}{
"Name": "test-1",
"Generate": true,
},
}
var reply interface{}
if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
// Set a new CA
connect.TestCAConfigSet(t, a, nil)
}()
wg.Add(1)
@ -588,9 +581,8 @@ func TestConnectRootsWatch(t *testing.T) {
}
func TestConnectLeafWatch(t *testing.T) {
// TODO(banks) enable and make it work once this is supported.
t.Skip("CA and Leaf implementation don't actually support blocking yet")
t.Parallel()
// NewTestAgent will bootstrap a new CA
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
@ -606,25 +598,10 @@ func TestConnectLeafWatch(t *testing.T) {
require.Nil(t, err)
}
// Setup a new generated CA
//
// TODO(banks): this is a hack since CA config is in flux. We _did_ expose a
// temporary agent endpoint for PUTing config, but didn't expose it in `api`
// package intentionally. If we are going to hack around with temporary API,
// we can might as well drop right down to the RPC level...
args := structs.CAConfiguration{
Provider: "static",
Config: map[string]interface{}{
"Name": "test-1",
"Generate": true,
},
}
var reply interface{}
if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
var lastCert *consulapi.LeafCert
invoke := makeInvokeCh()
//invoke := makeInvokeCh()
invoke := make(chan error)
plan := mustParse(t, `{"type":"connect_leaf", "service_id":"web"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
@ -634,7 +611,18 @@ func TestConnectLeafWatch(t *testing.T) {
if !ok || v == nil {
return // ignore
}
// TODO(banks): verify the right leaf came back.
if lastCert == nil {
// Initial fetch, just store the cert and return
lastCert = v
return
}
// TODO(banks): right now the root rotation actually causes Serial numbers
// to reset so these end up all being the same. That needs fixing but it's
// a bigger task than I want to bite off for this PR.
//assert.NotEqual(t, lastCert.SerialNumber, v.SerialNumber)
assert.NotEqual(t, lastCert.CertPEM, v.CertPEM)
assert.NotEqual(t, lastCert.PrivateKeyPEM, v.PrivateKeyPEM)
assert.NotEqual(t, lastCert.ModifyIndex, v.ModifyIndex)
invoke <- nil
}
@ -643,20 +631,8 @@ func TestConnectLeafWatch(t *testing.T) {
go func() {
defer wg.Done()
time.Sleep(20 * time.Millisecond)
// Change the CA which should eventually trigger a leaf change but probably
// won't now so this test has no way to succeed yet.
args := structs.CAConfiguration{
Provider: "static",
Config: map[string]interface{}{
"Name": "test-2",
"Generate": true,
},
}
var reply interface{}
if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
// Change the CA to trigger a leaf change
connect.TestCAConfigSet(t, a, nil)
}()
wg.Add(1)
@ -740,6 +716,7 @@ func TestConnectProxyConfigWatch(t *testing.T) {
}
func mustParse(t *testing.T, q string) *watch.Plan {
t.Helper()
var params map[string]interface{}
if err := json.Unmarshal([]byte(q), &params); err != nil {
t.Fatal(err)