Add -sidecar-for and new /agent/service/:service_id endpoint (#4691)

- A new endpoint `/v1/agent/service/:service_id` which is a generic way to look up the service for a single instance. The primary value here is that it:
   - **supports hash-based blocking** and so;
   - **replaces `/agent/connect/proxy/:proxy_id`** as the mechanism the built-in proxy uses to read its config.
   - It's not proxy specific and so works for any service.
   - It has a temporary shim to call through to the existing endpoint to preserve current managed proxy config defaulting behaviour until that is removed entirely (tested).
 - The built-in proxy now uses the new endpoint exclusively for it's config
 - The built-in proxy now has a `-sidecar-for` flag that allows the service ID of the _target_ service to be specified, on the condition that there is exactly one "sidecar" proxy (that is one that has `Proxy.DestinationServiceID` set) for the service registered.
 - Several fixes for edge cases for SidecarService
 - A fix for `Alias` checks - when running locally they didn't update their state until some external thing updated the target. If the target service has no checks registered as below, then the alias never made it past critical.
This commit is contained in:
Paul Banks 2018-09-27 15:00:51 +01:00
parent 7038fe6b71
commit 979e1c9c94
26 changed files with 1380 additions and 121 deletions

View File

@ -306,8 +306,9 @@ func (s *StateSyncer) Paused() bool {
return s.paused != 0
}
// Resume re-enables sync runs.
func (s *StateSyncer) Resume() {
// Resume re-enables sync runs. It returns true if it was the last pause/resume
// pair on the stack and so actually caused the state syncer to resume.
func (s *StateSyncer) Resume() bool {
s.pauseLock.Lock()
s.paused--
if s.paused < 0 {
@ -318,4 +319,5 @@ func (s *StateSyncer) Resume() {
if trigger {
s.SyncChanges.Trigger()
}
return trigger
}

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/hashicorp/consul/lib"
"github.com/stretchr/testify/assert"
)
func TestAE_scaleFactor(t *testing.T) {
@ -47,14 +48,16 @@ func TestAE_Pause_nestedPauseResume(t *testing.T) {
if l.Paused() != true {
t.Fatal("syncer should STILL be Paused after second call to Pause()")
}
l.Resume()
gotR := l.Resume()
if l.Paused() != true {
t.Fatal("syncer should STILL be Paused after FIRST call to Resume()")
}
l.Resume()
assert.False(t, gotR)
gotR = l.Resume()
if l.Paused() != false {
t.Fatal("syncer should NOT be Paused after SECOND call to Resume()")
}
assert.True(t, gotR)
defer func() {
err := recover()

View File

@ -127,6 +127,11 @@ type Agent struct {
// and the remote state.
sync *ae.StateSyncer
// syncMu and syncCh are used to coordinate agent endpoints that are blocking
// on local state during a config reload.
syncMu sync.Mutex
syncCh chan struct{}
// cache is the in-memory cache for data the Agent requests.
cache *cache.Cache
@ -1490,14 +1495,53 @@ func (a *Agent) StartSync() {
a.logger.Printf("[INFO] agent: started state syncer")
}
// PauseSync is used to pause anti-entropy while bulk changes are make
// PauseSync is used to pause anti-entropy while bulk changes are made. It also
// sets state that agent-local watches use to "ride out" config reloads and bulk
// updates which might spuriously unload state and reload it again.
func (a *Agent) PauseSync() {
// Do this outside of lock as it has it's own locking
a.sync.Pause()
// Coordinate local state watchers
a.syncMu.Lock()
defer a.syncMu.Unlock()
if a.syncCh == nil {
a.syncCh = make(chan struct{})
}
}
// ResumeSync is used to unpause anti-entropy after bulk changes are make
func (a *Agent) ResumeSync() {
a.sync.Resume()
// a.sync maintains a stack/ref count of Pause calls since we call
// Pause/Resume in nested way during a reload and AddService. We only want to
// trigger local state watchers if this Resume call actually started sync back
// up again (i.e. was the last resume on the stack). We could check that
// separately with a.sync.Paused but that is racey since another Pause call
// might be made between our Resume and checking Paused.
resumed := a.sync.Resume()
if !resumed {
// Return early so we don't notify local watchers until we are actually
// resumed.
return
}
// Coordinate local state watchers
a.syncMu.Lock()
defer a.syncMu.Unlock()
if a.syncCh != nil {
close(a.syncCh)
a.syncCh = nil
}
}
// syncPausedCh returns either a channel or nil. If nil sync is not paused. If
// non-nil, the channel will be closed when sync resumes.
func (a *Agent) syncPausedCh() <-chan struct{} {
a.syncMu.Lock()
defer a.syncMu.Unlock()
return a.syncCh
}
// GetLANCoordinate returns the coordinates of this node in the local pools

View File

@ -1,8 +1,10 @@
package agent
import (
"errors"
"fmt"
"log"
"net"
"net/http"
"net/url"
"strconv"
@ -230,6 +232,154 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request)
return agentSvcs, nil
}
// GET /v1/agent/service/:service_id
//
// Returns the service definition for a single local services and allows
// blocking watch using hash-based blocking.
func (s *HTTPServer) AgentService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Get the proxy ID. Note that this is the ID of a proxy's service instance.
id := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/")
// DEPRECATED(managed-proxies) - remove this whole hack.
//
// Support managed proxies until they are removed entirely. Since built-in
// proxy will now use this endpoint, in order to not break managed proxies in
// the interim until they are removed, we need to mirror the default-setting
// behaviour they had. Rather than thread that through this whole method as
// special cases that need to be unwound later (and duplicate logic in the
// proxy config endpoint) just defer to that and then translater the response.
if managedProxy := s.agent.State.Proxy(id); managedProxy != nil {
// This is for a managed proxy, use the old endpoint's behaviour
req.URL.Path = "/v1/agent/connect/proxy/" + id
obj, err := s.AgentConnectProxyConfig(resp, req)
if err != nil {
return obj, err
}
proxyCfg, ok := obj.(*api.ConnectProxyConfig)
if !ok {
return nil, errors.New("internal error")
}
// These are all set by defaults so type checks are just sanity checks that
// should never fail.
port, ok := proxyCfg.Config["bind_port"].(int)
if !ok || port < 1 {
return nil, errors.New("invalid proxy config")
}
addr, ok := proxyCfg.Config["bind_address"].(string)
if !ok || addr == "" {
return nil, errors.New("invalid proxy config")
}
localAddr, ok := proxyCfg.Config["local_service_address"].(string)
if !ok || localAddr == "" {
return nil, errors.New("invalid proxy config")
}
// Old local_service_address was a host:port
localAddress, localPortRaw, err := net.SplitHostPort(localAddr)
if err != nil {
return nil, err
}
localPort, err := strconv.Atoi(localPortRaw)
if err != nil {
return nil, err
}
reply := &api.AgentService{
Kind: api.ServiceKindConnectProxy,
ID: proxyCfg.ProxyServiceID,
Service: managedProxy.Proxy.ProxyService.Service,
Port: port,
Address: addr,
ContentHash: proxyCfg.ContentHash,
Proxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceName: proxyCfg.TargetServiceName,
DestinationServiceID: proxyCfg.TargetServiceID,
LocalServiceAddress: localAddress,
LocalServicePort: localPort,
Config: proxyCfg.Config,
Upstreams: proxyCfg.Upstreams,
},
}
return reply, nil
}
// Maybe block
var queryOpts structs.QueryOptions
if parseWait(resp, req, &queryOpts) {
// parseWait returns an error itself
return nil, nil
}
// Parse the token
var token string
s.parseToken(req, &token)
// Parse hash specially. Eventually this should happen in parseWait and end up
// in QueryOptions but I didn't want to make very general changes right away.
hash := req.URL.Query().Get("hash")
return s.agentLocalBlockingQuery(resp, hash, &queryOpts,
func(ws memdb.WatchSet) (string, interface{}, error) {
svcState := s.agent.State.ServiceState(id)
if svcState == nil {
resp.WriteHeader(http.StatusNotFound)
fmt.Fprintf(resp, "unknown proxy service ID: %s", id)
return "", nil, nil
}
svc := svcState.Service
// Setup watch on the service
ws.Add(svcState.WatchCh)
// Check ACLs.
rule, err := s.agent.resolveToken(token)
if err != nil {
return "", nil, err
}
if rule != nil && !rule.ServiceRead(svc.Service) {
return "", nil, acl.ErrPermissionDenied
}
var connect *api.AgentServiceConnect
var proxy *api.AgentServiceConnectProxyConfig
if svc.Connect.Native {
connect = &api.AgentServiceConnect{
Native: svc.Connect.Native,
}
}
if svc.Kind == structs.ServiceKindConnectProxy {
proxy = svc.Proxy.ToAPI()
}
// Calculate the content hash over the response, minus the hash field
reply := &api.AgentService{
Kind: api.ServiceKind(svc.Kind),
ID: svc.ID,
Service: svc.Service,
Tags: svc.Tags,
Meta: svc.Meta,
Port: svc.Port,
Address: svc.Address,
EnableTagOverride: svc.EnableTagOverride,
Proxy: proxy,
Connect: connect,
}
rawHash, err := hashstructure.Hash(reply, nil)
if err != nil {
return "", nil, err
}
// Include the ContentHash in the response body
reply.ContentHash = fmt.Sprintf("%x", rawHash)
return reply.ContentHash, reply, nil
})
}
func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any.
var token string
@ -1254,7 +1404,17 @@ func (s *HTTPServer) agentLocalBlockingQuery(resp http.ResponseWriter, hash stri
return curResp, err
}
// Watch returned false indicating a change was detected, loop and repeat
// the callback to load the new value.
// the callback to load the new value. If agent sync is paused it means
// local state is currently being bulk-edited e.g. config reload. In this
// case it's likely that local state just got unloaded and may or may not be
// reloaded yet. Wait a short amount of time for Sync to resume to ride out
// typical config reloads.
if syncPauseCh := s.agent.syncPausedCh(); syncPauseCh != nil {
select {
case <-syncPauseCh:
case <-timeout.C:
}
}
}
}

View File

@ -225,6 +225,367 @@ func TestAgent_Services_ACLFilter(t *testing.T) {
})
}
func TestAgent_Service(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), TestACLConfig()+`
services {
name = "web"
port = 8181
}
`)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
proxy := structs.TestConnectProxyConfig(t)
proxy.DestinationServiceID = "web1"
// Define a valid local sidecar proxy service
sidecarProxy := &structs.ServiceDefinition{
Kind: structs.ServiceKindConnectProxy,
Name: "web-sidecar-proxy",
Check: structs.CheckType{
TCP: "127.0.0.1:8000",
Interval: 10 * time.Second,
},
Port: 8000,
Proxy: &proxy,
}
// Define an updated version. Be careful to copy it.
updatedProxy := *sidecarProxy
updatedProxy.Port = 9999
// Mangle the proxy config/upstreams into the expected for with defaults and
// API struct types.
expectProxy := proxy
expectProxy.Upstreams =
structs.TestAddDefaultsToUpstreams(t, sidecarProxy.Proxy.Upstreams)
expectedResponse := &api.AgentService{
Kind: api.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Service: "web-sidecar-proxy",
Port: 8000,
Proxy: expectProxy.ToAPI(),
ContentHash: "26959a754e182054",
}
// Copy and modify
updatedResponse := *expectedResponse
updatedResponse.Port = 9999
updatedResponse.ContentHash = "1bdcf042660b33f6"
// Simple response for non-proxy service regustered in TestAgent config
expectWebResponse := &api.AgentService{
ID: "web",
Service: "web",
Port: 8181,
ContentHash: "7be2b0411161d3b1",
}
tests := []struct {
name string
tokenRules string
url string
updateFunc func()
wantWait time.Duration
wantCode int
wantErr string
wantResp *api.AgentService
}{
{
name: "simple fetch - proxy",
url: "/v1/agent/service/web-sidecar-proxy",
wantCode: 200,
wantResp: expectedResponse,
},
{
name: "simple fetch - non-proxy",
url: "/v1/agent/service/web",
wantCode: 200,
wantResp: expectWebResponse,
},
{
name: "blocking fetch timeout, no change",
url: "/v1/agent/service/web-sidecar-proxy?hash=" + expectedResponse.ContentHash + "&wait=100ms",
wantWait: 100 * time.Millisecond,
wantCode: 200,
wantResp: expectedResponse,
},
{
name: "blocking fetch old hash should return immediately",
url: "/v1/agent/service/web-sidecar-proxy?hash=123456789abcd&wait=10m",
wantCode: 200,
wantResp: expectedResponse,
},
{
name: "blocking fetch returns change",
url: "/v1/agent/service/web-sidecar-proxy?hash=" + expectedResponse.ContentHash,
updateFunc: func() {
time.Sleep(100 * time.Millisecond)
// Re-register with new proxy config, make sure we copy the struct so we
// don't alter it and affect later test cases.
req, _ := http.NewRequest("PUT", "/v1/agent/service/register?token=root", jsonReader(updatedProxy))
resp := httptest.NewRecorder()
_, err := a.srv.AgentRegisterService(resp, req)
require.NoError(t, err)
require.Equal(t, 200, resp.Code, "body: %s", resp.Body.String())
},
wantWait: 100 * time.Millisecond,
wantCode: 200,
wantResp: &updatedResponse,
},
{
// This test exercises a case that caused a busy loop to eat CPU for the
// entire duration of the blocking query. If a service gets re-registered
// wth same proxy config then the old proxy config chan is closed causing
// blocked watchset.Watch to return false indicating a change. But since
// the hash is the same when the blocking fn is re-called we should just
// keep blocking on the next iteration. The bug hit was that the WatchSet
// ws was not being reset in the loop and so when you try to `Watch` it
// the second time it just returns immediately making the blocking loop
// into a busy-poll!
//
// This test though doesn't catch that because busy poll still has the
// correct external behaviour. I don't want to instrument the loop to
// assert it's not executing too fast here as I can't think of a clean way
// and the issue is fixed now so this test doesn't actually catch the
// error, but does provide an easy way to verify the behaviour by hand:
// 1. Make this test fail e.g. change wantErr to true
// 2. Add a log.Println or similar into the blocking loop/function
// 3. See whether it's called just once or many times in a tight loop.
name: "blocking fetch interrupted with no change (same hash)",
url: "/v1/agent/service/web-sidecar-proxy?wait=200ms&hash=" + expectedResponse.ContentHash,
updateFunc: func() {
time.Sleep(100 * time.Millisecond)
// Re-register with _same_ proxy config
req, _ := http.NewRequest("PUT", "/v1/agent/service/register?token=root", jsonReader(sidecarProxy))
resp := httptest.NewRecorder()
_, err := a.srv.AgentRegisterService(resp, req)
require.NoError(t, err)
require.Equal(t, 200, resp.Code, "body: %s", resp.Body.String())
},
wantWait: 200 * time.Millisecond,
wantCode: 200,
wantResp: expectedResponse,
},
{
// When we reload config, the agent pauses Anti-entropy, then clears all
// services (which causes their watch chans to be closed) before loading
// state from config/snapshot again). If we do that naively then we don't
// just get a spurios wakeup on the watch if the service didn't change,
// but we get it wakeup and then race with the reload and probably see no
// services and return a 404 error which is gross. This test excercises
// that - even though the registrations were from API not config, they are
// persisted and cleared/reloaded from snapshot which has same effect.
//
// The fix for this test is to allow the same mechanism that pauses
// Anti-entropy during reload to also pause the hash blocking loop so we
// don't resume until the state is reloaded and we get a chance to see if
// it actually changed or not.
name: "blocking fetch interrupted by reload shouldn't 404 - no change",
url: "/v1/agent/service/web-sidecar-proxy?wait=200ms&hash=" + expectedResponse.ContentHash,
updateFunc: func() {
time.Sleep(100 * time.Millisecond)
// Reload
require.NoError(t, a.ReloadConfig(a.Config))
},
// Should eventually timeout since there is no actual change
wantWait: 200 * time.Millisecond,
wantCode: 200,
wantResp: expectedResponse,
},
{
// As above but test actually altering the service with the config reload.
// This simulates the API registration being overridden by a different one
// on disk during reload.
name: "blocking fetch interrupted by reload shouldn't 404 - changes",
url: "/v1/agent/service/web-sidecar-proxy?wait=10m&hash=" + expectedResponse.ContentHash,
updateFunc: func() {
time.Sleep(100 * time.Millisecond)
// Reload
newConfig := *a.Config
newConfig.Services = append(newConfig.Services, &updatedProxy)
require.NoError(t, a.ReloadConfig(&newConfig))
},
wantWait: 100 * time.Millisecond,
wantCode: 200,
wantResp: &updatedResponse,
},
{
name: "err: non-existent proxy",
url: "/v1/agent/service/nope",
wantCode: 404,
},
{
name: "err: bad ACL for service",
url: "/v1/agent/service/web-sidecar-proxy",
// Limited token doesn't grant read to the service
tokenRules: `
key "" {
policy = "read"
}
`,
// Note that because we return ErrPermissionDenied and handle writing
// status at a higher level helper this actually gets a 200 in this test
// case so just assert that it was an error.
wantErr: "Permission denied",
},
{
name: "good ACL for service",
url: "/v1/agent/service/web-sidecar-proxy",
// Limited token doesn't grant read to the service
tokenRules: `
service "web-sidecar-proxy" {
policy = "read"
}
`,
wantCode: 200,
wantResp: expectedResponse,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
// Register the basic service to ensure it's in a known state to start.
{
req, _ := http.NewRequest("PUT", "/v1/agent/service/register?token=root", jsonReader(sidecarProxy))
resp := httptest.NewRecorder()
_, err := a.srv.AgentRegisterService(resp, req)
require.NoError(err)
require.Equal(200, resp.Code, "body: %s", resp.Body.String())
}
req, _ := http.NewRequest("GET", tt.url, nil)
// Inject the root token for tests that don't care about ACL
var token = "root"
if tt.tokenRules != "" {
// Create new token and use that.
token = testCreateToken(t, a, tt.tokenRules)
}
req.Header.Set("X-Consul-Token", token)
resp := httptest.NewRecorder()
if tt.updateFunc != nil {
go tt.updateFunc()
}
start := time.Now()
obj, err := a.srv.AgentService(resp, req)
elapsed := time.Now().Sub(start)
if tt.wantErr != "" {
require.Error(err)
require.Contains(strings.ToLower(err.Error()), strings.ToLower(tt.wantErr))
} else {
require.NoError(err)
}
if tt.wantCode != 0 {
require.Equal(tt.wantCode, resp.Code, "body: %s", resp.Body.String())
}
if tt.wantWait != 0 {
assert.True(elapsed >= tt.wantWait, "should have waited at least %s, "+
"took %s", tt.wantWait, elapsed)
} else {
assert.True(elapsed < 10*time.Millisecond, "should not have waited, "+
"took %s", elapsed)
}
if tt.wantResp != nil {
assert.Equal(tt.wantResp, obj)
assert.Equal(tt.wantResp.ContentHash, resp.Header().Get("X-Consul-ContentHash"))
} else {
// Janky but Equal doesn't help here because nil !=
// *api.AgentService((*api.AgentService)(nil))
assert.Nil(obj)
}
})
}
}
// DEPRECATED(managed-proxies) - remove this In the interim, we need the newer
// /agent/service/service to work for managed proxies so we can swithc the built
// in proxy to use only that without breaking managed proxies early.
func TestAgent_Service_DeprecatedManagedProxy(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), `
connect {
proxy {
allow_managed_api_registration = true
}
}
`)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
svc := &structs.ServiceDefinition{
Name: "web",
Port: 8000,
Check: structs.CheckType{
TTL: 10 * time.Second,
},
Connect: &structs.ServiceConnect{
Proxy: &structs.ServiceDefinitionConnectProxy{
// Fix the command otherwise the executable path ends up being random
// temp dir in every test run so the ContentHash will never match.
Command: []string{"foo"},
Config: map[string]interface{}{
"foo": "bar",
"bind_address": "10.10.10.10",
"bind_port": 9999, // make this deterministic
},
Upstreams: structs.TestUpstreams(t),
},
},
}
require := require.New(t)
rr := httptest.NewRecorder()
req, _ := http.NewRequest("POST", "/v1/agent/services/register", jsonReader(svc))
_, err := a.srv.AgentRegisterService(rr, req)
require.NoError(err)
require.Equal(200, rr.Code, "body:\n"+rr.Body.String())
rr = httptest.NewRecorder()
req, _ = http.NewRequest("GET", "/v1/agent/service/web-proxy", nil)
obj, err := a.srv.AgentService(rr, req)
require.NoError(err)
require.Equal(200, rr.Code, "body:\n"+rr.Body.String())
gotService, ok := obj.(*api.AgentService)
require.True(ok)
expect := &api.AgentService{
Kind: api.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Port: 9999,
Address: "10.10.10.10",
ContentHash: "e24f099e42e88317",
Proxy: &api.AgentServiceConnectProxyConfig{
DestinationServiceID: "web",
DestinationServiceName: "web",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 8000,
Config: map[string]interface{}{
"foo": "bar",
"bind_port": 9999,
"bind_address": "10.10.10.10",
"local_service_address": "127.0.0.1:8000",
},
Upstreams: structs.TestAddDefaultsToUpstreams(t, svc.Connect.Proxy.Upstreams).ToAPI(),
},
}
require.Equal(expect, gotService)
}
func TestAgent_Checks(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
@ -1593,15 +1954,28 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
},
},
},
SidecarService: &structs.ServiceDefinition{
Name: "test-proxy",
// The sidecar service is nilled since it is only config sugar and
// shouldn't be represented in state. We assert that the translations
// there worked by inspecting the registered sidecar below.
SidecarService: nil,
},
}
got := a.State.Service("test")
require.Equal(t, svc, got)
sidecarSvc := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "test-sidecar-proxy",
Service: "test-proxy",
Meta: map[string]string{
"some": "meta",
"enable_tag_override": "sidecar_service.meta is 'opaque' so should not get translated",
}, Port: 8001,
},
Port: 8001,
EnableTagOverride: true,
Kind: structs.ServiceKindConnectProxy,
Proxy: &structs.ConnectProxyConfig{
LocallyRegisteredAsSidecar: true,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "test",
DestinationServiceID: "test",
LocalServiceAddress: "127.0.0.1",
@ -1619,12 +1993,10 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
},
},
},
},
},
}
got := a.State.Service("test")
require.Equal(t, svc, got)
gotSidecar := a.State.Service("test-sidecar-proxy")
require.Equal(t, sidecarSvc, gotSidecar)
}
func TestAgent_RegisterService_ACLDeny(t *testing.T) {
@ -1978,6 +2350,21 @@ func testDefaultSidecar(svc string, port int, fns ...func(*structs.NodeService))
return ns
}
func testCreateToken(t *testing.T, a *TestAgent, rules string) string {
args := map[string]interface{}{
"Name": "User Token",
"Type": "client",
"Rules": rules,
}
req, _ := http.NewRequest("PUT", "/v1/acl/create?token=root", jsonReader(args))
resp := httptest.NewRecorder()
obj, err := a.srv.ACLCreate(resp, req)
require.NoError(t, err)
require.NotNil(t, obj)
aclResp := obj.(aclCreateResponse)
return aclResp.ID
}
// This tests local agent service registration with a sidecar service. Note we
// only test simple defaults for the sidecar here since the actual logic for
// handling sidecar defaults and port assignment is tested thoroughly in
@ -2361,18 +2748,7 @@ func TestAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T) {
// Create an ACL token with require policy
var token string
if tt.enableACL && tt.tokenRules != "" {
args := map[string]interface{}{
"Name": "User Token",
"Type": "client",
"Rules": tt.tokenRules,
}
req, _ := http.NewRequest("PUT", "/v1/acl/create?token=root", jsonReader(args))
resp := httptest.NewRecorder()
obj, err := a.srv.ACLCreate(resp, req)
require.NoError(err)
require.NotNil(obj)
aclResp := obj.(aclCreateResponse)
token = aclResp.ID
token = testCreateToken(t, a, tt.tokenRules)
}
br := bytes.NewBufferString(tt.json)

View File

@ -85,16 +85,22 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) {
c.Notify.AddAliasCheck(c.CheckID, c.ServiceID, notifyCh)
defer c.Notify.RemoveAliasCheck(c.CheckID, c.ServiceID)
for {
select {
case <-notifyCh:
updateStatus := func() {
checks := c.Notify.Checks()
checksList := make([]*structs.HealthCheck, 0, len(checks))
for _, chk := range checks {
checksList = append(checksList, chk)
}
c.processChecks(checksList)
}
// Immediately run to get the current state of the target service
updateStatus()
for {
select {
case <-notifyCh:
updateStatus()
case <-stopCh:
return
}

View File

@ -435,3 +435,31 @@ func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error
return nil
}
// Test that local checks immediately reflect the subject states when added and
// don't require an update to the subject before being accurate.
func TestCheckAlias_localInitialStatus(t *testing.T) {
t.Parallel()
notify := newMockAliasNotify()
chkID := types.CheckID("foo")
rpc := &mockRPC{}
chk := &CheckAlias{
ServiceID: "web",
CheckID: chkID,
Notify: notify,
RPC: rpc,
}
chk.Start()
defer chk.Stop()
// Don't touch the aliased service or it's checks (there are none but this is
// valid and should be consisded "passing").
retry.Run(t, func(r *retry.R) {
if got, want := notify.State(chkID), api.HealthPassing; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
}

View File

@ -18,6 +18,7 @@ func init() {
registerEndpoint("/v1/agent/monitor", []string{"GET"}, (*HTTPServer).AgentMonitor)
registerEndpoint("/v1/agent/metrics", []string{"GET"}, (*HTTPServer).AgentMetrics)
registerEndpoint("/v1/agent/services", []string{"GET"}, (*HTTPServer).AgentServices)
registerEndpoint("/v1/agent/service/", []string{"GET"}, (*HTTPServer).AgentService)
registerEndpoint("/v1/agent/checks", []string{"GET"}, (*HTTPServer).AgentChecks)
registerEndpoint("/v1/agent/members", []string{"GET"}, (*HTTPServer).AgentMembers)
registerEndpoint("/v1/agent/join/", []string{"PUT"}, (*HTTPServer).AgentJoin)

View File

@ -51,10 +51,16 @@ type ServiceState struct {
// Deleted is true when the service record has been marked as deleted
// but has not been removed on the server yet.
Deleted bool
// WatchCh is closed when the service state changes suitable for use in a
// memdb.WatchSet when watching agent local changes with hash-based blocking.
WatchCh chan struct{}
}
// Clone returns a shallow copy of the object. The service record still
// points to the original service record and must not be modified.
// Clone returns a shallow copy of the object. The service record still points
// to the original service record and must not be modified. The WatchCh is also
// still pointing to the original so the clone will be update when the original
// is.
func (s *ServiceState) Clone() *ServiceState {
s2 := new(ServiceState)
*s2 = *s
@ -279,6 +285,10 @@ func (l *State) RemoveService(id string) error {
// entry around until it is actually removed.
s.InSync = false
s.Deleted = true
if s.WatchCh != nil {
close(s.WatchCh)
s.WatchCh = nil
}
l.TriggerSyncChanges()
return nil
@ -313,9 +323,10 @@ func (l *State) Services() map[string]*structs.NodeService {
return m
}
// ServiceState returns a shallow copy of the current service state
// record. The service record still points to the original service
// record and must not be modified.
// ServiceState returns a shallow copy of the current service state record. The
// service record still points to the original service record and must not be
// modified. The WatchCh for the copy returned will also be closed when the
// actual service state is changed.
func (l *State) ServiceState(id string) *ServiceState {
l.RLock()
defer l.RUnlock()
@ -334,7 +345,15 @@ func (l *State) SetServiceState(s *ServiceState) {
l.Lock()
defer l.Unlock()
s.WatchCh = make(chan struct{})
old, hasOld := l.services[s.Service.ID]
l.services[s.Service.ID] = s
if hasOld && old.WatchCh != nil {
close(old.WatchCh)
}
l.TriggerSyncChanges()
}
@ -408,13 +427,13 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
return nil
}
// AddAliasCheck creates an alias check. When any check for the srcServiceID
// is changed, checkID will reflect that using the same semantics as
// AddAliasCheck creates an alias check. When any check for the srcServiceID is
// changed, checkID will reflect that using the same semantics as
// checks.CheckAlias.
//
// This is a local optimization so that the Alias check doesn't need to
// use blocking queries against the remote server for check updates for
// local services.
// This is a local optimization so that the Alias check doesn't need to use
// blocking queries against the remote server for check updates for local
// services.
func (l *State) AddAliasCheck(checkID types.CheckID, srcServiceID string, notifyCh chan<- struct{}) error {
l.Lock()
defer l.Unlock()

View File

@ -417,6 +417,91 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
assert.Nil(servicesInSync(a.State, 3))
}
func TestAgent_ServiceWatchCh(t *testing.T) {
t.Parallel()
a := &agent.TestAgent{Name: t.Name()}
a.Start()
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
require := require.New(t)
// register a local service
srv1 := &structs.NodeService{
ID: "svc_id1",
Service: "svc1",
Tags: []string{"tag1"},
Port: 6100,
}
require.NoError(a.State.AddService(srv1, ""))
verifyState := func(ss *local.ServiceState) {
require.NotNil(ss)
require.NotNil(ss.WatchCh)
// Sanity check WatchCh blocks
select {
case <-ss.WatchCh:
t.Fatal("should block until service changes")
default:
}
}
// Should be able to get a ServiceState
ss := a.State.ServiceState(srv1.ID)
verifyState(ss)
// Update service in another go routine
go func() {
srv2 := srv1
srv2.Port = 6200
require.NoError(a.State.AddService(srv2, ""))
}()
// We should observe WatchCh close
select {
case <-ss.WatchCh:
// OK!
case <-time.After(500 * time.Millisecond):
t.Fatal("timeout waiting for WatchCh to close")
}
// Should also fire for state being set explicitly
ss = a.State.ServiceState(srv1.ID)
verifyState(ss)
go func() {
a.State.SetServiceState(&local.ServiceState{
Service: ss.Service,
Token: "foo",
})
}()
// We should observe WatchCh close
select {
case <-ss.WatchCh:
// OK!
case <-time.After(500 * time.Millisecond):
t.Fatal("timeout waiting for WatchCh to close")
}
// Should also fire for service being removed
ss = a.State.ServiceState(srv1.ID)
verifyState(ss)
go func() {
require.NoError(a.State.RemoveService(srv1.ID))
}()
// We should observe WatchCh close
select {
case <-ss.WatchCh:
// OK!
case <-time.After(500 * time.Millisecond):
t.Fatal("timeout waiting for WatchCh to close")
}
}
func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
t.Parallel()
a := &agent.TestAgent{Name: t.Name()}

View File

@ -20,6 +20,11 @@ const (
// EnvProxyToken is the name of the environment variable that is passed
// to managed proxies containing the proxy token.
EnvProxyToken = "CONNECT_PROXY_TOKEN"
// EnvSidecarFor is the name of the environment variable that is set for
// sidecar proxies containing the service ID of their target on the local
// agent
EnvSidecarFor = "CONNECT_SIDECAR_FOR"
)
// Proxy is the interface implemented by all types of managed proxies.

View File

@ -2,7 +2,6 @@ package agent
import (
"fmt"
"math/rand"
"time"
"github.com/hashicorp/consul/agent/structs"
@ -88,22 +87,47 @@ func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token str
// Allocate port if needed (min and max inclusive).
rangeLen := a.config.ConnectSidecarMaxPort - a.config.ConnectSidecarMinPort + 1
if sidecar.Port < 1 && a.config.ConnectSidecarMinPort > 0 && rangeLen > 0 {
// This should be a really short list so don't bother optimising lookup yet.
OUTER:
for _, offset := range rand.Perm(rangeLen) {
p := a.config.ConnectSidecarMinPort + offset
// See if this port was already allocated to another service
// This did pick at random which was simpler but consul reload would assign
// new ports to all the sidecar since it unloads all state and re-populates.
// It also made this more difficult to test (have to pin the range to one
// etc.). Instead we assign sequentially, but rather than N^2 lookups, just
// iterated services once and find the set of used ports in allocation
// range. We could maintain this state permenantly in agent but it doesn't
// seem to be necessary - even with thousands of services this is not
// expensive to compute.
usedPorts := make(map[int]struct{})
for _, otherNS := range a.State.Services() {
if otherNS.Port == p {
// already taken, skip to next random pick in the range
continue OUTER
// Check if other port is in auto-assign range
if otherNS.Port >= a.config.ConnectSidecarMinPort &&
otherNS.Port <= a.config.ConnectSidecarMaxPort {
if otherNS.ID == sidecar.ID {
// This sidecar is already registered with an auto-port and is just
// being updated so pick the same port as before rather than allocate
// a new one.
sidecar.Port = otherNS.Port
break
}
usedPorts[otherNS.Port] = struct{}{}
}
// We made it through all existing proxies without a match so claim this one
// Note that the proxy might already be registered with a port that was
// not in the auto range or the auto range has moved. In either case we
// want to allocate a new one so it's no different from ignoring that it
// already exists as we do now.
}
// Check we still need to assign a port and didn't find we already had one
// allocated.
if sidecar.Port < 1 {
// Iterate until we find lowest unused port
for p := a.config.ConnectSidecarMinPort; p <= a.config.ConnectSidecarMaxPort; p++ {
_, used := usedPorts[p]
if !used {
sidecar.Port = p
break
}
}
}
}
// If no ports left (or auto ports disabled) fail
if sidecar.Port < 1 {
// If ports are set to zero explicitly, config builder switches them to

View File

@ -1,6 +1,7 @@
package agent
import (
"fmt"
"testing"
"time"
@ -11,6 +12,7 @@ import (
func TestAgent_sidecarServiceFromNodeService(t *testing.T) {
tests := []struct {
name string
maxPort int
preRegister *structs.ServiceDefinition
sd *structs.ServiceDefinition
token string
@ -200,17 +202,74 @@ func TestAgent_sidecarServiceFromNodeService(t *testing.T) {
token: "foo",
wantErr: "reserved for internal use",
},
{
name: "re-registering same sidecar with no port should pick same one",
// Allow multiple ports to be sure we get the right one
maxPort: 2500,
// Pre register the sidecar we want
preRegister: &structs.ServiceDefinition{
Kind: structs.ServiceKindConnectProxy,
ID: "web1-sidecar-proxy",
Name: "web-sidecar-proxy",
Port: 2222,
Proxy: &structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web1",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 1111,
},
},
// Register same again but with different service port
sd: &structs.ServiceDefinition{
ID: "web1",
Name: "web",
Port: 1112,
Connect: &structs.ServiceConnect{
SidecarService: &structs.ServiceDefinition{},
},
},
token: "foo",
wantNS: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web1-sidecar-proxy",
Service: "web-sidecar-proxy",
Port: 2222, // Should claim the same port as before
LocallyRegisteredAsSidecar: true,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web1",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 1112,
},
},
wantChecks: []*structs.CheckType{
&structs.CheckType{
Name: "Connect Sidecar Listening",
TCP: "127.0.0.1:2222",
Interval: 10 * time.Second,
},
&structs.CheckType{
Name: "Connect Sidecar Aliasing web1",
AliasService: "web1",
},
},
wantToken: "foo",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Set port range to make it deterministic. This allows a single assigned
// port at 2222 thanks to being inclusive at both ends.
hcl := `
// Set port range to be tiny (one availabl) to test consuming all of it.
// This allows a single assigned port at 2222 thanks to being inclusive at
// both ends.
if tt.maxPort == 0 {
tt.maxPort = 2222
}
hcl := fmt.Sprintf(`
ports {
sidecar_min_port = 2222
sidecar_max_port = 2222
sidecar_max_port = %d
}
`
`, tt.maxPort)
if tt.autoPortsDisabled {
hcl = `
ports {

View File

@ -39,7 +39,7 @@ func TestUpstreams(t testing.T) Upstreams {
// TestUpstreams) and adds default values that are populated during
// refigistration. Use this for generating the expected Upstreams value after
// registration.
func TestAddDefaultsToUpstreams(t testing.T, upstreams []Upstream) []Upstream {
func TestAddDefaultsToUpstreams(t testing.T, upstreams []Upstream) Upstreams {
ups := make([]Upstream, len(upstreams))
for i := range upstreams {
ups[i] = upstreams[i]

View File

@ -82,6 +82,7 @@ type AgentService struct {
EnableTagOverride bool
CreateIndex uint64 `json:",omitempty"`
ModifyIndex uint64 `json:",omitempty"`
ContentHash string `json:",omitempty"`
// DEPRECATED (ProxyDestination) - remove this field
ProxyDestination string `json:",omitempty"`
Proxy *AgentServiceConnectProxyConfig `json:",omitempty"`
@ -262,8 +263,10 @@ type ConnectProxyConfig struct {
TargetServiceID string
TargetServiceName string
ContentHash string
ExecMode ProxyExecMode
Command []string
// DEPRECATED(managed-proxies) - this struct is re-used for sidecar configs
// but they don't need ExecMode or Command
ExecMode ProxyExecMode `json:",omitempty"`
Command []string `json:",omitempty"`
Config map[string]interface{}
Upstreams []Upstream
}
@ -384,6 +387,33 @@ func (a *Agent) Services() (map[string]*AgentService, error) {
return out, nil
}
// Service returns a locally registered service instance and allows for
// hash-based blocking.
//
// Note that this uses an unconventional blocking mechanism since it's
// agent-local state. That means there is no persistent raft index so we block
// based on object hash instead.
func (a *Agent) Service(serviceID string, q *QueryOptions) (*AgentService, *QueryMeta, error) {
r := a.c.newRequest("GET", "/v1/agent/service/"+serviceID)
r.setQueryOptions(q)
rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out *AgentService
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}
// Members returns the known gossip members. The WAN
// flag can be used to query a server for WAN members.
func (a *Agent) Members(wan bool) ([]*AgentMember, error) {

View File

@ -642,6 +642,57 @@ func TestAPI_AgentServices_MultipleChecks(t *testing.T) {
}
}
func TestAPI_AgentService(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
require := require.New(t)
reg := &AgentServiceRegistration{
Name: "foo",
Tags: []string{"bar", "baz"},
Port: 8000,
Checks: AgentServiceChecks{
&AgentServiceCheck{
TTL: "15s",
},
&AgentServiceCheck{
TTL: "30s",
},
},
}
require.NoError(agent.ServiceRegister(reg))
got, qm, err := agent.Service("foo", nil)
require.NoError(err)
expect := &AgentService{
ID: "foo",
Service: "foo",
Tags: []string{"bar", "baz"},
ContentHash: "bf5bd67c5d74b26d",
Port: 8000,
}
require.Equal(expect, got)
require.Equal(expect.ContentHash, qm.LastContentHash)
// Sanity check blocking behaviour - this is more thoroughly tested in the
// agent endpoint tests but this ensures that the API package is at least
// passing the hash param properly.
opts := QueryOptions{
WaitHash: "bf5bd67c5d74b26d",
WaitTime: 100 * time.Millisecond, // Just long enough to be reliably measurable
}
start := time.Now()
got, qm, err = agent.Service("foo", &opts)
elapsed := time.Since(start)
require.NoError(err)
require.True(elapsed >= opts.WaitTime)
}
func TestAPI_AgentSetTTLStatus(t *testing.T) {
t.Parallel()
c, s := makeClient(t)

View File

@ -11,6 +11,7 @@ import (
"os"
"sort"
"strconv"
"strings"
proxyAgent "github.com/hashicorp/consul/agent/proxyprocess"
"github.com/hashicorp/consul/api"
@ -51,6 +52,7 @@ type cmd struct {
logLevel string
cfgFile string
proxyID string
sidecarFor string
pprofAddr string
service string
serviceAddr string
@ -69,6 +71,12 @@ func (c *cmd) init() {
c.flags.StringVar(&c.proxyID, "proxy-id", "",
"The proxy's ID on the local agent.")
c.flags.StringVar(&c.sidecarFor, "sidecar-for", "",
"The ID of a service instance on the local agent that this proxy should "+
"become a sidecar for. It requires that the proxy service is registered "+
"with the agent as a connect-proxy with Proxy.DestinationServiceID set "+
"to this value. If more than one such proxy is registered it will fail.")
c.flags.StringVar(&c.logLevel, "log-level", "INFO",
"Specifies the log level.")
@ -118,6 +126,9 @@ func (c *cmd) Run(args []string) int {
if c.proxyID == "" {
c.proxyID = os.Getenv(proxyAgent.EnvProxyID)
}
if c.sidecarFor == "" {
c.sidecarFor = os.Getenv(proxyAgent.EnvSidecarFor)
}
if c.http.Token() == "" {
c.http.SetToken(os.Getenv(proxyAgent.EnvProxyToken))
}
@ -200,6 +211,32 @@ func (c *cmd) Run(args []string) int {
return 0
}
func (c *cmd) lookupProxyIDForSidecar(client *api.Client) (string, error) {
svcs, err := client.Agent().Services()
if err != nil {
return "", fmt.Errorf("Failed looking up sidecar proxy info for %s: %s",
c.sidecarFor, err)
}
var proxyIDs []string
for _, svc := range svcs {
if svc.Kind == api.ServiceKindConnectProxy && svc.Proxy != nil &&
strings.ToLower(svc.Proxy.DestinationServiceID) == c.sidecarFor {
proxyIDs = append(proxyIDs, svc.ID)
}
}
if len(proxyIDs) == 0 {
return "", fmt.Errorf("No sidecar proxy registereded for %s", c.sidecarFor)
}
if len(proxyIDs) > 1 {
return "", fmt.Errorf("More than one sidecar proxy registereded for %s.\n"+
" Start proxy with -proxy-id and one of the following IDs: %s",
c.sidecarFor, strings.Join(proxyIDs, ", "))
}
return proxyIDs[0], nil
}
func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error) {
// Use the configured proxy ID
if c.proxyID != "" {
@ -208,6 +245,21 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error)
return proxyImpl.NewAgentConfigWatcher(client, c.proxyID, c.logger)
}
if c.sidecarFor != "" {
// Running as a sidecar, we need to find the proxy-id for the requested
// service
var err error
c.proxyID, err = c.lookupProxyIDForSidecar(client)
if err != nil {
return nil, err
}
c.UI.Info("Configuration mode: Agent API")
c.UI.Info(fmt.Sprintf(" Sidecar for ID: %s", c.sidecarFor))
c.UI.Info(fmt.Sprintf(" Proxy ID: %s", c.proxyID))
return proxyImpl.NewAgentConfigWatcher(client, c.proxyID, c.logger)
}
// Otherwise, we're representing a manually specified service.
if c.service == "" {
return nil, fmt.Errorf(

View File

@ -17,24 +17,25 @@ func TestCommandConfigWatcher(t *testing.T) {
Name string
Flags []string
Test func(*testing.T, *proxy.Config)
WantErr string
}{
{
"-service flag only",
[]string{"-service", "web"},
func(t *testing.T, cfg *proxy.Config) {
Name: "-service flag only",
Flags: []string{"-service", "web"},
Test: func(t *testing.T, cfg *proxy.Config) {
require.Equal(t, 0, cfg.PublicListener.BindPort)
require.Len(t, cfg.Upstreams, 0)
},
},
{
"-service flag with upstreams",
[]string{
Name: "-service flag with upstreams",
Flags: []string{
"-service", "web",
"-upstream", "db:1234",
"-upstream", "db2:2345",
},
func(t *testing.T, cfg *proxy.Config) {
Test: func(t *testing.T, cfg *proxy.Config) {
require.Equal(t, 0, cfg.PublicListener.BindPort)
require.Len(t, cfg.Upstreams, 2)
require.Equal(t, 1234, cfg.Upstreams[0].LocalBindPort)
@ -43,9 +44,9 @@ func TestCommandConfigWatcher(t *testing.T) {
},
{
"-service flag with -service-addr",
[]string{"-service", "web"},
func(t *testing.T, cfg *proxy.Config) {
Name: "-service flag with -service-addr",
Flags: []string{"-service", "web"},
Test: func(t *testing.T, cfg *proxy.Config) {
// -service-addr has no affect since -listen isn't set
require.Equal(t, 0, cfg.PublicListener.BindPort)
require.Len(t, cfg.Upstreams, 0)
@ -53,13 +54,13 @@ func TestCommandConfigWatcher(t *testing.T) {
},
{
"-service, -service-addr, -listen",
[]string{
Name: "-service, -service-addr, -listen",
Flags: []string{
"-service", "web",
"-service-addr", "127.0.0.1:1234",
"-listen", ":4567",
},
func(t *testing.T, cfg *proxy.Config) {
Test: func(t *testing.T, cfg *proxy.Config) {
require.Len(t, cfg.Upstreams, 0)
require.Equal(t, "", cfg.PublicListener.BindAddress)
@ -67,13 +68,81 @@ func TestCommandConfigWatcher(t *testing.T) {
require.Equal(t, "127.0.0.1:1234", cfg.PublicListener.LocalServiceAddress)
},
},
{
Name: "-sidecar-for, no sidecar",
Flags: []string{
"-sidecar-for", "no-sidecar",
},
WantErr: "No sidecar proxy registered",
},
{
Name: "-sidecar-for, multiple sidecars",
Flags: []string{
"-sidecar-for", "two-sidecars",
},
// Order is non-deterministic so don't assert the list of proxy IDs here
WantErr: `More than one sidecar proxy registereded for two-sidecars.
Start proxy with -proxy-id and one of the following IDs: `,
},
{
Name: "-sidecar-for, non-existent",
Flags: []string{
"-sidecar-for", "foo",
},
WantErr: "No sidecar proxy registered",
},
{
Name: "-sidecar-for, one sidecar",
Flags: []string{
"-sidecar-for", "one-sidecar",
},
Test: func(t *testing.T, cfg *proxy.Config) {
// Sanity check we got the right instance.
require.Equal(t, 9999, cfg.PublicListener.BindPort)
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
a := agent.NewTestAgent(t.Name(), ``)
// Registere a few services with 0, 1 and 2 sidecars
a := agent.NewTestAgent(t.Name(), `
services {
name = "no-sidecar"
port = 1111
}
services {
name = "one-sidecar"
port = 2222
connect {
sidecar_service {
port = 9999
}
}
}
services {
name = "two-sidecars"
port = 3333
connect {
sidecar_service {}
}
}
services {
kind = "connect-proxy"
name = "other-sidecar-for-two-sidecars"
port = 4444
proxy {
destination_service_id = "two-sidecars"
destination_service_name = "two-sidecars"
}
}
`)
defer a.Shutdown()
client := a.Client()
@ -81,16 +150,24 @@ func TestCommandConfigWatcher(t *testing.T) {
c := New(ui, make(chan struct{}))
c.testNoStart = true
// Run and purposely fail the command
// Run the command
code := c.Run(append([]string{
"-http-addr=" + a.HTTPAddr(),
}, tc.Flags...))
if tc.WantErr == "" {
require.Equal(0, code, ui.ErrorWriter.String())
} else {
require.Equal(1, code, ui.ErrorWriter.String())
require.Contains(ui.ErrorWriter.String(), tc.WantErr)
return
}
// Get the configuration watcher
cw, err := c.configWatcher(client)
require.NoError(err)
if tc.Test != nil {
tc.Test(t, testConfig(t, cw))
}
})
}
}

View File

@ -135,7 +135,7 @@ func (c *cmd) Run(args []string) int {
return 1
}
if strings.HasPrefix(wp.Type, "connect_") {
if strings.HasPrefix(wp.Type, "connect_") || strings.HasPrefix(wp.Type, "agent_") {
c.UI.Error(fmt.Sprintf("Type %s is not supported in the CLI tool", wp.Type))
return 1
}

View File

@ -57,3 +57,23 @@ func TestWatchCommandNoConnect(t *testing.T) {
t.Fatalf("bad: %#v", ui.ErrorWriter.String())
}
}
func TestWatchCommandNoAgentService(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
ui := cli.NewMockUi()
c := New(ui, nil)
args := []string{"-http-addr=" + a.HTTPAddr(), "-type=agent_service"}
code := c.Run(args)
if code != 1 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
if !strings.Contains(ui.ErrorWriter.String(),
"Type agent_service is not supported in the CLI tool") {
t.Fatalf("bad: %#v", ui.ErrorWriter.String())
}
}

View File

@ -194,8 +194,8 @@ func NewAgentConfigWatcher(client *api.Client, proxyID string,
// Setup watch plan for config
plan, err := watch.Parse(map[string]interface{}{
"type": "connect_proxy_config",
"proxy_service_id": w.proxyID,
"type": "agent_service",
"service_id": w.proxyID,
})
if err != nil {
return nil, err
@ -209,20 +209,26 @@ func NewAgentConfigWatcher(client *api.Client, proxyID string,
func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal,
val interface{}) {
resp, ok := val.(*api.ConnectProxyConfig)
resp, ok := val.(*api.AgentService)
if !ok {
w.logger.Printf("[WARN] proxy config watch returned bad response: %v", val)
return
}
if resp.Kind != api.ServiceKindConnectProxy {
w.logger.Printf("[ERR] service with id %s is not a valid connect proxy",
w.proxyID)
return
}
// Create proxy config from the response
cfg := &Config{
// Token should be already setup in the client
ProxiedServiceName: resp.TargetServiceName,
ProxiedServiceName: resp.Proxy.DestinationServiceName,
ProxiedServiceNamespace: "default",
}
if tRaw, ok := resp.Config["telemetry"]; ok {
if tRaw, ok := resp.Proxy.Config["telemetry"]; ok {
err := mapstructure.Decode(tRaw, &cfg.Telemetry)
if err != nil {
w.logger.Printf("[WARN] proxy telemetry config failed to parse: %s", err)
@ -230,15 +236,18 @@ func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal,
}
// Unmarshal configs
err := mapstructure.Decode(resp.Config, &cfg.PublicListener)
err := mapstructure.Decode(resp.Proxy.Config, &cfg.PublicListener)
if err != nil {
w.logger.Printf("[ERR] proxy config watch public listener config "+
"couldn't be parsed: %s", err)
return
w.logger.Printf("[ERR] failed to parse public listener config: %s", err)
}
cfg.PublicListener.BindAddress = resp.Address
cfg.PublicListener.BindPort = resp.Port
cfg.PublicListener.LocalServiceAddress = fmt.Sprintf("%s:%d",
resp.Proxy.LocalServiceAddress, resp.Proxy.LocalServicePort)
cfg.PublicListener.applyDefaults()
for _, u := range resp.Upstreams {
for _, u := range resp.Proxy.Upstreams {
uc := UpstreamConfig(u)
uc.applyDefaults()
cfg.Upstreams = append(cfg.Upstreams, uc)

View File

@ -79,7 +79,7 @@ func TestUpstreamResolverFuncFromClient(t *testing.T) {
}
}
func TestAgentConfigWatcher(t *testing.T) {
func TestAgentConfigWatcherManagedProxy(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent("agent_smith", `
@ -95,7 +95,6 @@ func TestAgentConfigWatcher(t *testing.T) {
client := a.Client()
agent := client.Agent()
// Register a service with a proxy
// Register a local agent service with a managed proxy
reg := &api.AgentServiceRegistration{
Name: "web",
@ -178,6 +177,96 @@ func TestAgentConfigWatcher(t *testing.T) {
assert.Equal(t, expectCfg, cfg)
}
func TestAgentConfigWatcherSidecarProxy(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent("agent_smith", ``)
defer a.Shutdown()
client := a.Client()
agent := client.Agent()
// Register a local agent service with a managed proxy
reg := &api.AgentServiceRegistration{
Name: "web",
Port: 8080,
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Proxy: &api.AgentServiceConnectProxyConfig{
Config: map[string]interface{}{
"handshake_timeout_ms": 999,
},
Upstreams: []api.Upstream{
{
DestinationName: "db",
LocalBindPort: 9191,
},
},
},
},
},
}
err := agent.ServiceRegister(reg)
require.NoError(t, err)
w, err := NewAgentConfigWatcher(client, "web-sidecar-proxy",
log.New(os.Stderr, "", log.LstdFlags))
require.NoError(t, err)
cfg := testGetConfigValTimeout(t, w, 500*time.Millisecond)
expectCfg := &Config{
ProxiedServiceName: "web",
ProxiedServiceNamespace: "default",
PublicListener: PublicListenerConfig{
BindAddress: "0.0.0.0",
BindPort: 21000,
LocalServiceAddress: "127.0.0.1:8080",
HandshakeTimeoutMs: 999,
LocalConnectTimeoutMs: 1000, // from applyDefaults
},
Upstreams: []UpstreamConfig{
{
DestinationName: "db",
DestinationNamespace: "default",
DestinationType: "service",
LocalBindPort: 9191,
LocalBindAddress: "127.0.0.1",
},
},
}
require.Equal(t, expectCfg, cfg)
// Now keep watching and update the config.
go func() {
// Wait for watcher to be watching
time.Sleep(20 * time.Millisecond)
reg.Connect.SidecarService.Proxy.Upstreams = append(reg.Connect.SidecarService.Proxy.Upstreams,
api.Upstream{
DestinationName: "cache",
LocalBindPort: 9292,
LocalBindAddress: "127.10.10.10",
})
reg.Connect.SidecarService.Proxy.Config["local_connect_timeout_ms"] = 444
err := agent.ServiceRegister(reg)
require.NoError(t, err)
}()
cfg = testGetConfigValTimeout(t, w, 2*time.Second)
expectCfg.Upstreams = append(expectCfg.Upstreams, UpstreamConfig{
DestinationName: "cache",
DestinationNamespace: "default",
DestinationType: "service",
LocalBindPort: 9292,
LocalBindAddress: "127.10.10.10",
})
expectCfg.PublicListener.LocalConnectTimeoutMs = 444
assert.Equal(t, expectCfg, cfg)
}
func testGetConfigValTimeout(t *testing.T, w ConfigWatcher,
timeout time.Duration) *Config {
t.Helper()

View File

@ -1,7 +1,6 @@
package proxy
import (
"bytes"
"crypto/x509"
"log"
@ -69,12 +68,17 @@ func (p *Proxy) Serve() error {
go func() {
<-service.ReadyWait()
p.logger.Printf("[INFO] proxy loaded config and ready to serve")
p.logger.Printf("[INFO] Proxy loaded config and ready to serve")
tcfg := service.ServerTLSConfig()
cert, _ := tcfg.GetCertificate(nil)
leaf, _ := x509.ParseCertificate(cert.Certificate[0])
p.logger.Printf("[DEBUG] leaf: %s roots: %s", leaf.URIs[0],
bytes.Join(tcfg.RootCAs.Subjects(), []byte(",")))
p.logger.Printf("[INFO] TLS Identity: %s", leaf.URIs[0])
roots, err := connect.CommonNamesFromCertPool(tcfg.RootCAs)
if err != nil {
p.logger.Printf("[ERR] Failed to parse root subjects: %s", err)
} else {
p.logger.Printf("[INFO] TLS Roots : %v", roots)
}
// Only start a listener if we have a port set. This allows
// the configuration to disable our public listener.
@ -87,6 +91,7 @@ func (p *Proxy) Serve() error {
p.logger.Printf("[ERR] failed to start public listener: %s", err)
failCh <- err
}
}
}()
}

View File

@ -3,6 +3,8 @@ package connect
import (
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/asn1"
"errors"
"fmt"
"io/ioutil"
@ -107,6 +109,33 @@ func devTLSConfigFromFiles(caFile, certFile,
return cfg, nil
}
// PKIXNameFromRawSubject attempts to parse a DER encoded "Subject" as a PKIX
// Name. It's useful for inspecting root certificates in an x509.CertPool which
// only expose RawSubject via the Subjects method.
func PKIXNameFromRawSubject(raw []byte) (*pkix.Name, error) {
var subject pkix.RDNSequence
if _, err := asn1.Unmarshal(raw, &subject); err != nil {
return nil, err
}
var name pkix.Name
name.FillFromRDNSequence(&subject)
return &name, nil
}
// CommonNamesFromCertPool returns the common names of the certificates in the
// cert pool.
func CommonNamesFromCertPool(p *x509.CertPool) ([]string, error) {
var names []string
for _, rawSubj := range p.Subjects() {
n, err := PKIXNameFromRawSubject(rawSubj)
if err != nil {
return nil, err
}
names = append(names, n.CommonName)
}
return names, nil
}
// CertURIFromConn is a helper to extract the service identifier URI from a
// net.Conn. If the net.Conn is not a *tls.Conn then an error is always
// returned. If the *tls.Conn didn't present a valid connect certificate, or is

View File

@ -26,6 +26,7 @@ func init() {
"connect_roots": connectRootsWatch,
"connect_leaf": connectLeafWatch,
"connect_proxy_config": connectProxyConfigWatch,
"agent_service": agentServiceWatch,
}
}
@ -305,6 +306,33 @@ func connectProxyConfigWatch(params map[string]interface{}) (WatcherFunc, error)
return fn, nil
}
// agentServiceWatch is used to watch for changes to a single service instance
// on the local agent. Note that this state is agent-local so the watch
// mechanism uses `hash` rather than `index` for deciding whether to block.
func agentServiceWatch(params map[string]interface{}) (WatcherFunc, error) {
// We don't support consistency modes since it's agent local data
var serviceID string
if err := assignValue(params, "service_id", &serviceID); err != nil {
return nil, err
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
agent := p.client.Agent()
opts := makeQueryOptionsWithContext(p, false)
defer p.cancelFunc()
svc, _, err := agent.Service(serviceID, &opts)
if err != nil {
return nil, nil, err
}
// Return string ContentHash since we don't have Raft indexes to block on.
return WaitHashVal(svc.ContentHash), svc, err
}
return fn, nil
}
func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions {
ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel

View File

@ -736,6 +736,63 @@ func TestConnectProxyConfigWatch(t *testing.T) {
wg.Wait()
}
func TestAgentServiceWatch(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), ``)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register a local agent service
reg := &consulapi.AgentServiceRegistration{
Name: "web",
Port: 8080,
}
client := a.Client()
agent := client.Agent()
err := agent.ServiceRegister(reg)
require.NoError(t, err)
invoke := makeInvokeCh()
plan := mustParse(t, `{"type":"agent_service", "service_id":"web"}`)
plan.HybridHandler = func(blockParamVal watch.BlockingParamVal, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(*consulapi.AgentService)
if !ok || v == nil {
return // ignore
}
invoke <- nil
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(20 * time.Millisecond)
// Change the service definition
reg.Port = 9090
err := agent.ServiceRegister(reg)
require.NoError(t, err)
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(a.HTTPAddr()); err != nil {
t.Fatalf("err: %v", err)
}
}()
if err := <-invoke; err != nil {
t.Fatalf("err: %v", err)
}
plan.Stop()
wg.Wait()
}
func mustParse(t *testing.T, q string) *watch.Plan {
t.Helper()
var params map[string]interface{}