Agent Connect Proxy config endpoint with hash-based blocking

This commit is contained in:
Paul Banks 2018-04-18 21:05:30 +01:00 committed by Mitchell Hashimoto
parent c2266b134a
commit 44afb5c699
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
5 changed files with 386 additions and 45 deletions

View File

@ -7,6 +7,7 @@ import (
"net/url"
"strconv"
"strings"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/checks"
@ -26,6 +27,7 @@ import (
// NOTE(mitcehllh): This is temporary while certs are stubbed out.
"github.com/mitchellh/go-testing-interface"
"github.com/mitchellh/hashstructure"
)
type Self struct {
@ -896,6 +898,123 @@ func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.
return &reply, nil
}
// GET /v1/agent/connect/proxy/:proxy_service_id
//
// Returns the local proxy config for the identified proxy. Requires token=
// param with the correct local ProxyToken (not ACL token).
func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Get the proxy ID. Note that this is the ID of a proxy's service instance.
id := strings.TrimPrefix(req.URL.Path, "/v1/agent/connect/proxy/")
// Maybe block
var queryOpts structs.QueryOptions
if parseWait(resp, req, &queryOpts) {
// parseWait returns an error itself
return nil, nil
}
// Parse hash specially since it's only this endpoint that uses it currently.
// Eventually this should happen in parseWait and end up in QueryOptions but I
// didn't want to make very general changes right away.
hash := req.URL.Query().Get("hash")
return s.agentLocalBlockingQuery(hash, &queryOpts,
func(updateCh chan struct{}) (string, interface{}, error) {
// Retrieve the proxy specified
proxy := s.agent.State.Proxy(id)
if proxy == nil {
resp.WriteHeader(http.StatusNotFound)
fmt.Fprintf(resp, "unknown proxy service ID: %s", id)
return "", nil, nil
}
// Lookup the target service as a convenience
target := s.agent.State.Service(proxy.Proxy.TargetServiceID)
if target == nil {
// Not found since this endpoint is only useful for agent-managed proxies so
// service missing means the service was deregistered racily with this call.
resp.WriteHeader(http.StatusNotFound)
fmt.Fprintf(resp, "unknown target service ID: %s", proxy.Proxy.TargetServiceID)
return "", nil, nil
}
// Setup "watch" on the proxy being modified and respond on chan if it is.
go func() {
select {
case <-updateCh:
// blocking query timedout or was cancelled. Abort
return
case <-proxy.WatchCh:
// Proxy was updated or removed, report it
updateCh <- struct{}{}
}
}()
hash, err := hashstructure.Hash(proxy.Proxy, nil)
if err != nil {
return "", nil, err
}
contentHash := fmt.Sprintf("%x", hash)
reply := &structs.ConnectManageProxyResponse{
ProxyServiceID: proxy.Proxy.ProxyService.ID,
TargetServiceID: target.ID,
TargetServiceName: target.Service,
ContentHash: contentHash,
ExecMode: proxy.Proxy.ExecMode.String(),
Command: proxy.Proxy.Command,
Config: proxy.Proxy.Config,
}
return contentHash, reply, nil
})
return nil, nil
}
type agentLocalBlockingFunc func(updateCh chan struct{}) (string, interface{}, error)
func (s *HTTPServer) agentLocalBlockingQuery(hash string,
queryOpts *structs.QueryOptions, fn agentLocalBlockingFunc) (interface{}, error) {
var timer *time.Timer
if hash != "" {
// TODO(banks) at least define these defaults somewhere in a const. Would be
// nice not to duplicate the ones in consul/rpc.go too...
wait := queryOpts.MaxQueryTime
if wait == 0 {
wait = 5 * time.Minute
}
if wait > 10*time.Minute {
wait = 10 * time.Minute
}
// Apply a small amount of jitter to the request.
wait += lib.RandomStagger(wait / 16)
timer = time.NewTimer(wait)
}
ch := make(chan struct{})
for {
curHash, curResp, err := fn(ch)
if err != nil {
return curResp, err
}
// Hash was passed and matches current one, wait for update or timeout.
if timer != nil && hash == curHash {
select {
case <-ch:
// Update happened, loop to fetch a new value
continue
case <-timer.C:
// Timeout, stop the watcher goroutine and return what we have
close(ch)
break
}
}
return curResp, err
}
}
// AgentConnectAuthorize
//
// POST /v1/agent/connect/authorize

View File

