Remove old connect client and proxy implementation

This commit is contained in:
Paul Banks 2018-04-03 12:55:50 +01:00 committed by Mitchell Hashimoto
parent 2d6a2ce1e3
commit 67669abf82
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
18 changed files with 0 additions and 1966 deletions

View File

@ -1,256 +0,0 @@
package connect
// import (
// "context"
// "crypto/tls"
// "fmt"
// "math/rand"
// "net"
// "github.com/hashicorp/consul/api"
// )
// // CertStatus indicates whether the Client currently has valid certificates for
// // incoming and outgoing connections.
// type CertStatus int
// const (
// // CertStatusUnknown is the zero value for CertStatus which may be returned
// // when a watch channel is closed on shutdown. It has no other meaning.
// CertStatusUnknown CertStatus = iota
// // CertStatusOK indicates the client has valid certificates and trust roots to
// // Authenticate incoming and outgoing connections.
// CertStatusOK
// // CertStatusPending indicates the client is waiting to be issued initial
// // certificates, or that it's certificates have expired and it's waiting to be
// // issued new ones. In this state all incoming and outgoing connections will
// // fail.
// CertStatusPending
// )
// func (s CertStatus) String() string {
// switch s {
// case CertStatusOK:
// return "OK"
// case CertStatusPending:
// return "pending"
// case CertStatusUnknown:
// fallthrough
// default:
// return "unknown"
// }
// }
// // Client is the interface a basic client implementation must support.
// type Client interface {
// // TODO(banks): build this and test it
// // CertStatus returns the current status of the client's certificates. It can
// // be used to determine if the Client is able to service requests at the
// // current time.
// //CertStatus() CertStatus
// // TODO(banks): build this and test it
// // WatchCertStatus returns a channel that is notified on all status changes.
// // Note that a message on the channel isn't guaranteed to be different so it's
// // value should be inspected. During Client shutdown the channel will be
// // closed returning a zero type which is equivalent to CertStatusUnknown.
// //WatchCertStatus() <-chan CertStatus
// // ServerTLSConfig returns the *tls.Config to be used when creating a TCP
// // listener that should accept Connect connections. It is likely that at
// // startup the tlsCfg returned will not be immediately usable since
// // certificates are typically fetched from the agent asynchronously. In this
// // case it's still safe to listen with the provided config, but auth failures
// // will occur until initial certificate discovery is complete. In general at
// // any time it is possible for certificates to expire before new replacements
// // have been issued due to local network errors so the server may not actually
// // have a working certificate configuration at any time, however as soon as
// // valid certs can be issued it will automatically start working again so
// // should take no action.
// ServerTLSConfig() (*tls.Config, error)
// // DialService opens a new connection to the named service registered in
// // Consul. It will perform service discovery to find healthy instances. If
// // there is an error during connection it is returned and the caller may call
// // again. The client implementation makes a best effort to make consecutive
// // Dials against different instances either by randomising the list and/or
// // maintaining a local memory of which instances recently failed. If the
// // context passed times out before connection is established and verified an
// // error is returned.
// DialService(ctx context.Context, namespace, name string) (net.Conn, error)
// // DialPreparedQuery opens a new connection by executing the named Prepared
// // Query against the local Consul agent, and picking one of the returned
// // instances to connect to. It will perform service discovery with the same
// // semantics as DialService.
// DialPreparedQuery(ctx context.Context, namespace, name string) (net.Conn, error)
// }
// /*
// Maybe also convenience wrappers for:
// - listening TLS conn with right config
// - http.ListenAndServeTLS equivalent
// */
// // AgentClient is the primary implementation of a connect.Client which
// // communicates with the local Consul agent.
// type AgentClient struct {
// agent *api.Client
// tlsCfg *ReloadableTLSConfig
// }
// // NewClient returns an AgentClient to allow consuming and providing
// // Connect-enabled network services.
// func NewClient(agent *api.Client) Client {
// // TODO(banks): hook up fetching certs from Agent and updating tlsCfg on cert
// // delivery/change. Perhaps need to make
// return &AgentClient{
// agent: agent,
// tlsCfg: NewReloadableTLSConfig(defaultTLSConfig()),
// }
// }
// // NewInsecureDevClientWithLocalCerts returns an AgentClient that will still do
// // service discovery via the local agent but will use externally provided
// // certificates and skip authorization. This is intended just for development
// // and must not be used ever in production.
// func NewInsecureDevClientWithLocalCerts(agent *api.Client, caFile, certFile,
// keyFile string) (Client, error) {
// cfg, err := devTLSConfigFromFiles(caFile, certFile, keyFile)
// if err != nil {
// return nil, err
// }
// return &AgentClient{
// agent: agent,
// tlsCfg: NewReloadableTLSConfig(cfg),
// }, nil
// }
// // ServerTLSConfig implements Client
// func (c *AgentClient) ServerTLSConfig() (*tls.Config, error) {
// return c.tlsCfg.ServerTLSConfig(), nil
// }
// // DialService implements Client
// func (c *AgentClient) DialService(ctx context.Context, namespace,
// name string) (net.Conn, error) {
// return c.dial(ctx, "service", namespace, name)
// }
// // DialPreparedQuery implements Client
// func (c *AgentClient) DialPreparedQuery(ctx context.Context, namespace,
// name string) (net.Conn, error) {
// return c.dial(ctx, "prepared_query", namespace, name)
// }
// func (c *AgentClient) dial(ctx context.Context, discoveryType, namespace,
// name string) (net.Conn, error) {
// svcs, err := c.discoverInstances(ctx, discoveryType, namespace, name)
// if err != nil {
// return nil, err
// }
// svc, err := c.pickInstance(svcs)
// if err != nil {
// return nil, err
// }
// if svc == nil {
// return nil, fmt.Errorf("no healthy services discovered")
// }
// // OK we have a service we can dial! We need a ClientAuther that will validate
// // the connection is legit.
// // TODO(banks): implement ClientAuther properly to actually verify connected
// // cert matches the expected service/cluster etc. based on svc.
// auther := &ClientAuther{}
// tlsConfig := c.tlsCfg.TLSConfig(auther)
// // Resolve address TODO(banks): I expected this to happen magically in the
// // agent at registration time if I register with no explicit address but
// // apparently doesn't. This is a quick hack to make it work for now, need to
// // see if there is a better shared code path for doing this.
// addr := svc.Service.Address
// if addr == "" {
// addr = svc.Node.Address
// }
// var dialer net.Dialer
// tcpConn, err := dialer.DialContext(ctx, "tcp",
// fmt.Sprintf("%s:%d", addr, svc.Service.Port))
// if err != nil {
// return nil, err
// }
// tlsConn := tls.Client(tcpConn, tlsConfig)
// err = tlsConn.Handshake()
// if err != nil {
// tlsConn.Close()
// return nil, err
// }
// return tlsConn, nil
// }
// // pickInstance returns an instance from the given list to try to connect to. It
// // may be made pluggable later, for now it just picks a random one regardless of
// // whether the list is already shuffled.
// func (c *AgentClient) pickInstance(svcs []*api.ServiceEntry) (*api.ServiceEntry, error) {
// if len(svcs) < 1 {
// return nil, nil
// }
// idx := rand.Intn(len(svcs))
// return svcs[idx], nil
// }
// // discoverInstances returns all instances for the given discoveryType,
// // namespace and name. The returned service entries may or may not be shuffled
// func (c *AgentClient) discoverInstances(ctx context.Context, discoverType,
// namespace, name string) ([]*api.ServiceEntry, error) {
// q := &api.QueryOptions{
// // TODO(banks): make this configurable?
// AllowStale: true,
// }
// q = q.WithContext(ctx)
// switch discoverType {
// case "service":
// svcs, _, err := c.agent.Health().Connect(name, "", true, q)
// if err != nil {
// return nil, err
// }
// return svcs, err
// case "prepared_query":
// // TODO(banks): it's not super clear to me how this should work eventually.
// // How do we distinguise between a PreparedQuery for the actual services and
// // one that should return the connect proxies where that differs? If we
// // can't then we end up with a janky UX where user specifies a reasonable
// // prepared query but we try to connect to non-connect services and fail
// // with a confusing TLS error. Maybe just a way to filter PreparedQuery
// // results by connect-enabled would be sufficient (or even metadata to do
// // that ourselves in the response although less efficient).
// resp, _, err := c.agent.PreparedQuery().Execute(name, q)
// if err != nil {
// return nil, err
// }
// // Awkward, we have a slice of api.ServiceEntry here but want a slice of
// // *api.ServiceEntry for compat with Connect/Service APIs. Have to convert
// // them to keep things type-happy.
// svcs := make([]*api.ServiceEntry, len(resp.Nodes))
// for idx, se := range resp.Nodes {
// svcs[idx] = &se
// }
// return svcs, err
// default:
// return nil, fmt.Errorf("unsupported discovery type: %s", discoverType)
// }
// }

