From 9f607d4970563861a6b85ae193017772f35c4a30 Mon Sep 17 00:00:00 2001 From: Semir Patel Date: Mon, 27 Mar 2023 10:35:39 -0500 Subject: [PATCH] Read(...) endpoint for the resource service (#16655) --- .../services/resource/mock_Backend.go | 176 ++++++++++++++++++ agent/grpc-external/services/resource/read.go | 54 ++++++ .../services/resource/read_test.go | 171 +++++++++++++++++ .../grpc-external/services/resource/server.go | 14 +- .../services/resource/server_test.go | 8 - 5 files changed, 410 insertions(+), 13 deletions(-) create mode 100644 agent/grpc-external/services/resource/mock_Backend.go create mode 100644 agent/grpc-external/services/resource/read.go create mode 100644 agent/grpc-external/services/resource/read_test.go diff --git a/agent/grpc-external/services/resource/mock_Backend.go b/agent/grpc-external/services/resource/mock_Backend.go new file mode 100644 index 000000000..eb133455d --- /dev/null +++ b/agent/grpc-external/services/resource/mock_Backend.go @@ -0,0 +1,176 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package resource + +import ( + context "context" + + pbresource "github.com/hashicorp/consul/proto-public/pbresource" + mock "github.com/stretchr/testify/mock" + + storage "github.com/hashicorp/consul/internal/storage" +) + +// MockBackend is an autogenerated mock type for the Backend type +type MockBackend struct { + mock.Mock +} + +// DeleteCAS provides a mock function with given fields: ctx, id, version +func (_m *MockBackend) DeleteCAS(ctx context.Context, id *pbresource.ID, version string) error { + ret := _m.Called(ctx, id, version) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *pbresource.ID, string) error); ok { + r0 = rf(ctx, id, version) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// List provides a mock function with given fields: ctx, consistency, resType, tenancy, namePrefix +func (_m *MockBackend) List(ctx context.Context, consistency storage.ReadConsistency, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) { + ret := _m.Called(ctx, consistency, resType, tenancy, namePrefix) + + var r0 []*pbresource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, storage.ReadConsistency, storage.UnversionedType, *pbresource.Tenancy, string) ([]*pbresource.Resource, error)); ok { + return rf(ctx, consistency, resType, tenancy, namePrefix) + } + if rf, ok := ret.Get(0).(func(context.Context, storage.ReadConsistency, storage.UnversionedType, *pbresource.Tenancy, string) []*pbresource.Resource); ok { + r0 = rf(ctx, consistency, resType, tenancy, namePrefix) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*pbresource.Resource) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, storage.ReadConsistency, storage.UnversionedType, *pbresource.Tenancy, string) error); ok { + r1 = rf(ctx, consistency, resType, tenancy, namePrefix) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// OwnerReferences provides a mock function with given fields: ctx, id +func (_m *MockBackend) OwnerReferences(ctx context.Context, id *pbresource.ID) ([]*pbresource.ID, error) { + ret := _m.Called(ctx, id) + + var r0 []*pbresource.ID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *pbresource.ID) ([]*pbresource.ID, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, *pbresource.ID) []*pbresource.ID); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*pbresource.ID) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *pbresource.ID) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Read provides a mock function with given fields: ctx, consistency, id +func (_m *MockBackend) Read(ctx context.Context, consistency storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) { + ret := _m.Called(ctx, consistency, id) + + var r0 *pbresource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, storage.ReadConsistency, *pbresource.ID) (*pbresource.Resource, error)); ok { + return rf(ctx, consistency, id) + } + if rf, ok := ret.Get(0).(func(context.Context, storage.ReadConsistency, *pbresource.ID) *pbresource.Resource); ok { + r0 = rf(ctx, consistency, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*pbresource.Resource) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, storage.ReadConsistency, *pbresource.ID) error); ok { + r1 = rf(ctx, consistency, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// WatchList provides a mock function with given fields: ctx, resType, tenancy, namePrefix +func (_m *MockBackend) WatchList(ctx context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (storage.Watch, error) { + ret := _m.Called(ctx, resType, tenancy, namePrefix) + + var r0 storage.Watch + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, storage.UnversionedType, *pbresource.Tenancy, string) (storage.Watch, error)); ok { + return rf(ctx, resType, tenancy, namePrefix) + } + if rf, ok := ret.Get(0).(func(context.Context, storage.UnversionedType, *pbresource.Tenancy, string) storage.Watch); ok { + r0 = rf(ctx, resType, tenancy, namePrefix) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage.Watch) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, storage.UnversionedType, *pbresource.Tenancy, string) error); ok { + r1 = rf(ctx, resType, tenancy, namePrefix) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// WriteCAS provides a mock function with given fields: ctx, res +func (_m *MockBackend) WriteCAS(ctx context.Context, res *pbresource.Resource) (*pbresource.Resource, error) { + ret := _m.Called(ctx, res) + + var r0 *pbresource.Resource + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *pbresource.Resource) (*pbresource.Resource, error)); ok { + return rf(ctx, res) + } + if rf, ok := ret.Get(0).(func(context.Context, *pbresource.Resource) *pbresource.Resource); ok { + r0 = rf(ctx, res) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*pbresource.Resource) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *pbresource.Resource) error); ok { + r1 = rf(ctx, res) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewMockBackend interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockBackend creates a new instance of MockBackend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockBackend(t mockConstructorTestingTNewMockBackend) *MockBackend { + mock := &MockBackend{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/grpc-external/services/resource/read.go b/agent/grpc-external/services/resource/read.go new file mode 100644 index 000000000..ad84ae2b2 --- /dev/null +++ b/agent/grpc-external/services/resource/read.go @@ -0,0 +1,54 @@ +package resource + +import ( + "context" + "errors" + "fmt" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +func (s *Server) Read(ctx context.Context, req *pbresource.ReadRequest) (*pbresource.ReadResponse, error) { + // check type exists + _, ok := s.registry.Resolve(req.Id.Type) + if !ok { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("resource type %s not registered", resource.ToGVK(req.Id.Type))) + } + + consistency := storage.EventualConsistency + if isConsistentRead(ctx) { + consistency = storage.StrongConsistency + } + + resource, err := s.backend.Read(ctx, consistency, req.Id) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + return nil, status.Error(codes.NotFound, err.Error()) + } + if errors.As(err, &storage.GroupVersionMismatchError{}) { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + return nil, err + } + return &pbresource.ReadResponse{Resource: resource}, nil +} + +func isConsistentRead(ctx context.Context) bool { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return false + } + + vals := md.Get("x-consul-consistency-mode") + if len(vals) == 0 { + return false + } + + return vals[0] == "consistent" +} diff --git a/agent/grpc-external/services/resource/read_test.go b/agent/grpc-external/services/resource/read_test.go new file mode 100644 index 000000000..a61686e2e --- /dev/null +++ b/agent/grpc-external/services/resource/read_test.go @@ -0,0 +1,171 @@ +package resource + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/internal/storage/inmem" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" +) + +func TestRead_TypeNotFound(t *testing.T) { + server := NewServer(Config{registry: resource.NewRegistry()}) + client := testClient(t, server) + + _, err := client.Read(context.Background(), &pbresource.ReadRequest{Id: id1}) + require.Error(t, err) + require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String()) + require.Contains(t, err.Error(), "resource type mesh/v1/service not registered") +} + +func TestRead_ResourceNotFound(t *testing.T) { + for desc, tc := range readTestCases() { + t.Run(desc, func(t *testing.T) { + server := testServer(t) + server.registry.Register(resource.Registration{Type: typev1}) + client := testClient(t, server) + + _, err := client.Read(tc.ctx, &pbresource.ReadRequest{Id: id1}) + require.Error(t, err) + require.Equal(t, codes.NotFound.String(), status.Code(err).String()) + require.Contains(t, err.Error(), "resource not found") + }) + } +} + +func TestRead_GroupVersionMismatch(t *testing.T) { + for desc, tc := range readTestCases() { + t.Run(desc, func(t *testing.T) { + server := testServer(t) + server.registry.Register(resource.Registration{Type: typev1}) + server.registry.Register(resource.Registration{Type: typev2}) + client := testClient(t, server) + + resource1 := &pbresource.Resource{Id: id1, Version: ""} + _, err := server.backend.WriteCAS(tc.ctx, resource1) + require.NoError(t, err) + + _, err = client.Read(tc.ctx, &pbresource.ReadRequest{Id: id2}) + require.Error(t, err) + require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String()) + require.Contains(t, err.Error(), "resource was requested with GroupVersion") + }) + } +} + +func TestRead_Success(t *testing.T) { + for desc, tc := range readTestCases() { + t.Run(desc, func(t *testing.T) { + server := testServer(t) + server.registry.Register(resource.Registration{Type: typev1}) + client := testClient(t, server) + resource1 := &pbresource.Resource{Id: id1, Version: ""} + resource1, err := server.backend.WriteCAS(tc.ctx, resource1) + require.NoError(t, err) + + rsp, err := client.Read(tc.ctx, &pbresource.ReadRequest{Id: id1}) + require.NoError(t, err) + prototest.AssertDeepEqual(t, resource1, rsp.Resource) + }) + } +} + +func TestRead_VerifyReadConsistencyArg(t *testing.T) { + // Uses a mockBackend instead of the inmem Backend to verify the ReadConsistency argument is set correctly. + for desc, tc := range readTestCases() { + t.Run(desc, func(t *testing.T) { + mockBackend := NewMockBackend(t) + server := NewServer(Config{ + registry: resource.NewRegistry(), + backend: mockBackend, + }) + server.registry.Register(resource.Registration{Type: typev1}) + resource1 := &pbresource.Resource{Id: id1, Version: "1"} + mockBackend.On("Read", mock.Anything, mock.Anything, mock.Anything).Return(resource1, nil) + client := testClient(t, server) + + rsp, err := client.Read(tc.ctx, &pbresource.ReadRequest{Id: id1}) + require.NoError(t, err) + prototest.AssertDeepEqual(t, resource1, rsp.Resource) + mockBackend.AssertCalled(t, "Read", mock.Anything, tc.consistency, mock.Anything) + }) + } +} + +type readTestCase struct { + consistency storage.ReadConsistency + ctx context.Context +} + +func readTestCases() map[string]readTestCase { + return map[string]readTestCase{ + "eventually consistent read": { + consistency: storage.EventualConsistency, + ctx: context.Background(), + }, + "strongly consistent read": { + consistency: storage.StrongConsistency, + ctx: metadata.NewOutgoingContext( + context.Background(), + metadata.New(map[string]string{"x-consul-consistency-mode": "consistent"}), + ), + }, + } + +} + +func testServer(t *testing.T) *Server { + backend, err := inmem.NewBackend() + require.NoError(t, err) + ctx := testContext(t) + go backend.Run(ctx) + return NewServer(Config{ + registry: resource.NewRegistry(), + backend: backend, + }) +} + +func testContext(t *testing.T) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + return ctx +} + +var ( + typev1 = &pbresource.Type{ + Group: "mesh", + GroupVersion: "v1", + Kind: "service", + } + typev2 = &pbresource.Type{ + Group: "mesh", + GroupVersion: "v2", + Kind: "service", + } + tenancy = &pbresource.Tenancy{ + Partition: "default", + Namespace: "default", + PeerName: "local", + } + id1 = &pbresource.ID{ + Uid: "abcd", + Name: "billing", + Type: typev1, + Tenancy: tenancy, + } + id2 = &pbresource.ID{ + Uid: "abcd", + Name: "billing", + Type: typev2, + Tenancy: tenancy, + } +) diff --git a/agent/grpc-external/services/resource/server.go b/agent/grpc-external/services/resource/server.go index 0fac97e46..6c12171bb 100644 --- a/agent/grpc-external/services/resource/server.go +++ b/agent/grpc-external/services/resource/server.go @@ -5,6 +5,8 @@ import ( "google.golang.org/grpc" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/storage" "github.com/hashicorp/consul/proto-public/pbresource" ) @@ -13,6 +15,13 @@ type Server struct { } type Config struct { + registry resource.Registry + backend storage.Backend +} + +//go:generate mockery --name Backend --inpackage +type Backend interface { + storage.Backend } func NewServer(cfg Config) *Server { @@ -25,11 +34,6 @@ func (s *Server) Register(grpcServer *grpc.Server) { pbresource.RegisterResourceServiceServer(grpcServer, s) } -func (s *Server) Read(ctx context.Context, req *pbresource.ReadRequest) (*pbresource.ReadResponse, error) { - // TODO - return &pbresource.ReadResponse{}, nil -} - func (s *Server) Write(ctx context.Context, req *pbresource.WriteRequest) (*pbresource.WriteResponse, error) { // TODO return &pbresource.WriteResponse{}, nil diff --git a/agent/grpc-external/services/resource/server_test.go b/agent/grpc-external/services/resource/server_test.go index a07b181e0..f0adb1362 100644 --- a/agent/grpc-external/services/resource/server_test.go +++ b/agent/grpc-external/services/resource/server_test.go @@ -26,14 +26,6 @@ func testClient(t *testing.T, server *Server) pbresource.ResourceServiceClient { return pbresource.NewResourceServiceClient(conn) } -func TestRead_TODO(t *testing.T) { - server := NewServer(Config{}) - client := testClient(t, server) - resp, err := client.Read(context.Background(), &pbresource.ReadRequest{}) - require.NoError(t, err) - require.NotNil(t, resp) -} - func TestWrite_TODO(t *testing.T) { server := NewServer(Config{}) client := testClient(t, server)