From ebbb234ecb7ed21e917269c23eb17b2d343f3f17 Mon Sep 17 00:00:00 2001 From: Freddy Date: Mon, 11 May 2020 11:35:17 -0600 Subject: [PATCH] Gateway Services Nodes UI Endpoint (#7685) The endpoint supports queries for both Ingress Gateways and Terminating Gateways. Used to display a gateway's linked services in the UI. --- .../cache-types/catalog_service_list_test.go | 4 +- agent/consul/acl.go | 47 ++ agent/consul/internal_endpoint.go | 78 ++ agent/consul/internal_endpoint_test.go | 669 +++++++++++++++++- agent/consul/state/catalog.go | 2 +- agent/consul/state/catalog_test.go | 9 +- agent/http_register.go | 1 + agent/structs/structs.go | 34 +- agent/ui_endpoint.go | 60 +- agent/ui_endpoint_test.go | 238 +++++++ 10 files changed, 1125 insertions(+), 17 deletions(-) diff --git a/agent/cache-types/catalog_service_list_test.go b/agent/cache-types/catalog_service_list_test.go index f7ac28e73..0bd0b602c 100644 --- a/agent/cache-types/catalog_service_list_test.go +++ b/agent/cache-types/catalog_service_list_test.go @@ -26,10 +26,10 @@ func TestCatalogServiceList(t *testing.T) { reply := args.Get(2).(*structs.IndexedServiceList) reply.Services = structs.ServiceList{ - structs.ServiceInfo{ + structs.ServiceName{ Name: "foo", }, - structs.ServiceInfo{ + structs.ServiceName{ Name: "bar", }, } diff --git a/agent/consul/acl.go b/agent/consul/acl.go index e1e45056a..cccb54a02 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -1171,6 +1171,23 @@ func (f *aclFilter) allowNode(node string, ent *acl.AuthorizerContext) bool { return f.authorizer.NodeRead(node, ent) == acl.Allow } +// allowNode is used to determine if the gateway and service are accessible for an ACL +func (f *aclFilter) allowGateway(gs *structs.GatewayService) bool { + var authzContext acl.AuthorizerContext + + // Need read on service and gateway. Gateway may have different EnterpriseMeta so we fill authzContext twice + gs.Gateway.FillAuthzContext(&authzContext) + if !f.allowService(gs.Gateway.ID, &authzContext) { + return false + } + + gs.Service.FillAuthzContext(&authzContext) + if !f.allowService(gs.Service.ID, &authzContext) { + return false + } + return true +} + // allowService is used to determine if a service is accessible for an ACL. func (f *aclFilter) allowService(service string, ent *acl.AuthorizerContext) bool { if service == "" { @@ -1438,6 +1455,33 @@ func (f *aclFilter) filterNodeDump(dump *structs.NodeDump) { *dump = nd } +// filterServiceDump is used to filter nodes based on ACL rules. +func (f *aclFilter) filterServiceDump(services *structs.ServiceDump) { + svcs := *services + var authzContext acl.AuthorizerContext + + for i := 0; i < len(svcs); i++ { + service := svcs[i] + + if f.allowGateway(service.GatewayService) { + // ServiceDump might only have gateway config and no node information + if service.Node == nil { + continue + } + + service.Service.FillAuthzContext(&authzContext) + if f.allowNode(service.Node.Node, &authzContext) { + continue + } + } + + f.logger.Debug("dropping service from result due to ACLs", "service", service.GatewayService.Service) + svcs = append(svcs[:i], svcs[i+1:]...) + i-- + } + *services = svcs +} + // filterNodes is used to filter through all parts of a node list and remove // elements the provided ACL token cannot access. func (f *aclFilter) filterNodes(nodes *structs.Nodes) { @@ -1749,6 +1793,9 @@ func (r *ACLResolver) filterACLWithAuthorizer(authorizer acl.Authorizer, subj in case *structs.IndexedNodeDump: filt.filterNodeDump(&v.Dump) + case *structs.IndexedServiceDump: + filt.filterServiceDump(&v.Dump) + case *structs.IndexedNodes: filt.filterNodes(&v.Nodes) diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 6f052e409..31a084c79 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -127,6 +127,84 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs. }) } +// GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config +func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error { + if done, err := m.srv.forward("Internal.GatewayServiceDump", args, args, reply); done { + return err + } + + // Verify the arguments + if args.ServiceName == "" { + return fmt.Errorf("Must provide gateway name") + } + + var authzContext acl.AuthorizerContext + authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext) + if err != nil { + return err + } + + if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { + return err + } + + // We need read access to the gateway we're trying to find services for, so check that first. + if authz != nil && authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow { + return acl.ErrPermissionDenied + } + + err = m.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + var maxIdx uint64 + idx, gatewayServices, err := state.GatewayServices(ws, args.ServiceName, &args.EnterpriseMeta) + if err != nil { + return err + } + if idx > maxIdx { + maxIdx = idx + } + + // Loop over the gateway <-> serviceName mappings and fetch all service instances for each + var result structs.ServiceDump + for _, gs := range gatewayServices { + idx, instances, err := state.CheckServiceNodes(ws, gs.Service.ID, &gs.Service.EnterpriseMeta) + if err != nil { + return err + } + if idx > maxIdx { + maxIdx = idx + } + for _, n := range instances { + svc := structs.ServiceInfo{ + Node: n.Node, + Service: n.Service, + Checks: n.Checks, + GatewayService: gs, + } + result = append(result, &svc) + } + + // Ensure we store the gateway <-> service mapping even if there are no instances of the service + if len(instances) == 0 { + svc := structs.ServiceInfo{ + GatewayService: gs, + } + result = append(result, &svc) + } + } + reply.Index, reply.Dump = maxIdx, result + + if err := m.srv.filterACL(args.Token, reply); err != nil { + return err + } + return nil + }) + + return err +} + // EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC // call to fire an event. The primary use case is to enable user events being // triggered in a remote DC. diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index f36ea42d1..1b5c03e2b 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -6,13 +6,13 @@ import ( "strings" "testing" - "github.com/hashicorp/consul/sdk/testutil/retry" - "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/consul/types" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1146,3 +1146,668 @@ service "gateway" { assert.Equal(r, expect, resp.Services) }) } + +func TestInternal_GatewayServiceDump_Terminating(t *testing.T) { + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + + // Register gateway and two service instances that will be associated with it + { + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "terminating-gateway", + Service: "terminating-gateway", + Kind: structs.ServiceKindTerminatingGateway, + Port: 443, + }, + Check: &structs.HealthCheck{ + Name: "terminating connect", + Status: api.HealthPassing, + ServiceID: "terminating-gateway", + }, + } + var out struct{} + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + }, + Check: &structs.HealthCheck{ + Name: "db-warning", + Status: api.HealthWarning, + ServiceID: "db", + }, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "baz", + Address: "127.0.0.3", + Service: &structs.NodeService{ + ID: "db2", + Service: "db", + }, + Check: &structs.HealthCheck{ + Name: "db2-passing", + Status: api.HealthPassing, + ServiceID: "db2", + }, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) + } + + // Register terminating-gateway config entry, linking it to db, api, and redis (dne) + { + args := &structs.TerminatingGatewayConfigEntry{ + Name: "terminating-gateway", + Kind: structs.TerminatingGateway, + Services: []structs.LinkedService{ + { + Name: "db", + }, + { + Name: "redis", + CAFile: "/etc/certs/ca.pem", + CertFile: "/etc/certs/cert.pem", + KeyFile: "/etc/certs/key.pem", + }, + }, + } + + req := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: args, + } + var configOutput bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &configOutput)) + require.True(t, configOutput) + } + + var out structs.IndexedServiceDump + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "terminating-gateway", + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out)) + + dump := out.Dump + + // Reset raft indices to facilitate assertion + for i := 0; i < len(dump); i++ { + svc := dump[i] + if svc.Node != nil { + svc.Node.RaftIndex = structs.RaftIndex{} + } + if svc.Service != nil { + svc.Service.RaftIndex = structs.RaftIndex{} + } + if len(svc.Checks) > 0 && svc.Checks[0] != nil { + svc.Checks[0].RaftIndex = structs.RaftIndex{} + } + if svc.GatewayService != nil { + svc.GatewayService.RaftIndex = structs.RaftIndex{} + } + } + + expect := structs.ServiceDump{ + { + Node: &structs.Node{ + Node: "baz", + Address: "127.0.0.3", + Datacenter: "dc1", + }, + Service: &structs.NodeService{ + ID: "db2", + Service: "db", + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + }, + Checks: structs.HealthChecks{ + { + Node: "baz", + CheckID: types.CheckID("db2-passing"), + Name: "db2-passing", + Status: "passing", + ServiceID: "db2", + ServiceName: "db", + }, + }, + GatewayService: &structs.GatewayService{ + Gateway: structs.ServiceID{ID: "terminating-gateway"}, + Service: structs.ServiceID{ID: "db"}, + GatewayKind: "terminating-gateway", + }, + }, + { + Node: &structs.Node{ + Node: "bar", + Address: "127.0.0.2", + Datacenter: "dc1", + }, + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + }, + Checks: structs.HealthChecks{ + { + Node: "bar", + CheckID: types.CheckID("db-warning"), + Name: "db-warning", + Status: "warning", + ServiceID: "db", + ServiceName: "db", + }, + }, + GatewayService: &structs.GatewayService{ + Gateway: structs.ServiceID{ID: "terminating-gateway"}, + Service: structs.ServiceID{ID: "db"}, + GatewayKind: "terminating-gateway", + }, + }, + { + // Only GatewayService should be returned when linked service isn't registered + GatewayService: &structs.GatewayService{ + Gateway: structs.ServiceID{ID: "terminating-gateway"}, + Service: structs.ServiceID{ID: "redis"}, + GatewayKind: "terminating-gateway", + CAFile: "/etc/certs/ca.pem", + CertFile: "/etc/certs/cert.pem", + KeyFile: "/etc/certs/key.pem", + }, + }, + } + assert.ElementsMatch(t, expect, dump) +} + +func TestInternal_GatewayServiceDump_Terminating_ACL(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = true + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForTestAgent(t, s1.RPC, "dc1", testrpc.WithToken("root")) + + // Create the ACL. + token, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", ` + service "db" { policy = "read" } + service "terminating-gateway" { policy = "read" } + node_prefix "" { policy = "read" }`) + require.NoError(t, err) + + // Register gateway and two service instances that will be associated with it + { + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "terminating-gateway", + Service: "terminating-gateway", + Kind: structs.ServiceKindTerminatingGateway, + Port: 443, + }, + Check: &structs.HealthCheck{ + Name: "terminating connect", + Status: api.HealthPassing, + ServiceID: "terminating-gateway", + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var out struct{} + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + }, + Check: &structs.HealthCheck{ + Name: "db-warning", + Status: api.HealthWarning, + ServiceID: "db", + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "baz", + Address: "127.0.0.3", + Service: &structs.NodeService{ + ID: "api", + Service: "api", + }, + Check: &structs.HealthCheck{ + Name: "api-passing", + Status: api.HealthPassing, + ServiceID: "api", + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) + } + + // Register terminating-gateway config entry, linking it to db and api + { + args := &structs.TerminatingGatewayConfigEntry{ + Name: "terminating-gateway", + Kind: structs.TerminatingGateway, + Services: []structs.LinkedService{ + {Name: "db"}, + {Name: "api"}, + }, + } + + req := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: args, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out)) + require.True(t, out) + } + + var out structs.IndexedServiceDump + + // Not passing a token with service:read on Gateway leads to PermissionDenied + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "terminating-gateway", + } + err = msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out) + require.Error(t, err, acl.ErrPermissionDenied) + + // Passing a token without service:read on api leads to it getting filtered out + req = structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "terminating-gateway", + QueryOptions: structs.QueryOptions{Token: token.SecretID}, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out)) + + nodes := out.Dump + require.Len(t, nodes, 1) + require.Equal(t, nodes[0].Node.Node, "bar") + require.Equal(t, nodes[0].Service.Service, "db") + require.Equal(t, nodes[0].Checks[0].Status, api.HealthWarning) +} + +func TestInternal_GatewayServiceDump_Ingress(t *testing.T) { + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + + // Register gateway and service instance that will be associated with it + { + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "ingress-gateway", + Service: "ingress-gateway", + Kind: structs.ServiceKindIngressGateway, + Port: 8443, + }, + Check: &structs.HealthCheck{ + Name: "ingress connect", + Status: api.HealthPassing, + ServiceID: "ingress-gateway", + }, + } + var regOutput struct{} + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, ®Output)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + }, + Check: &structs.HealthCheck{ + Name: "db-warning", + Status: api.HealthWarning, + ServiceID: "db", + }, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, ®Output)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "baz", + Address: "127.0.0.3", + Service: &structs.NodeService{ + ID: "db2", + Service: "db", + }, + Check: &structs.HealthCheck{ + Name: "db2-passing", + Status: api.HealthPassing, + ServiceID: "db2", + }, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, ®Output)) + + // Register ingress-gateway config entry, linking it to db and redis (dne) + args := &structs.IngressGatewayConfigEntry{ + Name: "ingress-gateway", + Kind: structs.IngressGateway, + Listeners: []structs.IngressListener{ + { + Port: 8888, + Protocol: "tcp", + Services: []structs.IngressService{ + { + Name: "db", + }, + }, + }, + { + Port: 8080, + Protocol: "tcp", + Services: []structs.IngressService{ + { + Name: "web", + }, + }, + }, + }, + } + + req := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: args, + } + var configOutput bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &configOutput)) + require.True(t, configOutput) + } + + var out structs.IndexedServiceDump + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "ingress-gateway", + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out)) + + dump := out.Dump + + // Reset raft indices to facilitate assertion + for i := 0; i < len(dump); i++ { + svc := dump[i] + if svc.Node != nil { + svc.Node.RaftIndex = structs.RaftIndex{} + } + if svc.Service != nil { + svc.Service.RaftIndex = structs.RaftIndex{} + } + if len(svc.Checks) > 0 && svc.Checks[0] != nil { + svc.Checks[0].RaftIndex = structs.RaftIndex{} + } + if svc.GatewayService != nil { + svc.GatewayService.RaftIndex = structs.RaftIndex{} + } + } + + expect := structs.ServiceDump{ + { + Node: &structs.Node{ + Node: "bar", + Address: "127.0.0.2", + Datacenter: "dc1", + }, + Service: &structs.NodeService{ + Kind: "", + ID: "db", + Service: "db", + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + }, + Checks: structs.HealthChecks{ + { + Node: "bar", + CheckID: types.CheckID("db-warning"), + Name: "db-warning", + Status: "warning", + ServiceID: "db", + ServiceName: "db", + }, + }, + GatewayService: &structs.GatewayService{ + Gateway: structs.ServiceID{ID: "ingress-gateway"}, + Service: structs.ServiceID{ID: "db"}, + GatewayKind: "ingress-gateway", + Port: 8888, + }, + }, + { + Node: &structs.Node{ + Node: "baz", + Address: "127.0.0.3", + Datacenter: "dc1", + }, + Service: &structs.NodeService{ + ID: "db2", + Service: "db", + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + }, + Checks: structs.HealthChecks{ + { + Node: "baz", + CheckID: types.CheckID("db2-passing"), + Name: "db2-passing", + Status: "passing", + ServiceID: "db2", + ServiceName: "db", + }, + }, + GatewayService: &structs.GatewayService{ + Gateway: structs.ServiceID{ID: "ingress-gateway"}, + Service: structs.ServiceID{ID: "db"}, + GatewayKind: "ingress-gateway", + Port: 8888, + }, + }, + { + // Only GatewayService should be returned when upstream isn't registered + GatewayService: &structs.GatewayService{ + Gateway: structs.ServiceID{ID: "ingress-gateway"}, + Service: structs.ServiceID{ID: "web"}, + GatewayKind: "ingress-gateway", + Port: 8080, + }, + }, + } + assert.ElementsMatch(t, expect, dump) +} + +func TestInternal_GatewayServiceDump_Ingress_ACL(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = true + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForTestAgent(t, s1.RPC, "dc1", testrpc.WithToken("root")) + + // Create the ACL. + token, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", ` + service "db" { policy = "read" } + service "ingress-gateway" { policy = "read" } + node_prefix "" { policy = "read" }`) + require.NoError(t, err) + + // Register gateway and two service instances that will be associated with it + { + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "ingress-gateway", + Service: "ingress-gateway", + Kind: structs.ServiceKindIngressGateway, + }, + Check: &structs.HealthCheck{ + Name: "ingress connect", + Status: api.HealthPassing, + ServiceID: "ingress-gateway", + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var out struct{} + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + }, + Check: &structs.HealthCheck{ + Name: "db-warning", + Status: api.HealthWarning, + ServiceID: "db", + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "baz", + Address: "127.0.0.3", + Service: &structs.NodeService{ + ID: "api", + Service: "api", + }, + Check: &structs.HealthCheck{ + Name: "api-passing", + Status: api.HealthPassing, + ServiceID: "api", + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) + } + + // Register ingress-gateway config entry, linking it to db and api + { + args := &structs.IngressGatewayConfigEntry{ + Name: "ingress-gateway", + Kind: structs.IngressGateway, + Listeners: []structs.IngressListener{ + { + Port: 8888, + Protocol: "tcp", + Services: []structs.IngressService{ + { + Name: "db", + }, + }, + }, + { + Port: 8080, + Protocol: "tcp", + Services: []structs.IngressService{ + { + Name: "web", + }, + }, + }, + }, + } + + req := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: args, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out)) + require.True(t, out) + } + + var out structs.IndexedServiceDump + + // Not passing a token with service:read on Gateway leads to PermissionDenied + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "ingress-gateway", + } + err = msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out) + require.Error(t, err, acl.ErrPermissionDenied) + + // Passing a token without service:read on api leads to it getting filtered out + req = structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "ingress-gateway", + QueryOptions: structs.QueryOptions{Token: token.SecretID}, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out)) + + nodes := out.Dump + require.Len(t, nodes, 1) + require.Equal(t, nodes[0].Node.Node, "bar") + require.Equal(t, nodes[0].Service.Service, "db") + require.Equal(t, nodes[0].Checks[0].Status, api.HealthWarning) +} diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 19c8e7af7..2e078267a 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -882,7 +882,7 @@ func (s *Store) serviceListTxn(tx *memdb.Txn, ws memdb.WatchSet, entMeta *struct results := make(structs.ServiceList, 0, len(unique)) for sid, _ := range unique { - results = append(results, structs.ServiceInfo{Name: sid.ID, EnterpriseMeta: sid.EnterpriseMeta}) + results = append(results, structs.ServiceName{Name: sid.ID, EnterpriseMeta: sid.EnterpriseMeta}) } return idx, results, nil diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 9a108541f..6f678d263 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2,11 +2,6 @@ package state import ( "fmt" - "reflect" - "sort" - "strings" - "testing" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" @@ -16,6 +11,10 @@ import ( "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "reflect" + "sort" + "strings" + "testing" ) func makeRandomNodeID(t *testing.T) types.NodeID { diff --git a/agent/http_register.go b/agent/http_register.go index d18dd4fff..fc2f84ee4 100644 --- a/agent/http_register.go +++ b/agent/http_register.go @@ -94,6 +94,7 @@ func init() { registerEndpoint("/v1/internal/ui/nodes", []string{"GET"}, (*HTTPServer).UINodes) registerEndpoint("/v1/internal/ui/node/", []string{"GET"}, (*HTTPServer).UINodeInfo) registerEndpoint("/v1/internal/ui/services", []string{"GET"}, (*HTTPServer).UIServices) + registerEndpoint("/v1/internal/ui/gateway-services-nodes/", []string{"GET"}, (*HTTPServer).UIGatewayServicesNodes) registerEndpoint("/v1/internal/acl/authorize", []string{"POST"}, (*HTTPServer).ACLAuthorize) registerEndpoint("/v1/kv/", []string{"GET", "PUT", "DELETE"}, (*HTTPServer).KVSEndpoint) registerEndpoint("/v1/operator/raft/configuration", []string{"GET"}, (*HTTPServer).OperatorRaftConfiguration) diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 13a250ec1..23a392a08 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1554,6 +1554,20 @@ func (nodes CheckServiceNodes) Shuffle() { } } +func (nodes CheckServiceNodes) ToServiceDump() ServiceDump { + var ret ServiceDump + for i := range nodes { + svc := ServiceInfo{ + Node: nodes[i].Node, + Service: nodes[i].Service, + Checks: nodes[i].Checks, + GatewayService: nil, + } + ret = append(ret, &svc) + } + return ret +} + // ShallowClone duplicates the slice and underlying array. func (nodes CheckServiceNodes) ShallowClone() CheckServiceNodes { dup := make(CheckServiceNodes, len(nodes)) @@ -1617,6 +1631,15 @@ type NodeInfo struct { // as it is rather expensive to generate. type NodeDump []*NodeInfo +type ServiceInfo struct { + Node *Node + Service *NodeService + Checks HealthChecks + GatewayService *GatewayService +} + +type ServiceDump []*ServiceInfo + type CheckID struct { ID types.CheckID EnterpriseMeta @@ -1702,16 +1725,16 @@ type IndexedServices struct { QueryMeta } -type ServiceInfo struct { +type ServiceName struct { Name string EnterpriseMeta } -func (si *ServiceInfo) ToServiceID() ServiceID { +func (si *ServiceName) ToServiceID() ServiceID { return ServiceID{ID: si.Name, EnterpriseMeta: si.EnterpriseMeta} } -type ServiceList []ServiceInfo +type ServiceList []ServiceName type IndexedServiceList struct { Services ServiceList @@ -1755,6 +1778,11 @@ type IndexedNodeDump struct { QueryMeta } +type IndexedServiceDump struct { + Dump ServiceDump + QueryMeta +} + type IndexedGatewayServices struct { Services GatewayServices QueryMeta diff --git a/agent/ui_endpoint.go b/agent/ui_endpoint.go index 8e38458e5..4fef9e44a 100644 --- a/agent/ui_endpoint.go +++ b/agent/ui_endpoint.go @@ -15,6 +15,10 @@ import ( // to extract this. const metaExternalSource = "external-source" +type GatewayConfig struct { + ListenerPort int +} + // ServiceSummary is used to summarize a service type ServiceSummary struct { Kind structs.ServiceKind `json:",omitempty"` @@ -29,6 +33,7 @@ type ServiceSummary struct { ChecksCritical int ExternalSources []string externalSourceSet map[string]struct{} // internal to track uniqueness + GatewayConfig GatewayConfig `json:",omitempty"` structs.EnterpriseMeta } @@ -155,10 +160,45 @@ RPC: } // Generate the summary - return summarizeServices(out.Nodes), nil + // TODO (gateways) (freddy) Have Internal.ServiceDump return ServiceDump instead. Need to add bexpr filtering for type. + return summarizeServices(out.Nodes.ToServiceDump()), nil } -func summarizeServices(dump structs.CheckServiceNodes) []*ServiceSummary { +// UIGatewayServices is used to query all the nodes for services associated with a gateway along with their gateway config +func (s *HTTPServer) UIGatewayServicesNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Parse arguments + args := structs.ServiceSpecificRequest{} + if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil { + return nil, err + } + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + // Pull out the service name + args.ServiceName = strings.TrimPrefix(req.URL.Path, "/v1/internal/ui/gateway-services-nodes/") + if args.ServiceName == "" { + resp.WriteHeader(http.StatusBadRequest) + fmt.Fprint(resp, "Missing gateway name") + return nil, nil + } + + // Make the RPC request + var out structs.IndexedServiceDump + defer setMeta(resp, &out.QueryMeta) +RPC: + if err := s.agent.RPC("Internal.GatewayServiceDump", &args, &out); err != nil { + // Retry the request allowing stale data if no leader + if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !args.AllowStale { + args.AllowStale = true + goto RPC + } + return nil, err + } + return summarizeServices(out.Dump), nil +} + +func summarizeServices(dump structs.ServiceDump) []*ServiceSummary { // Collect the summary information var services []structs.ServiceID summary := make(map[structs.ServiceID]*ServiceSummary) @@ -179,8 +219,19 @@ func summarizeServices(dump structs.CheckServiceNodes) []*ServiceSummary { } for _, csn := range dump { + if csn.GatewayService != nil { + sum := getService(csn.GatewayService.Service) + sum.GatewayConfig.ListenerPort = csn.GatewayService.Port + } + + // Will happen in cases where we only have the GatewayServices mapping + if csn.Service == nil { + continue + } + sid := structs.NewServiceID(csn.Service.Service, &csn.Service.EnterpriseMeta) + sum := getService(sid) + svc := csn.Service - sum := getService(structs.NewServiceID(svc.Service, &svc.EnterpriseMeta)) sum.Nodes = append(sum.Nodes, csn.Node.Node) sum.Kind = svc.Kind sum.InstanceCount += 1 @@ -240,9 +291,10 @@ func summarizeServices(dump structs.CheckServiceNodes) []*ServiceSummary { }) output := make([]*ServiceSummary, len(summary)) for idx, service := range services { - // Sort the nodes + // Sort the nodes and tags sum := summary[service] sort.Strings(sum.Nodes) + sort.Strings(sum.Tags) output[idx] = sum } return output diff --git a/agent/ui_endpoint_test.go b/agent/ui_endpoint_test.go index b9d95cc55..960bc8d65 100644 --- a/agent/ui_endpoint_test.go +++ b/agent/ui_endpoint_test.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/testrpc" cleanhttp "github.com/hashicorp/go-cleanhttp" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -415,3 +416,240 @@ func TestUiServices(t *testing.T) { require.ElementsMatch(t, expected, summary) }) } + +func TestUIGatewayServiceNodes_Terminating(t *testing.T) { + t.Parallel() + + a := NewTestAgent(t, "") + defer a.Shutdown() + + // Register terminating gateway and a service that will be associated with it + { + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "terminating-gateway", + Service: "terminating-gateway", + Kind: structs.ServiceKindTerminatingGateway, + Port: 443, + }, + Check: &structs.HealthCheck{ + Name: "terminating connect", + Status: api.HealthPassing, + ServiceID: "terminating-gateway", + }, + } + var regOutput struct{} + require.NoError(t, a.RPC("Catalog.Register", &arg, ®Output)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tags: []string{"primary"}, + }, + Check: &structs.HealthCheck{ + Name: "db-warning", + Status: api.HealthWarning, + ServiceID: "db", + }, + } + require.NoError(t, a.RPC("Catalog.Register", &arg, ®Output)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "baz", + Address: "127.0.0.3", + Service: &structs.NodeService{ + ID: "db2", + Service: "db", + Tags: []string{"backup"}, + }, + Check: &structs.HealthCheck{ + Name: "db2-passing", + Status: api.HealthPassing, + ServiceID: "db2", + }, + } + require.NoError(t, a.RPC("Catalog.Register", &arg, ®Output)) + + // Register terminating-gateway config entry, linking it to db and redis (does not exist) + args := &structs.TerminatingGatewayConfigEntry{ + Name: "terminating-gateway", + Kind: structs.TerminatingGateway, + Services: []structs.LinkedService{ + { + Name: "db", + }, + { + Name: "redis", + CAFile: "/etc/certs/ca.pem", + CertFile: "/etc/certs/cert.pem", + KeyFile: "/etc/certs/key.pem", + }, + }, + } + + req := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: args, + } + var configOutput bool + require.NoError(t, a.RPC("ConfigEntry.Apply", &req, &configOutput)) + require.True(t, configOutput) + } + + // Request + req, _ := http.NewRequest("GET", "/v1/internal/ui/gateway-services-nodes/terminating-gateway", nil) + resp := httptest.NewRecorder() + obj, err := a.srv.UIGatewayServicesNodes(resp, req) + assert.Nil(t, err) + assertIndex(t, resp) + + dump := obj.([]*ServiceSummary) + expect := []*ServiceSummary{ + { + Name: "redis", + }, + { + Name: "db", + Tags: []string{"backup", "primary"}, + Nodes: []string{"bar", "baz"}, + InstanceCount: 2, + ChecksPassing: 1, + ChecksWarning: 1, + ChecksCritical: 0, + }, + } + assert.ElementsMatch(t, expect, dump) +} + +func TestUIGatewayServiceNodes_Ingress(t *testing.T) { + t.Parallel() + + a := NewTestAgent(t, "") + defer a.Shutdown() + + // Register ingress gateway and a service that will be associated with it + { + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "ingress-gateway", + Service: "ingress-gateway", + Kind: structs.ServiceKindIngressGateway, + Port: 8443, + }, + Check: &structs.HealthCheck{ + Name: "ingress connect", + Status: api.HealthPassing, + ServiceID: "ingress-gateway", + }, + } + var regOutput struct{} + require.NoError(t, a.RPC("Catalog.Register", &arg, ®Output)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tags: []string{"primary"}, + }, + Check: &structs.HealthCheck{ + Name: "db-warning", + Status: api.HealthWarning, + ServiceID: "db", + }, + } + require.NoError(t, a.RPC("Catalog.Register", &arg, ®Output)) + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "baz", + Address: "127.0.0.3", + Service: &structs.NodeService{ + ID: "db2", + Service: "db", + Tags: []string{"backup"}, + }, + Check: &structs.HealthCheck{ + Name: "db2-passing", + Status: api.HealthPassing, + ServiceID: "db2", + }, + } + require.NoError(t, a.RPC("Catalog.Register", &arg, ®Output)) + + // Register ingress-gateway config entry, linking it to db and redis (does not exist) + args := &structs.IngressGatewayConfigEntry{ + Name: "ingress-gateway", + Kind: structs.IngressGateway, + Listeners: []structs.IngressListener{ + { + Port: 8888, + Protocol: "tcp", + Services: []structs.IngressService{ + { + Name: "db", + }, + }, + }, + { + Port: 8080, + Protocol: "tcp", + Services: []structs.IngressService{ + { + Name: "web", + }, + }, + }, + }, + } + + req := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: args, + } + var configOutput bool + require.NoError(t, a.RPC("ConfigEntry.Apply", &req, &configOutput)) + require.True(t, configOutput) + } + + // Request + req, _ := http.NewRequest("GET", "/v1/internal/ui/gateway-services-nodes/ingress-gateway", nil) + resp := httptest.NewRecorder() + obj, err := a.srv.UIGatewayServicesNodes(resp, req) + assert.Nil(t, err) + assertIndex(t, resp) + + dump := obj.([]*ServiceSummary) + expect := []*ServiceSummary{ + { + Name: "web", + GatewayConfig: GatewayConfig{ListenerPort: 8080}, + }, + { + Name: "db", + Tags: []string{"backup", "primary"}, + Nodes: []string{"bar", "baz"}, + InstanceCount: 2, + ChecksPassing: 1, + ChecksWarning: 1, + ChecksCritical: 0, + GatewayConfig: GatewayConfig{ListenerPort: 8888}, + }, + } + assert.ElementsMatch(t, expect, dump) +}