View File

@ -1,148 +0,0 @@
package connect
// import (
// "context"
// "crypto/x509"
// "crypto/x509/pkix"
// "encoding/asn1"
// "io/ioutil"
// "net"
// "net/http"
// "net/http/httptest"
// "net/url"
// "strconv"
// "testing"
// "github.com/hashicorp/consul/api"
// "github.com/hashicorp/consul/testutil"
// "github.com/stretchr/testify/require"
// )
// func TestNewInsecureDevClientWithLocalCerts(t *testing.T) {
// agent, err := api.NewClient(api.DefaultConfig())
// require.Nil(t, err)
// got, err := NewInsecureDevClientWithLocalCerts(agent,
// "testdata/ca1-ca-consul-internal.cert.pem",
// "testdata/ca1-svc-web.cert.pem",
// "testdata/ca1-svc-web.key.pem",
// )
// require.Nil(t, err)
// // Sanity check correct certs were loaded
// serverCfg, err := got.ServerTLSConfig()
// require.Nil(t, err)
// caSubjects := serverCfg.RootCAs.Subjects()
// require.Len(t, caSubjects, 1)
// caSubject, err := testNameFromRawDN(caSubjects[0])
// require.Nil(t, err)
// require.Equal(t, "Consul Internal", caSubject.CommonName)
// require.Len(t, serverCfg.Certificates, 1)
// cert, err := x509.ParseCertificate(serverCfg.Certificates[0].Certificate[0])
// require.Nil(t, err)
// require.Equal(t, "web", cert.Subject.CommonName)
// }
// func testNameFromRawDN(raw []byte) (*pkix.Name, error) {
// var seq pkix.RDNSequence
// if _, err := asn1.Unmarshal(raw, &seq); err != nil {
// return nil, err
// }
// var name pkix.Name
// name.FillFromRDNSequence(&seq)
// return &name, nil
// }
// func testAgent(t *testing.T) (*testutil.TestServer, *api.Client) {
// t.Helper()
// // Make client config
// conf := api.DefaultConfig()
// // Create server
// server, err := testutil.NewTestServerConfigT(t, nil)
// require.Nil(t, err)
// conf.Address = server.HTTPAddr
// // Create client
// agent, err := api.NewClient(conf)
// require.Nil(t, err)
// return server, agent
// }
// func testService(t *testing.T, ca, name string, client *api.Client) *httptest.Server {
// t.Helper()
// // Run a test service to discover
// server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// w.Write([]byte("svc: " + name))
// }))
// server.TLS = TestTLSConfig(t, ca, name)
// server.StartTLS()
// u, err := url.Parse(server.URL)
// require.Nil(t, err)
// port, err := strconv.Atoi(u.Port())
// require.Nil(t, err)
// // If client is passed, register the test service instance
// if client != nil {
// svc := &api.AgentServiceRegistration{
// // TODO(banks): we don't really have a good way to represent
// // connect-native apps yet so we have to pretend out little server is a
// // proxy for now.
// Kind: api.ServiceKindConnectProxy,
// ProxyDestination: name,
// Name: name + "-proxy",
// Address: u.Hostname(),
// Port: port,
// }
// err := client.Agent().ServiceRegister(svc)
// require.Nil(t, err)
// }
// return server
// }
// func TestDialService(t *testing.T) {
// consulServer, agent := testAgent(t)
// defer consulServer.Stop()
// svc := testService(t, "ca1", "web", agent)
// defer svc.Close()
// c, err := NewInsecureDevClientWithLocalCerts(agent,
// "testdata/ca1-ca-consul-internal.cert.pem",
// "testdata/ca1-svc-web.cert.pem",
// "testdata/ca1-svc-web.key.pem",
// )
// require.Nil(t, err)
// conn, err := c.DialService(context.Background(), "default", "web")
// require.Nilf(t, err, "err: %s", err)
// // Inject our conn into http.Transport
// httpClient := &http.Client{
// Transport: &http.Transport{
// DialTLS: func(network, addr string) (net.Conn, error) {
// return conn, nil
// },
// },
// }
// // Don't be fooled the hostname here is ignored since we did the dialling
// // ourselves
// resp, err := httpClient.Get("https://web.connect.consul/")
// require.Nil(t, err)
// defer resp.Body.Close()
// body, err := ioutil.ReadAll(resp.Body)
// require.Nil(t, err)
// require.Equal(t, "svc: web", string(body))
// }

View File

