diff --git a/agent/cache-types/intention_upstreams_destination.go b/agent/cache-types/intention_upstreams_destination.go new file mode 100644 index 000000000..ae1012c35 --- /dev/null +++ b/agent/cache-types/intention_upstreams_destination.go @@ -0,0 +1,52 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +// IntentionUpstreamsDestinationName Recommended name for registration. +const IntentionUpstreamsDestinationName = "intention-upstreams-destination" + +// IntentionUpstreamsDestination supports fetching upstreams for a given gateway name. +type IntentionUpstreamsDestination struct { + RegisterOptionsBlockingRefresh + RPC RPC +} + +func (i *IntentionUpstreamsDestination) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a ServiceSpecificRequest. + reqReal, ok := req.(*structs.ServiceSpecificRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Lightweight copy this object so that manipulating QueryOptions doesn't race. + dup := *reqReal + reqReal = &dup + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = opts.MinIndex + reqReal.QueryOptions.MaxQueryTime = opts.Timeout + + // Always allow stale - there's no point in hitting leader if the request is + // going to be served from cache and end up arbitrarily stale anyway. This + // allows cached service-discover to automatically read scale across all + // servers too. + reqReal.AllowStale = true + + // Fetch + var reply structs.IndexedServiceList + if err := i.RPC.RPC("Internal.IntentionUpstreamsDestination", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + result.Index = reply.QueryMeta.Index + return result, nil +} diff --git a/agent/cache-types/intention_upstreams_destination_test.go b/agent/cache-types/intention_upstreams_destination_test.go new file mode 100644 index 000000000..7aa2d02ef --- /dev/null +++ b/agent/cache-types/intention_upstreams_destination_test.go @@ -0,0 +1,52 @@ +package cachetype + +import ( + "testing" + "time" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestIntentionUpstreamsDestination(t *testing.T) { + rpc := TestRPC(t) + typ := &IntentionUpstreamsDestination{RPC: rpc} + + // Expect the proper RPC call. This also sets the expected value + // since that is return-by-pointer in the arguments. + var resp *structs.IndexedServiceList + rpc.On("RPC", "Internal.IntentionUpstreamsDestination", mock.Anything, mock.Anything).Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.ServiceSpecificRequest) + require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) + require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) + require.True(t, req.AllowStale) + require.Equal(t, "foo", req.ServiceName) + + services := structs.ServiceList{ + {Name: "foo"}, + } + reply := args.Get(2).(*structs.IndexedServiceList) + reply.Services = services + reply.QueryMeta.Index = 48 + resp = reply + }) + + // Fetch + resultA, err := typ.Fetch(cache.FetchOptions{ + MinIndex: 24, + Timeout: 1 * time.Second, + }, &structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "foo", + }) + require.NoError(t, err) + require.Equal(t, cache.FetchResult{ + Value: resp, + Index: 48, + }, resultA) + + rpc.AssertExpectations(t) +} diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index 807bb8be2..75048b913 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -1307,7 +1307,7 @@ func registerIntentionUpstreamEntries(t *testing.T, codec rpc.ClientCodec, token } registerTestCatalogEntriesMap(t, codec, registrations) - // Add intentions: deny all and web -> api + // Add intentions: deny all and web -> api and web -> api.example.com entries := []structs.ConfigEntryRequest{ { Datacenter: "dc1", @@ -1323,6 +1323,20 @@ func registerIntentionUpstreamEntries(t *testing.T, codec rpc.ClientCodec, token }, WriteRequest: structs.WriteRequest{Token: token}, }, + { + Datacenter: "dc1", + Entry: &structs.ServiceIntentionsConfigEntry{ + Kind: structs.ServiceIntentions, + Name: "api.example.com", + Sources: []*structs.SourceIntention{ + { + Name: "web", + Action: structs.IntentionActionAllow, + }, + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, { Datacenter: "dc1", Entry: &structs.ServiceIntentionsConfigEntry{ @@ -1342,4 +1356,36 @@ func registerIntentionUpstreamEntries(t *testing.T, codec rpc.ClientCodec, token var out bool require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out)) } + + // Add destinations + dests := []structs.ConfigEntryRequest{ + { + Datacenter: "dc1", + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "api.example.com", + Destination: &structs.DestinationConfig{ + Address: "api.example.com", + Port: 443, + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + { + Datacenter: "dc1", + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "kafka.store.com", + Destination: &structs.DestinationConfig{ + Address: "172.168.2.1", + Port: 9003, + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + } + for _, req := range dests { + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out)) + } } diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 113abd2dd..f1fda470f 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -224,6 +224,27 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl if done, err := m.srv.ForwardRPC("Internal.IntentionUpstreams", args, reply); done { return err } + return m.internalUpstreams(args, reply, structs.IntentionTargetService) +} + +// IntentionUpstreamsDestination returns the upstreams of a service. Upstreams are inferred from intentions. +// If intentions allow a connection from the target to some candidate destination, the candidate destination is considered +// an upstream of the target.this is performs the same logic as IntentionUpstreams endpoint but for destination upstreams only. +func (m *Internal) IntentionUpstreamsDestination(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList) error { + // Exit early if Connect hasn't been enabled. + if !m.srv.config.ConnectEnabled { + return ErrConnectNotEnabled + } + if args.ServiceName == "" { + return fmt.Errorf("Must provide a service name") + } + if done, err := m.srv.ForwardRPC("Internal.IntentionUpstreamsDestination", args, reply); done { + return err + } + return m.internalUpstreams(args, reply, structs.IntentionTargetDestination) +} + +func (m *Internal) internalUpstreams(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList, intentionTarget structs.IntentionTargetType) error { authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil) if err != nil { @@ -244,7 +265,7 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl defaultDecision := authz.IntentionDefaultAllow(nil) sn := structs.NewServiceName(args.ServiceName, &args.EnterpriseMeta) - index, services, err := state.IntentionTopology(ws, sn, false, defaultDecision) + index, services, err := state.IntentionTopology(ws, sn, false, defaultDecision, intentionTarget) if err != nil { return err } @@ -272,7 +293,7 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl }) } -// GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config +// GatewayServiceDump returns all the nodes for services associated with a gateway along with their gateway config func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error { if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, reply); done { return err @@ -350,7 +371,7 @@ func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, repl return err } -// Match returns the set of intentions that match the given source/destination. +// GatewayIntentions Match returns the set of intentions that match the given source/destination. func (m *Internal) GatewayIntentions(args *structs.IntentionQueryRequest, reply *structs.IndexedIntentions) error { // Forward if necessary if done, err := m.srv.ForwardRPC("Internal.GatewayIntentions", args, reply); done { diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index 9354b4f66..3737f3a08 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -2323,6 +2323,50 @@ func TestInternal_IntentionUpstreams(t *testing.T) { }) } +func TestInternal_IntentionUpstreamsDestination(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + codec := rpcClient(t, s1) + defer codec.Close() + + // Services: + // api and api-proxy on node foo + // web and web-proxy on node foo + // + // Intentions + // * -> * (deny) intention + // web -> api (allow) + registerIntentionUpstreamEntries(t, codec, "") + + t.Run("api.example.com", func(t *testing.T) { + retry.Run(t, func(r *retry.R) { + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + } + var out structs.IndexedServiceList + require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.IntentionUpstreamsDestination", &args, &out)) + + // foo/api + require.Len(r, out.Services, 1) + + expectUp := structs.ServiceList{ + structs.NewServiceName("api.example.com", structs.DefaultEnterpriseMetaInDefaultPartition()), + } + require.Equal(r, expectUp, out.Services) + }) + }) +} + func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index ee1623956..d76f87bf6 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -3975,7 +3975,7 @@ func (s *Store) ServiceTopology( // Only transparent proxies / connect native services have upstreams from intentions if hasTransparent || connectNative { - idx, intentionUpstreams, err := s.intentionTopologyTxn(tx, ws, sn, false, defaultAllow) + idx, intentionUpstreams, err := s.intentionTopologyTxn(tx, ws, sn, false, defaultAllow, structs.IntentionTargetService) if err != nil { return 0, nil, err } @@ -4092,7 +4092,7 @@ func (s *Store) ServiceTopology( downstreamSources[dn.String()] = structs.TopologySourceRegistration } - idx, intentionDownstreams, err := s.intentionTopologyTxn(tx, ws, sn, true, defaultAllow) + idx, intentionDownstreams, err := s.intentionTopologyTxn(tx, ws, sn, true, defaultAllow, structs.IntentionTargetService) if err != nil { return 0, nil, err } diff --git a/agent/consul/state/config_entry_intention.go b/agent/consul/state/config_entry_intention.go index bd539489a..ebbc9e7b7 100644 --- a/agent/consul/state/config_entry_intention.go +++ b/agent/consul/state/config_entry_intention.go @@ -299,7 +299,8 @@ func readSourceIntentionsFromConfigEntriesForServiceTxn( results = append(results, entry.ToIntention(src)) } case structs.IntentionTargetDestination: - if kind == structs.GatewayServiceKindDestination { + // wildcard is needed here to be able to consider destinations in the wildcard intentions + if kind == structs.GatewayServiceKindDestination || entry.HasWildcardDestination() { results = append(results, entry.ToIntention(src)) } default: diff --git a/agent/consul/state/intention.go b/agent/consul/state/intention.go index 89f7304f6..cff5ed353 100644 --- a/agent/consul/state/intention.go +++ b/agent/consul/state/intention.go @@ -957,11 +957,12 @@ func (s *Store) IntentionTopology( target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, + intentionTarget structs.IntentionTargetType, ) (uint64, structs.ServiceList, error) { tx := s.db.ReadTxn() defer tx.Abort() - idx, services, err := s.intentionTopologyTxn(tx, ws, target, downstreams, defaultDecision) + idx, services, err := s.intentionTopologyTxn(tx, ws, target, downstreams, defaultDecision, intentionTarget) if err != nil { requested := "upstreams" if downstreams { @@ -982,6 +983,7 @@ func (s *Store) intentionTopologyTxn( target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, + intentionTarget structs.IntentionTargetType, ) (uint64, []ServiceWithDecision, error) { var maxIdx uint64 @@ -998,7 +1000,7 @@ func (s *Store) intentionTopologyTxn( Partition: target.PartitionOrDefault(), Name: target.Name, } - index, intentions, err := compatIntentionMatchOneTxn(tx, ws, entry, intentionMatchType, structs.IntentionTargetService) + index, intentions, err := compatIntentionMatchOneTxn(tx, ws, entry, intentionMatchType, intentionTarget) if err != nil { return 0, nil, fmt.Errorf("failed to query intentions for %s", target.String()) } @@ -1010,7 +1012,13 @@ func (s *Store) intentionTopologyTxn( // Ideally those should be excluded as well, since they can't be upstreams/downstreams without a proxy. // Maybe narrow serviceNamesOfKindTxn to services represented by proxies? (ingress, sidecar- wildcardMeta := structs.WildcardEnterpriseMetaInPartition(structs.WildcardSpecifier) - index, services, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, *wildcardMeta) + var services []*KindServiceName + if intentionTarget == structs.IntentionTargetService { + index, services, err = serviceNamesOfKindTxn(tx, ws, structs.ServiceKindTypical, *wildcardMeta) + } else { + // destinations can only ever be upstream, since they are only allowed as intention destination. + index, services, err = serviceNamesOfKindTxn(tx, ws, structs.ServiceKindDestination, *wildcardMeta) + } if err != nil { return index, nil, fmt.Errorf("failed to list ingress service names: %v", err) } @@ -1028,16 +1036,6 @@ func (s *Store) intentionTopologyTxn( maxIdx = index } services = append(services, ingress...) - } else { - // destinations can only ever be upstream, since they are only allowed as intention destination. - index, destinations, err := serviceNamesOfKindTxn(tx, ws, structs.ServiceKindDestination, *wildcardMeta) - if err != nil { - return index, nil, fmt.Errorf("failed to list destination names: %v", err) - } - if index > maxIdx { - maxIdx = index - } - services = append(services, destinations...) } // When checking authorization to upstreams, the match type for the decision is `destination` because we are deciding diff --git a/agent/consul/state/intention_test.go b/agent/consul/state/intention_test.go index 4a369d166..18ba63914 100644 --- a/agent/consul/state/intention_test.go +++ b/agent/consul/state/intention_test.go @@ -2185,7 +2185,197 @@ func TestStore_IntentionTopology(t *testing.T) { idx++ } - idx, got, err := s.IntentionTopology(nil, tt.target, tt.downstreams, tt.defaultDecision) + idx, got, err := s.IntentionTopology(nil, tt.target, tt.downstreams, tt.defaultDecision, structs.IntentionTargetService) + require.NoError(t, err) + require.Equal(t, tt.expect.idx, idx) + + // ServiceList is from a map, so it is not deterministically sorted + sort.Slice(got, func(i, j int) bool { + return got[i].String() < got[j].String() + }) + require.Equal(t, tt.expect.services, got) + }) + } +} + +func TestStore_IntentionTopology_Destination(t *testing.T) { + node := structs.Node{ + Node: "foo", + Address: "127.0.0.1", + } + + services := []structs.NodeService{ + { + ID: structs.ConsulServiceID, + Service: structs.ConsulServiceName, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + { + ID: "web-1", + Service: "web", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + { + ID: "mysql-1", + Service: "mysql", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + } + destinations := []structs.ServiceConfigEntry{ + { + Name: "api.test.com", + Destination: &structs.DestinationConfig{}, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + { + Name: "kafka.store.org", + Destination: &structs.DestinationConfig{}, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + } + + type expect struct { + idx uint64 + services structs.ServiceList + } + tests := []struct { + name string + defaultDecision acl.EnforcementDecision + intentions []structs.ServiceIntentionsConfigEntry + target structs.ServiceName + downstreams bool + expect expect + }{ + { + name: "(upstream) acl allow all but intentions deny one, destination target", + defaultDecision: acl.Allow, + intentions: []structs.ServiceIntentionsConfigEntry{ + { + Kind: structs.ServiceIntentions, + Name: "api.test.com", + Sources: []*structs.SourceIntention{ + { + Name: "web", + Action: structs.IntentionActionDeny, + }, + }, + }, + }, + target: structs.NewServiceName("web", nil), + downstreams: false, + expect: expect{ + idx: 7, + services: structs.ServiceList{ + { + Name: "kafka.store.org", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + }, + }, + }, + { + name: "(upstream) acl deny all intentions allow one, destination target", + defaultDecision: acl.Deny, + intentions: []structs.ServiceIntentionsConfigEntry{ + { + Kind: structs.ServiceIntentions, + Name: "kafka.store.org", + Sources: []*structs.SourceIntention{ + { + Name: "web", + Action: structs.IntentionActionAllow, + }, + }, + }, + }, + target: structs.NewServiceName("web", nil), + downstreams: false, + expect: expect{ + idx: 7, + services: structs.ServiceList{ + { + Name: "kafka.store.org", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + }, + }, + }, + { + name: "(upstream) acl deny all check only destinations show, service target", + defaultDecision: acl.Deny, + intentions: []structs.ServiceIntentionsConfigEntry{ + { + Kind: structs.ServiceIntentions, + Name: "api", + Sources: []*structs.SourceIntention{ + { + Name: "web", + Action: structs.IntentionActionAllow, + }, + }, + }, + }, + target: structs.NewServiceName("web", nil), + downstreams: false, + expect: expect{ + idx: 7, + services: structs.ServiceList{}, + }, + }, + { + name: "(upstream) acl allow all check only destinations show, service target", + defaultDecision: acl.Allow, + intentions: []structs.ServiceIntentionsConfigEntry{ + { + Kind: structs.ServiceIntentions, + Name: "api", + Sources: []*structs.SourceIntention{ + { + Name: "web", + Action: structs.IntentionActionAllow, + }, + }, + }, + }, + target: structs.NewServiceName("web", nil), + downstreams: false, + expect: expect{ + idx: 7, + services: structs.ServiceList{ + { + Name: "api.test.com", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + { + Name: "kafka.store.org", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := testConfigStateStore(t) + + var idx uint64 = 1 + require.NoError(t, s.EnsureNode(idx, &node)) + idx++ + + for _, svc := range services { + require.NoError(t, s.EnsureService(idx, "foo", &svc)) + idx++ + } + for _, d := range destinations { + require.NoError(t, s.EnsureConfigEntry(idx, &d)) + idx++ + } + for _, ixn := range tt.intentions { + require.NoError(t, s.EnsureConfigEntry(idx, &ixn)) + idx++ + } + + idx, got, err := s.IntentionTopology(nil, tt.target, tt.downstreams, tt.defaultDecision, structs.IntentionTargetDestination) require.NoError(t, err) require.Equal(t, tt.expect.idx, idx) @@ -2211,7 +2401,7 @@ func TestStore_IntentionTopology_Watches(t *testing.T) { target := structs.NewServiceName("web", structs.DefaultEnterpriseMetaInDefaultPartition()) ws := memdb.NewWatchSet() - index, got, err := s.IntentionTopology(ws, target, false, acl.Deny) + index, got, err := s.IntentionTopology(ws, target, false, acl.Deny, structs.IntentionTargetService) require.NoError(t, err) require.Equal(t, uint64(0), index) require.Empty(t, got) @@ -2233,7 +2423,7 @@ func TestStore_IntentionTopology_Watches(t *testing.T) { // Reset the WatchSet ws = memdb.NewWatchSet() - index, got, err = s.IntentionTopology(ws, target, false, acl.Deny) + index, got, err = s.IntentionTopology(ws, target, false, acl.Deny, structs.IntentionTargetService) require.NoError(t, err) require.Equal(t, uint64(2), index) require.Empty(t, got) @@ -2255,7 +2445,7 @@ func TestStore_IntentionTopology_Watches(t *testing.T) { // require.False(t, watchFired(ws)) // Result should not have changed - index, got, err = s.IntentionTopology(ws, target, false, acl.Deny) + index, got, err = s.IntentionTopology(ws, target, false, acl.Deny, structs.IntentionTargetService) require.NoError(t, err) require.Equal(t, uint64(3), index) require.Empty(t, got) @@ -2270,7 +2460,7 @@ func TestStore_IntentionTopology_Watches(t *testing.T) { require.True(t, watchFired(ws)) // Reset the WatchSet - index, got, err = s.IntentionTopology(nil, target, false, acl.Deny) + index, got, err = s.IntentionTopology(nil, target, false, acl.Deny, structs.IntentionTargetService) require.NoError(t, err) require.Equal(t, uint64(4), index)