From 3f7c072b75e7d979b306ef996658dce21db84360 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Fri, 5 Jun 2020 14:54:29 -0700 Subject: [PATCH] Add connect expose CLI command --- agent/config_endpoint.go | 4 +- command/commands_oss.go | 4 +- command/connect/expose/expose.go | 236 +++++++++++++++++++++ command/connect/expose/expose_test.go | 289 ++++++++++++++++++++++++++ command/intention/create/create.go | 8 +- 5 files changed, 535 insertions(+), 6 deletions(-) create mode 100644 command/connect/expose/expose.go create mode 100644 command/connect/expose/expose_test.go diff --git a/agent/config_endpoint.go b/agent/config_endpoint.go index be8c11e33..3b1967eba 100644 --- a/agent/config_endpoint.go +++ b/agent/config_endpoint.go @@ -9,6 +9,8 @@ import ( "github.com/hashicorp/consul/agent/structs" ) +const ConfigEntryNotFoundErr string = "Config entry not found" + // Config switches on the different CRUD operations for config entries. func (s *HTTPServer) Config(resp http.ResponseWriter, req *http.Request) (interface{}, error) { switch req.Method { @@ -48,7 +50,7 @@ func (s *HTTPServer) configGet(resp http.ResponseWriter, req *http.Request) (int setMeta(resp, &reply.QueryMeta) if reply.Entry == nil { - return nil, NotFoundError{Reason: fmt.Sprintf("Config entry not found for %q / %q", pathArgs[0], pathArgs[1])} + return nil, NotFoundError{Reason: fmt.Sprintf("%s for %q / %q", ConfigEntryNotFoundErr, pathArgs[0], pathArgs[1])} } return reply.Entry, nil diff --git a/command/commands_oss.go b/command/commands_oss.go index cf419cad4..e7237a188 100644 --- a/command/commands_oss.go +++ b/command/commands_oss.go @@ -51,7 +51,8 @@ import ( caget "github.com/hashicorp/consul/command/connect/ca/get" caset "github.com/hashicorp/consul/command/connect/ca/set" "github.com/hashicorp/consul/command/connect/envoy" - "github.com/hashicorp/consul/command/connect/envoy/pipe-bootstrap" + pipebootstrap "github.com/hashicorp/consul/command/connect/envoy/pipe-bootstrap" + "github.com/hashicorp/consul/command/connect/expose" "github.com/hashicorp/consul/command/connect/proxy" "github.com/hashicorp/consul/command/debug" "github.com/hashicorp/consul/command/event" @@ -169,6 +170,7 @@ func init() { Register("connect proxy", func(ui cli.Ui) (cli.Command, error) { return proxy.New(ui, MakeShutdownCh()), nil }) Register("connect envoy", func(ui cli.Ui) (cli.Command, error) { return envoy.New(ui), nil }) Register("connect envoy pipe-bootstrap", func(ui cli.Ui) (cli.Command, error) { return pipebootstrap.New(ui), nil }) + Register("connect expose", func(ui cli.Ui) (cli.Command, error) { return expose.New(ui), nil }) Register("debug", func(ui cli.Ui) (cli.Command, error) { return debug.New(ui, MakeShutdownCh()), nil }) Register("event", func(ui cli.Ui) (cli.Command, error) { return event.New(ui), nil }) Register("exec", func(ui cli.Ui) (cli.Command, error) { return exec.New(ui, MakeShutdownCh()), nil }) diff --git a/command/connect/expose/expose.go b/command/connect/expose/expose.go new file mode 100644 index 000000000..3f0c81c08 --- /dev/null +++ b/command/connect/expose/expose.go @@ -0,0 +1,236 @@ +package expose + +import ( + "flag" + "fmt" + "strconv" + "strings" + + "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/command/flags" + "github.com/hashicorp/consul/command/intention/create" + "github.com/hashicorp/consul/command/intention/finder" + "github.com/mitchellh/cli" +) + +func New(ui cli.Ui) *cmd { + c := &cmd{UI: ui} + c.init() + return c +} + +type cmd struct { + UI cli.Ui + flags *flag.FlagSet + http *flags.HTTPFlags + help string + + // flags + ingressGateway string + service string + portRaw string + port int + protocol string +} + +func (c *cmd) init() { + c.flags = flag.NewFlagSet("", flag.ContinueOnError) + c.flags.StringVar(&c.ingressGateway, "ingress-gateway", "", + "The name of the ingress gateway service to use. Required.") + + c.flags.StringVar(&c.service, "service", "", + "The name of destination service to expose. Required.") + + c.flags.StringVar(&c.portRaw, "port", "", + "The listener port to use for the service on the Ingress gateway. Required.") + + c.flags.StringVar(&c.protocol, "protocol", "tcp", + "The protocol for the service. Defaults to 'tcp'. Optional.") + + c.http = &flags.HTTPFlags{} + flags.Merge(c.flags, c.http.ClientFlags()) + flags.Merge(c.flags, c.http.ServerFlags()) + c.help = flags.Usage(help, c.flags) +} + +func (c *cmd) Run(args []string) int { + if err := c.flags.Parse(args); err != nil { + if err == flag.ErrHelp { + return 0 + } + c.UI.Error(fmt.Sprintf("Failed to parse args: %v", err)) + return 1 + } + + // Set up a client. + client, err := c.http.APIClient() + if err != nil { + c.UI.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + // Check for any missing or invalid flag values. + if c.service == "" { + c.UI.Error("A service name must be given via the -service flag.") + return 1 + } + svc, svcNamespace, err := create.ParseIntentionTarget(c.service) + if err != nil { + c.UI.Error(fmt.Sprintf("Invalid service name: %s", err)) + return 1 + } + + if c.ingressGateway == "" { + c.UI.Error("An ingress gateway service must be given via the -ingress-gateway flag.") + return 1 + } + gateway, gatewayNamespace, err := create.ParseIntentionTarget(c.ingressGateway) + if err != nil { + c.UI.Error(fmt.Sprintf("Invalid ingress gateway name: %s", err)) + return 1 + } + + if c.portRaw == "" { + c.UI.Error("A port must be provided via the -port flag.") + return 1 + } else { + c.port, err = strconv.Atoi(c.portRaw) + if err != nil { + c.UI.Error(fmt.Sprintf("Error parsing port: %s", err)) + return 1 + } + } + + // First get the config entry for the ingress gateway, if it exists. Don't error if it's a 404 as that + // just means we'll need to create a new config entry. + conf, _, err := client.ConfigEntries().Get(api.IngressGateway, gateway, nil) + if err != nil && !strings.Contains(err.Error(), agent.ConfigEntryNotFoundErr) { + c.UI.Error(fmt.Sprintf("Error fetching existing ingress gateway configuration: %s", err)) + return 1 + } + if conf == nil { + conf = &api.IngressGatewayConfigEntry{ + Kind: api.IngressGateway, + Name: gateway, + Namespace: gatewayNamespace, + } + } + + // Make sure the flags don't conflict with existing config. + ingressConf, ok := conf.(*api.IngressGatewayConfigEntry) + if !ok { + // This should never happen + c.UI.Error(fmt.Sprintf("Config entry is an invalid type: %T", conf)) + return 1 + } + + listenerIdx := -1 + for i, listener := range ingressConf.Listeners { + // Make sure the service isn't already exposed in this gateway + for _, service := range listener.Services { + if service.Name == svc { + c.UI.Error(fmt.Sprintf("Service %q already exposed through listener with port %d", svc, listener.Port)) + goto CREATE_INTENTION + } + } + + // If there's already a listener for the given port, make sure the protocol matches. + if listener.Port == c.port { + listenerIdx = i + if listener.Protocol != c.protocol { + c.UI.Error(fmt.Sprintf("Listener on port %d already configured with conflicting protocol %q", listener.Port, listener.Protocol)) + return 1 + } + } + } + + // Add a service to the existing listener for the port if one exists, or make a new listener. + if listenerIdx >= 0 { + ingressConf.Listeners[listenerIdx].Services = append(ingressConf.Listeners[listenerIdx].Services, api.IngressService{ + Name: svc, + Namespace: svcNamespace, + }) + } else { + ingressConf.Listeners = append(ingressConf.Listeners, api.IngressListener{ + Port: c.port, + Protocol: c.protocol, + Services: []api.IngressService{ + { + Name: svc, + Namespace: svcNamespace, + }, + }, + }) + } + + // Write the updated config entry using a check-and-set, so it fails if the entry + // has been changed since we looked it up. + { + succeeded, _, err := client.ConfigEntries().CAS(ingressConf, ingressConf.GetModifyIndex(), nil) + if err != nil { + c.UI.Error(fmt.Sprintf("Error writing ingress config entry: %v", err)) + return 1 + } + if !succeeded { + c.UI.Error("Ingress config entry was changed while attempting to update, please try again.") + return 1 + } + c.UI.Output(fmt.Sprintf("Successfully updated config entry for ingress service %q", gateway)) + } + +CREATE_INTENTION: + // Check for an existing intention. + ixnFinder := finder.Finder{Client: client} + existing, err := ixnFinder.Find(c.ingressGateway, c.service) + if err != nil { + c.UI.Error(fmt.Sprintf("Error looking up existing intention: %s", err)) + return 1 + } + if existing != nil && existing.Action == api.IntentionActionAllow { + c.UI.Error(fmt.Sprintf("Intention already exists for %q -> %q", c.ingressGateway, c.service)) + return 0 + } + + // Add the intention between the gateway service and the destination. + ixn := &api.Intention{ + SourceName: gateway, + SourceNS: gatewayNamespace, + DestinationName: svc, + DestinationNS: svcNamespace, + SourceType: api.IntentionSourceConsul, + Action: api.IntentionActionAllow, + } + if existing == nil { + _, _, err = client.Connect().IntentionCreate(ixn, nil) + if err != nil { + c.UI.Error(fmt.Sprintf("Error creating intention: %s", err)) + return 1 + } + } else { + _, err = client.Connect().IntentionUpdate(ixn, nil) + if err != nil { + c.UI.Error(fmt.Sprintf("Error updating intention: %s", err)) + return 1 + } + } + + c.UI.Output(fmt.Sprintf("Successfully set up intention for %q -> %q", c.ingressGateway, c.service)) + return 0 +} + +func (c *cmd) Synopsis() string { + return synopsis +} + +func (c *cmd) Help() string { + return c.help +} + +const synopsis = "Expose a Connect-enabled service through an Ingress gateway" +const help = ` +Usage: consul connect expose [options] + + Exposes a Connect-enabled service through the given ingress gateway, using the + given protocol and port. +` diff --git a/command/connect/expose/expose_test.go b/command/connect/expose/expose_test.go new file mode 100644 index 000000000..18b650138 --- /dev/null +++ b/command/connect/expose/expose_test.go @@ -0,0 +1,289 @@ +package expose + +import ( + "testing" + + "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testrpc" + "github.com/mitchellh/cli" + "github.com/stretchr/testify/require" +) + +func TestConnectExpose(t *testing.T) { + t.Parallel() + require := require.New(t) + a := agent.NewTestAgent(t, ``) + client := a.Client() + defer a.Shutdown() + + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + { + ui := cli.NewMockUi() + c := New(ui) + args := []string{ + "-http-addr=" + a.HTTPAddr(), + "-service=foo", + "-ingress-gateway=ingress", + "-port=8888", + "-protocol=tcp", + } + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + } + + // Make sure the config entry and intention have been created. + entry, _, err := client.ConfigEntries().Get(api.IngressGateway, "ingress", nil) + require.NoError(err) + expected := &api.IngressGatewayConfigEntry{ + Kind: api.IngressGateway, + Name: "ingress", + Listeners: []api.IngressListener{ + { + Port: 8888, + Protocol: "tcp", + Services: []api.IngressService{ + { + Name: "foo", + }, + }, + }, + }, + } + expected.CreateIndex = entry.GetCreateIndex() + expected.ModifyIndex = entry.GetModifyIndex() + require.Equal(expected, entry) + + ixns, _, err := client.Connect().Intentions(nil) + require.NoError(err) + require.Len(ixns, 1) + require.Equal("ingress", ixns[0].SourceName) + require.Equal("foo", ixns[0].DestinationName) + + // Run the command again with a different port, make sure the config entry + // and intentions aren't modified. + { + ui := cli.NewMockUi() + c := New(ui) + args := []string{ + "-http-addr=" + a.HTTPAddr(), + "-service=foo", + "-ingress-gateway=ingress", + "-port=7777", + "-protocol=tcp", + } + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + // Make sure the config entry/intention weren't affected. + entry, _, err = client.ConfigEntries().Get(api.IngressGateway, "ingress", nil) + require.NoError(err) + require.Equal(expected, entry) + + ixns, _, err = client.Connect().Intentions(nil) + require.NoError(err) + require.Len(ixns, 1) + require.Equal("ingress", ixns[0].SourceName) + require.Equal("foo", ixns[0].DestinationName) + } + + // Run the command again with a conflicting protocol, should exit with an error and + // cause no changes to config entry/intentions. + { + ui := cli.NewMockUi() + c := New(ui) + args := []string{ + "-http-addr=" + a.HTTPAddr(), + "-service=bar", + "-ingress-gateway=ingress", + "-port=8888", + "-protocol=http", + } + + code := c.Run(args) + if code != 1 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + require.Contains(ui.ErrorWriter.String(), `conflicting protocol "tcp"`) + + // Make sure the config entry/intention weren't affected. + entry, _, err = client.ConfigEntries().Get(api.IngressGateway, "ingress", nil) + require.NoError(err) + require.Equal(expected, entry) + + ixns, _, err = client.Connect().Intentions(nil) + require.NoError(err) + require.Len(ixns, 1) + require.Equal("ingress", ixns[0].SourceName) + require.Equal("foo", ixns[0].DestinationName) + } +} + +func TestConnectExpose_invalidFlags(t *testing.T) { + t.Parallel() + require := require.New(t) + a := agent.NewTestAgent(t, ``) + defer a.Shutdown() + + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + t.Run("missing service", func(t *testing.T) { + ui := cli.NewMockUi() + c := New(ui) + args := []string{ + "-http-addr=" + a.HTTPAddr(), + } + + code := c.Run(args) + if code != 1 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + require.Contains(ui.ErrorWriter.String(), "A service name must be given") + }) + t.Run("missing gateway", func(t *testing.T) { + ui := cli.NewMockUi() + c := New(ui) + args := []string{ + "-http-addr=" + a.HTTPAddr(), + "-service=foo", + } + + code := c.Run(args) + if code != 1 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + require.Contains(ui.ErrorWriter.String(), "An ingress gateway service must be given") + }) + t.Run("missing port", func(t *testing.T) { + ui := cli.NewMockUi() + c := New(ui) + args := []string{ + "-http-addr=" + a.HTTPAddr(), + "-service=foo", + "-ingress-gateway=ingress", + } + + code := c.Run(args) + if code != 1 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + require.Contains(ui.ErrorWriter.String(), "A port must be provided") + }) +} + +func TestConnectExpose_existingConfig(t *testing.T) { + t.Parallel() + require := require.New(t) + a := agent.NewTestAgent(t, ``) + client := a.Client() + defer a.Shutdown() + + // Create some service config entries to set their protocol. + for _, service := range []string{"bar", "zoo"} { + _, _, err := client.ConfigEntries().Set(&api.ServiceConfigEntry{ + Kind: "service-defaults", + Name: service, + Protocol: "http", + }, nil) + require.NoError(err) + } + + // Create an existing ingress config entry with some services. + ingressConf := &api.IngressGatewayConfigEntry{ + Kind: api.IngressGateway, + Name: "ingress", + Listeners: []api.IngressListener{ + { + Port: 8888, + Protocol: "tcp", + Services: []api.IngressService{ + { + Name: "foo", + }, + }, + }, + { + Port: 9999, + Protocol: "http", + Services: []api.IngressService{ + { + Name: "bar", + }, + }, + }, + }, + } + _, _, err := client.ConfigEntries().Set(ingressConf, nil) + require.NoError(err) + + // Add a service on a new port. + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + { + ui := cli.NewMockUi() + c := New(ui) + args := []string{ + "-http-addr=" + a.HTTPAddr(), + "-service=baz", + "-ingress-gateway=ingress", + "-port=10000", + "-protocol=tcp", + } + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + // Make sure the ingress config was updated and existing services preserved. + entry, _, err := client.ConfigEntries().Get(api.IngressGateway, "ingress", nil) + require.NoError(err) + + ingressConf.Listeners = append(ingressConf.Listeners, api.IngressListener{ + Port: 10000, + Protocol: "tcp", + Services: []api.IngressService{ + { + Name: "baz", + }, + }, + }) + ingressConf.CreateIndex = entry.GetCreateIndex() + ingressConf.ModifyIndex = entry.GetModifyIndex() + require.Equal(ingressConf, entry) + } + + // Add an service on a port shared with an existing listener. + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + { + ui := cli.NewMockUi() + c := New(ui) + args := []string{ + "-http-addr=" + a.HTTPAddr(), + "-service=zoo", + "-ingress-gateway=ingress", + "-port=9999", + "-protocol=http", + } + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + // Make sure the ingress config was updated and existing services preserved. + entry, _, err := client.ConfigEntries().Get(api.IngressGateway, "ingress", nil) + require.NoError(err) + + ingressConf.Listeners[1].Services = append(ingressConf.Listeners[1].Services, api.IngressService{ + Name: "zoo", + }) + ingressConf.CreateIndex = entry.GetCreateIndex() + ingressConf.ModifyIndex = entry.GetModifyIndex() + require.Equal(ingressConf, entry) + } +} diff --git a/command/intention/create/create.go b/command/intention/create/create.go index 04ee8153e..2f3b0b15a 100644 --- a/command/intention/create/create.go +++ b/command/intention/create/create.go @@ -136,10 +136,10 @@ func (c *cmd) Run(args []string) int { return 0 } -// parseIntentionTarget parses a target of the form / and returns +// ParseIntentionTarget parses a target of the form / and returns // the two distinct parts. In some cases the namespace may be elided and this function // will return the empty string for the namespace then. -func parseIntentionTarget(input string) (name string, namespace string, err error) { +func ParseIntentionTarget(input string) (name string, namespace string, err error) { // Get the index to the '/'. If it doesn't exist, we have just a name // so just set that and return. idx := strings.IndexByte(input, '/') @@ -171,12 +171,12 @@ func (c *cmd) ixnsFromArgs(args []string) ([]*api.Intention, error) { return nil, fmt.Errorf("Must specify two arguments: source and destination") } - srcName, srcNamespace, err := parseIntentionTarget(args[0]) + srcName, srcNamespace, err := ParseIntentionTarget(args[0]) if err != nil { return nil, fmt.Errorf("Invalid intention source: %v", err) } - dstName, dstNamespace, err := parseIntentionTarget(args[1]) + dstName, dstNamespace, err := ParseIntentionTarget(args[1]) if err != nil { return nil, fmt.Errorf("Invalid intention destination: %v", err) }