@ -151,64 +151,3 @@ func (s *TestService) Close() {
close(s.stopChan)
}
}
/*
// TestCAPool returns an *x509.CertPool containing the named CA certs from the
// testdata dir.
func TestCAPool(t testing.T, caNames ...string) *x509.CertPool {
t.Helper()
pool := x509.NewCertPool()
for _, name := range caNames {
certs, err := filepath.Glob(testDataDir() + "/" + name + "-ca-*.cert.pem")
require.Nil(t, err)
for _, cert := range certs {
caPem, err := ioutil.ReadFile(cert)
require.Nil(t, err)
pool.AppendCertsFromPEM(caPem)
}
}
return pool
}
// TestSvcKeyPair returns an tls.Certificate containing both cert and private
// key for a given service under a given CA from the testdata dir.
func TestSvcKeyPair(t testing.T, ca, name string) tls.Certificate {
t.Helper()
prefix := fmt.Sprintf(testDataDir()+"/%s-svc-%s", ca, name)
cert, err := tls.LoadX509KeyPair(prefix+".cert.pem", prefix+".key.pem")
require.Nil(t, err)
return cert
}
// TestTLSConfig returns a *tls.Config suitable for use during tests.
func TestTLSConfig(t testing.T, ca, svc string) *tls.Config {
t.Helper()
return &tls.Config{
Certificates: []tls.Certificate{TestSvcKeyPair(t, ca, svc)},
MinVersion: tls.VersionTLS12,
RootCAs: TestCAPool(t, ca),
ClientCAs: TestCAPool(t, ca),
ClientAuth: tls.RequireAndVerifyClientCert,
// In real life we'll need to do this too since otherwise Go will attempt to
// verify DNS names match DNS SAN/CN which we don't want, but we'll hook
// VerifyPeerCertificates and do our own x509 path validation as well as
// AuthZ upcall. For now we are just testing the basic proxy mechanism so
// this is fine.
InsecureSkipVerify: true,
}
}
// TestAuther is a simple Auther implementation that does nothing but what you
// tell it to!
type TestAuther struct {
// Return is the value returned from an Auth() call. Set it to nil to have all
// certificates unconditionally accepted or a value to have them fail.
Return error
}
// Auth implements Auther
func (a *TestAuther) Auth(rawCerts [][]byte,
verifiedChains [][]*x509.Certificate) error {
return a.Return
}
*/

View File

@ -1,111 +0,0 @@
package proxy
import (
"io/ioutil"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/hcl"
)
// Config is the publicly configurable state for an entire proxy instance. It's
// mostly used as the format for the local-file config mode which is mostly for
// dev/testing. In normal use, different parts of this config are pulled from
// different locations (e.g. command line, agent config endpoint, agent
// certificate endpoints).
type Config struct {
// ProxyID is the identifier for this proxy as registered in Consul. It's only
// guaranteed to be unique per agent.
ProxyID string `json:"proxy_id" hcl:"proxy_id"`
// Token is the authentication token provided for queries to the local agent.
Token string `json:"token" hcl:"token"`
// ProxiedServiceName is the name of the service this proxy is representing.
ProxiedServiceName string `json:"proxied_service_name" hcl:"proxied_service_name"`
// ProxiedServiceNamespace is the namespace of the service this proxy is
// representing.
ProxiedServiceNamespace string `json:"proxied_service_namespace" hcl:"proxied_service_namespace"`
// PublicListener configures the mTLS listener.
PublicListener PublicListenerConfig `json:"public_listener" hcl:"public_listener"`
// Upstreams configures outgoing proxies for remote connect services.
Upstreams []UpstreamConfig `json:"upstreams" hcl:"upstreams"`
// DevCAFile allows passing the file path to PEM encoded root certificate
// bundle to be used in development instead of the ones supplied by Connect.
DevCAFile string `json:"dev_ca_file" hcl:"dev_ca_file"`
// DevServiceCertFile allows passing the file path to PEM encoded service
// certificate (client and server) to be used in development instead of the
// ones supplied by Connect.
DevServiceCertFile string `json:"dev_service_cert_file" hcl:"dev_service_cert_file"`
// DevServiceKeyFile allows passing the file path to PEM encoded service
// private key to be used in development instead of the ones supplied by
// Connect.
DevServiceKeyFile string `json:"dev_service_key_file" hcl:"dev_service_key_file"`
}
// ConfigWatcher is a simple interface to allow dynamic configurations from
// plugggable sources.
type ConfigWatcher interface {
// Watch returns a channel that will deliver new Configs if something external
// provokes it.
Watch() <-chan *Config
}
// StaticConfigWatcher is a simple ConfigWatcher that delivers a static Config
// once and then never changes it.
type StaticConfigWatcher struct {
ch chan *Config
}
// NewStaticConfigWatcher returns a ConfigWatcher for a config that never
// changes. It assumes only one "watcher" will ever call Watch. The config is
// delivered on the first call but will never be delivered again to allow
// callers to call repeatedly (e.g. select in a loop).
func NewStaticConfigWatcher(cfg *Config) *StaticConfigWatcher {
sc := &StaticConfigWatcher{
// Buffer it so we can queue up the config for first delivery.
ch: make(chan *Config, 1),
}
sc.ch <- cfg
return sc
}
// Watch implements ConfigWatcher on a static configuration for compatibility.
// It returns itself on the channel once and then leaves it open.
func (sc *StaticConfigWatcher) Watch() <-chan *Config {
return sc.ch
}
// ParseConfigFile parses proxy configuration form a file for local dev.
func ParseConfigFile(filename string) (*Config, error) {
bs, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
var cfg Config
err = hcl.Unmarshal(bs, &cfg)
if err != nil {
return nil, err
}
return &cfg, nil
}
// AgentConfigWatcher watches the local Consul agent for proxy config changes.
type AgentConfigWatcher struct {
client *api.Client
}
// Watch implements ConfigWatcher.
func (w *AgentConfigWatcher) Watch() <-chan *Config {
watch := make(chan *Config)
// TODO implement me
return watch
}

View File

@ -1,46 +0,0 @@
package proxy
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestParseConfigFile(t *testing.T) {
cfg, err := ParseConfigFile("testdata/config-kitchensink.hcl")
require.Nil(t, err)
expect := &Config{
ProxyID: "foo",
Token: "11111111-2222-3333-4444-555555555555",
ProxiedServiceName: "web",
ProxiedServiceNamespace: "default",
PublicListener: PublicListenerConfig{
BindAddress: ":9999",
LocalServiceAddress: "127.0.0.1:5000",
LocalConnectTimeoutMs: 1000,
HandshakeTimeoutMs: 5000,
},
Upstreams: []UpstreamConfig{
{
LocalBindAddress: "127.0.0.1:6000",
DestinationName: "db",
DestinationNamespace: "default",
DestinationType: "service",
ConnectTimeoutMs: 10000,
},
{
LocalBindAddress: "127.0.0.1:6001",
DestinationName: "geo-cache",
DestinationNamespace: "default",
DestinationType: "prepared_query",
ConnectTimeoutMs: 10000,
},
},
DevCAFile: "connect/testdata/ca1-ca-consul-internal.cert.pem",
DevServiceCertFile: "connect/testdata/ca1-svc-web.cert.pem",
DevServiceKeyFile: "connect/testdata/ca1-svc-web.key.pem",
}
require.Equal(t, expect, cfg)
}

View File

@ -1,48 +0,0 @@
package proxy
import (
"io"
"net"
"sync/atomic"
)
// Conn represents a single proxied TCP connection.
type Conn struct {
src, dst net.Conn
stopping int32
}
// NewConn returns a conn joining the two given net.Conn
func NewConn(src, dst net.Conn) *Conn {
return &Conn{
src: src,
dst: dst,
stopping: 0,
}
}
// Close closes both connections.
func (c *Conn) Close() {
atomic.StoreInt32(&c.stopping, 1)
c.src.Close()
c.dst.Close()
}
// CopyBytes will continuously copy bytes in both directions between src and dst
// until either connection is closed.
func (c *Conn) CopyBytes() error {
defer c.Close()
go func() {
// Need this since Copy is only guaranteed to stop when it's source reader
// (second arg) hits EOF or error but either conn might close first possibly
// causing this goroutine to exit but not the outer one. See TestSc
//defer c.Close()
io.Copy(c.dst, c.src)
}()
_, err := io.Copy(c.src, c.dst)
if atomic.LoadInt32(&c.stopping) == 1 {
return nil
}
return err
}