@ -24,6 +24,7 @@ import (
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/copystructure"
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -1428,9 +1429,9 @@ func TestAgent_RegisterService_ManagedConnectProxy(t *testing.T) {
// Ensure proxy itself was registered
proxy := a.State.Proxy("web-proxy")
require.NotNil(proxy)
assert.Equal(structs.ProxyExecModeScript, proxy.ExecMode)
assert.Equal("proxy.sh", proxy.Command)
assert.Equal(args.Connect.Proxy.Config, proxy.Config)
assert.Equal(structs.ProxyExecModeScript, proxy.Proxy.ExecMode)
assert.Equal("proxy.sh", proxy.Proxy.Command)
assert.Equal(args.Connect.Proxy.Config, proxy.Proxy.Config)
// Ensure the token was configured
assert.Equal("abc123", a.State.ServiceToken("web"))
@ -2200,6 +2201,167 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
// TODO(mitchellh): verify the private key matches the cert
}
func TestAgentConnectProxy(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
// Define a local service with a managed proxy. It's registered in the test
// loop to make sure agent state is predictable whatever order tests execute
// since some alter this service config.
reg := &structs.ServiceDefinition{
Name: "test",
Address: "127.0.0.1",
Port: 8000,
Check: structs.CheckType{
TTL: 15 * time.Second,
},
Connect: &structs.ServiceDefinitionConnect{
Proxy: &structs.ServiceDefinitionConnectProxy{
Config: map[string]interface{}{
"bind_port": 1234,
"connect_timeout_ms": 500,
"upstreams": []map[string]interface{}{
{
"destination_name": "db",
"local_port": 3131,
},
},
},
},
},
}
expectedResponse := &structs.ConnectManageProxyResponse{
ProxyServiceID: "test-proxy",
TargetServiceID: "test",
TargetServiceName: "test",
ContentHash: "a15dccb216d38a6e",
ExecMode: "daemon",
Command: "",
Config: map[string]interface{}{
"upstreams": []interface{}{
map[string]interface{}{
"destination_name": "db",
"local_port": float64(3131),
},
},
"bind_port": float64(1234),
"connect_timeout_ms": float64(500),
},
}
ur, err := copystructure.Copy(expectedResponse)
require.NoError(t, err)
updatedResponse := ur.(*structs.ConnectManageProxyResponse)
updatedResponse.ContentHash = "22bc9233a52c08fd"
upstreams := updatedResponse.Config["upstreams"].([]interface{})
upstreams = append(upstreams,
map[string]interface{}{
"destination_name": "cache",
"local_port": float64(4242),
})
updatedResponse.Config["upstreams"] = upstreams
tests := []struct {
name string
url string
updateFunc func()
wantWait time.Duration
wantCode int
wantErr bool
wantResp *structs.ConnectManageProxyResponse
}{
{
name: "simple fetch",
url: "/v1/agent/connect/proxy/test-proxy",
wantCode: 200,
wantErr: false,
wantResp: expectedResponse,
},
{
name: "blocking fetch timeout, no change",
url: "/v1/agent/connect/proxy/test-proxy?hash=a15dccb216d38a6e&wait=100ms",
wantWait: 100 * time.Millisecond,
wantCode: 200,
wantErr: false,
wantResp: expectedResponse,
},
{
name: "blocking fetch old hash should return immediately",
url: "/v1/agent/connect/proxy/test-proxy?hash=123456789abcd&wait=10m",
wantCode: 200,
wantErr: false,
wantResp: expectedResponse,
},
{
name: "blocking fetch returns change",
url: "/v1/agent/connect/proxy/test-proxy?hash=a15dccb216d38a6e",
updateFunc: func() {
time.Sleep(100 * time.Millisecond)
// Re-register with new proxy config
r2, err := copystructure.Copy(reg)
require.NoError(t, err)
reg2 := r2.(*structs.ServiceDefinition)
reg2.Connect.Proxy.Config = updatedResponse.Config
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(r2))
resp := httptest.NewRecorder()
_, err = a.srv.AgentRegisterService(resp, req)
require.NoError(t, err)
require.Equal(t, 200, resp.Code, "body: %s", resp.Body.String())
},
wantWait: 100 * time.Millisecond,
wantCode: 200,
wantErr: false,
wantResp: updatedResponse,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
// Register the basic service to ensure it's in a known state to start.
{
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(reg))
resp := httptest.NewRecorder()
_, err := a.srv.AgentRegisterService(resp, req)
require.NoError(err)
require.Equal(200, resp.Code, "body: %s", resp.Body.String())
}
req, _ := http.NewRequest("GET", tt.url, nil)
resp := httptest.NewRecorder()
if tt.updateFunc != nil {
go tt.updateFunc()
}
start := time.Now()
obj, err := a.srv.AgentConnectProxyConfig(resp, req)
elapsed := time.Now().Sub(start)
if tt.wantErr {
require.Error(err)
} else {
require.NoError(err)
}
if tt.wantCode != 0 {
require.Equal(tt.wantCode, resp.Code, "body: %s", resp.Body.String())
}
if tt.wantWait != 0 {
assert.True(elapsed >= tt.wantWait, "should have waited at least %s, "+
"took %s", tt.wantWait, elapsed)
} else {
assert.True(elapsed < 10*time.Millisecond, "should not have waited, "+
"took %s", elapsed)
}
assert.Equal(tt.wantResp, obj)
})
}
}
func TestAgentConnectAuthorize_badBody(t *testing.T) {
t.Parallel()

View File

@ -125,6 +125,10 @@ type ManagedProxy struct {
// be exposed any other way. Unmanaged proxies will never see this and need to
// use service-scoped ACL tokens distributed externally.
ProxyToken string
// WatchCh is a close-only chan that is closed when the proxy is removed or
// updated.
WatchCh chan struct{}
}
// State is used to represent the node's services,
@ -171,7 +175,7 @@ type State struct {
// tokens contains the ACL tokens
tokens *token.Store
// managedProxies is a map of all manged connect proxies registered locally on
// managedProxies is a map of all managed connect proxies registered locally on
// this agent. This is NOT kept in sync with servers since it's agent-local
// config only. Proxy instances have separate service registrations in the
// services map above which are kept in sync via anti-entropy. Un-managed
@ -633,9 +637,17 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str
proxy.ProxyService = svc
// All set, add the proxy and return the service
if old, ok := l.managedProxies[svc.ID]; ok {
// Notify watchers of the existing proxy config that it's changing. Note
// this is safe here even before the map is updated since we still hold the
// state lock and the watcher can't re-read the new config until we return
// anyway.
close(old.WatchCh)
}
l.managedProxies[svc.ID] = &ManagedProxy{
Proxy: proxy,
ProxyToken: pToken,
WatchCh: make(chan struct{}),
}
// No need to trigger sync as proxy state is local only.
@ -653,49 +665,32 @@ func (l *State) RemoveProxy(id string) error {
}
delete(l.managedProxies, id)
// Notify watchers of the existing proxy config that it's changed.
close(p.WatchCh)
// No need to trigger sync as proxy state is local only.
return nil
}
// Proxy returns the local proxy state.
func (l *State) Proxy(id string) *structs.ConnectManagedProxy {
func (l *State) Proxy(id string) *ManagedProxy {
l.RLock()
defer l.RUnlock()
p := l.managedProxies[id]
if p == nil {
return nil
}
return p.Proxy
return l.managedProxies[id]
}
// Proxies returns the locally registered proxies.
func (l *State) Proxies() map[string]*structs.ConnectManagedProxy {
func (l *State) Proxies() map[string]*ManagedProxy {
l.RLock()
defer l.RUnlock()
m := make(map[string]*structs.ConnectManagedProxy)
m := make(map[string]*ManagedProxy)
for id, p := range l.managedProxies {
m[id] = p.Proxy
m[id] = p
}
return m
}
// ProxyToken returns the local proxy token for a given proxy. Note this is not
// an ACL token so it won't fallback to using the agent-configured default ACL
// token. If the proxy doesn't exist an error is returned, otherwise the token
// is guaranteed to exist.
func (l *State) ProxyToken(id string) (string, error) {
l.RLock()
defer l.RUnlock()
p := l.managedProxies[id]
if p == nil {
return "", fmt.Errorf("proxy %s not registered", id)
}
return p.ProxyToken, nil
}
// Metadata returns the local node metadata fields that the
// agent is aware of and are being kept in sync with the server
func (l *State) Metadata() map[string]string {

View File

@ -6,6 +6,7 @@ import (
"log"
"os"
"reflect"
"sync"
"testing"
"time"
@ -1673,8 +1674,8 @@ func TestStateProxyManagement(t *testing.T) {
t.Parallel()
state := local.NewState(local.Config{
ProxyPortRangeStart: 20000,
ProxyPortRangeEnd: 20002,
ProxyBindMinPort: 20000,
ProxyBindMaxPort: 20001,
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
// Stub state syncing
@ -1707,6 +1708,20 @@ func TestStateProxyManagement(t *testing.T) {
}, "fake-token-db")
require.NoError(err)
// Record initial local modify index
lastModifyIndex := state.LocalModifyIndex()
assertModIndexUpdate := func(id string) {
t.Helper()
nowIndex := state.LocalModifyIndex()
assert.True(lastModifyIndex < nowIndex)
if id != "" {
p := state.Proxy(id)
require.NotNil(p)
assert.True(lastModifyIndex < p.ModifyIndex)
}
lastModifyIndex = nowIndex
}
// Should work now
svc, err := state.AddProxy(&p1, "fake-token")
require.NoError(err)
@ -1718,6 +1733,7 @@ func TestStateProxyManagement(t *testing.T) {
assert.Equal("", svc.Address, "should have empty address by default")
// Port is non-deterministic but could be either of 20000 or 20001
assert.Contains([]int{20000, 20001}, svc.Port)
assertModIndexUpdate(svc.ID)
// Second proxy should claim other port
p2 := p1
@ -1726,10 +1742,10 @@ func TestStateProxyManagement(t *testing.T) {
require.NoError(err)
assert.Contains([]int{20000, 20001}, svc2.Port)
assert.NotEqual(svc.Port, svc2.Port)
assertModIndexUpdate(svc2.ID)
// Just saving this for later...
p2Token, err := state.ProxyToken(svc2.ID)
require.NoError(err)
// Store this for later
p2token := state.Proxy(svc2.ID).ProxyToken
// Third proxy should fail as all ports are used
p3 := p1
@ -1746,6 +1762,32 @@ func TestStateProxyManagement(t *testing.T) {
require.NoError(err)
require.Equal("0.0.0.0", svc3.Address)
require.Equal(1234, svc3.Port)
assertModIndexUpdate(svc3.ID)
// Update config of an already registered proxy should work
p3updated := p3
p3updated.Config["foo"] = "bar"
// Setup multiple watchers who should all witness the change
gotP3 := state.Proxy(svc3.ID)
require.NotNil(gotP3)
var watchWg sync.WaitGroup
for i := 0; i < 3; i++ {
watchWg.Add(1)
go func() {
<-gotP3.WatchCh
watchWg.Done()
}()
}
svc3, err = state.AddProxy(&p3updated, "fake-token")
require.NoError(err)
require.Equal("0.0.0.0", svc3.Address)
require.Equal(1234, svc3.Port)
gotProxy3 := state.Proxy(svc3.ID)
require.NotNil(gotProxy3)
require.Equal(p3updated.Config, gotProxy3.Proxy.Config)
assertModIndexUpdate(svc3.ID) // update must change mod index
// All watchers should have fired so this should not hang the test!
watchWg.Wait()
// Remove one of the auto-assigned proxies
err = state.RemoveProxy(svc2.ID)
@ -1758,31 +1800,29 @@ func TestStateProxyManagement(t *testing.T) {
require.NoError(err)
assert.Contains([]int{20000, 20001}, svc2.Port)
assert.Equal(svc4.Port, svc2.Port, "should get the same port back that we freed")
assertModIndexUpdate(svc4.ID)
// Remove a proxy that doesn't exist should error
err = state.RemoveProxy("nope")
require.Error(err)
assert.Equal(&p4, state.Proxy(p4.ProxyService.ID),
assert.Equal(&p4, state.Proxy(p4.ProxyService.ID).Proxy,
"should fetch the right proxy details")
assert.Nil(state.Proxy("nope"))
proxies := state.Proxies()
assert.Len(proxies, 3)
assert.Equal(&p1, proxies[svc.ID])
assert.Equal(&p4, proxies[svc4.ID])
assert.Equal(&p3, proxies[svc3.ID])
assert.Equal(&p1, proxies[svc.ID].Proxy)
assert.Equal(&p4, proxies[svc4.ID].Proxy)
assert.Equal(&p3, proxies[svc3.ID].Proxy)
tokens := make([]string, 4)
tokens[0], err = state.ProxyToken(svc.ID)
require.NoError(err)
tokens[0] = state.Proxy(svc.ID).ProxyToken
// p2 not registered anymore but lets make sure p4 got a new token when it
// re-registered with same ID.
tokens[1] = p2Token
tokens[2], err = state.ProxyToken(svc3.ID)
require.NoError(err)
tokens[3], err = state.ProxyToken(svc4.ID)
require.NoError(err)
tokens[1] = p2token
tokens[2] = state.Proxy(svc2.ID).ProxyToken
tokens[3] = state.Proxy(svc3.ID).ProxyToken
// Quick check all are distinct
for i := 0; i < len(tokens)-1; i++ {

View File

@ -32,6 +32,18 @@ const (
ProxyExecModeScript
)
// String implements Stringer
func (m ProxyExecMode) String() string {
switch m {
case ProxyExecModeDaemon:
return "daemon"
case ProxyExecModeScript:
return "script"
default:
return "unknown"
}
}
// ConnectManagedProxy represents the agent-local state for a configured proxy
// instance. This is never stored or sent to the servers and is only used to
// store the config for the proxy that the agent needs to track. For now it's
@ -91,3 +103,16 @@ func (p *ConnectManagedProxy) ParseConfig() (*ConnectManagedProxyConfig, error)
}
return &cfg, nil
}
// ConnectManageProxyResponse is the public response object we return for
// queries on local proxy config state. It's similar to ConnectManagedProxy but
// with some fields re-arranged.
type ConnectManageProxyResponse struct {
ProxyServiceID string
TargetServiceID string
TargetServiceName string
ContentHash string
ExecMode string
Command string
Config map[string]interface{}
}