resource: `WriteStatus` endpoint (#16886)
This commit is contained in:
parent
f08fc57997
commit
d46543631c
|
@ -58,11 +58,6 @@ func (s *Server) Register(grpcServer *grpc.Server) {
|
|||
pbresource.RegisterResourceServiceServer(grpcServer, s)
|
||||
}
|
||||
|
||||
func (s *Server) WriteStatus(ctx context.Context, req *pbresource.WriteStatusRequest) (*pbresource.WriteStatusResponse, error) {
|
||||
// TODO
|
||||
return &pbresource.WriteStatusResponse{}, nil
|
||||
}
|
||||
|
||||
// Get token from grpc metadata or AnonymounsTokenId if not found
|
||||
func tokenFromContext(ctx context.Context) string {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
|
|
|
@ -13,6 +13,8 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/acl/resolver"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/testutils"
|
||||
|
@ -22,17 +24,8 @@ import (
|
|||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
pbdemov2 "github.com/hashicorp/consul/proto/private/pbdemo/v2"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
func TestWriteStatus_TODO(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
resp, err := client.WriteStatus(context.Background(), &pbresource.WriteStatusRequest{})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
}
|
||||
|
||||
func randomACLIdentity(t *testing.T) structs.ACLIdentity {
|
||||
id, err := uuid.GenerateUUID()
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -10,13 +10,30 @@ import (
|
|||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
// errUseWriteStatus is returned when the user attempts to modify the resource
|
||||
// status using the Write endpoint.
|
||||
//
|
||||
// We only allow modifications to the status using the WriteStatus endpoint
|
||||
// because:
|
||||
//
|
||||
// - Setting statuses should only be done by controllers and requires different
|
||||
// permissions.
|
||||
//
|
||||
// - Status-only updates shouldn't increment the resource generation.
|
||||
//
|
||||
// While we could accomplish both in the Write handler, there's seldom need to
|
||||
// update the resource body and status at the same time, so it makes more sense
|
||||
// to keep them separate.
|
||||
var errUseWriteStatus = status.Error(codes.InvalidArgument, "resource.status can only be set using the WriteStatus endpoint")
|
||||
|
||||
func (s *Server) Write(ctx context.Context, req *pbresource.WriteRequest) (*pbresource.WriteResponse, error) {
|
||||
if err := validateWriteRequiredFields(req); err != nil {
|
||||
if err := validateWriteRequest(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -74,7 +91,9 @@ func (s *Server) Write(ctx context.Context, req *pbresource.WriteRequest) (*pbre
|
|||
case errors.Is(err, storage.ErrNotFound):
|
||||
input.Id.Uid = ulid.Make().String()
|
||||
|
||||
// TODO: Prevent setting statuses in this endpoint.
|
||||
if len(input.Status) != 0 {
|
||||
return errUseWriteStatus
|
||||
}
|
||||
|
||||
// Update path.
|
||||
case err == nil:
|
||||
|
@ -101,13 +120,15 @@ func (s *Server) Write(ctx context.Context, req *pbresource.WriteRequest) (*pbre
|
|||
// - Read returns stale version `v1`
|
||||
// - We carry `v1`'s statuses over (effectively overwriting `v2`'s statuses)
|
||||
// - CAS operation succeeds anyway because user-given version is current
|
||||
//
|
||||
// TODO(boxofrad): add a test for this once the status field has been added.
|
||||
if input.Version != existing.Version {
|
||||
return storage.ErrCASFailure
|
||||
}
|
||||
|
||||
// TODO: Carry over the statuses here.
|
||||
if input.Status == nil {
|
||||
input.Status = existing.Status
|
||||
} else if !resource.EqualStatus(input.Status, existing.Status) {
|
||||
return errUseWriteStatus
|
||||
}
|
||||
|
||||
default:
|
||||
return err
|
||||
|
@ -124,7 +145,10 @@ func (s *Server) Write(ctx context.Context, req *pbresource.WriteRequest) (*pbre
|
|||
case errors.Is(err, storage.ErrWrongUid):
|
||||
return nil, status.Error(codes.FailedPrecondition, err.Error())
|
||||
case err != nil:
|
||||
return nil, status.Errorf(codes.Internal, "failed to write resource: %v", err.Error())
|
||||
if _, ok := status.FromError(err); !ok {
|
||||
err = status.Errorf(codes.Internal, "failed to write resource: %v", err.Error())
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pbresource.WriteResponse{Resource: result}, nil
|
||||
|
@ -165,7 +189,7 @@ func (s *Server) retryCAS(ctx context.Context, vsn string, cas func() error) err
|
|||
return err
|
||||
}
|
||||
|
||||
func validateWriteRequiredFields(req *pbresource.WriteRequest) error {
|
||||
func validateWriteRequest(req *pbresource.WriteRequest) error {
|
||||
var field string
|
||||
switch {
|
||||
case req.Resource == nil:
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
func (s *Server) WriteStatus(ctx context.Context, req *pbresource.WriteStatusRequest) (*pbresource.WriteStatusResponse, error) {
|
||||
if err := validateWriteStatusRequest(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err := s.resolveType(req.Id.Type)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// At the storage backend layer, all writes are CAS operations.
|
||||
//
|
||||
// See comment in write.go for more information.
|
||||
//
|
||||
// Most controllers *won't* do an explicit CAS write of the status because it
|
||||
// doesn't provide much value, and conflicts are fairly likely in the flurry
|
||||
// of activity after a resource is updated.
|
||||
//
|
||||
// Here's why that's okay:
|
||||
//
|
||||
// - Controllers should only update their own status (identified by its key)
|
||||
// and updating separate statuses is commutative.
|
||||
//
|
||||
// - Controllers that make writes should be leader-elected singletons (i.e.
|
||||
// there should only be one instance of the controller running) so we don't
|
||||
// need to worry about multiple instances racing with each other.
|
||||
//
|
||||
// - Only controllers are supposed to write statuses, so you should never be
|
||||
// racing with a user's write of the same status.
|
||||
var result *pbresource.Resource
|
||||
err = s.retryCAS(ctx, req.Version, func() error {
|
||||
resource, err := s.Backend.Read(ctx, storage.EventualConsistency, req.Id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if req.Version != "" && req.Version != resource.Version {
|
||||
return storage.ErrCASFailure
|
||||
}
|
||||
|
||||
resource = clone(resource)
|
||||
if resource.Status == nil {
|
||||
resource.Status = make(map[string]*pbresource.Status)
|
||||
}
|
||||
resource.Status[req.Key] = req.Status
|
||||
|
||||
result, err = s.Backend.WriteCAS(ctx, resource)
|
||||
return err
|
||||
})
|
||||
|
||||
switch {
|
||||
case errors.Is(err, storage.ErrNotFound):
|
||||
return nil, status.Error(codes.NotFound, err.Error())
|
||||
case errors.Is(err, storage.ErrCASFailure):
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
case err != nil:
|
||||
return nil, status.Errorf(codes.Internal, "failed to write resource: %v", err.Error())
|
||||
}
|
||||
|
||||
return &pbresource.WriteStatusResponse{Resource: result}, nil
|
||||
}
|
||||
|
||||
func validateWriteStatusRequest(req *pbresource.WriteStatusRequest) error {
|
||||
var field string
|
||||
switch {
|
||||
case req.Id == nil:
|
||||
field = "id"
|
||||
case req.Id.Type == nil:
|
||||
field = "id.type"
|
||||
case req.Id.Tenancy == nil:
|
||||
field = "id.tenancy"
|
||||
case req.Id.Name == "":
|
||||
field = "id.name"
|
||||
case req.Id.Uid == "":
|
||||
// We require Uid because only controllers should write statuses and
|
||||
// controllers should *always* refer to a specific incarnation of a
|
||||
// resource using its Uid.
|
||||
field = "id.uid"
|
||||
case req.Key == "":
|
||||
field = "key"
|
||||
case req.Status == nil:
|
||||
field = "status"
|
||||
case req.Status.ObservedGeneration == "":
|
||||
field = "status.observed_generation"
|
||||
default:
|
||||
for i, condition := range req.Status.Conditions {
|
||||
if condition.Type == "" {
|
||||
field = fmt.Sprintf("status.conditions[%d].type", i)
|
||||
break
|
||||
}
|
||||
|
||||
if condition.Resource != nil {
|
||||
switch {
|
||||
case condition.Resource.Type == nil:
|
||||
field = fmt.Sprintf("status.conditions[%d].resource.type", i)
|
||||
break
|
||||
case condition.Resource.Tenancy == nil:
|
||||
field = fmt.Sprintf("status.conditions[%d].resource.tenancy", i)
|
||||
break
|
||||
case condition.Resource.Name == "":
|
||||
field = fmt.Sprintf("status.conditions[%d].resource.name", i)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if field != "" {
|
||||
return status.Errorf(codes.InvalidArgument, "%s is required", field)
|
||||
}
|
||||
|
||||
if _, err := ulid.ParseStrict(req.Status.ObservedGeneration); err != nil {
|
||||
return status.Error(codes.InvalidArgument, "status.observed_generation is not valid")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,237 @@
|
|||
package resource
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/resource/demo"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
func TestWriteStatus_InputValidation(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
|
||||
demo.Register(server.Registry)
|
||||
|
||||
testCases := map[string]func(*pbresource.WriteStatusRequest){
|
||||
"no id": func(req *pbresource.WriteStatusRequest) { req.Id = nil },
|
||||
"no type": func(req *pbresource.WriteStatusRequest) { req.Id.Type = nil },
|
||||
"no tenancy": func(req *pbresource.WriteStatusRequest) { req.Id.Tenancy = nil },
|
||||
"no name": func(req *pbresource.WriteStatusRequest) { req.Id.Name = "" },
|
||||
"no uid": func(req *pbresource.WriteStatusRequest) { req.Id.Uid = "" },
|
||||
"no key": func(req *pbresource.WriteStatusRequest) { req.Key = "" },
|
||||
"no status": func(req *pbresource.WriteStatusRequest) { req.Status = nil },
|
||||
"no observed generation": func(req *pbresource.WriteStatusRequest) { req.Status.ObservedGeneration = "" },
|
||||
"bad observed generation": func(req *pbresource.WriteStatusRequest) { req.Status.ObservedGeneration = "bogus" },
|
||||
"no condition type": func(req *pbresource.WriteStatusRequest) { req.Status.Conditions[0].Type = "" },
|
||||
"no reference type": func(req *pbresource.WriteStatusRequest) { req.Status.Conditions[0].Resource.Type = nil },
|
||||
"no reference tenancy": func(req *pbresource.WriteStatusRequest) { req.Status.Conditions[0].Resource.Tenancy = nil },
|
||||
"no reference name": func(req *pbresource.WriteStatusRequest) { req.Status.Conditions[0].Resource.Name = "" },
|
||||
}
|
||||
for desc, modFn := range testCases {
|
||||
t.Run(desc, func(t *testing.T) {
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
|
||||
res.Id.Uid = ulid.Make().String()
|
||||
res.Generation = ulid.Make().String()
|
||||
|
||||
req := validWriteStatusRequest(t, res)
|
||||
modFn(req)
|
||||
|
||||
_, err = client.WriteStatus(testContext(t), req)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteStatus_Success(t *testing.T) {
|
||||
for desc, fn := range map[string]func(*pbresource.WriteStatusRequest){
|
||||
"CAS": func(*pbresource.WriteStatusRequest) {},
|
||||
"Non CAS": func(req *pbresource.WriteStatusRequest) { req.Version = "" },
|
||||
} {
|
||||
t.Run(desc, func(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
|
||||
demo.Register(server.Registry)
|
||||
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
|
||||
writeRsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
|
||||
require.NoError(t, err)
|
||||
res = writeRsp.Resource
|
||||
|
||||
req := validWriteStatusRequest(t, res)
|
||||
fn(req)
|
||||
|
||||
rsp, err := client.WriteStatus(testContext(t), req)
|
||||
require.NoError(t, err)
|
||||
res = rsp.Resource
|
||||
|
||||
req = validWriteStatusRequest(t, res)
|
||||
req.Key = "consul.io/other-controller"
|
||||
fn(req)
|
||||
|
||||
rsp, err = client.WriteStatus(testContext(t), req)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, rsp.Resource.Generation, res.Generation, "generation should not have changed")
|
||||
require.NotEqual(t, rsp.Resource.Version, res.Version, "version should have changed")
|
||||
require.Contains(t, rsp.Resource.Status, "consul.io/other-controller")
|
||||
require.Contains(t, rsp.Resource.Status, "consul.io/artist-controller")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteStatus_CASFailure(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
|
||||
demo.Register(server.Registry)
|
||||
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
|
||||
rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
|
||||
require.NoError(t, err)
|
||||
res = rsp.Resource
|
||||
|
||||
req := validWriteStatusRequest(t, res)
|
||||
req.Version = "nope"
|
||||
|
||||
_, err = client.WriteStatus(testContext(t), req)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, codes.Aborted.String(), status.Code(err).String())
|
||||
}
|
||||
|
||||
func TestWriteStatus_TypeNotFound(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
res.Id.Uid = ulid.Make().String()
|
||||
res.Generation = ulid.Make().String()
|
||||
|
||||
_, err = client.WriteStatus(testContext(t), validWriteStatusRequest(t, res))
|
||||
require.Error(t, err)
|
||||
require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String())
|
||||
require.Contains(t, err.Error(), "resource type demo.v2.artist not registered")
|
||||
}
|
||||
|
||||
func TestWriteStatus_ResourceNotFound(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
demo.Register(server.Registry)
|
||||
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
res.Id.Uid = ulid.Make().String()
|
||||
res.Generation = ulid.Make().String()
|
||||
|
||||
_, err = client.WriteStatus(testContext(t), validWriteStatusRequest(t, res))
|
||||
require.Error(t, err)
|
||||
require.Equal(t, codes.NotFound.String(), status.Code(err).String())
|
||||
}
|
||||
|
||||
func TestWriteStatus_WrongUid(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
demo.Register(server.Registry)
|
||||
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
|
||||
rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
|
||||
require.NoError(t, err)
|
||||
res = rsp.Resource
|
||||
|
||||
req := validWriteStatusRequest(t, res)
|
||||
req.Id.Uid = ulid.Make().String()
|
||||
|
||||
_, err = client.WriteStatus(testContext(t), req)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, codes.NotFound.String(), status.Code(err).String())
|
||||
}
|
||||
|
||||
func TestWriteStatus_NonCASUpdate_Retry(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
|
||||
demo.Register(server.Registry)
|
||||
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
|
||||
rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
|
||||
require.NoError(t, err)
|
||||
res = rsp.Resource
|
||||
|
||||
// Simulate conflicting writes by blocking the RPC after it has read the
|
||||
// current version of the resource, but before it tries to make a write.
|
||||
backend := &blockOnceBackend{
|
||||
Backend: server.Backend,
|
||||
|
||||
readCh: make(chan struct{}),
|
||||
blockCh: make(chan struct{}),
|
||||
}
|
||||
server.Backend = backend
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
req := validWriteStatusRequest(t, res)
|
||||
req.Version = ""
|
||||
|
||||
_, err := client.WriteStatus(testContext(t), req)
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
// Wait for the read, to ensure the Write in the goroutine above has read the
|
||||
// current version of the resource.
|
||||
<-backend.readCh
|
||||
|
||||
// Update the resource.
|
||||
_, err = client.Write(testContext(t), &pbresource.WriteRequest{Resource: modifyArtist(t, res)})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Unblock the read.
|
||||
close(backend.blockCh)
|
||||
|
||||
// Check that the write succeeded anyway because of a retry.
|
||||
require.NoError(t, <-errCh)
|
||||
}
|
||||
|
||||
func validWriteStatusRequest(t *testing.T, res *pbresource.Resource) *pbresource.WriteStatusRequest {
|
||||
t.Helper()
|
||||
|
||||
album, err := demo.GenerateV2Album(res.Id)
|
||||
require.NoError(t, err)
|
||||
|
||||
return &pbresource.WriteStatusRequest{
|
||||
Id: res.Id,
|
||||
Version: res.Version,
|
||||
Key: "consul.io/artist-controller",
|
||||
Status: &pbresource.Status{
|
||||
ObservedGeneration: res.Generation,
|
||||
Conditions: []*pbresource.Condition{
|
||||
{
|
||||
Type: "AlbumCreated",
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: "AlbumCreated",
|
||||
Message: fmt.Sprintf("Album '%s' created", album.Id.Name),
|
||||
Resource: resource.Reference(album.Id, ""),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
|
@ -2,9 +2,10 @@ package resource
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
@ -69,7 +70,7 @@ func TestWrite_TypeNotFound(t *testing.T) {
|
|||
require.Contains(t, err.Error(), "resource type demo.v2.artist not registered")
|
||||
}
|
||||
|
||||
func TestWrite_ResourceCreation(t *testing.T) {
|
||||
func TestWrite_ResourceCreation_Success(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
|
||||
|
@ -107,6 +108,25 @@ func TestWrite_CASUpdate_Success(t *testing.T) {
|
|||
require.NotEqual(t, rsp1.Resource.Generation, rsp2.Resource.Generation)
|
||||
}
|
||||
|
||||
func TestWrite_ResourceCreation_StatusProvided(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
|
||||
demo.Register(server.Registry)
|
||||
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
|
||||
res.Status = map[string]*pbresource.Status{
|
||||
"consul.io/some-controller": {ObservedGeneration: ulid.Make().String()},
|
||||
}
|
||||
|
||||
_, err = client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
|
||||
require.Error(t, err)
|
||||
require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String())
|
||||
require.Contains(t, err.Error(), "WriteStatus endpoint")
|
||||
}
|
||||
|
||||
func TestWrite_CASUpdate_Failure(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
|
@ -149,6 +169,60 @@ func TestWrite_Update_WrongUid(t *testing.T) {
|
|||
require.Contains(t, err.Error(), "uid doesn't match")
|
||||
}
|
||||
|
||||
func TestWrite_Update_StatusModified(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
|
||||
demo.Register(server.Registry)
|
||||
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
|
||||
rsp1, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
|
||||
require.NoError(t, err)
|
||||
|
||||
statusRsp, err := client.WriteStatus(testContext(t), validWriteStatusRequest(t, rsp1.Resource))
|
||||
require.NoError(t, err)
|
||||
res = statusRsp.Resource
|
||||
|
||||
// Passing the staus unmodified should be fine.
|
||||
rsp2, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Attempting to modify the status should return an error.
|
||||
res = rsp2.Resource
|
||||
res.Status["consul.io/other-controller"] = &pbresource.Status{ObservedGeneration: res.Generation}
|
||||
|
||||
_, err = client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
|
||||
require.Error(t, err)
|
||||
require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String())
|
||||
require.Contains(t, err.Error(), "WriteStatus endpoint")
|
||||
}
|
||||
|
||||
func TestWrite_Update_NilStatus(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
|
||||
demo.Register(server.Registry)
|
||||
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
|
||||
rsp1, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
|
||||
require.NoError(t, err)
|
||||
|
||||
statusRsp, err := client.WriteStatus(testContext(t), validWriteStatusRequest(t, rsp1.Resource))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Passing a nil status should be fine (and carry over the old status).
|
||||
res = statusRsp.Resource
|
||||
res.Status = nil
|
||||
|
||||
rsp2, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, rsp2.Resource.Status)
|
||||
}
|
||||
|
||||
func TestWrite_Update_NoUid(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
|
@ -239,7 +313,7 @@ func TestWrite_NonCASUpdate_Retry(t *testing.T) {
|
|||
type blockOnceBackend struct {
|
||||
storage.Backend
|
||||
|
||||
once sync.Once
|
||||
done uint32
|
||||
readCh chan struct{}
|
||||
blockCh chan struct{}
|
||||
}
|
||||
|
@ -247,10 +321,12 @@ type blockOnceBackend struct {
|
|||
func (b *blockOnceBackend) Read(ctx context.Context, consistency storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) {
|
||||
res, err := b.Backend.Read(ctx, consistency, id)
|
||||
|
||||
b.once.Do(func() {
|
||||
// Block for exactly one call to Read. All subsequent calls (including those
|
||||
// concurrent to the blocked call) will return immediately.
|
||||
if atomic.CompareAndSwapUint32(&b.done, 0, 1) {
|
||||
close(b.readCh)
|
||||
<-b.blockCh
|
||||
})
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
package resource
|
||||
|
||||
import "github.com/hashicorp/consul/proto-public/pbresource"
|
||||
|
||||
// Reference returns a reference to the resource with the given ID.
|
||||
func Reference(id *pbresource.ID, section string) *pbresource.Reference {
|
||||
return &pbresource.Reference{
|
||||
Type: id.Type,
|
||||
Tenancy: id.Tenancy,
|
||||
Name: id.Name,
|
||||
Section: section,
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
package resource
|
||||
|
||||
import (
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
// EqualStatus compares two status maps for equality.
|
||||
func EqualStatus(a, b map[string]*pbresource.Status) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
|
||||
compared := make(map[string]struct{})
|
||||
for k, av := range a {
|
||||
bv, ok := b[k]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if !proto.Equal(av, bv) {
|
||||
return false
|
||||
}
|
||||
compared[k] = struct{}{}
|
||||
}
|
||||
|
||||
for k, bv := range b {
|
||||
if _, skip := compared[k]; skip {
|
||||
continue
|
||||
}
|
||||
|
||||
av, ok := a[k]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
if !proto.Equal(av, bv) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
package resource
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
func TestEqualStatus(t *testing.T) {
|
||||
generation := ulid.Make().String()
|
||||
|
||||
for idx, tc := range []struct {
|
||||
a, b map[string]*pbresource.Status
|
||||
equal bool
|
||||
}{
|
||||
{nil, nil, true},
|
||||
{nil, map[string]*pbresource.Status{}, true},
|
||||
{
|
||||
map[string]*pbresource.Status{
|
||||
"consul.io/some-controller": {
|
||||
ObservedGeneration: generation,
|
||||
Conditions: []*pbresource.Condition{
|
||||
{
|
||||
Type: "Foo",
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: "Bar",
|
||||
Message: "Foo is true because of Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
map[string]*pbresource.Status{
|
||||
"consul.io/some-controller": {
|
||||
ObservedGeneration: generation,
|
||||
Conditions: []*pbresource.Condition{
|
||||
{
|
||||
Type: "Foo",
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: "Bar",
|
||||
Message: "Foo is true because of Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
true,
|
||||
},
|
||||
{
|
||||
map[string]*pbresource.Status{
|
||||
"consul.io/some-controller": {
|
||||
ObservedGeneration: generation,
|
||||
Conditions: []*pbresource.Condition{
|
||||
{
|
||||
Type: "Foo",
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: "Bar",
|
||||
Message: "Foo is true because of Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
map[string]*pbresource.Status{
|
||||
"consul.io/some-controller": {
|
||||
ObservedGeneration: generation,
|
||||
Conditions: []*pbresource.Condition{
|
||||
{
|
||||
Type: "Foo",
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: "Bar",
|
||||
Message: "Foo is false because of Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
{
|
||||
map[string]*pbresource.Status{
|
||||
"consul.io/some-controller": {
|
||||
ObservedGeneration: generation,
|
||||
Conditions: []*pbresource.Condition{
|
||||
{
|
||||
Type: "Foo",
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: "Bar",
|
||||
Message: "Foo is true because of Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
map[string]*pbresource.Status{
|
||||
"consul.io/some-controller": {
|
||||
ObservedGeneration: generation,
|
||||
Conditions: []*pbresource.Condition{
|
||||
{
|
||||
Type: "Foo",
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: "Bar",
|
||||
Message: "Foo is true because of Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
"consul.io/other-controller": {
|
||||
ObservedGeneration: generation,
|
||||
Conditions: []*pbresource.Condition{
|
||||
{
|
||||
Type: "Foo",
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: "Bar",
|
||||
Message: "Foo is true because of Bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
} {
|
||||
t.Run(fmt.Sprintf("%d", idx), func(t *testing.T) {
|
||||
require.Equal(t, tc.equal, EqualStatus(tc.a, tc.b))
|
||||
require.Equal(t, tc.equal, EqualStatus(tc.b, tc.a))
|
||||
})
|
||||
}
|
||||
}
|
|
@ -47,6 +47,36 @@ func (msg *Resource) UnmarshalBinary(b []byte) error {
|
|||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *Status) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *Status) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *Condition) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *Condition) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *Reference) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *Reference) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WatchEvent) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
|
@ -117,16 +147,6 @@ func (msg *WriteResponse) UnmarshalBinary(b []byte) error {
|
|||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WriteStatusResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *WriteStatusResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WriteStatusRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
|
@ -137,6 +157,16 @@ func (msg *WriteStatusRequest) UnmarshalBinary(b []byte) error {
|
|||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WriteStatusResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *WriteStatusResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *DeleteRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -34,11 +34,38 @@ message Resource {
|
|||
string generation = 4;
|
||||
|
||||
map<string, string> metadata = 5;
|
||||
reserved 6; // status
|
||||
map<string, Status> status = 6;
|
||||
|
||||
google.protobuf.Any data = 7;
|
||||
}
|
||||
|
||||
message Status {
|
||||
string observed_generation = 1;
|
||||
repeated Condition conditions = 2;
|
||||
}
|
||||
|
||||
message Condition {
|
||||
enum State {
|
||||
// buf:lint:ignore ENUM_ZERO_VALUE_SUFFIX
|
||||
STATE_UNKNOWN = 0;
|
||||
STATE_TRUE = 1;
|
||||
STATE_FALSE = 2;
|
||||
}
|
||||
|
||||
string type = 1;
|
||||
State state = 2;
|
||||
string reason = 3;
|
||||
string message = 4;
|
||||
Reference resource = 5;
|
||||
}
|
||||
|
||||
message Reference {
|
||||
Type type = 1;
|
||||
Tenancy tenancy = 2;
|
||||
string name = 3;
|
||||
string section = 4;
|
||||
}
|
||||
|
||||
message WatchEvent {
|
||||
enum Operation {
|
||||
OPERATION_UNSPECIFIED = 0;
|
||||
|
@ -95,13 +122,6 @@ service ResourceService {
|
|||
}
|
||||
}
|
||||
|
||||
enum Condition {
|
||||
CONDITION_UNSPECIFIED = 0;
|
||||
CONDITION_ACCEPTED = 1;
|
||||
CONDITION_INVALID = 2;
|
||||
CONDITION_PERSISTENT_FAILURE = 3;
|
||||
}
|
||||
|
||||
message ReadRequest {
|
||||
ID id = 1;
|
||||
}
|
||||
|
@ -128,17 +148,15 @@ message WriteResponse {
|
|||
Resource resource = 1;
|
||||
}
|
||||
|
||||
message WriteStatusResponse {
|
||||
Resource resource = 1;
|
||||
}
|
||||
|
||||
message WriteStatusRequest {
|
||||
ID id = 1;
|
||||
string version = 2;
|
||||
string key = 3;
|
||||
Condition condition = 4;
|
||||
string state = 5;
|
||||
repeated string messages = 6;
|
||||
Status status = 4;
|
||||
}
|
||||
|
||||
message WriteStatusResponse {
|
||||
Resource resource = 1;
|
||||
}
|
||||
|
||||
message DeleteRequest {
|
||||
|
|
Loading…
Reference in New Issue