View File

@ -1,119 +0,0 @@
package proxy
import (
"bufio"
"net"
"testing"
"github.com/stretchr/testify/require"
)
// testConnSetup listens on a random TCP port and passes the accepted net.Conn
// back to test code on returned channel. It then creates a source and
// destination Conn. And a cleanup func
func testConnSetup(t *testing.T) (net.Conn, net.Conn, func()) {
t.Helper()
l, err := net.Listen("tcp", "localhost:0")
require.Nil(t, err)
ch := make(chan net.Conn, 1)
go func(ch chan net.Conn) {
src, err := l.Accept()
require.Nil(t, err)
ch <- src
}(ch)
dst, err := net.Dial("tcp", l.Addr().String())
require.Nil(t, err)
src := <-ch
stopper := func() {
l.Close()
src.Close()
dst.Close()
}
return src, dst, stopper
}
func TestConn(t *testing.T) {
src, dst, stop := testConnSetup(t)
defer stop()
c := NewConn(src, dst)
retCh := make(chan error, 1)
go func() {
retCh <- c.CopyBytes()
}()
srcR := bufio.NewReader(src)
dstR := bufio.NewReader(dst)
_, err := src.Write([]byte("ping 1\n"))
require.Nil(t, err)
_, err = dst.Write([]byte("ping 2\n"))
require.Nil(t, err)
got, err := dstR.ReadString('\n')
require.Equal(t, "ping 1\n", got)
got, err = srcR.ReadString('\n')
require.Equal(t, "ping 2\n", got)
_, err = src.Write([]byte("pong 1\n"))
require.Nil(t, err)
_, err = dst.Write([]byte("pong 2\n"))
require.Nil(t, err)
got, err = dstR.ReadString('\n')
require.Equal(t, "pong 1\n", got)
got, err = srcR.ReadString('\n')
require.Equal(t, "pong 2\n", got)
c.Close()
ret := <-retCh
require.Nil(t, ret, "Close() should not cause error return")
}
func TestConnSrcClosing(t *testing.T) {
src, dst, stop := testConnSetup(t)
defer stop()
c := NewConn(src, dst)
retCh := make(chan error, 1)
go func() {
retCh <- c.CopyBytes()
}()
// If we close the src conn, we expect CopyBytes to return and src to be
// closed too. No good way to assert that the conn is closed really other than
// assume the retCh receive will hand unless CopyBytes exits and that
// CopyBytes defers Closing both. i.e. if this test doesn't time out it's
// good!
src.Close()
<-retCh
}
func TestConnDstClosing(t *testing.T) {
src, dst, stop := testConnSetup(t)
defer stop()
c := NewConn(src, dst)
retCh := make(chan error, 1)
go func() {
retCh <- c.CopyBytes()
}()
// If we close the dst conn, we expect CopyBytes to return and src to be
// closed too. No good way to assert that the conn is closed really other than
// assume the retCh receive will hand unless CopyBytes exits and that
// CopyBytes defers Closing both. i.e. if this test doesn't time out it's
// good!
dst.Close()
<-retCh
}

View File

@ -1,140 +0,0 @@
package proxy
import (
"errors"
"log"
"os"
)
var (
// ErrExists is the error returned when adding a proxy that exists already.
ErrExists = errors.New("proxy with that name already exists")
// ErrNotExist is the error returned when removing a proxy that doesn't exist.
ErrNotExist = errors.New("proxy with that name doesn't exist")
)
// Manager implements the logic for configuring and running a set of proxiers.
// Typically it's used to run one PublicListener and zero or more Upstreams.
type Manager struct {
ch chan managerCmd
// stopped is used to signal the caller of StopAll when the run loop exits
// after stopping all runners. It's only closed.
stopped chan struct{}
// runners holds the currently running instances. It should only by accessed
// from within the `run` goroutine.
runners map[string]*Runner
logger *log.Logger
}
type managerCmd struct {
name string
p Proxier
errCh chan error
}
// NewManager creates a manager of proxier instances.
func NewManager() *Manager {
return NewManagerWithLogger(log.New(os.Stdout, "", log.LstdFlags))
}
// NewManagerWithLogger creates a manager of proxier instances with the
// specified logger.
func NewManagerWithLogger(logger *log.Logger) *Manager {
m := &Manager{
ch: make(chan managerCmd),
stopped: make(chan struct{}),
runners: make(map[string]*Runner),
logger: logger,
}
go m.run()
return m
}
// RunProxier starts a new Proxier instance in the manager. It is safe to call
// from separate goroutines. If there is already a running proxy with the same
// name it returns ErrExists.
func (m *Manager) RunProxier(name string, p Proxier) error {
cmd := managerCmd{
name: name,
p: p,
errCh: make(chan error),
}
m.ch <- cmd
return <-cmd.errCh
}
// StopProxier stops a Proxier instance by name. It is safe to call from
// separate goroutines. If the instance with that name doesn't exist it returns
// ErrNotExist.
func (m *Manager) StopProxier(name string) error {
cmd := managerCmd{
name: name,
p: nil,
errCh: make(chan error),
}
m.ch <- cmd
return <-cmd.errCh
}
// StopAll shuts down the manager instance and stops all running proxies. It is
// safe to call from any goroutine but must only be called once.
func (m *Manager) StopAll() error {
close(m.ch)
<-m.stopped
return nil
}
// run is the main manager processing loop. It keeps all actions in a single
// goroutine triggered by channel commands to keep it simple to reason about
// lifecycle events for each proxy.
func (m *Manager) run() {
defer close(m.stopped)
// range over channel blocks and loops on each message received until channel
// is closed.
for cmd := range m.ch {
if cmd.p == nil {
m.remove(&cmd)
} else {
m.add(&cmd)
}
}
// Shutting down, Stop all the runners
for _, r := range m.runners {
r.Stop()
}
}
// add the named proxier instance and stop it. Should only be called from the
// run loop.
func (m *Manager) add(cmd *managerCmd) {
// Check existing
if _, ok := m.runners[cmd.name]; ok {
cmd.errCh <- ErrExists
return
}
// Start new runner
r := NewRunnerWithLogger(cmd.name, cmd.p, m.logger)
m.runners[cmd.name] = r
go r.Listen()
cmd.errCh <- nil
}
// remove the named proxier instance and stop it. Should only be called from the
// run loop.
func (m *Manager) remove(cmd *managerCmd) {
// Fetch proxier by name
r, ok := m.runners[cmd.name]
if !ok {
cmd.errCh <- ErrNotExist
return
}
err := r.Stop()
delete(m.runners, cmd.name)
cmd.errCh <- err
}

View File

