533 lines
16 KiB
Go
533 lines
16 KiB
Go
package proxycfg
|
|
|
|
import (
|
|
"path"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/mitchellh/copystructure"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
"github.com/hashicorp/consul/agent/connect"
|
|
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
|
"github.com/hashicorp/consul/agent/local"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/token"
|
|
"github.com/hashicorp/consul/sdk/testutil"
|
|
)
|
|
|
|
func mustCopyProxyConfig(t *testing.T, ns *structs.NodeService) structs.ConnectProxyConfig {
|
|
cfg, err := copyProxyConfig(ns)
|
|
require.NoError(t, err)
|
|
return cfg
|
|
}
|
|
|
|
// assertLastReqArgs verifies that each request type had the correct source
|
|
// parameters (e.g. Datacenter name) and token.
|
|
func assertLastReqArgs(t *testing.T, types *TestCacheTypes, token string, source *structs.QuerySource) {
|
|
t.Helper()
|
|
// Roots needs correct DC and token
|
|
rootReq := types.roots.lastReq.Load()
|
|
require.IsType(t, rootReq, &structs.DCSpecificRequest{})
|
|
require.Equal(t, token, rootReq.(*structs.DCSpecificRequest).Token)
|
|
require.Equal(t, source.Datacenter, rootReq.(*structs.DCSpecificRequest).Datacenter)
|
|
|
|
// Leaf needs correct DC and token
|
|
leafReq := types.leaf.lastReq.Load()
|
|
require.IsType(t, leafReq, &cachetype.ConnectCALeafRequest{})
|
|
require.Equal(t, token, leafReq.(*cachetype.ConnectCALeafRequest).Token)
|
|
require.Equal(t, source.Datacenter, leafReq.(*cachetype.ConnectCALeafRequest).Datacenter)
|
|
|
|
// Intentions needs correct DC and token
|
|
intReq := types.intentions.lastReq.Load()
|
|
require.IsType(t, intReq, &structs.IntentionQueryRequest{})
|
|
require.Equal(t, token, intReq.(*structs.IntentionQueryRequest).Token)
|
|
require.Equal(t, source.Datacenter, intReq.(*structs.IntentionQueryRequest).Datacenter)
|
|
}
|
|
|
|
func TestManager_BasicLifecycle(t *testing.T) {
|
|
// Create a bunch of common data for the various test cases.
|
|
roots, leaf := TestCerts(t)
|
|
|
|
dbDefaultChain := func() *structs.CompiledDiscoveryChain {
|
|
return discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", connect.TestClusterID+".consul", "dc1",
|
|
func(req *discoverychain.CompileRequest) {
|
|
// This is because structs.TestUpstreams uses an opaque config
|
|
// to override connect timeouts.
|
|
req.OverrideConnectTimeout = 1 * time.Second
|
|
},
|
|
&structs.ServiceResolverConfigEntry{
|
|
Kind: structs.ServiceResolver,
|
|
Name: "db",
|
|
},
|
|
)
|
|
}
|
|
dbSplitChain := func() *structs.CompiledDiscoveryChain {
|
|
return discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", "trustdomain.consul", "dc1",
|
|
func(req *discoverychain.CompileRequest) {
|
|
// This is because structs.TestUpstreams uses an opaque config
|
|
// to override connect timeouts.
|
|
req.OverrideConnectTimeout = 1 * time.Second
|
|
},
|
|
&structs.ProxyConfigEntry{
|
|
Kind: structs.ProxyDefaults,
|
|
Name: structs.ProxyConfigGlobal,
|
|
Config: map[string]interface{}{
|
|
"protocol": "http",
|
|
},
|
|
},
|
|
&structs.ServiceResolverConfigEntry{
|
|
Kind: structs.ServiceResolver,
|
|
Name: "db",
|
|
Subsets: map[string]structs.ServiceResolverSubset{
|
|
"v1": {
|
|
Filter: "Service.Meta.version == v1",
|
|
},
|
|
"v2": {
|
|
Filter: "Service.Meta.version == v2",
|
|
},
|
|
},
|
|
},
|
|
&structs.ServiceSplitterConfigEntry{
|
|
Kind: structs.ServiceSplitter,
|
|
Name: "db",
|
|
Splits: []structs.ServiceSplit{
|
|
{Weight: 60, ServiceSubset: "v1"},
|
|
{Weight: 40, ServiceSubset: "v2"},
|
|
},
|
|
},
|
|
)
|
|
}
|
|
webProxy := &structs.NodeService{
|
|
Kind: structs.ServiceKindConnectProxy,
|
|
ID: "web-sidecar-proxy",
|
|
Service: "web-sidecar-proxy",
|
|
Port: 9999,
|
|
Meta: map[string]string{},
|
|
Proxy: structs.ConnectProxyConfig{
|
|
DestinationServiceID: "web",
|
|
DestinationServiceName: "web",
|
|
LocalServiceAddress: "127.0.0.1",
|
|
LocalServicePort: 8080,
|
|
Config: map[string]interface{}{
|
|
"foo": "bar",
|
|
},
|
|
Upstreams: structs.TestUpstreams(t),
|
|
},
|
|
}
|
|
|
|
rootsCacheKey := testGenCacheKey(&structs.DCSpecificRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token"},
|
|
})
|
|
leafCacheKey := testGenCacheKey(&cachetype.ConnectCALeafRequest{
|
|
Datacenter: "dc1",
|
|
Token: "my-token",
|
|
Service: "web",
|
|
})
|
|
intentionCacheKey := testGenCacheKey(&structs.IntentionQueryRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token"},
|
|
Match: &structs.IntentionQueryMatch{
|
|
Type: structs.IntentionMatchDestination,
|
|
Entries: []structs.IntentionMatchEntry{
|
|
{
|
|
Namespace: structs.IntentionDefaultNamespace,
|
|
Name: "web",
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
dbChainCacheKey := testGenCacheKey(&structs.DiscoveryChainRequest{
|
|
Name: "db",
|
|
EvaluateInDatacenter: "dc1",
|
|
EvaluateInNamespace: "default",
|
|
// This is because structs.TestUpstreams uses an opaque config
|
|
// to override connect timeouts.
|
|
OverrideConnectTimeout: 1 * time.Second,
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token"},
|
|
})
|
|
|
|
dbHealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token", Filter: ""},
|
|
ServiceName: "db",
|
|
Connect: true,
|
|
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
|
})
|
|
db_v1_HealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token",
|
|
Filter: "Service.Meta.version == v1",
|
|
},
|
|
ServiceName: "db",
|
|
Connect: true,
|
|
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
|
})
|
|
db_v2_HealthCacheKey := testGenCacheKey(&structs.ServiceSpecificRequest{
|
|
Datacenter: "dc1",
|
|
QueryOptions: structs.QueryOptions{Token: "my-token",
|
|
Filter: "Service.Meta.version == v2",
|
|
},
|
|
ServiceName: "db",
|
|
Connect: true,
|
|
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
|
})
|
|
|
|
// Create test cases using some of the common data above.
|
|
tests := []*testcase_BasicLifecycle{
|
|
{
|
|
name: "simple-default-resolver",
|
|
setup: func(t *testing.T, types *TestCacheTypes) {
|
|
// Note that we deliberately leave the 'geo-cache' prepared query to time out
|
|
types.health.Set(dbHealthCacheKey, &structs.IndexedCheckServiceNodes{
|
|
Nodes: TestUpstreamNodes(t),
|
|
})
|
|
types.compiledChain.Set(dbChainCacheKey, &structs.DiscoveryChainResponse{
|
|
Chain: dbDefaultChain(),
|
|
})
|
|
},
|
|
expectSnap: &ConfigSnapshot{
|
|
Kind: structs.ServiceKindConnectProxy,
|
|
Service: webProxy.Service,
|
|
ProxyID: webProxy.CompoundServiceID(),
|
|
Address: webProxy.Address,
|
|
Port: webProxy.Port,
|
|
Proxy: mustCopyProxyConfig(t, webProxy),
|
|
ServiceMeta: webProxy.Meta,
|
|
TaggedAddresses: make(map[string]structs.ServiceAddress),
|
|
Roots: roots,
|
|
ConnectProxy: configSnapshotConnectProxy{
|
|
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
|
|
Leaf: leaf,
|
|
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
|
|
"db": dbDefaultChain(),
|
|
},
|
|
WatchedUpstreams: nil, // Clone() clears this out
|
|
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
|
|
"db": {
|
|
"db.default.dc1": TestUpstreamNodes(t),
|
|
},
|
|
},
|
|
WatchedGateways: nil, // Clone() clears this out
|
|
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
|
|
"db": {},
|
|
},
|
|
},
|
|
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
|
|
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
|
|
},
|
|
Datacenter: "dc1",
|
|
},
|
|
},
|
|
{
|
|
name: "chain-resolver-with-version-split",
|
|
setup: func(t *testing.T, types *TestCacheTypes) {
|
|
// Note that we deliberately leave the 'geo-cache' prepared query to time out
|
|
types.health.Set(db_v1_HealthCacheKey, &structs.IndexedCheckServiceNodes{
|
|
Nodes: TestUpstreamNodes(t),
|
|
})
|
|
types.health.Set(db_v2_HealthCacheKey, &structs.IndexedCheckServiceNodes{
|
|
Nodes: TestUpstreamNodesAlternate(t),
|
|
})
|
|
types.compiledChain.Set(dbChainCacheKey, &structs.DiscoveryChainResponse{
|
|
Chain: dbSplitChain(),
|
|
})
|
|
},
|
|
expectSnap: &ConfigSnapshot{
|
|
Kind: structs.ServiceKindConnectProxy,
|
|
Service: webProxy.Service,
|
|
ProxyID: webProxy.CompoundServiceID(),
|
|
Address: webProxy.Address,
|
|
Port: webProxy.Port,
|
|
Proxy: mustCopyProxyConfig(t, webProxy),
|
|
ServiceMeta: webProxy.Meta,
|
|
TaggedAddresses: make(map[string]structs.ServiceAddress),
|
|
Roots: roots,
|
|
ConnectProxy: configSnapshotConnectProxy{
|
|
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
|
|
Leaf: leaf,
|
|
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
|
|
"db": dbSplitChain(),
|
|
},
|
|
WatchedUpstreams: nil, // Clone() clears this out
|
|
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
|
|
"db": {
|
|
"v1.db.default.dc1": TestUpstreamNodes(t),
|
|
"v2.db.default.dc1": TestUpstreamNodesAlternate(t),
|
|
},
|
|
},
|
|
WatchedGateways: nil, // Clone() clears this out
|
|
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
|
|
"db": {},
|
|
},
|
|
},
|
|
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
|
|
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
|
|
},
|
|
Datacenter: "dc1",
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
require.NotNil(t, tt.setup)
|
|
require.NotNil(t, tt.expectSnap)
|
|
|
|
// Use a mocked cache to make life simpler
|
|
types := NewTestCacheTypes(t)
|
|
|
|
// Setup initial values
|
|
types.roots.Set(rootsCacheKey, roots)
|
|
types.leaf.Set(leafCacheKey, leaf)
|
|
types.intentions.Set(intentionCacheKey, TestIntentions(t))
|
|
tt.setup(t, types)
|
|
|
|
expectSnapCopy, err := copystructure.Copy(tt.expectSnap)
|
|
require.NoError(t, err)
|
|
|
|
webProxyCopy, err := copystructure.Copy(webProxy)
|
|
require.NoError(t, err)
|
|
|
|
testManager_BasicLifecycle(t, tt, types,
|
|
rootsCacheKey, leafCacheKey,
|
|
roots, leaf,
|
|
webProxyCopy.(*structs.NodeService),
|
|
expectSnapCopy.(*ConfigSnapshot),
|
|
)
|
|
})
|
|
}
|
|
}
|
|
|
|
type testcase_BasicLifecycle struct {
|
|
name string
|
|
setup func(t *testing.T, types *TestCacheTypes)
|
|
webProxy *structs.NodeService
|
|
expectSnap *ConfigSnapshot
|
|
}
|
|
|
|
func testManager_BasicLifecycle(
|
|
t *testing.T,
|
|
tt *testcase_BasicLifecycle,
|
|
types *TestCacheTypes,
|
|
rootsCacheKey, leafCacheKey string,
|
|
roots *structs.IndexedCARoots,
|
|
leaf *structs.IssuedCert,
|
|
webProxy *structs.NodeService,
|
|
expectSnap *ConfigSnapshot,
|
|
) {
|
|
c := TestCacheWithTypes(t, types)
|
|
|
|
require := require.New(t)
|
|
logger := testutil.Logger(t)
|
|
state := local.NewState(local.Config{}, logger, &token.Store{})
|
|
source := &structs.QuerySource{
|
|
Node: "node1",
|
|
Datacenter: "dc1",
|
|
}
|
|
|
|
// Stub state syncing
|
|
state.TriggerSyncChanges = func() {}
|
|
|
|
// Create manager
|
|
m, err := NewManager(ManagerConfig{c, state, source, logger, nil})
|
|
require.NoError(err)
|
|
|
|
// And run it
|
|
go func() {
|
|
err := m.Run()
|
|
require.NoError(err)
|
|
}()
|
|
|
|
// BEFORE we register, we should be able to get a watch channel
|
|
wCh, cancel := m.Watch(webProxy.CompoundServiceID())
|
|
defer cancel()
|
|
|
|
// And it should block with nothing sent on it yet
|
|
assertWatchChanBlocks(t, wCh)
|
|
|
|
require.NoError(state.AddService(webProxy, "my-token"))
|
|
|
|
// We should see the initial config delivered but not until after the
|
|
// coalesce timeout
|
|
start := time.Now()
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
require.True(time.Since(start) >= coalesceTimeout)
|
|
|
|
assertLastReqArgs(t, types, "my-token", source)
|
|
|
|
// Update NodeConfig
|
|
webProxy.Port = 7777
|
|
require.NoError(state.AddService(webProxy, "my-token"))
|
|
|
|
expectSnap.Port = 7777
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
|
|
// Register a second watcher
|
|
wCh2, cancel2 := m.Watch(webProxy.CompoundServiceID())
|
|
defer cancel2()
|
|
|
|
// New watcher should immediately receive the current state
|
|
assertWatchChanRecvs(t, wCh2, expectSnap)
|
|
|
|
// Change token
|
|
require.NoError(state.AddService(webProxy, "other-token"))
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
assertWatchChanRecvs(t, wCh2, expectSnap)
|
|
|
|
// This is actually sort of timing dependent - the cache background fetcher
|
|
// will still be fetching with the old token, but we rely on the fact that our
|
|
// mock type will have been blocked on those for a while.
|
|
assertLastReqArgs(t, types, "other-token", source)
|
|
// Update roots
|
|
newRoots, newLeaf := TestCerts(t)
|
|
newRoots.Roots = append(newRoots.Roots, roots.Roots...)
|
|
types.roots.Set(rootsCacheKey, newRoots)
|
|
|
|
// Expect new roots in snapshot
|
|
expectSnap.Roots = newRoots
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
assertWatchChanRecvs(t, wCh2, expectSnap)
|
|
|
|
// Update leaf
|
|
types.leaf.Set(leafCacheKey, newLeaf)
|
|
|
|
// Expect new roots in snapshot
|
|
expectSnap.ConnectProxy.Leaf = newLeaf
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
assertWatchChanRecvs(t, wCh2, expectSnap)
|
|
|
|
// Remove the proxy
|
|
state.RemoveService(webProxy.CompoundServiceID())
|
|
|
|
// Chan should NOT close
|
|
assertWatchChanBlocks(t, wCh)
|
|
assertWatchChanBlocks(t, wCh2)
|
|
|
|
// Re-add the proxy with another new port
|
|
webProxy.Port = 3333
|
|
require.NoError(state.AddService(webProxy, "other-token"))
|
|
|
|
// Same watch chan should be notified again
|
|
expectSnap.Port = 3333
|
|
assertWatchChanRecvs(t, wCh, expectSnap)
|
|
assertWatchChanRecvs(t, wCh2, expectSnap)
|
|
|
|
// Cancel watch
|
|
cancel()
|
|
|
|
// Watch chan should be closed
|
|
assertWatchChanRecvs(t, wCh, nil)
|
|
|
|
// We specifically don't remove the proxy or cancel the second watcher to
|
|
// ensure both are cleaned up by close.
|
|
require.NoError(m.Close())
|
|
|
|
// Sanity check the state is clean
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
require.Len(m.proxies, 0)
|
|
require.Len(m.watchers, 0)
|
|
}
|
|
|
|
func assertWatchChanBlocks(t *testing.T, ch <-chan *ConfigSnapshot) {
|
|
t.Helper()
|
|
|
|
select {
|
|
case <-ch:
|
|
t.Fatal("Should be nothing sent on watch chan yet")
|
|
default:
|
|
}
|
|
}
|
|
|
|
func assertWatchChanRecvs(t *testing.T, ch <-chan *ConfigSnapshot, expect *ConfigSnapshot) {
|
|
t.Helper()
|
|
|
|
select {
|
|
case got, ok := <-ch:
|
|
require.Equal(t, expect, got)
|
|
if expect == nil {
|
|
require.False(t, ok, "watch chan should be closed")
|
|
}
|
|
case <-time.After(100*time.Millisecond + coalesceTimeout):
|
|
t.Fatal("recv timeout")
|
|
}
|
|
}
|
|
|
|
func TestManager_deliverLatest(t *testing.T) {
|
|
// None of these need to do anything to test this method just be valid
|
|
logger := testutil.Logger(t)
|
|
cfg := ManagerConfig{
|
|
Cache: cache.New(nil),
|
|
State: local.NewState(local.Config{}, logger, &token.Store{}),
|
|
Source: &structs.QuerySource{
|
|
Node: "node1",
|
|
Datacenter: "dc1",
|
|
},
|
|
Logger: logger,
|
|
}
|
|
require := require.New(t)
|
|
|
|
m, err := NewManager(cfg)
|
|
require.NoError(err)
|
|
|
|
snap1 := &ConfigSnapshot{
|
|
ProxyID: structs.NewServiceID("test-proxy", nil),
|
|
Port: 1111,
|
|
}
|
|
snap2 := &ConfigSnapshot{
|
|
ProxyID: structs.NewServiceID("test-proxy", nil),
|
|
Port: 2222,
|
|
}
|
|
|
|
// Put an overall time limit on this test case so we don't have to guard every
|
|
// call to ensure the whole test doesn't deadlock.
|
|
time.AfterFunc(100*time.Millisecond, func() {
|
|
t.Fatal("test timed out")
|
|
})
|
|
|
|
// test 1 buffered chan
|
|
ch1 := make(chan *ConfigSnapshot, 1)
|
|
|
|
// Sending to an unblocked chan should work
|
|
m.deliverLatest(snap1, ch1)
|
|
|
|
// Check it was delivered
|
|
require.Equal(snap1, <-ch1)
|
|
|
|
// Now send both without reading simulating a slow client
|
|
m.deliverLatest(snap1, ch1)
|
|
m.deliverLatest(snap2, ch1)
|
|
|
|
// Check we got the _second_ one
|
|
require.Equal(snap2, <-ch1)
|
|
|
|
// Same again for 5-buffered chan
|
|
ch5 := make(chan *ConfigSnapshot, 5)
|
|
|
|
// Sending to an unblocked chan should work
|
|
m.deliverLatest(snap1, ch5)
|
|
|
|
// Check it was delivered
|
|
require.Equal(snap1, <-ch5)
|
|
|
|
// Now send enough to fill the chan simulating a slow client
|
|
for i := 0; i < 5; i++ {
|
|
m.deliverLatest(snap1, ch5)
|
|
}
|
|
m.deliverLatest(snap2, ch5)
|
|
|
|
// Check we got the _second_ one
|
|
require.Equal(snap2, <-ch5)
|
|
}
|
|
|
|
func testGenCacheKey(req cache.Request) string {
|
|
info := req.CacheInfo()
|
|
return path.Join(info.Key, info.Datacenter)
|
|
}
|