agent: Use an in-process listener with cache (#12762)

Uses a bufconn listener between consul-template and vault-agent when
caching is enabled and either templates or a listener is defined. This
means no listeners need to be defined in vault-agent for just
templating. Always routes consul-template through the vault-agent
cache (instead of only when persistent cache is enabled).

Uses a local transportDialer interface in config.Cache{}. 

Co-authored-by: Tom Proctor <tomhjp@users.noreply.github.com>
Co-authored-by: Ben Ash <32777270+benashz@users.noreply.github.com>
This commit is contained in:
Theron Voran 2021-10-15 17:22:19 -07:00 committed by GitHub
parent 3428de017a
commit ae79afdd26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 278 additions and 229 deletions

3
changelog/12762.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
agent/cache: Use an in-process listener between consul-template and vault-agent when caching is enabled and either templates or a listener is defined
```

View File

@ -2,6 +2,7 @@ package command
import (
"context"
"crypto/tls"
"flag"
"fmt"
"io"
@ -40,6 +41,8 @@ import (
"github.com/hashicorp/vault/command/agent/sink/inmem"
"github.com/hashicorp/vault/command/agent/template"
"github.com/hashicorp/vault/command/agent/winsvc"
"github.com/hashicorp/vault/internalshared/configutil"
"github.com/hashicorp/vault/internalshared/listenerutil"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/logical"
@ -48,6 +51,7 @@ import (
"github.com/mitchellh/cli"
"github.com/oklog/run"
"github.com/posener/complete"
"google.golang.org/grpc/test/bufconn"
)
var (
@ -470,7 +474,7 @@ func (c *AgentCommand) Run(args []string) int {
var leaseCache *cache.LeaseCache
var previousToken string
// Parse agent listener configurations
if config.Cache != nil && len(config.Listeners) != 0 {
if config.Cache != nil {
cacheLogger := c.logger.Named("cache")
// Create the API proxier
@ -666,11 +670,25 @@ func (c *AgentCommand) Run(args []string) int {
cacheHandler := cache.Handler(ctx, cacheLogger, leaseCache, inmemSink, proxyVaultToken)
var listeners []net.Listener
// If there are templates, add an in-process listener
if len(config.Templates) > 0 {
config.Listeners = append(config.Listeners, &configutil.Listener{Type: listenerutil.BufConnType})
}
for i, lnConfig := range config.Listeners {
ln, tlsConf, err := cache.StartListener(lnConfig)
if err != nil {
c.UI.Error(fmt.Sprintf("Error starting listener: %v", err))
return 1
var ln net.Listener
var tlsConf *tls.Config
if lnConfig.Type == listenerutil.BufConnType {
inProcListener := bufconn.Listen(1024 * 1024)
config.Cache.InProcDialer = listenerutil.NewBufConnWrapper(inProcListener)
ln = inProcListener
} else {
ln, tlsConf, err = cache.StartListener(lnConfig)
if err != nil {
c.UI.Error(fmt.Sprintf("Error starting listener: %v", err))
return 1
}
}
listeners = append(listeners, ln)

View File

@ -1,9 +1,11 @@
package config
import (
"context"
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"strings"
"time"
@ -64,14 +66,25 @@ type Vault struct {
Retry *Retry `hcl:"retry"`
}
// transportDialer is an interface that allows passing a custom dialer function
// to an HTTP client's transport config
type transportDialer interface {
// Dial is intended to match https://pkg.go.dev/net#Dialer.Dial
Dial(network, address string) (net.Conn, error)
// DialContext is intended to match https://pkg.go.dev/net#Dialer.DialContext
DialContext(ctx context.Context, network, address string) (net.Conn, error)
}
// Cache contains any configuration needed for Cache mode
type Cache struct {
UseAutoAuthTokenRaw interface{} `hcl:"use_auto_auth_token"`
UseAutoAuthToken bool `hcl:"-"`
ForceAutoAuthToken bool `hcl:"-"`
EnforceConsistency string `hcl:"enforce_consistency"`
WhenInconsistent string `hcl:"when_inconsistent"`
Persist *Persist `hcl:"persist"`
UseAutoAuthTokenRaw interface{} `hcl:"use_auto_auth_token"`
UseAutoAuthToken bool `hcl:"-"`
ForceAutoAuthToken bool `hcl:"-"`
EnforceConsistency string `hcl:"enforce_consistency"`
WhenInconsistent string `hcl:"when_inconsistent"`
Persist *Persist `hcl:"persist"`
InProcDialer transportDialer `hcl:"-"`
}
// Persist contains configuration needed for persistent caching
@ -203,8 +216,8 @@ func LoadConfig(path string) (*Config, error) {
}
if result.Cache != nil {
if len(result.Listeners) < 1 {
return nil, fmt.Errorf("at least one listener required when cache enabled")
if len(result.Listeners) < 1 && len(result.Templates) < 1 {
return nil, fmt.Errorf("enabling the cache requires at least 1 template or 1 listener to be defined")
}
if result.Cache.UseAutoAuthToken {

View File

@ -105,6 +105,74 @@ func TestLoadConfigFile_AgentCache(t *testing.T) {
}
}
func TestLoadConfigFile_AgentCache_NoListeners(t *testing.T) {
config, err := LoadConfig("./test-fixtures/config-cache-no-listeners.hcl")
if err != nil {
t.Fatal(err)
}
expected := &Config{
SharedConfig: &configutil.SharedConfig{
PidFile: "./pidfile",
},
AutoAuth: &AutoAuth{
Method: &Method{
Type: "aws",
MountPath: "auth/aws",
Config: map[string]interface{}{
"role": "foobar",
},
},
Sinks: []*Sink{
{
Type: "file",
DHType: "curve25519",
DHPath: "/tmp/file-foo-dhpath",
AAD: "foobar",
Config: map[string]interface{}{
"path": "/tmp/file-foo",
},
},
},
},
Cache: &Cache{
UseAutoAuthToken: true,
UseAutoAuthTokenRaw: true,
ForceAutoAuthToken: false,
Persist: &Persist{
Type: "kubernetes",
Path: "/vault/agent-cache/",
KeepAfterImport: true,
ExitOnErr: true,
ServiceAccountTokenFile: "/tmp/serviceaccount/token",
},
},
Vault: &Vault{
Address: "http://127.0.0.1:1111",
CACert: "config_ca_cert",
CAPath: "config_ca_path",
TLSSkipVerifyRaw: interface{}("true"),
TLSSkipVerify: true,
ClientCert: "config_client_cert",
ClientKey: "config_client_key",
Retry: &Retry{
NumRetries: 12,
},
},
Templates: []*ctconfig.TemplateConfig{
{
Source: pointerutil.StringPtr("/path/on/disk/to/template.ctmpl"),
Destination: pointerutil.StringPtr("/path/on/disk/where/template/will/render.txt"),
},
},
}
config.Prune()
if diff := deep.Equal(config, expected); diff != nil {
t.Fatal(diff)
}
}
func TestLoadConfigFile(t *testing.T) {
if err := os.Setenv("TEST_AAD_ENV", "aad"); err != nil {
t.Fatal(err)
@ -270,7 +338,7 @@ func TestLoadConfigFile_Bad_AgentCache_ForceAutoAuthNoMethod(t *testing.T) {
func TestLoadConfigFile_Bad_AgentCache_NoListeners(t *testing.T) {
_, err := LoadConfig("./test-fixtures/bad-config-cache-no-listeners.hcl")
if err == nil {
t.Fatal("LoadConfig should return an error when cache section present and no listeners present")
t.Fatal("LoadConfig should return an error when cache section present and no listeners present and no templates defined")
}
}

View File

@ -0,0 +1,45 @@
pid_file = "./pidfile"
auto_auth {
method {
type = "aws"
config = {
role = "foobar"
}
}
sink {
type = "file"
config = {
path = "/tmp/file-foo"
}
aad = "foobar"
dh_type = "curve25519"
dh_path = "/tmp/file-foo-dhpath"
}
}
cache {
use_auto_auth_token = true
persist = {
type = "kubernetes"
path = "/vault/agent-cache/"
keep_after_import = true
exit_on_err = true
service_account_token_file = "/tmp/serviceaccount/token"
}
}
vault {
address = "http://127.0.0.1:1111"
ca_cert = "config_ca_cert"
ca_path = "config_ca_path"
tls_skip_verify = "true"
client_cert = "config_client_cert"
client_key = "config_client_key"
}
template {
source = "/path/on/disk/to/template.ctmpl"
destination = "/path/on/disk/where/template/will/render.txt"
}

View File

@ -264,10 +264,7 @@ func newRunnerConfig(sc *ServerConfig, templates ctconfig.TemplateConfigs) (*ctc
}
// Use the cache if available or fallback to the Vault server values.
// For now we're only routing templating through the cache when persistence
// is enabled. The templating engine and the cache have some inconsistencies
// that need to be fixed for 1.7x/1.8
if sc.AgentConfig.Cache != nil && sc.AgentConfig.Cache.Persist != nil && len(sc.AgentConfig.Listeners) != 0 {
if sc.AgentConfig.Cache != nil {
attempts = 0
// If we don't want exit on template retry failure (i.e. unlimited
@ -283,23 +280,18 @@ func newRunnerConfig(sc *ServerConfig, templates ctconfig.TemplateConfigs) (*ctc
attempts = ctconfig.DefaultRetryAttempts
}
scheme := "unix://"
if sc.AgentConfig.Listeners[0].Type == "tcp" {
scheme = "https://"
if sc.AgentConfig.Listeners[0].TLSDisable {
scheme = "http://"
}
if sc.AgentConfig.Cache.InProcDialer == nil {
return nil, fmt.Errorf("missing in-process dialer configuration")
}
address := fmt.Sprintf("%s%s", scheme, sc.AgentConfig.Listeners[0].Address)
conf.Vault.Address = &address
if conf.Vault.Transport == nil {
conf.Vault.Transport = &ctconfig.TransportConfig{}
}
conf.Vault.Transport.CustomDialer = sc.AgentConfig.Cache.InProcDialer
// The in-process dialer ignores the address passed in, but we're still
// setting it here to override the setting at the top of this function,
// and to prevent the vault/http client from defaulting to https.
conf.Vault.Address = pointerutil.StringPtr("http://127.0.0.1:8200")
// Skip verification if its using the cache because they're part of the same agent.
if scheme == "https://" {
if sc.AgentConfig.Listeners[0].TLSRequireAndVerifyClientCert {
return nil, errors.New("template server cannot use local cache when mTLS is enabled")
}
conf.Vault.SSL.Verify = pointerutil.BoolPtr(false)
}
} else if strings.HasPrefix(sc.AgentConfig.Vault.Address, "https") || sc.AgentConfig.Vault.CACert != "" {
skipVerify := sc.AgentConfig.Vault.TLSSkipVerify
verify := !skipVerify

View File

@ -15,8 +15,12 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/command/agent/config"
"github.com/hashicorp/vault/internalshared/configutil"
"github.com/hashicorp/vault/internalshared/listenerutil"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/helper/pointerutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/test/bufconn"
)
// TestNewServer is a simple test to make sure NewServer returns a Server and
@ -77,44 +81,7 @@ func newAgentConfig(listeners []*configutil.Listener, enableCache, enablePersise
return agentConfig
}
func TestCacheConfigUnix(t *testing.T) {
listeners := []*configutil.Listener{
{
Type: "unix",
Address: "foobar",
TLSDisable: true,
SocketMode: "configmode",
SocketUser: "configuser",
SocketGroup: "configgroup",
},
{
Type: "tcp",
Address: "127.0.0.1:8300",
TLSDisable: true,
},
{
Type: "tcp",
Address: "127.0.0.1:8400",
TLSKeyFile: "/path/to/cakey.pem",
TLSCertFile: "/path/to/cacert.pem",
},
}
agentConfig := newAgentConfig(listeners, true, true)
serverConfig := ServerConfig{AgentConfig: agentConfig}
ctConfig, err := newRunnerConfig(&serverConfig, ctconfig.TemplateConfigs{})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
expected := "unix://foobar"
if *ctConfig.Vault.Address != expected {
t.Fatalf("expected %s, got %s", expected, *ctConfig.Vault.Address)
}
}
func TestCacheConfigHTTP(t *testing.T) {
func TestCacheConfig(t *testing.T) {
listeners := []*configutil.Listener{
{
Type: "tcp",
@ -137,132 +104,69 @@ func TestCacheConfigHTTP(t *testing.T) {
},
}
agentConfig := newAgentConfig(listeners, true, true)
serverConfig := ServerConfig{AgentConfig: agentConfig}
ctConfig, err := newRunnerConfig(&serverConfig, ctconfig.TemplateConfigs{})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
expected := "http://127.0.0.1:8300"
if *ctConfig.Vault.Address != expected {
t.Fatalf("expected %s, got %s", expected, *ctConfig.Vault.Address)
}
}
func TestCacheConfigHTTPS(t *testing.T) {
listeners := []*configutil.Listener{
{
Type: "tcp",
Address: "127.0.0.1:8300",
TLSKeyFile: "/path/to/cakey.pem",
TLSCertFile: "/path/to/cacert.pem",
cases := map[string]struct {
cacheEnabled bool
persistentCacheEnabled bool
setDialer bool
expectedErr string
expectCustomDialer bool
}{
"persistent_cache": {
cacheEnabled: true,
persistentCacheEnabled: true,
setDialer: true,
expectedErr: "",
expectCustomDialer: true,
},
{
Type: "unix",
Address: "foobar",
TLSDisable: true,
SocketMode: "configmode",
SocketUser: "configuser",
SocketGroup: "configgroup",
"memory_cache": {
cacheEnabled: true,
persistentCacheEnabled: false,
setDialer: true,
expectedErr: "",
expectCustomDialer: true,
},
{
Type: "tcp",
Address: "127.0.0.1:8400",
TLSDisable: true,
"no_cache": {
cacheEnabled: false,
persistentCacheEnabled: false,
setDialer: false,
expectedErr: "",
expectCustomDialer: false,
},
"cache_no_dialer": {
cacheEnabled: true,
persistentCacheEnabled: false,
setDialer: false,
expectedErr: "missing in-process dialer configuration",
expectCustomDialer: false,
},
}
agentConfig := newAgentConfig(listeners, true, true)
serverConfig := ServerConfig{AgentConfig: agentConfig}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
agentConfig := newAgentConfig(listeners, tc.cacheEnabled, tc.persistentCacheEnabled)
if tc.setDialer && tc.cacheEnabled {
bListener := bufconn.Listen(1024 * 1024)
defer bListener.Close()
agentConfig.Cache.InProcDialer = listenerutil.NewBufConnWrapper(bListener)
}
serverConfig := ServerConfig{AgentConfig: agentConfig}
ctConfig, err := newRunnerConfig(&serverConfig, ctconfig.TemplateConfigs{})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
ctConfig, err := newRunnerConfig(&serverConfig, ctconfig.TemplateConfigs{})
if len(tc.expectedErr) > 0 {
require.Error(t, err, tc.expectedErr)
return
}
expected := "https://127.0.0.1:8300"
if *ctConfig.Vault.Address != expected {
t.Fatalf("expected %s, got %s", expected, *ctConfig.Vault.Address)
}
require.NoError(t, err)
require.NotNil(t, ctConfig)
assert.Equal(t, tc.expectCustomDialer, ctConfig.Vault.Transport.CustomDialer != nil)
if *ctConfig.Vault.SSL.Verify {
t.Fatalf("expected %t, got %t", true, *ctConfig.Vault.SSL.Verify)
}
}
func TestCacheConfigNoCache(t *testing.T) {
listeners := []*configutil.Listener{
{
Type: "tcp",
Address: "127.0.0.1:8300",
TLSKeyFile: "/path/to/cakey.pem",
TLSCertFile: "/path/to/cacert.pem",
},
{
Type: "unix",
Address: "foobar",
TLSDisable: true,
SocketMode: "configmode",
SocketUser: "configuser",
SocketGroup: "configgroup",
},
{
Type: "tcp",
Address: "127.0.0.1:8400",
TLSDisable: true,
},
}
agentConfig := newAgentConfig(listeners, false, false)
serverConfig := ServerConfig{AgentConfig: agentConfig}
ctConfig, err := newRunnerConfig(&serverConfig, ctconfig.TemplateConfigs{})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
expected := "http://127.0.0.1:1111"
if *ctConfig.Vault.Address != expected {
t.Fatalf("expected %s, got %s", expected, *ctConfig.Vault.Address)
}
}
func TestCacheConfigNoPersistentCache(t *testing.T) {
listeners := []*configutil.Listener{
{
Type: "tcp",
Address: "127.0.0.1:8300",
TLSKeyFile: "/path/to/cakey.pem",
TLSCertFile: "/path/to/cacert.pem",
},
{
Type: "unix",
Address: "foobar",
TLSDisable: true,
SocketMode: "configmode",
SocketUser: "configuser",
SocketGroup: "configgroup",
},
{
Type: "tcp",
Address: "127.0.0.1:8400",
TLSDisable: true,
},
}
agentConfig := newAgentConfig(listeners, true, false)
serverConfig := ServerConfig{AgentConfig: agentConfig}
ctConfig, err := newRunnerConfig(&serverConfig, ctconfig.TemplateConfigs{})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
expected := "http://127.0.0.1:1111"
if *ctConfig.Vault.Address != expected {
t.Fatalf("expected %s, got %s", expected, *ctConfig.Vault.Address)
if tc.expectCustomDialer {
assert.Equal(t, "http://127.0.0.1:8200", *ctConfig.Vault.Address)
} else {
assert.Equal(t, "http://127.0.0.1:1111", *ctConfig.Vault.Address)
}
})
}
}
@ -270,6 +174,9 @@ func TestCacheConfigNoListener(t *testing.T) {
listeners := []*configutil.Listener{}
agentConfig := newAgentConfig(listeners, true, true)
bListener := bufconn.Listen(1024 * 1024)
defer bListener.Close()
agentConfig.Cache.InProcDialer = listenerutil.NewBufConnWrapper(bListener)
serverConfig := ServerConfig{AgentConfig: agentConfig}
ctConfig, err := newRunnerConfig(&serverConfig, ctconfig.TemplateConfigs{})
@ -277,43 +184,8 @@ func TestCacheConfigNoListener(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
expected := "http://127.0.0.1:1111"
if *ctConfig.Vault.Address != expected {
t.Fatalf("expected %s, got %s", expected, *ctConfig.Vault.Address)
}
}
func TestCacheConfigRejectMTLS(t *testing.T) {
listeners := []*configutil.Listener{
{
Type: "tcp",
Address: "127.0.0.1:8300",
TLSKeyFile: "/path/to/cakey.pem",
TLSCertFile: "/path/to/cacert.pem",
TLSRequireAndVerifyClientCert: true,
},
{
Type: "unix",
Address: "foobar",
TLSDisable: true,
SocketMode: "configmode",
SocketUser: "configuser",
SocketGroup: "configgroup",
},
{
Type: "tcp",
Address: "127.0.0.1:8400",
TLSDisable: true,
},
}
agentConfig := newAgentConfig(listeners, true, true)
serverConfig := ServerConfig{AgentConfig: agentConfig}
_, err := newRunnerConfig(&serverConfig, ctconfig.TemplateConfigs{})
if err == nil {
t.Fatal("expected error, got none")
}
assert.Equal(t, "http://127.0.0.1:8200", *ctConfig.Vault.Address)
assert.NotNil(t, ctConfig.Vault.Transport.CustomDialer)
}
func TestServerRun(t *testing.T) {

2
go.mod
View File

@ -60,7 +60,7 @@ require (
github.com/google/go-metrics-stackdriver v0.2.0
github.com/gorilla/mux v1.7.3 // indirect
github.com/hashicorp/cap v0.1.0
github.com/hashicorp/consul-template v0.27.1
github.com/hashicorp/consul-template v0.27.2-0.20211014231529-4ff55381f1c4
github.com/hashicorp/consul/api v1.11.0
github.com/hashicorp/errwrap v1.1.0
github.com/hashicorp/go-cleanhttp v0.5.2

4
go.sum
View File

@ -569,8 +569,8 @@ github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/cap v0.1.0 h1:uBDfu9NDvmotza/mJW6vtQId+VYid9ztlTnDCW6YUWU=
github.com/hashicorp/cap v0.1.0/go.mod h1:VfBvK2ULRyqsuqAnjgZl7HJ7/CGMC7ro4H5eXiZuun8=
github.com/hashicorp/consul-template v0.27.1 h1:VGQDW2DJeZnmtWO2KvEnMBGxmccGTASEW2DsHHz1QRg=
github.com/hashicorp/consul-template v0.27.1/go.mod h1:cAi5bOqno7Ao5sFHu7O80wMOPnqcF5ADrTApWU4Lqx4=
github.com/hashicorp/consul-template v0.27.2-0.20211014231529-4ff55381f1c4 h1:Heoq6IaSKwqOzAJMDg33LRu0GmNxVswQkIcREBFQD2E=
github.com/hashicorp/consul-template v0.27.2-0.20211014231529-4ff55381f1c4/go.mod h1:cAi5bOqno7Ao5sFHu7O80wMOPnqcF5ADrTApWU4Lqx4=
github.com/hashicorp/consul/api v1.4.0/go.mod h1:xc8u05kyMa3Wjr9eEAsIAo3dg8+LywT5E/Cl7cNS5nU=
github.com/hashicorp/consul/api v1.11.0 h1:Hw/G8TtRvOElqxVIhBzXciiSTbapq8hZ2XKZsXk5ZCE=
github.com/hashicorp/consul/api v1.11.0/go.mod h1:XjsvQN+RJGWI2TWy1/kqaE16HrR2J/FWgkYjdZQsX9M=

View File

@ -0,0 +1,38 @@
package listenerutil
import (
"context"
"net"
"google.golang.org/grpc/test/bufconn"
)
const BufConnType = "bufconn"
// BufConnWrapper implements consul-template's TransportDialer using a
// bufconn listener, to provide a way to Dial the in-memory listener
type BufConnWrapper struct {
listener *bufconn.Listener
}
// NewBufConnWrapper returns a new BufConnWrapper using an
// existing bufconn.Listener
func NewBufConnWrapper(bcl *bufconn.Listener) *BufConnWrapper {
return &BufConnWrapper{
listener: bcl,
}
}
// Dial connects to the listening end of the bufconn (satisfies
// consul-template's TransportDialer interface). This is essentially the client
// side of the bufconn connection.
func (bcl *BufConnWrapper) Dial(_, _ string) (net.Conn, error) {
return bcl.listener.Dial()
}
// DialContext connects to the listening end of the bufconn (satisfies
// consul-template's TransportDialer interface). This is essentially the client
// side of the bufconn connection.
func (bcl *BufConnWrapper) DialContext(ctx context.Context, _, _ string) (net.Conn, error) {
return bcl.listener.DialContext(ctx)
}