@ -1,76 +0,0 @@
package proxy
import (
"fmt"
"net"
"testing"
"github.com/stretchr/testify/require"
)
func TestManager(t *testing.T) {
m := NewManager()
addrs := TestLocalBindAddrs(t, 3)
for i := 0; i < len(addrs); i++ {
name := fmt.Sprintf("proxier-%d", i)
// Run proxy
err := m.RunProxier(name, &TestProxier{
Addr: addrs[i],
Prefix: name + ": ",
})
require.Nil(t, err)
}
// Make sure each one is echoing correctly now all are running
for i := 0; i < len(addrs); i++ {
conn, err := net.Dial("tcp", addrs[i])
require.Nil(t, err)
TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i))
conn.Close()
}
// Stop first proxier
err := m.StopProxier("proxier-0")
require.Nil(t, err)
// We should fail to dial it now. Note that Runner.Stop is synchronous so
// there should be a strong guarantee that it's stopped listening by now.
_, err = net.Dial("tcp", addrs[0])
require.NotNil(t, err)
// Rest of proxiers should still be running
for i := 1; i < len(addrs); i++ {
conn, err := net.Dial("tcp", addrs[i])
require.Nil(t, err)
TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i))
conn.Close()
}
// Stop non-existent proxier should fail
err = m.StopProxier("foo")
require.Equal(t, ErrNotExist, err)
// Add already-running proxier should fail
err = m.RunProxier("proxier-1", &TestProxier{})
require.Equal(t, ErrExists, err)
// But rest should stay running
for i := 1; i < len(addrs); i++ {
conn, err := net.Dial("tcp", addrs[i])
require.Nil(t, err)
TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i))
conn.Close()
}
// StopAll should stop everything
err = m.StopAll()
require.Nil(t, err)
// Verify failures
for i := 0; i < len(addrs); i++ {
_, err = net.Dial("tcp", addrs[i])
require.NotNilf(t, err, "proxier-%d should not be running", i)
}
}

View File

@ -1,32 +0,0 @@
package proxy
import (
"errors"
"net"
)
// ErrStopped is returned for operations on a proxy that is stopped
var ErrStopped = errors.New("stopped")
// ErrStopping is returned for operations on a proxy that is stopping
var ErrStopping = errors.New("stopping")
// Proxier is an interface for managing different proxy implementations in a
// standard way. We have at least two different types of Proxier implementations
// needed: one for the incoming mTLS -> local proxy and another for each
// "upstream" service the app needs to talk out to (which listens locally and
// performs service discovery to find a suitable remote service).
type Proxier interface {
// Listener returns a net.Listener that is open and ready for use, the Proxy
// manager will arrange accepting new connections from it and passing them to
// the handler method.
Listener() (net.Listener, error)
// HandleConn is called for each incoming connection accepted by the listener.
// It is called in it's own goroutine and should run until it hits an error.
// When stopping the Proxier, the manager will simply close the conn provided
// and expects an error to be eventually returned. Any time spent not blocked
// on the passed conn (for example doing service discovery) should therefore
// be time-bound so that shutdown can't stall forever.
HandleConn(conn net.Conn) error
}

View File

@ -1,112 +0,0 @@
package proxy
import (
"context"
"log"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect"
)
// Proxy implements the built-in connect proxy.
type Proxy struct {
proxyID, token string
connect connect.Client
manager *Manager
cfgWatch ConfigWatcher
cfg *Config
logger *log.Logger
}
// NewFromConfigFile returns a Proxy instance configured just from a local file.
// This is intended mostly for development and bypasses the normal mechanisms
// for fetching config and certificates from the local agent.
func NewFromConfigFile(client *api.Client, filename string,
logger *log.Logger) (*Proxy, error) {
cfg, err := ParseConfigFile(filename)
if err != nil {
return nil, err
}
connect, err := connect.NewInsecureDevClientWithLocalCerts(client,
cfg.DevCAFile, cfg.DevServiceCertFile, cfg.DevServiceKeyFile)
if err != nil {
return nil, err
}
p := &Proxy{
proxyID: cfg.ProxyID,
connect: connect,
manager: NewManagerWithLogger(logger),
cfgWatch: NewStaticConfigWatcher(cfg),
logger: logger,
}
return p, nil
}
// New returns a Proxy with the given id, consuming the provided (configured)
// agent. It is ready to Run().
func New(client *api.Client, proxyID string, logger *log.Logger) (*Proxy, error) {
p := &Proxy{
proxyID: proxyID,
connect: connect.NewClient(client),
manager: NewManagerWithLogger(logger),
cfgWatch: &AgentConfigWatcher{client: client},
logger: logger,
}
return p, nil
}
// Run the proxy instance until a fatal error occurs or ctx is cancelled.
func (p *Proxy) Run(ctx context.Context) error {
defer p.manager.StopAll()
// Watch for config changes (initial setup happens on first "change")
for {
select {
case newCfg := <-p.cfgWatch.Watch():
p.logger.Printf("[DEBUG] got new config")
if p.cfg == nil {
// Initial setup
err := p.startPublicListener(ctx, newCfg.PublicListener)
if err != nil {
return err
}
}
// TODO add/remove upstreams properly based on a diff with current
for _, uc := range newCfg.Upstreams {
uc.Client = p.connect
uc.logger = p.logger
err := p.manager.RunProxier(uc.String(), NewUpstream(uc))
if err == ErrExists {
continue
}
if err != nil {
p.logger.Printf("[ERR] failed to start upstream %s: %s", uc.String(),
err)
}
}
p.cfg = newCfg
case <-ctx.Done():
return nil
}
}
}
func (p *Proxy) startPublicListener(ctx context.Context,
cfg PublicListenerConfig) error {
// Get TLS creds
tlsCfg, err := p.connect.ServerTLSConfig()
if err != nil {
return err
}
cfg.TLSConfig = tlsCfg
cfg.logger = p.logger
return p.manager.RunProxier("public_listener", NewPublicListener(cfg))
}

View File

