Add RPC endpoint for intention upstreams

This commit is contained in:
freddygv 2021-03-13 21:44:24 -07:00
parent e4e14639b2
commit 3b2169b36d
4 changed files with 300 additions and 1 deletions

View File

@ -397,7 +397,7 @@ func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.In
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, services, err := state.ServiceList(ws, &args.EnterpriseMeta)
index, services, err := state.ServiceList(ws, nil, &args.EnterpriseMeta)
if err != nil {
return err
}

View File

@ -910,3 +910,138 @@ func registerTestTopologyEntries(t *testing.T, codec rpc.ClientCodec, token stri
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out))
}
}
func registetIntentionUpstreamEntries(t *testing.T, codec rpc.ClientCodec, token string) {
t.Helper()
// api and api-proxy on node foo
// web and web-proxy on node foo
// redis and redis-proxy on node foo
// * -> * (deny) intention
// web -> api (allow)
registrations := map[string]*structs.RegisterRequest{
"Node foo": {
Datacenter: "dc1",
Node: "foo",
ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"),
Address: "127.0.0.2",
WriteRequest: structs.WriteRequest{Token: token},
},
"Service api on foo": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "api",
Service: "api",
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service api-proxy": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Port: 8443,
Address: "198.18.1.2",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service web on foo": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "web",
Service: "web",
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service web-proxy on foo": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Port: 8080,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service redis on foo": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "redis",
Service: "redis",
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service redis-proxy on foo": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "redis-proxy",
Service: "redis-proxy",
Port: 1234,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "redis",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
}
registerTestCatalogEntriesMap(t, codec, registrations)
// Add intentions: deny all and web -> api
entries := []structs.ConfigEntryRequest{
{
Datacenter: "dc1",
Entry: &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "api",
Sources: []*structs.SourceIntention{
{
Name: "web",
Action: structs.IntentionActionAllow,
},
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
{
Datacenter: "dc1",
Entry: &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "*",
Sources: []*structs.SourceIntention{
{
Name: "*",
Action: structs.IntentionActionDeny,
},
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
}
for _, req := range entries {
var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out))
}
}

View File

@ -188,6 +188,49 @@ func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *
})
}
// IntentionUpstreams returns the upstreams or downstreams of a service. Upstreams and downstreams are inferred from intentions.
// If intentions allow a connection from the target to some candidate service, the candidate service is considered
// an upstream of the target.
func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList) error {
// Exit early if Connect hasn't been enabled.
if !m.srv.config.ConnectEnabled {
return ErrConnectNotEnabled
}
if args.ServiceName == "" {
return fmt.Errorf("Must provide a service name")
}
if done, err := m.srv.ForwardRPC("Internal.IntentionUpstreams", args, args, reply); done {
return err
}
authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
if err != nil {
return err
}
if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
return m.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
defaultDecision := acl.Allow
if authz != nil {
defaultDecision = authz.IntentionDefaultAllow(nil)
}
sn := structs.NewServiceName(args.ServiceName, &args.EnterpriseMeta)
index, services, err := state.IntentionTopology(ws, sn, false, defaultDecision)
if err != nil {
return err
}
reply.Index, reply.Services = index, services
return m.srv.filterACLWithAuthorizer(authz, reply)
})
}
// GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config
func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error {
if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, args, reply); done {

View File

@ -1885,3 +1885,124 @@ service "web" { policy = "read" }
require.True(t, acl.IsErrPermissionDenied(err))
})
}
func TestInternal_IntentionUpstreams(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// Services:
// api and api-proxy on node foo
// web and web-proxy on node foo
//
// Intentions
// * -> * (deny) intention
// web -> api (allow)
registetIntentionUpstreamEntries(t, codec, "")
t.Run("web", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
}
var out structs.IndexedServiceList
require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.IntentionUpstreams", &args, &out))
// foo/api
require.Len(r, out.Services, 1)
expectUp := structs.ServiceList{
structs.NewServiceName("api", structs.DefaultEnterpriseMeta()),
}
require.Equal(r, expectUp, out.Services)
})
})
}
func TestInternal_IntentionUpstreams_ACL(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = TestDefaultMasterToken
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// Services:
// api and api-proxy on node foo
// web and web-proxy on node foo
//
// Intentions
// * -> * (deny) intention
// web -> api (allow)
registetIntentionUpstreamEntries(t, codec, TestDefaultMasterToken)
t.Run("valid token", func(t *testing.T) {
// Token grants read to read api service
userToken, err := upsertTestTokenWithPolicyRules(codec, TestDefaultMasterToken, "dc1", `
service_prefix "api" { policy = "read" }
`)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
QueryOptions: structs.QueryOptions{Token: userToken.SecretID},
}
var out structs.IndexedServiceList
require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.IntentionUpstreams", &args, &out))
// foo/api
require.Len(r, out.Services, 1)
expectUp := structs.ServiceList{
structs.NewServiceName("api", structs.DefaultEnterpriseMeta()),
}
require.Equal(r, expectUp, out.Services)
})
})
t.Run("invalid token filters results", func(t *testing.T) {
// Token grants read to read an unrelated service, mongo
userToken, err := upsertTestTokenWithPolicyRules(codec, TestDefaultMasterToken, "dc1", `
service_prefix "mongo" { policy = "read" }
`)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
QueryOptions: structs.QueryOptions{Token: userToken.SecretID},
}
var out structs.IndexedServiceList
require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.IntentionUpstreams", &args, &out))
// Token can't read api service
require.Empty(r, out.Services)
})
})
}