From 67669abf82ef8e21dfa3dc9db0bf0d405b4a61a6 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Tue, 3 Apr 2018 12:55:50 +0100 Subject: [PATCH] Remove old connect client and proxy implementation --- connect/client.go | 256 ------------------------- connect/client_test.go | 148 --------------- connect/testing.go | 61 ------ proxy/config.go | 111 ----------- proxy/config_test.go | 46 ----- proxy/conn.go | 48 ----- proxy/conn_test.go | 119 ------------ proxy/manager.go | 140 -------------- proxy/manager_test.go | 76 -------- proxy/proxier.go | 32 ---- proxy/proxy.go | 112 ----------- proxy/public_listener.go | 119 ------------ proxy/public_listener_test.go | 38 ---- proxy/runner.go | 118 ------------ proxy/testdata/config-kitchensink.hcl | 36 ---- proxy/testing.go | 170 ----------------- proxy/upstream.go | 261 -------------------------- proxy/upstream_test.go | 75 -------- 18 files changed, 1966 deletions(-) delete mode 100644 connect/client.go delete mode 100644 connect/client_test.go delete mode 100644 proxy/config.go delete mode 100644 proxy/config_test.go delete mode 100644 proxy/conn.go delete mode 100644 proxy/conn_test.go delete mode 100644 proxy/manager.go delete mode 100644 proxy/manager_test.go delete mode 100644 proxy/proxier.go delete mode 100644 proxy/proxy.go delete mode 100644 proxy/public_listener.go delete mode 100644 proxy/public_listener_test.go delete mode 100644 proxy/runner.go delete mode 100644 proxy/testdata/config-kitchensink.hcl delete mode 100644 proxy/testing.go delete mode 100644 proxy/upstream.go delete mode 100644 proxy/upstream_test.go diff --git a/connect/client.go b/connect/client.go deleted file mode 100644 index 18e43f4cb..000000000 --- a/connect/client.go +++ /dev/null @@ -1,256 +0,0 @@ -package connect - -// import ( -// "context" -// "crypto/tls" -// "fmt" -// "math/rand" -// "net" - -// "github.com/hashicorp/consul/api" -// ) - -// // CertStatus indicates whether the Client currently has valid certificates for -// // incoming and outgoing connections. -// type CertStatus int - -// const ( -// // CertStatusUnknown is the zero value for CertStatus which may be returned -// // when a watch channel is closed on shutdown. It has no other meaning. -// CertStatusUnknown CertStatus = iota - -// // CertStatusOK indicates the client has valid certificates and trust roots to -// // Authenticate incoming and outgoing connections. -// CertStatusOK - -// // CertStatusPending indicates the client is waiting to be issued initial -// // certificates, or that it's certificates have expired and it's waiting to be -// // issued new ones. In this state all incoming and outgoing connections will -// // fail. -// CertStatusPending -// ) - -// func (s CertStatus) String() string { -// switch s { -// case CertStatusOK: -// return "OK" -// case CertStatusPending: -// return "pending" -// case CertStatusUnknown: -// fallthrough -// default: -// return "unknown" -// } -// } - -// // Client is the interface a basic client implementation must support. -// type Client interface { -// // TODO(banks): build this and test it -// // CertStatus returns the current status of the client's certificates. It can -// // be used to determine if the Client is able to service requests at the -// // current time. -// //CertStatus() CertStatus - -// // TODO(banks): build this and test it -// // WatchCertStatus returns a channel that is notified on all status changes. -// // Note that a message on the channel isn't guaranteed to be different so it's -// // value should be inspected. During Client shutdown the channel will be -// // closed returning a zero type which is equivalent to CertStatusUnknown. -// //WatchCertStatus() <-chan CertStatus - -// // ServerTLSConfig returns the *tls.Config to be used when creating a TCP -// // listener that should accept Connect connections. It is likely that at -// // startup the tlsCfg returned will not be immediately usable since -// // certificates are typically fetched from the agent asynchronously. In this -// // case it's still safe to listen with the provided config, but auth failures -// // will occur until initial certificate discovery is complete. In general at -// // any time it is possible for certificates to expire before new replacements -// // have been issued due to local network errors so the server may not actually -// // have a working certificate configuration at any time, however as soon as -// // valid certs can be issued it will automatically start working again so -// // should take no action. -// ServerTLSConfig() (*tls.Config, error) - -// // DialService opens a new connection to the named service registered in -// // Consul. It will perform service discovery to find healthy instances. If -// // there is an error during connection it is returned and the caller may call -// // again. The client implementation makes a best effort to make consecutive -// // Dials against different instances either by randomising the list and/or -// // maintaining a local memory of which instances recently failed. If the -// // context passed times out before connection is established and verified an -// // error is returned. -// DialService(ctx context.Context, namespace, name string) (net.Conn, error) - -// // DialPreparedQuery opens a new connection by executing the named Prepared -// // Query against the local Consul agent, and picking one of the returned -// // instances to connect to. It will perform service discovery with the same -// // semantics as DialService. -// DialPreparedQuery(ctx context.Context, namespace, name string) (net.Conn, error) -// } - -// /* - -// Maybe also convenience wrappers for: -// - listening TLS conn with right config -// - http.ListenAndServeTLS equivalent - -// */ - -// // AgentClient is the primary implementation of a connect.Client which -// // communicates with the local Consul agent. -// type AgentClient struct { -// agent *api.Client -// tlsCfg *ReloadableTLSConfig -// } - -// // NewClient returns an AgentClient to allow consuming and providing -// // Connect-enabled network services. -// func NewClient(agent *api.Client) Client { -// // TODO(banks): hook up fetching certs from Agent and updating tlsCfg on cert -// // delivery/change. Perhaps need to make -// return &AgentClient{ -// agent: agent, -// tlsCfg: NewReloadableTLSConfig(defaultTLSConfig()), -// } -// } - -// // NewInsecureDevClientWithLocalCerts returns an AgentClient that will still do -// // service discovery via the local agent but will use externally provided -// // certificates and skip authorization. This is intended just for development -// // and must not be used ever in production. -// func NewInsecureDevClientWithLocalCerts(agent *api.Client, caFile, certFile, -// keyFile string) (Client, error) { - -// cfg, err := devTLSConfigFromFiles(caFile, certFile, keyFile) -// if err != nil { -// return nil, err -// } - -// return &AgentClient{ -// agent: agent, -// tlsCfg: NewReloadableTLSConfig(cfg), -// }, nil -// } - -// // ServerTLSConfig implements Client -// func (c *AgentClient) ServerTLSConfig() (*tls.Config, error) { -// return c.tlsCfg.ServerTLSConfig(), nil -// } - -// // DialService implements Client -// func (c *AgentClient) DialService(ctx context.Context, namespace, -// name string) (net.Conn, error) { -// return c.dial(ctx, "service", namespace, name) -// } - -// // DialPreparedQuery implements Client -// func (c *AgentClient) DialPreparedQuery(ctx context.Context, namespace, -// name string) (net.Conn, error) { -// return c.dial(ctx, "prepared_query", namespace, name) -// } - -// func (c *AgentClient) dial(ctx context.Context, discoveryType, namespace, -// name string) (net.Conn, error) { - -// svcs, err := c.discoverInstances(ctx, discoveryType, namespace, name) -// if err != nil { -// return nil, err -// } - -// svc, err := c.pickInstance(svcs) -// if err != nil { -// return nil, err -// } -// if svc == nil { -// return nil, fmt.Errorf("no healthy services discovered") -// } - -// // OK we have a service we can dial! We need a ClientAuther that will validate -// // the connection is legit. - -// // TODO(banks): implement ClientAuther properly to actually verify connected -// // cert matches the expected service/cluster etc. based on svc. -// auther := &ClientAuther{} -// tlsConfig := c.tlsCfg.TLSConfig(auther) - -// // Resolve address TODO(banks): I expected this to happen magically in the -// // agent at registration time if I register with no explicit address but -// // apparently doesn't. This is a quick hack to make it work for now, need to -// // see if there is a better shared code path for doing this. -// addr := svc.Service.Address -// if addr == "" { -// addr = svc.Node.Address -// } -// var dialer net.Dialer -// tcpConn, err := dialer.DialContext(ctx, "tcp", -// fmt.Sprintf("%s:%d", addr, svc.Service.Port)) -// if err != nil { -// return nil, err -// } - -// tlsConn := tls.Client(tcpConn, tlsConfig) -// err = tlsConn.Handshake() -// if err != nil { -// tlsConn.Close() -// return nil, err -// } - -// return tlsConn, nil -// } - -// // pickInstance returns an instance from the given list to try to connect to. It -// // may be made pluggable later, for now it just picks a random one regardless of -// // whether the list is already shuffled. -// func (c *AgentClient) pickInstance(svcs []*api.ServiceEntry) (*api.ServiceEntry, error) { -// if len(svcs) < 1 { -// return nil, nil -// } -// idx := rand.Intn(len(svcs)) -// return svcs[idx], nil -// } - -// // discoverInstances returns all instances for the given discoveryType, -// // namespace and name. The returned service entries may or may not be shuffled -// func (c *AgentClient) discoverInstances(ctx context.Context, discoverType, -// namespace, name string) ([]*api.ServiceEntry, error) { - -// q := &api.QueryOptions{ -// // TODO(banks): make this configurable? -// AllowStale: true, -// } -// q = q.WithContext(ctx) - -// switch discoverType { -// case "service": -// svcs, _, err := c.agent.Health().Connect(name, "", true, q) -// if err != nil { -// return nil, err -// } -// return svcs, err - -// case "prepared_query": -// // TODO(banks): it's not super clear to me how this should work eventually. -// // How do we distinguise between a PreparedQuery for the actual services and -// // one that should return the connect proxies where that differs? If we -// // can't then we end up with a janky UX where user specifies a reasonable -// // prepared query but we try to connect to non-connect services and fail -// // with a confusing TLS error. Maybe just a way to filter PreparedQuery -// // results by connect-enabled would be sufficient (or even metadata to do -// // that ourselves in the response although less efficient). -// resp, _, err := c.agent.PreparedQuery().Execute(name, q) -// if err != nil { -// return nil, err -// } - -// // Awkward, we have a slice of api.ServiceEntry here but want a slice of -// // *api.ServiceEntry for compat with Connect/Service APIs. Have to convert -// // them to keep things type-happy. -// svcs := make([]*api.ServiceEntry, len(resp.Nodes)) -// for idx, se := range resp.Nodes { -// svcs[idx] = &se -// } -// return svcs, err -// default: -// return nil, fmt.Errorf("unsupported discovery type: %s", discoverType) -// } -// } diff --git a/connect/client_test.go b/connect/client_test.go deleted file mode 100644 index 045bc8fd6..000000000 --- a/connect/client_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package connect - -// import ( -// "context" -// "crypto/x509" -// "crypto/x509/pkix" -// "encoding/asn1" -// "io/ioutil" -// "net" -// "net/http" -// "net/http/httptest" -// "net/url" -// "strconv" -// "testing" - -// "github.com/hashicorp/consul/api" -// "github.com/hashicorp/consul/testutil" -// "github.com/stretchr/testify/require" -// ) - -// func TestNewInsecureDevClientWithLocalCerts(t *testing.T) { - -// agent, err := api.NewClient(api.DefaultConfig()) -// require.Nil(t, err) - -// got, err := NewInsecureDevClientWithLocalCerts(agent, -// "testdata/ca1-ca-consul-internal.cert.pem", -// "testdata/ca1-svc-web.cert.pem", -// "testdata/ca1-svc-web.key.pem", -// ) -// require.Nil(t, err) - -// // Sanity check correct certs were loaded -// serverCfg, err := got.ServerTLSConfig() -// require.Nil(t, err) -// caSubjects := serverCfg.RootCAs.Subjects() -// require.Len(t, caSubjects, 1) -// caSubject, err := testNameFromRawDN(caSubjects[0]) -// require.Nil(t, err) -// require.Equal(t, "Consul Internal", caSubject.CommonName) - -// require.Len(t, serverCfg.Certificates, 1) -// cert, err := x509.ParseCertificate(serverCfg.Certificates[0].Certificate[0]) -// require.Nil(t, err) -// require.Equal(t, "web", cert.Subject.CommonName) -// } - -// func testNameFromRawDN(raw []byte) (*pkix.Name, error) { -// var seq pkix.RDNSequence -// if _, err := asn1.Unmarshal(raw, &seq); err != nil { -// return nil, err -// } - -// var name pkix.Name -// name.FillFromRDNSequence(&seq) -// return &name, nil -// } - -// func testAgent(t *testing.T) (*testutil.TestServer, *api.Client) { -// t.Helper() - -// // Make client config -// conf := api.DefaultConfig() - -// // Create server -// server, err := testutil.NewTestServerConfigT(t, nil) -// require.Nil(t, err) - -// conf.Address = server.HTTPAddr - -// // Create client -// agent, err := api.NewClient(conf) -// require.Nil(t, err) - -// return server, agent -// } - -// func testService(t *testing.T, ca, name string, client *api.Client) *httptest.Server { -// t.Helper() - -// // Run a test service to discover -// server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { -// w.Write([]byte("svc: " + name)) -// })) -// server.TLS = TestTLSConfig(t, ca, name) -// server.StartTLS() - -// u, err := url.Parse(server.URL) -// require.Nil(t, err) - -// port, err := strconv.Atoi(u.Port()) -// require.Nil(t, err) - -// // If client is passed, register the test service instance -// if client != nil { -// svc := &api.AgentServiceRegistration{ -// // TODO(banks): we don't really have a good way to represent -// // connect-native apps yet so we have to pretend out little server is a -// // proxy for now. -// Kind: api.ServiceKindConnectProxy, -// ProxyDestination: name, -// Name: name + "-proxy", -// Address: u.Hostname(), -// Port: port, -// } -// err := client.Agent().ServiceRegister(svc) -// require.Nil(t, err) -// } - -// return server -// } - -// func TestDialService(t *testing.T) { -// consulServer, agent := testAgent(t) -// defer consulServer.Stop() - -// svc := testService(t, "ca1", "web", agent) -// defer svc.Close() - -// c, err := NewInsecureDevClientWithLocalCerts(agent, -// "testdata/ca1-ca-consul-internal.cert.pem", -// "testdata/ca1-svc-web.cert.pem", -// "testdata/ca1-svc-web.key.pem", -// ) -// require.Nil(t, err) - -// conn, err := c.DialService(context.Background(), "default", "web") -// require.Nilf(t, err, "err: %s", err) - -// // Inject our conn into http.Transport -// httpClient := &http.Client{ -// Transport: &http.Transport{ -// DialTLS: func(network, addr string) (net.Conn, error) { -// return conn, nil -// }, -// }, -// } - -// // Don't be fooled the hostname here is ignored since we did the dialling -// // ourselves -// resp, err := httpClient.Get("https://web.connect.consul/") -// require.Nil(t, err) -// defer resp.Body.Close() -// body, err := ioutil.ReadAll(resp.Body) -// require.Nil(t, err) - -// require.Equal(t, "svc: web", string(body)) -// } diff --git a/connect/testing.go b/connect/testing.go index 7e1b9cdac..f6fa438cf 100644 --- a/connect/testing.go +++ b/connect/testing.go @@ -151,64 +151,3 @@ func (s *TestService) Close() { close(s.stopChan) } } - -/* -// TestCAPool returns an *x509.CertPool containing the named CA certs from the -// testdata dir. -func TestCAPool(t testing.T, caNames ...string) *x509.CertPool { - t.Helper() - pool := x509.NewCertPool() - for _, name := range caNames { - certs, err := filepath.Glob(testDataDir() + "/" + name + "-ca-*.cert.pem") - require.Nil(t, err) - for _, cert := range certs { - caPem, err := ioutil.ReadFile(cert) - require.Nil(t, err) - pool.AppendCertsFromPEM(caPem) - } - } - return pool -} - -// TestSvcKeyPair returns an tls.Certificate containing both cert and private -// key for a given service under a given CA from the testdata dir. -func TestSvcKeyPair(t testing.T, ca, name string) tls.Certificate { - t.Helper() - prefix := fmt.Sprintf(testDataDir()+"/%s-svc-%s", ca, name) - cert, err := tls.LoadX509KeyPair(prefix+".cert.pem", prefix+".key.pem") - require.Nil(t, err) - return cert -} - -// TestTLSConfig returns a *tls.Config suitable for use during tests. -func TestTLSConfig(t testing.T, ca, svc string) *tls.Config { - t.Helper() - return &tls.Config{ - Certificates: []tls.Certificate{TestSvcKeyPair(t, ca, svc)}, - MinVersion: tls.VersionTLS12, - RootCAs: TestCAPool(t, ca), - ClientCAs: TestCAPool(t, ca), - ClientAuth: tls.RequireAndVerifyClientCert, - // In real life we'll need to do this too since otherwise Go will attempt to - // verify DNS names match DNS SAN/CN which we don't want, but we'll hook - // VerifyPeerCertificates and do our own x509 path validation as well as - // AuthZ upcall. For now we are just testing the basic proxy mechanism so - // this is fine. - InsecureSkipVerify: true, - } -} - -// TestAuther is a simple Auther implementation that does nothing but what you -// tell it to! -type TestAuther struct { - // Return is the value returned from an Auth() call. Set it to nil to have all - // certificates unconditionally accepted or a value to have them fail. - Return error -} - -// Auth implements Auther -func (a *TestAuther) Auth(rawCerts [][]byte, - verifiedChains [][]*x509.Certificate) error { - return a.Return -} -*/ diff --git a/proxy/config.go b/proxy/config.go deleted file mode 100644 index a5958135a..000000000 --- a/proxy/config.go +++ /dev/null @@ -1,111 +0,0 @@ -package proxy - -import ( - "io/ioutil" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/hcl" -) - -// Config is the publicly configurable state for an entire proxy instance. It's -// mostly used as the format for the local-file config mode which is mostly for -// dev/testing. In normal use, different parts of this config are pulled from -// different locations (e.g. command line, agent config endpoint, agent -// certificate endpoints). -type Config struct { - // ProxyID is the identifier for this proxy as registered in Consul. It's only - // guaranteed to be unique per agent. - ProxyID string `json:"proxy_id" hcl:"proxy_id"` - - // Token is the authentication token provided for queries to the local agent. - Token string `json:"token" hcl:"token"` - - // ProxiedServiceName is the name of the service this proxy is representing. - ProxiedServiceName string `json:"proxied_service_name" hcl:"proxied_service_name"` - - // ProxiedServiceNamespace is the namespace of the service this proxy is - // representing. - ProxiedServiceNamespace string `json:"proxied_service_namespace" hcl:"proxied_service_namespace"` - - // PublicListener configures the mTLS listener. - PublicListener PublicListenerConfig `json:"public_listener" hcl:"public_listener"` - - // Upstreams configures outgoing proxies for remote connect services. - Upstreams []UpstreamConfig `json:"upstreams" hcl:"upstreams"` - - // DevCAFile allows passing the file path to PEM encoded root certificate - // bundle to be used in development instead of the ones supplied by Connect. - DevCAFile string `json:"dev_ca_file" hcl:"dev_ca_file"` - - // DevServiceCertFile allows passing the file path to PEM encoded service - // certificate (client and server) to be used in development instead of the - // ones supplied by Connect. - DevServiceCertFile string `json:"dev_service_cert_file" hcl:"dev_service_cert_file"` - - // DevServiceKeyFile allows passing the file path to PEM encoded service - // private key to be used in development instead of the ones supplied by - // Connect. - DevServiceKeyFile string `json:"dev_service_key_file" hcl:"dev_service_key_file"` -} - -// ConfigWatcher is a simple interface to allow dynamic configurations from -// plugggable sources. -type ConfigWatcher interface { - // Watch returns a channel that will deliver new Configs if something external - // provokes it. - Watch() <-chan *Config -} - -// StaticConfigWatcher is a simple ConfigWatcher that delivers a static Config -// once and then never changes it. -type StaticConfigWatcher struct { - ch chan *Config -} - -// NewStaticConfigWatcher returns a ConfigWatcher for a config that never -// changes. It assumes only one "watcher" will ever call Watch. The config is -// delivered on the first call but will never be delivered again to allow -// callers to call repeatedly (e.g. select in a loop). -func NewStaticConfigWatcher(cfg *Config) *StaticConfigWatcher { - sc := &StaticConfigWatcher{ - // Buffer it so we can queue up the config for first delivery. - ch: make(chan *Config, 1), - } - sc.ch <- cfg - return sc -} - -// Watch implements ConfigWatcher on a static configuration for compatibility. -// It returns itself on the channel once and then leaves it open. -func (sc *StaticConfigWatcher) Watch() <-chan *Config { - return sc.ch -} - -// ParseConfigFile parses proxy configuration form a file for local dev. -func ParseConfigFile(filename string) (*Config, error) { - bs, err := ioutil.ReadFile(filename) - if err != nil { - return nil, err - } - - var cfg Config - - err = hcl.Unmarshal(bs, &cfg) - if err != nil { - return nil, err - } - - return &cfg, nil -} - -// AgentConfigWatcher watches the local Consul agent for proxy config changes. -type AgentConfigWatcher struct { - client *api.Client -} - -// Watch implements ConfigWatcher. -func (w *AgentConfigWatcher) Watch() <-chan *Config { - watch := make(chan *Config) - // TODO implement me - return watch -} diff --git a/proxy/config_test.go b/proxy/config_test.go deleted file mode 100644 index 89287d573..000000000 --- a/proxy/config_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package proxy - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestParseConfigFile(t *testing.T) { - cfg, err := ParseConfigFile("testdata/config-kitchensink.hcl") - require.Nil(t, err) - - expect := &Config{ - ProxyID: "foo", - Token: "11111111-2222-3333-4444-555555555555", - ProxiedServiceName: "web", - ProxiedServiceNamespace: "default", - PublicListener: PublicListenerConfig{ - BindAddress: ":9999", - LocalServiceAddress: "127.0.0.1:5000", - LocalConnectTimeoutMs: 1000, - HandshakeTimeoutMs: 5000, - }, - Upstreams: []UpstreamConfig{ - { - LocalBindAddress: "127.0.0.1:6000", - DestinationName: "db", - DestinationNamespace: "default", - DestinationType: "service", - ConnectTimeoutMs: 10000, - }, - { - LocalBindAddress: "127.0.0.1:6001", - DestinationName: "geo-cache", - DestinationNamespace: "default", - DestinationType: "prepared_query", - ConnectTimeoutMs: 10000, - }, - }, - DevCAFile: "connect/testdata/ca1-ca-consul-internal.cert.pem", - DevServiceCertFile: "connect/testdata/ca1-svc-web.cert.pem", - DevServiceKeyFile: "connect/testdata/ca1-svc-web.key.pem", - } - - require.Equal(t, expect, cfg) -} diff --git a/proxy/conn.go b/proxy/conn.go deleted file mode 100644 index dfad81db7..000000000 --- a/proxy/conn.go +++ /dev/null @@ -1,48 +0,0 @@ -package proxy - -import ( - "io" - "net" - "sync/atomic" -) - -// Conn represents a single proxied TCP connection. -type Conn struct { - src, dst net.Conn - stopping int32 -} - -// NewConn returns a conn joining the two given net.Conn -func NewConn(src, dst net.Conn) *Conn { - return &Conn{ - src: src, - dst: dst, - stopping: 0, - } -} - -// Close closes both connections. -func (c *Conn) Close() { - atomic.StoreInt32(&c.stopping, 1) - c.src.Close() - c.dst.Close() -} - -// CopyBytes will continuously copy bytes in both directions between src and dst -// until either connection is closed. -func (c *Conn) CopyBytes() error { - defer c.Close() - - go func() { - // Need this since Copy is only guaranteed to stop when it's source reader - // (second arg) hits EOF or error but either conn might close first possibly - // causing this goroutine to exit but not the outer one. See TestSc - //defer c.Close() - io.Copy(c.dst, c.src) - }() - _, err := io.Copy(c.src, c.dst) - if atomic.LoadInt32(&c.stopping) == 1 { - return nil - } - return err -} diff --git a/proxy/conn_test.go b/proxy/conn_test.go deleted file mode 100644 index ac907238d..000000000 --- a/proxy/conn_test.go +++ /dev/null @@ -1,119 +0,0 @@ -package proxy - -import ( - "bufio" - "net" - "testing" - - "github.com/stretchr/testify/require" -) - -// testConnSetup listens on a random TCP port and passes the accepted net.Conn -// back to test code on returned channel. It then creates a source and -// destination Conn. And a cleanup func -func testConnSetup(t *testing.T) (net.Conn, net.Conn, func()) { - t.Helper() - - l, err := net.Listen("tcp", "localhost:0") - require.Nil(t, err) - - ch := make(chan net.Conn, 1) - go func(ch chan net.Conn) { - src, err := l.Accept() - require.Nil(t, err) - ch <- src - }(ch) - - dst, err := net.Dial("tcp", l.Addr().String()) - require.Nil(t, err) - - src := <-ch - - stopper := func() { - l.Close() - src.Close() - dst.Close() - } - - return src, dst, stopper -} - -func TestConn(t *testing.T) { - src, dst, stop := testConnSetup(t) - defer stop() - - c := NewConn(src, dst) - - retCh := make(chan error, 1) - go func() { - retCh <- c.CopyBytes() - }() - - srcR := bufio.NewReader(src) - dstR := bufio.NewReader(dst) - - _, err := src.Write([]byte("ping 1\n")) - require.Nil(t, err) - _, err = dst.Write([]byte("ping 2\n")) - require.Nil(t, err) - - got, err := dstR.ReadString('\n') - require.Equal(t, "ping 1\n", got) - - got, err = srcR.ReadString('\n') - require.Equal(t, "ping 2\n", got) - - _, err = src.Write([]byte("pong 1\n")) - require.Nil(t, err) - _, err = dst.Write([]byte("pong 2\n")) - require.Nil(t, err) - - got, err = dstR.ReadString('\n') - require.Equal(t, "pong 1\n", got) - - got, err = srcR.ReadString('\n') - require.Equal(t, "pong 2\n", got) - - c.Close() - - ret := <-retCh - require.Nil(t, ret, "Close() should not cause error return") -} - -func TestConnSrcClosing(t *testing.T) { - src, dst, stop := testConnSetup(t) - defer stop() - - c := NewConn(src, dst) - retCh := make(chan error, 1) - go func() { - retCh <- c.CopyBytes() - }() - - // If we close the src conn, we expect CopyBytes to return and src to be - // closed too. No good way to assert that the conn is closed really other than - // assume the retCh receive will hand unless CopyBytes exits and that - // CopyBytes defers Closing both. i.e. if this test doesn't time out it's - // good! - src.Close() - <-retCh -} - -func TestConnDstClosing(t *testing.T) { - src, dst, stop := testConnSetup(t) - defer stop() - - c := NewConn(src, dst) - retCh := make(chan error, 1) - go func() { - retCh <- c.CopyBytes() - }() - - // If we close the dst conn, we expect CopyBytes to return and src to be - // closed too. No good way to assert that the conn is closed really other than - // assume the retCh receive will hand unless CopyBytes exits and that - // CopyBytes defers Closing both. i.e. if this test doesn't time out it's - // good! - dst.Close() - <-retCh -} diff --git a/proxy/manager.go b/proxy/manager.go deleted file mode 100644 index c22a1b7ff..000000000 --- a/proxy/manager.go +++ /dev/null @@ -1,140 +0,0 @@ -package proxy - -import ( - "errors" - "log" - "os" -) - -var ( - // ErrExists is the error returned when adding a proxy that exists already. - ErrExists = errors.New("proxy with that name already exists") - // ErrNotExist is the error returned when removing a proxy that doesn't exist. - ErrNotExist = errors.New("proxy with that name doesn't exist") -) - -// Manager implements the logic for configuring and running a set of proxiers. -// Typically it's used to run one PublicListener and zero or more Upstreams. -type Manager struct { - ch chan managerCmd - - // stopped is used to signal the caller of StopAll when the run loop exits - // after stopping all runners. It's only closed. - stopped chan struct{} - - // runners holds the currently running instances. It should only by accessed - // from within the `run` goroutine. - runners map[string]*Runner - - logger *log.Logger -} - -type managerCmd struct { - name string - p Proxier - errCh chan error -} - -// NewManager creates a manager of proxier instances. -func NewManager() *Manager { - return NewManagerWithLogger(log.New(os.Stdout, "", log.LstdFlags)) -} - -// NewManagerWithLogger creates a manager of proxier instances with the -// specified logger. -func NewManagerWithLogger(logger *log.Logger) *Manager { - m := &Manager{ - ch: make(chan managerCmd), - stopped: make(chan struct{}), - runners: make(map[string]*Runner), - logger: logger, - } - go m.run() - return m -} - -// RunProxier starts a new Proxier instance in the manager. It is safe to call -// from separate goroutines. If there is already a running proxy with the same -// name it returns ErrExists. -func (m *Manager) RunProxier(name string, p Proxier) error { - cmd := managerCmd{ - name: name, - p: p, - errCh: make(chan error), - } - m.ch <- cmd - return <-cmd.errCh -} - -// StopProxier stops a Proxier instance by name. It is safe to call from -// separate goroutines. If the instance with that name doesn't exist it returns -// ErrNotExist. -func (m *Manager) StopProxier(name string) error { - cmd := managerCmd{ - name: name, - p: nil, - errCh: make(chan error), - } - m.ch <- cmd - return <-cmd.errCh -} - -// StopAll shuts down the manager instance and stops all running proxies. It is -// safe to call from any goroutine but must only be called once. -func (m *Manager) StopAll() error { - close(m.ch) - <-m.stopped - return nil -} - -// run is the main manager processing loop. It keeps all actions in a single -// goroutine triggered by channel commands to keep it simple to reason about -// lifecycle events for each proxy. -func (m *Manager) run() { - defer close(m.stopped) - - // range over channel blocks and loops on each message received until channel - // is closed. - for cmd := range m.ch { - if cmd.p == nil { - m.remove(&cmd) - } else { - m.add(&cmd) - } - } - - // Shutting down, Stop all the runners - for _, r := range m.runners { - r.Stop() - } -} - -// add the named proxier instance and stop it. Should only be called from the -// run loop. -func (m *Manager) add(cmd *managerCmd) { - // Check existing - if _, ok := m.runners[cmd.name]; ok { - cmd.errCh <- ErrExists - return - } - - // Start new runner - r := NewRunnerWithLogger(cmd.name, cmd.p, m.logger) - m.runners[cmd.name] = r - go r.Listen() - cmd.errCh <- nil -} - -// remove the named proxier instance and stop it. Should only be called from the -// run loop. -func (m *Manager) remove(cmd *managerCmd) { - // Fetch proxier by name - r, ok := m.runners[cmd.name] - if !ok { - cmd.errCh <- ErrNotExist - return - } - err := r.Stop() - delete(m.runners, cmd.name) - cmd.errCh <- err -} diff --git a/proxy/manager_test.go b/proxy/manager_test.go deleted file mode 100644 index d4fa8c5b4..000000000 --- a/proxy/manager_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package proxy - -import ( - "fmt" - "net" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestManager(t *testing.T) { - m := NewManager() - - addrs := TestLocalBindAddrs(t, 3) - - for i := 0; i < len(addrs); i++ { - name := fmt.Sprintf("proxier-%d", i) - // Run proxy - err := m.RunProxier(name, &TestProxier{ - Addr: addrs[i], - Prefix: name + ": ", - }) - require.Nil(t, err) - } - - // Make sure each one is echoing correctly now all are running - for i := 0; i < len(addrs); i++ { - conn, err := net.Dial("tcp", addrs[i]) - require.Nil(t, err) - TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i)) - conn.Close() - } - - // Stop first proxier - err := m.StopProxier("proxier-0") - require.Nil(t, err) - - // We should fail to dial it now. Note that Runner.Stop is synchronous so - // there should be a strong guarantee that it's stopped listening by now. - _, err = net.Dial("tcp", addrs[0]) - require.NotNil(t, err) - - // Rest of proxiers should still be running - for i := 1; i < len(addrs); i++ { - conn, err := net.Dial("tcp", addrs[i]) - require.Nil(t, err) - TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i)) - conn.Close() - } - - // Stop non-existent proxier should fail - err = m.StopProxier("foo") - require.Equal(t, ErrNotExist, err) - - // Add already-running proxier should fail - err = m.RunProxier("proxier-1", &TestProxier{}) - require.Equal(t, ErrExists, err) - - // But rest should stay running - for i := 1; i < len(addrs); i++ { - conn, err := net.Dial("tcp", addrs[i]) - require.Nil(t, err) - TestEchoConn(t, conn, fmt.Sprintf("proxier-%d: ", i)) - conn.Close() - } - - // StopAll should stop everything - err = m.StopAll() - require.Nil(t, err) - - // Verify failures - for i := 0; i < len(addrs); i++ { - _, err = net.Dial("tcp", addrs[i]) - require.NotNilf(t, err, "proxier-%d should not be running", i) - } -} diff --git a/proxy/proxier.go b/proxy/proxier.go deleted file mode 100644 index 23940c6ad..000000000 --- a/proxy/proxier.go +++ /dev/null @@ -1,32 +0,0 @@ -package proxy - -import ( - "errors" - "net" -) - -// ErrStopped is returned for operations on a proxy that is stopped -var ErrStopped = errors.New("stopped") - -// ErrStopping is returned for operations on a proxy that is stopping -var ErrStopping = errors.New("stopping") - -// Proxier is an interface for managing different proxy implementations in a -// standard way. We have at least two different types of Proxier implementations -// needed: one for the incoming mTLS -> local proxy and another for each -// "upstream" service the app needs to talk out to (which listens locally and -// performs service discovery to find a suitable remote service). -type Proxier interface { - // Listener returns a net.Listener that is open and ready for use, the Proxy - // manager will arrange accepting new connections from it and passing them to - // the handler method. - Listener() (net.Listener, error) - - // HandleConn is called for each incoming connection accepted by the listener. - // It is called in it's own goroutine and should run until it hits an error. - // When stopping the Proxier, the manager will simply close the conn provided - // and expects an error to be eventually returned. Any time spent not blocked - // on the passed conn (for example doing service discovery) should therefore - // be time-bound so that shutdown can't stall forever. - HandleConn(conn net.Conn) error -} diff --git a/proxy/proxy.go b/proxy/proxy.go deleted file mode 100644 index a293466b8..000000000 --- a/proxy/proxy.go +++ /dev/null @@ -1,112 +0,0 @@ -package proxy - -import ( - "context" - "log" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/connect" -) - -// Proxy implements the built-in connect proxy. -type Proxy struct { - proxyID, token string - - connect connect.Client - manager *Manager - cfgWatch ConfigWatcher - cfg *Config - - logger *log.Logger -} - -// NewFromConfigFile returns a Proxy instance configured just from a local file. -// This is intended mostly for development and bypasses the normal mechanisms -// for fetching config and certificates from the local agent. -func NewFromConfigFile(client *api.Client, filename string, - logger *log.Logger) (*Proxy, error) { - cfg, err := ParseConfigFile(filename) - if err != nil { - return nil, err - } - - connect, err := connect.NewInsecureDevClientWithLocalCerts(client, - cfg.DevCAFile, cfg.DevServiceCertFile, cfg.DevServiceKeyFile) - if err != nil { - return nil, err - } - - p := &Proxy{ - proxyID: cfg.ProxyID, - connect: connect, - manager: NewManagerWithLogger(logger), - cfgWatch: NewStaticConfigWatcher(cfg), - logger: logger, - } - return p, nil -} - -// New returns a Proxy with the given id, consuming the provided (configured) -// agent. It is ready to Run(). -func New(client *api.Client, proxyID string, logger *log.Logger) (*Proxy, error) { - p := &Proxy{ - proxyID: proxyID, - connect: connect.NewClient(client), - manager: NewManagerWithLogger(logger), - cfgWatch: &AgentConfigWatcher{client: client}, - logger: logger, - } - return p, nil -} - -// Run the proxy instance until a fatal error occurs or ctx is cancelled. -func (p *Proxy) Run(ctx context.Context) error { - defer p.manager.StopAll() - - // Watch for config changes (initial setup happens on first "change") - for { - select { - case newCfg := <-p.cfgWatch.Watch(): - p.logger.Printf("[DEBUG] got new config") - if p.cfg == nil { - // Initial setup - err := p.startPublicListener(ctx, newCfg.PublicListener) - if err != nil { - return err - } - } - - // TODO add/remove upstreams properly based on a diff with current - for _, uc := range newCfg.Upstreams { - uc.Client = p.connect - uc.logger = p.logger - err := p.manager.RunProxier(uc.String(), NewUpstream(uc)) - if err == ErrExists { - continue - } - if err != nil { - p.logger.Printf("[ERR] failed to start upstream %s: %s", uc.String(), - err) - } - } - p.cfg = newCfg - - case <-ctx.Done(): - return nil - } - } -} - -func (p *Proxy) startPublicListener(ctx context.Context, - cfg PublicListenerConfig) error { - - // Get TLS creds - tlsCfg, err := p.connect.ServerTLSConfig() - if err != nil { - return err - } - cfg.TLSConfig = tlsCfg - - cfg.logger = p.logger - return p.manager.RunProxier("public_listener", NewPublicListener(cfg)) -} diff --git a/proxy/public_listener.go b/proxy/public_listener.go deleted file mode 100644 index 1942992cf..000000000 --- a/proxy/public_listener.go +++ /dev/null @@ -1,119 +0,0 @@ -package proxy - -import ( - "crypto/tls" - "fmt" - "log" - "net" - "os" - "time" -) - -// PublicListener provides an implementation of Proxier that listens for inbound -// mTLS connections, authenticates them with the local agent, and if successful -// forwards them to the locally configured app. -type PublicListener struct { - cfg *PublicListenerConfig -} - -// PublicListenerConfig contains the most basic parameters needed to start the -// proxy. -// -// Note that the tls.Configs here are expected to be "dynamic" in the sense that -// they are expected to use `GetConfigForClient` (added in go 1.8) to return -// dynamic config per connection if required. -type PublicListenerConfig struct { - // BindAddress is the host:port the public mTLS listener will bind to. - BindAddress string `json:"bind_address" hcl:"bind_address"` - - // LocalServiceAddress is the host:port for the proxied application. This - // should be on loopback or otherwise protected as it's plain TCP. - LocalServiceAddress string `json:"local_service_address" hcl:"local_service_address"` - - // TLSConfig config is used for the mTLS listener. - TLSConfig *tls.Config - - // LocalConnectTimeout is the timeout for establishing connections with the - // local backend. Defaults to 1000 (1s). - LocalConnectTimeoutMs int `json:"local_connect_timeout_ms" hcl:"local_connect_timeout_ms"` - - // HandshakeTimeout is the timeout for incoming mTLS clients to complete a - // handshake. Setting this low avoids DOS by malicious clients holding - // resources open. Defaults to 10000 (10s). - HandshakeTimeoutMs int `json:"handshake_timeout_ms" hcl:"handshake_timeout_ms"` - - logger *log.Logger -} - -func (plc *PublicListenerConfig) applyDefaults() { - if plc.LocalConnectTimeoutMs == 0 { - plc.LocalConnectTimeoutMs = 1000 - } - if plc.HandshakeTimeoutMs == 0 { - plc.HandshakeTimeoutMs = 10000 - } - if plc.logger == nil { - plc.logger = log.New(os.Stdout, "", log.LstdFlags) - } -} - -// NewPublicListener returns a proxy instance with the given config. -func NewPublicListener(cfg PublicListenerConfig) *PublicListener { - p := &PublicListener{ - cfg: &cfg, - } - p.cfg.applyDefaults() - return p -} - -// Listener implements Proxier -func (p *PublicListener) Listener() (net.Listener, error) { - l, err := net.Listen("tcp", p.cfg.BindAddress) - if err != nil { - return nil, err - } - - return tls.NewListener(l, p.cfg.TLSConfig), nil -} - -// HandleConn implements Proxier -func (p *PublicListener) HandleConn(conn net.Conn) error { - defer conn.Close() - tlsConn, ok := conn.(*tls.Conn) - if !ok { - return fmt.Errorf("non-TLS conn") - } - - // Setup Handshake timer - to := time.Duration(p.cfg.HandshakeTimeoutMs) * time.Millisecond - err := tlsConn.SetDeadline(time.Now().Add(to)) - if err != nil { - return err - } - - // Force TLS handshake so that abusive clients can't hold resources open - err = tlsConn.Handshake() - if err != nil { - return err - } - - // Handshake OK, clear the deadline - err = tlsConn.SetDeadline(time.Time{}) - if err != nil { - return err - } - - // Huzzah, open a connection to the backend and let them talk - // TODO maybe add a connection pool here? - to = time.Duration(p.cfg.LocalConnectTimeoutMs) * time.Millisecond - dst, err := net.DialTimeout("tcp", p.cfg.LocalServiceAddress, to) - if err != nil { - return fmt.Errorf("failed dialling local app: %s", err) - } - - p.cfg.logger.Printf("[DEBUG] accepted connection from %s", conn.RemoteAddr()) - - // Hand conn and dst over to Conn to manage the byte copying. - c := NewConn(conn, dst) - return c.CopyBytes() -} diff --git a/proxy/public_listener_test.go b/proxy/public_listener_test.go deleted file mode 100644 index 83e84d658..000000000 --- a/proxy/public_listener_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package proxy - -import ( - "crypto/tls" - "testing" - - "github.com/hashicorp/consul/connect" - "github.com/stretchr/testify/require" -) - -func TestPublicListener(t *testing.T) { - addrs := TestLocalBindAddrs(t, 2) - - cfg := PublicListenerConfig{ - BindAddress: addrs[0], - LocalServiceAddress: addrs[1], - HandshakeTimeoutMs: 100, - LocalConnectTimeoutMs: 100, - TLSConfig: connect.TestTLSConfig(t, "ca1", "web"), - } - - testApp, err := NewTestTCPServer(t, cfg.LocalServiceAddress) - require.Nil(t, err) - defer testApp.Close() - - p := NewPublicListener(cfg) - - // Run proxy - r := NewRunner("test", p) - go r.Listen() - defer r.Stop() - - // Proxy and backend are running, play the part of a TLS client using same - // cert for now. - conn, err := tls.Dial("tcp", cfg.BindAddress, connect.TestTLSConfig(t, "ca1", "web")) - require.Nil(t, err) - TestEchoConn(t, conn, "") -} diff --git a/proxy/runner.go b/proxy/runner.go deleted file mode 100644 index b559b22b7..000000000 --- a/proxy/runner.go +++ /dev/null @@ -1,118 +0,0 @@ -package proxy - -import ( - "log" - "net" - "os" - "sync" - "sync/atomic" -) - -// Runner manages the lifecycle of one Proxier. -type Runner struct { - name string - p Proxier - - // Stopping is if a flag that is updated and read atomically - stopping int32 - stopCh chan struct{} - // wg is used to signal back to Stop when all goroutines have stopped - wg sync.WaitGroup - - logger *log.Logger -} - -// NewRunner returns a Runner ready to Listen. -func NewRunner(name string, p Proxier) *Runner { - return NewRunnerWithLogger(name, p, log.New(os.Stdout, "", log.LstdFlags)) -} - -// NewRunnerWithLogger returns a Runner ready to Listen using the specified -// log.Logger. -func NewRunnerWithLogger(name string, p Proxier, logger *log.Logger) *Runner { - return &Runner{ - name: name, - p: p, - stopCh: make(chan struct{}), - logger: logger, - } -} - -// Listen starts the proxier instance. It blocks until a fatal error occurs or -// Stop() is called. -func (r *Runner) Listen() error { - if atomic.LoadInt32(&r.stopping) == 1 { - return ErrStopped - } - - l, err := r.p.Listener() - if err != nil { - return err - } - r.logger.Printf("[INFO] proxy: %s listening on %s", r.name, l.Addr().String()) - - // Run goroutine that will close listener on stop - go func() { - <-r.stopCh - l.Close() - r.logger.Printf("[INFO] proxy: %s shutdown", r.name) - }() - - // Add one for the accept loop - r.wg.Add(1) - defer r.wg.Done() - - for { - conn, err := l.Accept() - if err != nil { - if atomic.LoadInt32(&r.stopping) == 1 { - return nil - } - return err - } - - go r.handle(conn) - } - - return nil -} - -func (r *Runner) handle(conn net.Conn) { - r.wg.Add(1) - defer r.wg.Done() - - // Start a goroutine that will watch for the Runner stopping and close the - // conn, or watch for the Proxier closing (e.g. because other end hung up) and - // stop the goroutine to avoid leaks - doneCh := make(chan struct{}) - defer close(doneCh) - - go func() { - select { - case <-r.stopCh: - r.logger.Printf("[DEBUG] proxy: %s: terminating conn", r.name) - conn.Close() - return - case <-doneCh: - // Connection is already closed, this goroutine not needed any more - return - } - }() - - err := r.p.HandleConn(conn) - if err != nil { - r.logger.Printf("[DEBUG] proxy: %s: connection terminated: %s", r.name, err) - } else { - r.logger.Printf("[DEBUG] proxy: %s: connection terminated", r.name) - } -} - -// Stop stops the Listener and closes any active connections immediately. -func (r *Runner) Stop() error { - old := atomic.SwapInt32(&r.stopping, 1) - if old == 0 { - close(r.stopCh) - } - r.wg.Wait() - return nil -} diff --git a/proxy/testdata/config-kitchensink.hcl b/proxy/testdata/config-kitchensink.hcl deleted file mode 100644 index 766928353..000000000 --- a/proxy/testdata/config-kitchensink.hcl +++ /dev/null @@ -1,36 +0,0 @@ -# Example proxy config with everything specified - -proxy_id = "foo" -token = "11111111-2222-3333-4444-555555555555" - -proxied_service_name = "web" -proxied_service_namespace = "default" - -# Assumes running consul in dev mode from the repo root... -dev_ca_file = "connect/testdata/ca1-ca-consul-internal.cert.pem" -dev_service_cert_file = "connect/testdata/ca1-svc-web.cert.pem" -dev_service_key_file = "connect/testdata/ca1-svc-web.key.pem" - -public_listener { - bind_address = ":9999" - local_service_address = "127.0.0.1:5000" - local_connect_timeout_ms = 1000 - handshake_timeout_ms = 5000 -} - -upstreams = [ - { - local_bind_address = "127.0.0.1:6000" - destination_name = "db" - destination_namespace = "default" - destination_type = "service" - connect_timeout_ms = 10000 - }, - { - local_bind_address = "127.0.0.1:6001" - destination_name = "geo-cache" - destination_namespace = "default" - destination_type = "prepared_query" - connect_timeout_ms = 10000 - } -] diff --git a/proxy/testing.go b/proxy/testing.go deleted file mode 100644 index bd132b77f..000000000 --- a/proxy/testing.go +++ /dev/null @@ -1,170 +0,0 @@ -package proxy - -import ( - "context" - "crypto/tls" - "fmt" - "io" - "log" - "net" - "sync/atomic" - - "github.com/hashicorp/consul/lib/freeport" - "github.com/mitchellh/go-testing-interface" - "github.com/stretchr/testify/require" -) - -// TestLocalBindAddrs returns n localhost address:port strings with free ports -// for binding test listeners to. -func TestLocalBindAddrs(t testing.T, n int) []string { - ports := freeport.GetT(t, n) - addrs := make([]string, n) - for i, p := range ports { - addrs[i] = fmt.Sprintf("localhost:%d", p) - } - return addrs -} - -// TestTCPServer is a simple TCP echo server for use during tests. -type TestTCPServer struct { - l net.Listener - stopped int32 - accepted, closed, active int32 -} - -// NewTestTCPServer opens as a listening socket on the given address and returns -// a TestTCPServer serving requests to it. The server is already started and can -// be stopped by calling Close(). -func NewTestTCPServer(t testing.T, addr string) (*TestTCPServer, error) { - l, err := net.Listen("tcp", addr) - if err != nil { - return nil, err - } - log.Printf("test tcp server listening on %s", addr) - s := &TestTCPServer{ - l: l, - } - go s.accept() - return s, nil -} - -// Close stops the server -func (s *TestTCPServer) Close() { - atomic.StoreInt32(&s.stopped, 1) - if s.l != nil { - s.l.Close() - } -} - -func (s *TestTCPServer) accept() error { - for { - conn, err := s.l.Accept() - if err != nil { - if atomic.LoadInt32(&s.stopped) == 1 { - log.Printf("test tcp echo server %s stopped", s.l.Addr()) - return nil - } - log.Printf("test tcp echo server %s failed: %s", s.l.Addr(), err) - return err - } - - atomic.AddInt32(&s.accepted, 1) - atomic.AddInt32(&s.active, 1) - - go func(c net.Conn) { - io.Copy(c, c) - atomic.AddInt32(&s.closed, 1) - atomic.AddInt32(&s.active, -1) - }(conn) - } -} - -// TestEchoConn attempts to write some bytes to conn and expects to read them -// back within a short timeout (10ms). If prefix is not empty we expect it to be -// poresent at the start of all echoed responses (for example to distinguish -// between multiple echo server instances). -func TestEchoConn(t testing.T, conn net.Conn, prefix string) { - t.Helper() - - // Write some bytes and read them back - n, err := conn.Write([]byte("Hello World")) - require.Equal(t, 11, n) - require.Nil(t, err) - - expectLen := 11 + len(prefix) - - buf := make([]byte, expectLen) - // read until our buffer is full - it might be separate packets if prefix is - // in use. - got := 0 - for got < expectLen { - n, err = conn.Read(buf[got:]) - require.Nil(t, err) - got += n - } - require.Equal(t, expectLen, got) - require.Equal(t, prefix+"Hello World", string(buf[:])) -} - -// TestConnectClient is a testing mock that implements connect.Client but -// stubs the methods to make testing simpler. -type TestConnectClient struct { - Server *TestTCPServer - TLSConfig *tls.Config - Calls []callTuple -} -type callTuple struct { - typ, ns, name string -} - -// ServerTLSConfig implements connect.Client -func (c *TestConnectClient) ServerTLSConfig() (*tls.Config, error) { - return c.TLSConfig, nil -} - -// DialService implements connect.Client -func (c *TestConnectClient) DialService(ctx context.Context, namespace, - name string) (net.Conn, error) { - - c.Calls = append(c.Calls, callTuple{"service", namespace, name}) - - // Actually returning a vanilla TCP conn not a TLS one but the caller - // shouldn't care for tests since this interface should hide all the TLS - // config and verification. - return net.Dial("tcp", c.Server.l.Addr().String()) -} - -// DialPreparedQuery implements connect.Client -func (c *TestConnectClient) DialPreparedQuery(ctx context.Context, namespace, - name string) (net.Conn, error) { - - c.Calls = append(c.Calls, callTuple{"prepared_query", namespace, name}) - - // Actually returning a vanilla TCP conn not a TLS one but the caller - // shouldn't care for tests since this interface should hide all the TLS - // config and verification. - return net.Dial("tcp", c.Server.l.Addr().String()) -} - -// TestProxier is a simple Proxier instance that can be used in tests. -type TestProxier struct { - // Addr to listen on - Addr string - // Prefix to write first before echoing on new connections - Prefix string -} - -// Listener implements Proxier -func (p *TestProxier) Listener() (net.Listener, error) { - return net.Listen("tcp", p.Addr) -} - -// HandleConn implements Proxier -func (p *TestProxier) HandleConn(conn net.Conn) error { - _, err := conn.Write([]byte(p.Prefix)) - if err != nil { - return err - } - _, err = io.Copy(conn, conn) - return err -} diff --git a/proxy/upstream.go b/proxy/upstream.go deleted file mode 100644 index 1101624be..000000000 --- a/proxy/upstream.go +++ /dev/null @@ -1,261 +0,0 @@ -package proxy - -import ( - "context" - "fmt" - "log" - "net" - "os" - "time" - - "github.com/hashicorp/consul/connect" -) - -// Upstream provides an implementation of Proxier that listens for inbound TCP -// connections on the private network shared with the proxied application -// (typically localhost). For each accepted connection from the app, it uses the -// connect.Client to discover an instance and connect over mTLS. -type Upstream struct { - cfg *UpstreamConfig -} - -// UpstreamConfig configures the upstream -type UpstreamConfig struct { - // Client is the connect client to perform discovery with - Client connect.Client - - // LocalAddress is the host:port to listen on for local app connections. - LocalBindAddress string `json:"local_bind_address" hcl:"local_bind_address,attr"` - - // DestinationName is the service name of the destination. - DestinationName string `json:"destination_name" hcl:"destination_name,attr"` - - // DestinationNamespace is the namespace of the destination. - DestinationNamespace string `json:"destination_namespace" hcl:"destination_namespace,attr"` - - // DestinationType determines which service discovery method is used to find a - // candidate instance to connect to. - DestinationType string `json:"destination_type" hcl:"destination_type,attr"` - - // ConnectTimeout is the timeout for establishing connections with the remote - // service instance. Defaults to 10,000 (10s). - ConnectTimeoutMs int `json:"connect_timeout_ms" hcl:"connect_timeout_ms,attr"` - - logger *log.Logger -} - -func (uc *UpstreamConfig) applyDefaults() { - if uc.ConnectTimeoutMs == 0 { - uc.ConnectTimeoutMs = 10000 - } - if uc.logger == nil { - uc.logger = log.New(os.Stdout, "", log.LstdFlags) - } -} - -// String returns a string that uniquely identifies the Upstream. Used for -// identifying the upstream in log output and map keys. -func (uc *UpstreamConfig) String() string { - return fmt.Sprintf("%s->%s:%s/%s", uc.LocalBindAddress, uc.DestinationType, - uc.DestinationNamespace, uc.DestinationName) -} - -// NewUpstream returns an outgoing proxy instance with the given config. -func NewUpstream(cfg UpstreamConfig) *Upstream { - u := &Upstream{ - cfg: &cfg, - } - u.cfg.applyDefaults() - return u -} - -// String returns a string that uniquely identifies the Upstream. Used for -// identifying the upstream in log output and map keys. -func (u *Upstream) String() string { - return u.cfg.String() -} - -// Listener implements Proxier -func (u *Upstream) Listener() (net.Listener, error) { - return net.Listen("tcp", u.cfg.LocalBindAddress) -} - -// HandleConn implements Proxier -func (u *Upstream) HandleConn(conn net.Conn) error { - defer conn.Close() - - // Discover destination instance - dst, err := u.discoverAndDial() - if err != nil { - return err - } - - // Hand conn and dst over to Conn to manage the byte copying. - c := NewConn(conn, dst) - return c.CopyBytes() -} - -func (u *Upstream) discoverAndDial() (net.Conn, error) { - to := time.Duration(u.cfg.ConnectTimeoutMs) * time.Millisecond - ctx, cancel := context.WithTimeout(context.Background(), to) - defer cancel() - - switch u.cfg.DestinationType { - case "service": - return u.cfg.Client.DialService(ctx, u.cfg.DestinationNamespace, - u.cfg.DestinationName) - - case "prepared_query": - return u.cfg.Client.DialPreparedQuery(ctx, u.cfg.DestinationNamespace, - u.cfg.DestinationName) - - default: - return nil, fmt.Errorf("invalid destination type %s", u.cfg.DestinationType) - } -} - -/* -// Upstream represents a service that the proxied application needs to connect -// out to. It provides a dedication local TCP listener (usually listening only -// on loopback) and forwards incoming connections to the proxy to handle. -type Upstream struct { - cfg *UpstreamConfig - wg sync.WaitGroup - - proxy *Proxy - fatalErr error -} - -// NewUpstream creates an upstream ready to attach to a proxy instance with -// Proxy.AddUpstream. An Upstream can only be attached to single Proxy instance -// at once. -func NewUpstream(p *Proxy, cfg *UpstreamConfig) *Upstream { - return &Upstream{ - cfg: cfg, - proxy: p, - shutdown: make(chan struct{}), - } -} - -// UpstreamConfig configures the upstream -type UpstreamConfig struct { - // LocalAddress is the host:port to listen on for local app connections. - LocalAddress string - - // DestinationName is the service name of the destination. - DestinationName string - - // DestinationNamespace is the namespace of the destination. - DestinationNamespace string - - // DestinationType determines which service discovery method is used to find a - // candidate instance to connect to. - DestinationType string -} - -// String returns a string representation for the upstream for debugging or -// use as a unique key. -func (uc *UpstreamConfig) String() string { - return fmt.Sprintf("%s->%s:%s/%s", uc.LocalAddress, uc.DestinationType, - uc.DestinationNamespace, uc.DestinationName) -} - -func (u *Upstream) listen() error { - l, err := net.Listen("tcp", u.cfg.LocalAddress) - if err != nil { - u.fatal(err) - return - } - - for { - conn, err := l.Accept() - if err != nil { - return err - } - - go u.discoverAndConnect(conn) - } -} - -func (u *Upstream) discoverAndConnect(src net.Conn) { - // First, we need an upstream instance from Consul to connect to - dstAddrs, err := u.discoverInstances() - if err != nil { - u.fatal(fmt.Errorf("failed to discover upstream instances: %s", err)) - return - } - - if len(dstAddrs) < 1 { - log.Printf("[INFO] no instances found for %s", len(dstAddrs), u) - } - - // Attempt connection to first one that works - // TODO: configurable number/deadline? - for idx, addr := range dstAddrs { - err := u.proxy.startProxyingConn(src, addr, false) - if err != nil { - log.Printf("[INFO] failed to connect to %s: %s (%d of %d)", addr, err, - idx+1, len(dstAddrs)) - continue - } - return - } - - log.Printf("[INFO] failed to connect to all %d instances for %s", - len(dstAddrs), u) -} - -func (u *Upstream) discoverInstances() ([]string, error) { - switch u.cfg.DestinationType { - case "service": - svcs, _, err := u.cfg.Consul.Health().Service(u.cfg.DestinationName, "", - true, nil) - if err != nil { - return nil, err - } - - addrs := make([]string, len(svcs)) - - // Shuffle order as we go since health endpoint doesn't - perm := rand.Perm(len(addrs)) - for i, se := range svcs { - // Pick location in output array based on next permutation position - j := perm[i] - addrs[j] = fmt.Sprintf("%s:%d", se.Service.Address, se.Service.Port) - } - - return addrs, nil - - case "prepared_query": - pqr, _, err := u.cfg.Consul.PreparedQuery().Execute(u.cfg.DestinationName, - nil) - if err != nil { - return nil, err - } - - addrs := make([]string, 0, len(svcs)) - for _, se := range pqr.Nodes { - addrs = append(addrs, fmt.Sprintf("%s:%d", se.Service.Address, - se.Service.Port)) - } - - // PreparedQuery execution already shuffles the result - return addrs, nil - - default: - u.fatal(fmt.Errorf("invalid destination type %s", u.cfg.DestinationType)) - } -} - -func (u *Upstream) fatal(err Error) { - log.Printf("[ERROR] upstream %s stopping on error: %s", u.cfg.LocalAddress, - err) - u.fatalErr = err -} - -// String returns a string representation for the upstream for debugging or -// use as a unique key. -func (u *Upstream) String() string { - return u.cfg.String() -} -*/ diff --git a/proxy/upstream_test.go b/proxy/upstream_test.go deleted file mode 100644 index 79bca0136..000000000 --- a/proxy/upstream_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package proxy - -import ( - "net" - "testing" - - "github.com/hashicorp/consul/connect" - "github.com/stretchr/testify/require" -) - -func TestUpstream(t *testing.T) { - tests := []struct { - name string - cfg UpstreamConfig - }{ - { - name: "service", - cfg: UpstreamConfig{ - DestinationType: "service", - DestinationNamespace: "default", - DestinationName: "db", - ConnectTimeoutMs: 100, - }, - }, - { - name: "prepared_query", - cfg: UpstreamConfig{ - DestinationType: "prepared_query", - DestinationNamespace: "default", - DestinationName: "geo-db", - ConnectTimeoutMs: 100, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - addrs := TestLocalBindAddrs(t, 2) - - testApp, err := NewTestTCPServer(t, addrs[0]) - require.Nil(t, err) - defer testApp.Close() - - // Create mock client that will "discover" our test tcp server as a target and - // skip TLS altogether. - client := &TestConnectClient{ - Server: testApp, - TLSConfig: connect.TestTLSConfig(t, "ca1", "web"), - } - - // Override cfg params - tt.cfg.LocalBindAddress = addrs[1] - tt.cfg.Client = client - - u := NewUpstream(tt.cfg) - - // Run proxy - r := NewRunner("test", u) - go r.Listen() - defer r.Stop() - - // Proxy and fake remote service are running, play the part of the app - // connecting to a remote connect service over TCP. - conn, err := net.Dial("tcp", tt.cfg.LocalBindAddress) - require.Nil(t, err) - TestEchoConn(t, conn, "") - - // Validate that discovery actually was called as we expected - require.Len(t, client.Calls, 1) - require.Equal(t, tt.cfg.DestinationType, client.Calls[0].typ) - require.Equal(t, tt.cfg.DestinationNamespace, client.Calls[0].ns) - require.Equal(t, tt.cfg.DestinationName, client.Calls[0].name) - }) - } -}