@ -1,119 +0,0 @@
package proxy
import (
"crypto/tls"
"fmt"
"log"
"net"
"os"
"time"
)
// PublicListener provides an implementation of Proxier that listens for inbound
// mTLS connections, authenticates them with the local agent, and if successful
// forwards them to the locally configured app.
type PublicListener struct {
cfg *PublicListenerConfig
}
// PublicListenerConfig contains the most basic parameters needed to start the
// proxy.
//
// Note that the tls.Configs here are expected to be "dynamic" in the sense that
// they are expected to use `GetConfigForClient` (added in go 1.8) to return
// dynamic config per connection if required.
type PublicListenerConfig struct {
// BindAddress is the host:port the public mTLS listener will bind to.
BindAddress string `json:"bind_address" hcl:"bind_address"`
// LocalServiceAddress is the host:port for the proxied application. This
// should be on loopback or otherwise protected as it's plain TCP.
LocalServiceAddress string `json:"local_service_address" hcl:"local_service_address"`
// TLSConfig config is used for the mTLS listener.
TLSConfig *tls.Config
// LocalConnectTimeout is the timeout for establishing connections with the
// local backend. Defaults to 1000 (1s).
LocalConnectTimeoutMs int `json:"local_connect_timeout_ms" hcl:"local_connect_timeout_ms"`
// HandshakeTimeout is the timeout for incoming mTLS clients to complete a
// handshake. Setting this low avoids DOS by malicious clients holding
// resources open. Defaults to 10000 (10s).
HandshakeTimeoutMs int `json:"handshake_timeout_ms" hcl:"handshake_timeout_ms"`
logger *log.Logger
}
func (plc *PublicListenerConfig) applyDefaults() {
if plc.LocalConnectTimeoutMs == 0 {
plc.LocalConnectTimeoutMs = 1000
}
if plc.HandshakeTimeoutMs == 0 {
plc.HandshakeTimeoutMs = 10000
}
if plc.logger == nil {
plc.logger = log.New(os.Stdout, "", log.LstdFlags)
}
}
// NewPublicListener returns a proxy instance with the given config.
func NewPublicListener(cfg PublicListenerConfig) *PublicListener {
p := &PublicListener{
cfg: &cfg,
}
p.cfg.applyDefaults()
return p
}
// Listener implements Proxier
func (p *PublicListener) Listener() (net.Listener, error) {
l, err := net.Listen("tcp", p.cfg.BindAddress)
if err != nil {
return nil, err
}
return tls.NewListener(l, p.cfg.TLSConfig), nil
}
// HandleConn implements Proxier
func (p *PublicListener) HandleConn(conn net.Conn) error {
defer conn.Close()
tlsConn, ok := conn.(*tls.Conn)
if !ok {
return fmt.Errorf("non-TLS conn")
}
// Setup Handshake timer
to := time.Duration(p.cfg.HandshakeTimeoutMs) * time.Millisecond
err := tlsConn.SetDeadline(time.Now().Add(to))
if err != nil {
return err
}
// Force TLS handshake so that abusive clients can't hold resources open
err = tlsConn.Handshake()
if err != nil {
return err
}
// Handshake OK, clear the deadline
err = tlsConn.SetDeadline(time.Time{})
if err != nil {
return err
}
// Huzzah, open a connection to the backend and let them talk
// TODO maybe add a connection pool here?
to = time.Duration(p.cfg.LocalConnectTimeoutMs) * time.Millisecond
dst, err := net.DialTimeout("tcp", p.cfg.LocalServiceAddress, to)
if err != nil {
return fmt.Errorf("failed dialling local app: %s", err)
}
p.cfg.logger.Printf("[DEBUG] accepted connection from %s", conn.RemoteAddr())
// Hand conn and dst over to Conn to manage the byte copying.
c := NewConn(conn, dst)
return c.CopyBytes()
}

View File

@ -1,38 +0,0 @@
package proxy
import (
"crypto/tls"
"testing"
"github.com/hashicorp/consul/connect"
"github.com/stretchr/testify/require"
)
func TestPublicListener(t *testing.T) {
addrs := TestLocalBindAddrs(t, 2)
cfg := PublicListenerConfig{
BindAddress: addrs[0],
LocalServiceAddress: addrs[1],
HandshakeTimeoutMs: 100,
LocalConnectTimeoutMs: 100,
TLSConfig: connect.TestTLSConfig(t, "ca1", "web"),
}
testApp, err := NewTestTCPServer(t, cfg.LocalServiceAddress)
require.Nil(t, err)
defer testApp.Close()
p := NewPublicListener(cfg)
// Run proxy
r := NewRunner("test", p)
go r.Listen()
defer r.Stop()
// Proxy and backend are running, play the part of a TLS client using same
// cert for now.
conn, err := tls.Dial("tcp", cfg.BindAddress, connect.TestTLSConfig(t, "ca1", "web"))
require.Nil(t, err)
TestEchoConn(t, conn, "")
}

View File

@ -1,118 +0,0 @@
package proxy
import (
"log"
"net"
"os"
"sync"
"sync/atomic"
)
// Runner manages the lifecycle of one Proxier.
type Runner struct {
name string
p Proxier
// Stopping is if a flag that is updated and read atomically
stopping int32
stopCh chan struct{}
// wg is used to signal back to Stop when all goroutines have stopped
wg sync.WaitGroup
logger *log.Logger
}
// NewRunner returns a Runner ready to Listen.
func NewRunner(name string, p Proxier) *Runner {
return NewRunnerWithLogger(name, p, log.New(os.Stdout, "", log.LstdFlags))
}
// NewRunnerWithLogger returns a Runner ready to Listen using the specified
// log.Logger.
func NewRunnerWithLogger(name string, p Proxier, logger *log.Logger) *Runner {
return &Runner{
name: name,
p: p,
stopCh: make(chan struct{}),
logger: logger,
}
}
// Listen starts the proxier instance. It blocks until a fatal error occurs or
// Stop() is called.
func (r *Runner) Listen() error {
if atomic.LoadInt32(&r.stopping) == 1 {
return ErrStopped
}
l, err := r.p.Listener()
if err != nil {
return err
}
r.logger.Printf("[INFO] proxy: %s listening on %s", r.name, l.Addr().String())
// Run goroutine that will close listener on stop
go func() {
<-r.stopCh
l.Close()
r.logger.Printf("[INFO] proxy: %s shutdown", r.name)
}()
// Add one for the accept loop
r.wg.Add(1)
defer r.wg.Done()
for {
conn, err := l.Accept()
if err != nil {
if atomic.LoadInt32(&r.stopping) == 1 {
return nil
}
return err
}
go r.handle(conn)
}
return nil
}
func (r *Runner) handle(conn net.Conn) {
r.wg.Add(1)
defer r.wg.Done()
// Start a goroutine that will watch for the Runner stopping and close the
// conn, or watch for the Proxier closing (e.g. because other end hung up) and
// stop the goroutine to avoid leaks
doneCh := make(chan struct{})
defer close(doneCh)
go func() {
select {
case <-r.stopCh:
r.logger.Printf("[DEBUG] proxy: %s: terminating conn", r.name)
conn.Close()
return
case <-doneCh:
// Connection is already closed, this goroutine not needed any more
return
}
}()
err := r.p.HandleConn(conn)
if err != nil {
r.logger.Printf("[DEBUG] proxy: %s: connection terminated: %s", r.name, err)
} else {
r.logger.Printf("[DEBUG] proxy: %s: connection terminated", r.name)
}
}
// Stop stops the Listener and closes any active connections immediately.
func (r *Runner) Stop() error {
old := atomic.SwapInt32(&r.stopping, 1)
if old == 0 {
close(r.stopCh)
}
r.wg.Wait()
return nil
}

View File

@ -1,36 +0,0 @@
# Example proxy config with everything specified
proxy_id = "foo"
token = "11111111-2222-3333-4444-555555555555"
proxied_service_name = "web"
proxied_service_namespace = "default"
# Assumes running consul in dev mode from the repo root...
dev_ca_file = "connect/testdata/ca1-ca-consul-internal.cert.pem"
dev_service_cert_file = "connect/testdata/ca1-svc-web.cert.pem"
dev_service_key_file = "connect/testdata/ca1-svc-web.key.pem"
public_listener {
bind_address = ":9999"
local_service_address = "127.0.0.1:5000"
local_connect_timeout_ms = 1000
handshake_timeout_ms = 5000
}
upstreams = [
{
local_bind_address = "127.0.0.1:6000"
destination_name = "db"
destination_namespace = "default"
destination_type = "service"
connect_timeout_ms = 10000
},
{
local_bind_address = "127.0.0.1:6001"
destination_name = "geo-cache"
destination_namespace = "default"
destination_type = "prepared_query"
connect_timeout_ms = 10000
}
]

View File

@ -1,170 +0,0 @@
package proxy
import (
"context"
"crypto/tls"
"fmt"
"io"
"log"
"net"
"sync/atomic"
"github.com/hashicorp/consul/lib/freeport"
"github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
)
// TestLocalBindAddrs returns n localhost address:port strings with free ports
// for binding test listeners to.
func TestLocalBindAddrs(t testing.T, n int) []string {
ports := freeport.GetT(t, n)
addrs := make([]string, n)
for i, p := range ports {
addrs[i] = fmt.Sprintf("localhost:%d", p)
}
return addrs
}
// TestTCPServer is a simple TCP echo server for use during tests.
type TestTCPServer struct {
l net.Listener
stopped int32
accepted, closed, active int32
}
// NewTestTCPServer opens as a listening socket on the given address and returns
// a TestTCPServer serving requests to it. The server is already started and can
// be stopped by calling Close().
func NewTestTCPServer(t testing.T, addr string) (*TestTCPServer, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
log.Printf("test tcp server listening on %s", addr)
s := &TestTCPServer{
l: l,
}
go s.accept()
return s, nil
}
// Close stops the server
func (s *TestTCPServer) Close() {
atomic.StoreInt32(&s.stopped, 1)
if s.l != nil {
s.l.Close()
}
}
func (s *TestTCPServer) accept() error {
for {
conn, err := s.l.Accept()
if err != nil {
if atomic.LoadInt32(&s.stopped) == 1 {
log.Printf("test tcp echo server %s stopped", s.l.Addr())
return nil
}
log.Printf("test tcp echo server %s failed: %s", s.l.Addr(), err)
return err
}
atomic.AddInt32(&s.accepted, 1)
atomic.AddInt32(&s.active, 1)
go func(c net.Conn) {
io.Copy(c, c)
atomic.AddInt32(&s.closed, 1)
atomic.AddInt32(&s.active, -1)
}(conn)
}
}
// TestEchoConn attempts to write some bytes to conn and expects to read them
// back within a short timeout (10ms). If prefix is not empty we expect it to be
// poresent at the start of all echoed responses (for example to distinguish
// between multiple echo server instances).
func TestEchoConn(t testing.T, conn net.Conn, prefix string) {
t.Helper()
// Write some bytes and read them back
n, err := conn.Write([]byte("Hello World"))
require.Equal(t, 11, n)
require.Nil(t, err)
expectLen := 11 + len(prefix)
buf := make([]byte, expectLen)
// read until our buffer is full - it might be separate packets if prefix is
// in use.
got := 0
for got < expectLen {
n, err = conn.Read(buf[got:])
require.Nil(t, err)
got += n
}
require.Equal(t, expectLen, got)
require.Equal(t, prefix+"Hello World", string(buf[:]))
}
// TestConnectClient is a testing mock that implements connect.Client but
// stubs the methods to make testing simpler.
type TestConnectClient struct {
Server *TestTCPServer
TLSConfig *tls.Config
Calls []callTuple
}
type callTuple struct {
typ, ns, name string
}
// ServerTLSConfig implements connect.Client
func (c *TestConnectClient) ServerTLSConfig() (*tls.Config, error) {
return c.TLSConfig, nil
}
// DialService implements connect.Client
func (c *TestConnectClient) DialService(ctx context.Context, namespace,
name string) (net.Conn, error) {
c.Calls = append(c.Calls, callTuple{"service", namespace, name})
// Actually returning a vanilla TCP conn not a TLS one but the caller
// shouldn't care for tests since this interface should hide all the TLS
// config and verification.
return net.Dial("tcp", c.Server.l.Addr().String())
}
// DialPreparedQuery implements connect.Client
func (c *TestConnectClient) DialPreparedQuery(ctx context.Context, namespace,
name string) (net.Conn, error) {
c.Calls = append(c.Calls, callTuple{"prepared_query", namespace, name})
// Actually returning a vanilla TCP conn not a TLS one but the caller
// shouldn't care for tests since this interface should hide all the TLS
// config and verification.
return net.Dial("tcp", c.Server.l.Addr().String())
}
// TestProxier is a simple Proxier instance that can be used in tests.
type TestProxier struct {
// Addr to listen on
Addr string
// Prefix to write first before echoing on new connections
Prefix string
}
// Listener implements Proxier
func (p *TestProxier) Listener() (net.Listener, error) {
return net.Listen("tcp", p.Addr)
}
// HandleConn implements Proxier
func (p *TestProxier) HandleConn(conn net.Conn) error {
_, err := conn.Write([]byte(p.Prefix))
if err != nil {
return err
}
_, err = io.Copy(conn, conn)
return err
}

View File

@ -1,261 +0,0 @@
package proxy
import (
"context"
"fmt"
"log"
"net"
"os"
"time"
"github.com/hashicorp/consul/connect"
)
// Upstream provides an implementation of Proxier that listens for inbound TCP
// connections on the private network shared with the proxied application
// (typically localhost). For each accepted connection from the app, it uses the
// connect.Client to discover an instance and connect over mTLS.
type Upstream struct {
cfg *UpstreamConfig
}
// UpstreamConfig configures the upstream
type UpstreamConfig struct {
// Client is the connect client to perform discovery with
Client connect.Client
// LocalAddress is the host:port to listen on for local app connections.
LocalBindAddress string `json:"local_bind_address" hcl:"local_bind_address,attr"`
// DestinationName is the service name of the destination.
DestinationName string `json:"destination_name" hcl:"destination_name,attr"`
// DestinationNamespace is the namespace of the destination.
DestinationNamespace string `json:"destination_namespace" hcl:"destination_namespace,attr"`
// DestinationType determines which service discovery method is used to find a
// candidate instance to connect to.
DestinationType string `json:"destination_type" hcl:"destination_type,attr"`
// ConnectTimeout is the timeout for establishing connections with the remote
// service instance. Defaults to 10,000 (10s).
ConnectTimeoutMs int `json:"connect_timeout_ms" hcl:"connect_timeout_ms,attr"`
logger *log.Logger
}
func (uc *UpstreamConfig) applyDefaults() {
if uc.ConnectTimeoutMs == 0 {
uc.ConnectTimeoutMs = 10000
}
if uc.logger == nil {
uc.logger = log.New(os.Stdout, "", log.LstdFlags)
}
}
// String returns a string that uniquely identifies the Upstream. Used for
// identifying the upstream in log output and map keys.
func (uc *UpstreamConfig) String() string {
return fmt.Sprintf("%s->%s:%s/%s", uc.LocalBindAddress, uc.DestinationType,
uc.DestinationNamespace, uc.DestinationName)
}
// NewUpstream returns an outgoing proxy instance with the given config.
func NewUpstream(cfg UpstreamConfig) *Upstream {
u := &Upstream{
cfg: &cfg,
}
u.cfg.applyDefaults()
return u
}
// String returns a string that uniquely identifies the Upstream. Used for
// identifying the upstream in log output and map keys.
func (u *Upstream) String() string {
return u.cfg.String()
}
// Listener implements Proxier
func (u *Upstream) Listener() (net.Listener, error) {
return net.Listen("tcp", u.cfg.LocalBindAddress)
}
// HandleConn implements Proxier
func (u *Upstream) HandleConn(conn net.Conn) error {
defer conn.Close()
// Discover destination instance
dst, err := u.discoverAndDial()
if err != nil {
return err
}
// Hand conn and dst over to Conn to manage the byte copying.
c := NewConn(conn, dst)
return c.CopyBytes()
}
func (u *Upstream) discoverAndDial() (net.Conn, error) {
to := time.Duration(u.cfg.ConnectTimeoutMs) * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), to)
defer cancel()
switch u.cfg.DestinationType {
case "service":
return u.cfg.Client.DialService(ctx, u.cfg.DestinationNamespace,
u.cfg.DestinationName)
case "prepared_query":
return u.cfg.Client.DialPreparedQuery(ctx, u.cfg.DestinationNamespace,
u.cfg.DestinationName)
default:
return nil, fmt.Errorf("invalid destination type %s", u.cfg.DestinationType)
}
}
/*
// Upstream represents a service that the proxied application needs to connect
// out to. It provides a dedication local TCP listener (usually listening only
// on loopback) and forwards incoming connections to the proxy to handle.
type Upstream struct {
cfg *UpstreamConfig
wg sync.WaitGroup
proxy *Proxy
fatalErr error
}
// NewUpstream creates an upstream ready to attach to a proxy instance with
// Proxy.AddUpstream. An Upstream can only be attached to single Proxy instance
// at once.
func NewUpstream(p *Proxy, cfg *UpstreamConfig) *Upstream {
return &Upstream{
cfg: cfg,
proxy: p,
shutdown: make(chan struct{}),
}
}
// UpstreamConfig configures the upstream
type UpstreamConfig struct {
// LocalAddress is the host:port to listen on for local app connections.
LocalAddress string
// DestinationName is the service name of the destination.
DestinationName string
// DestinationNamespace is the namespace of the destination.
DestinationNamespace string
// DestinationType determines which service discovery method is used to find a
// candidate instance to connect to.
DestinationType string
}
// String returns a string representation for the upstream for debugging or
// use as a unique key.
func (uc *UpstreamConfig) String() string {
return fmt.Sprintf("%s->%s:%s/%s", uc.LocalAddress, uc.DestinationType,
uc.DestinationNamespace, uc.DestinationName)
}
func (u *Upstream) listen() error {
l, err := net.Listen("tcp", u.cfg.LocalAddress)
if err != nil {
u.fatal(err)
return
}
for {
conn, err := l.Accept()
if err != nil {
return err
}
go u.discoverAndConnect(conn)
}
}
func (u *Upstream) discoverAndConnect(src net.Conn) {
// First, we need an upstream instance from Consul to connect to
dstAddrs, err := u.discoverInstances()
if err != nil {
u.fatal(fmt.Errorf("failed to discover upstream instances: %s", err))
return
}
if len(dstAddrs) < 1 {
log.Printf("[INFO] no instances found for %s", len(dstAddrs), u)
}
// Attempt connection to first one that works
// TODO: configurable number/deadline?
for idx, addr := range dstAddrs {
err := u.proxy.startProxyingConn(src, addr, false)
if err != nil {
log.Printf("[INFO] failed to connect to %s: %s (%d of %d)", addr, err,
idx+1, len(dstAddrs))
continue
}
return
}
log.Printf("[INFO] failed to connect to all %d instances for %s",
len(dstAddrs), u)
}
func (u *Upstream) discoverInstances() ([]string, error) {
switch u.cfg.DestinationType {
case "service":
svcs, _, err := u.cfg.Consul.Health().Service(u.cfg.DestinationName, "",
true, nil)
if err != nil {
return nil, err
}
addrs := make([]string, len(svcs))
// Shuffle order as we go since health endpoint doesn't
perm := rand.Perm(len(addrs))
for i, se := range svcs {
// Pick location in output array based on next permutation position
j := perm[i]
addrs[j] = fmt.Sprintf("%s:%d", se.Service.Address, se.Service.Port)
}
return addrs, nil
case "prepared_query":
pqr, _, err := u.cfg.Consul.PreparedQuery().Execute(u.cfg.DestinationName,
nil)
if err != nil {
return nil, err
}
addrs := make([]string, 0, len(svcs))
for _, se := range pqr.Nodes {
addrs = append(addrs, fmt.Sprintf("%s:%d", se.Service.Address,
se.Service.Port))
}
// PreparedQuery execution already shuffles the result
return addrs, nil
default:
u.fatal(fmt.Errorf("invalid destination type %s", u.cfg.DestinationType))
}
}
func (u *Upstream) fatal(err Error) {
log.Printf("[ERROR] upstream %s stopping on error: %s", u.cfg.LocalAddress,
err)
u.fatalErr = err
}
// String returns a string representation for the upstream for debugging or
// use as a unique key.
func (u *Upstream) String() string {
return u.cfg.String()
}
*/

View File

@ -1,75 +0,0 @@
package proxy
import (
"net"
"testing"
"github.com/hashicorp/consul/connect"
"github.com/stretchr/testify/require"
)
func TestUpstream(t *testing.T) {
tests := []struct {
name string
cfg UpstreamConfig
}{
{
name: "service",
cfg: UpstreamConfig{
DestinationType: "service",
DestinationNamespace: "default",
DestinationName: "db",
ConnectTimeoutMs: 100,
},
},
{
name: "prepared_query",
cfg: UpstreamConfig{
DestinationType: "prepared_query",
DestinationNamespace: "default",
DestinationName: "geo-db",
ConnectTimeoutMs: 100,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
addrs := TestLocalBindAddrs(t, 2)
testApp, err := NewTestTCPServer(t, addrs[0])
require.Nil(t, err)
defer testApp.Close()
// Create mock client that will "discover" our test tcp server as a target and
// skip TLS altogether.
client := &TestConnectClient{
Server: testApp,
TLSConfig: connect.TestTLSConfig(t, "ca1", "web"),
}
// Override cfg params
tt.cfg.LocalBindAddress = addrs[1]
tt.cfg.Client = client
u := NewUpstream(tt.cfg)
// Run proxy
r := NewRunner("test", u)
go r.Listen()
defer r.Stop()
// Proxy and fake remote service are running, play the part of the app
// connecting to a remote connect service over TCP.
conn, err := net.Dial("tcp", tt.cfg.LocalBindAddress)
require.Nil(t, err)
TestEchoConn(t, conn, "")
// Validate that discovery actually was called as we expected
require.Len(t, client.Calls, 1)
require.Equal(t, tt.cfg.DestinationType, client.Calls[0].typ)
require.Equal(t, tt.cfg.DestinationNamespace, client.Calls[0].ns)
require.Equal(t, tt.cfg.DestinationName, client.Calls[0].name)
})
}
}