Controller Supervision (#17016)
This commit is contained in:
parent
5979752994
commit
f7c4f04060
|
@ -68,6 +68,7 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/rpc/peering"
|
"github.com/hashicorp/consul/agent/rpc/peering"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
|
"github.com/hashicorp/consul/internal/controller"
|
||||||
"github.com/hashicorp/consul/internal/resource"
|
"github.com/hashicorp/consul/internal/resource"
|
||||||
"github.com/hashicorp/consul/internal/resource/demo"
|
"github.com/hashicorp/consul/internal/resource/demo"
|
||||||
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
|
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
|
||||||
|
@ -435,6 +436,10 @@ type Server struct {
|
||||||
// with the Resource Service in-process (i.e. not via the network) without auth.
|
// with the Resource Service in-process (i.e. not via the network) without auth.
|
||||||
// It should only be used for purely-internal workloads, such as controllers.
|
// It should only be used for purely-internal workloads, such as controllers.
|
||||||
internalResourceServiceClient pbresource.ResourceServiceClient
|
internalResourceServiceClient pbresource.ResourceServiceClient
|
||||||
|
|
||||||
|
// controllerManager schedules the execution of controllers.
|
||||||
|
controllerManager *controller.Manager
|
||||||
|
|
||||||
// handles metrics reporting to HashiCorp
|
// handles metrics reporting to HashiCorp
|
||||||
reportingManager *reporting.ReportingManager
|
reportingManager *reporting.ReportingManager
|
||||||
}
|
}
|
||||||
|
@ -500,6 +505,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
||||||
incomingRPCLimiter: incomingRPCLimiter,
|
incomingRPCLimiter: incomingRPCLimiter,
|
||||||
routineManager: routine.NewManager(logger.Named(logging.ConsulServer)),
|
routineManager: routine.NewManager(logger.Named(logging.ConsulServer)),
|
||||||
typeRegistry: resource.NewRegistry(),
|
typeRegistry: resource.NewRegistry(),
|
||||||
|
controllerManager: controller.NewManager(logger.Named(logging.ControllerRuntime)),
|
||||||
}
|
}
|
||||||
incomingRPCLimiter.Register(s)
|
incomingRPCLimiter.Register(s)
|
||||||
|
|
||||||
|
@ -824,8 +830,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.config.DevMode {
|
if s.config.DevMode {
|
||||||
demo.Register(s.typeRegistry)
|
demo.RegisterTypes(s.typeRegistry)
|
||||||
|
demo.RegisterControllers(s.controllerManager)
|
||||||
}
|
}
|
||||||
|
go s.controllerManager.Run(&lib.StopChannelContext{StopCh: shutdownCh})
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
@ -1951,6 +1959,7 @@ func (s *Server) trackLeaderChanges() {
|
||||||
s.grpcLeaderForwarder.UpdateLeaderAddr(s.config.Datacenter, string(leaderObs.LeaderAddr))
|
s.grpcLeaderForwarder.UpdateLeaderAddr(s.config.Datacenter, string(leaderObs.LeaderAddr))
|
||||||
s.peeringBackend.SetLeaderAddress(string(leaderObs.LeaderAddr))
|
s.peeringBackend.SetLeaderAddress(string(leaderObs.LeaderAddr))
|
||||||
s.raftStorageBackend.LeaderChanged()
|
s.raftStorageBackend.LeaderChanged()
|
||||||
|
s.controllerManager.SetRaftLeader(s.IsLeader())
|
||||||
|
|
||||||
// Trigger sending an update to HCP status
|
// Trigger sending an update to HCP status
|
||||||
s.hcpManager.SendUpdate()
|
s.hcpManager.SendUpdate()
|
||||||
|
|
|
@ -22,7 +22,7 @@ func TestDelete_InputValidation(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
testCases := map[string]func(*pbresource.DeleteRequest){
|
testCases := map[string]func(*pbresource.DeleteRequest){
|
||||||
"no id": func(req *pbresource.DeleteRequest) { req.Id = nil },
|
"no id": func(req *pbresource.DeleteRequest) { req.Id = nil },
|
||||||
|
@ -101,7 +101,7 @@ func TestDelete_ACLs(t *testing.T) {
|
||||||
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
|
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
|
||||||
Return(tc.authz, nil)
|
Return(tc.authz, nil)
|
||||||
server.ACLResolver = mockACLResolver
|
server.ACLResolver = mockACLResolver
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -122,8 +122,7 @@ func TestDelete_Success(t *testing.T) {
|
||||||
for desc, tc := range deleteTestCases() {
|
for desc, tc := range deleteTestCases() {
|
||||||
t.Run(desc, func(t *testing.T) {
|
t.Run(desc, func(t *testing.T) {
|
||||||
server, client, ctx := testDeps(t)
|
server, client, ctx := testDeps(t)
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -150,7 +149,7 @@ func TestDelete_NotFound(t *testing.T) {
|
||||||
for desc, tc := range deleteTestCases() {
|
for desc, tc := range deleteTestCases() {
|
||||||
t.Run(desc, func(t *testing.T) {
|
t.Run(desc, func(t *testing.T) {
|
||||||
server, client, ctx := testDeps(t)
|
server, client, ctx := testDeps(t)
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -165,7 +164,7 @@ func TestDelete_VersionMismatch(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
server, client, ctx := testDeps(t)
|
server, client, ctx := testDeps(t)
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
rsp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: artist})
|
rsp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: artist})
|
||||||
|
|
|
@ -40,7 +40,7 @@ func TestList_Empty(t *testing.T) {
|
||||||
for desc, tc := range listTestCases() {
|
for desc, tc := range listTestCases() {
|
||||||
t.Run(desc, func(t *testing.T) {
|
t.Run(desc, func(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
rsp, err := client.List(tc.ctx, &pbresource.ListRequest{
|
rsp, err := client.List(tc.ctx, &pbresource.ListRequest{
|
||||||
|
@ -58,7 +58,7 @@ func TestList_Many(t *testing.T) {
|
||||||
for desc, tc := range listTestCases() {
|
for desc, tc := range listTestCases() {
|
||||||
t.Run(desc, func(t *testing.T) {
|
t.Run(desc, func(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
resources := make([]*pbresource.Resource, 10)
|
resources := make([]*pbresource.Resource, 10)
|
||||||
|
@ -89,7 +89,7 @@ func TestList_GroupVersionMismatch(t *testing.T) {
|
||||||
for desc, tc := range listTestCases() {
|
for desc, tc := range listTestCases() {
|
||||||
t.Run(desc, func(t *testing.T) {
|
t.Run(desc, func(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
|
@ -116,7 +116,7 @@ func TestList_VerifyReadConsistencyArg(t *testing.T) {
|
||||||
mockBackend := NewMockBackend(t)
|
mockBackend := NewMockBackend(t)
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
server.Backend = mockBackend
|
server.Backend = mockBackend
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -133,7 +133,7 @@ func TestList_VerifyReadConsistencyArg(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// N.B. Uses key ACLs for now. See demo.Register()
|
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
|
||||||
func TestList_ACL_ListDenied(t *testing.T) {
|
func TestList_ACL_ListDenied(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -146,7 +146,7 @@ func TestList_ACL_ListDenied(t *testing.T) {
|
||||||
require.Contains(t, err.Error(), "lacks permission 'key:list'")
|
require.Contains(t, err.Error(), "lacks permission 'key:list'")
|
||||||
}
|
}
|
||||||
|
|
||||||
// N.B. Uses key ACLs for now. See demo.Register()
|
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
|
||||||
func TestList_ACL_ListAllowed_ReadDenied(t *testing.T) {
|
func TestList_ACL_ListAllowed_ReadDenied(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -160,7 +160,7 @@ func TestList_ACL_ListAllowed_ReadDenied(t *testing.T) {
|
||||||
require.Empty(t, rsp.Resources)
|
require.Empty(t, rsp.Resources)
|
||||||
}
|
}
|
||||||
|
|
||||||
// N.B. Uses key ACLs for now. See demo.Register()
|
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
|
||||||
func TestList_ACL_ListAllowed_ReadAllowed(t *testing.T) {
|
func TestList_ACL_ListAllowed_ReadAllowed(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -184,7 +184,7 @@ func roundTripList(t *testing.T, authz acl.Authorizer) (*pbresource.Resource, *p
|
||||||
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
|
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
|
||||||
Return(authz, nil)
|
Return(authz, nil)
|
||||||
server.ACLResolver = mockACLResolver
|
server.ACLResolver = mockACLResolver
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -25,7 +25,7 @@ func TestRead_InputValidation(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
testCases := map[string]func(*pbresource.ReadRequest){
|
testCases := map[string]func(*pbresource.ReadRequest){
|
||||||
"no id": func(req *pbresource.ReadRequest) { req.Id = nil },
|
"no id": func(req *pbresource.ReadRequest) { req.Id = nil },
|
||||||
|
@ -79,7 +79,7 @@ func TestRead_ResourceNotFound(t *testing.T) {
|
||||||
t.Run(desc, func(t *testing.T) {
|
t.Run(desc, func(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
|
@ -98,7 +98,7 @@ func TestRead_GroupVersionMismatch(t *testing.T) {
|
||||||
t.Run(desc, func(t *testing.T) {
|
t.Run(desc, func(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
|
@ -123,7 +123,7 @@ func TestRead_Success(t *testing.T) {
|
||||||
t.Run(desc, func(t *testing.T) {
|
t.Run(desc, func(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
|
@ -146,7 +146,7 @@ func TestRead_VerifyReadConsistencyArg(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
mockBackend := NewMockBackend(t)
|
mockBackend := NewMockBackend(t)
|
||||||
server.Backend = mockBackend
|
server.Backend = mockBackend
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -162,7 +162,7 @@ func TestRead_VerifyReadConsistencyArg(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// N.B. Uses key ACLs for now. See demo.Register()
|
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
|
||||||
func TestRead_ACLs(t *testing.T) {
|
func TestRead_ACLs(t *testing.T) {
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
authz resolver.Result
|
authz resolver.Result
|
||||||
|
@ -188,7 +188,7 @@ func TestRead_ACLs(t *testing.T) {
|
||||||
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
|
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
|
||||||
Return(tc.authz, nil)
|
Return(tc.authz, nil)
|
||||||
server.ACLResolver = mockACLResolver
|
server.ACLResolver = mockACLResolver
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -46,7 +46,7 @@ func TestWatchList_GroupVersionMatches(t *testing.T) {
|
||||||
|
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// create a watch
|
// create a watch
|
||||||
|
@ -90,7 +90,7 @@ func TestWatchList_GroupVersionMismatch(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
@ -123,7 +123,7 @@ func TestWatchList_GroupVersionMismatch(t *testing.T) {
|
||||||
mustGetNoResource(t, rspCh)
|
mustGetNoResource(t, rspCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// N.B. Uses key ACLs for now. See demo.Register()
|
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
|
||||||
func TestWatchList_ACL_ListDenied(t *testing.T) {
|
func TestWatchList_ACL_ListDenied(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -137,7 +137,7 @@ func TestWatchList_ACL_ListDenied(t *testing.T) {
|
||||||
require.Contains(t, err.Error(), "lacks permission 'key:list'")
|
require.Contains(t, err.Error(), "lacks permission 'key:list'")
|
||||||
}
|
}
|
||||||
|
|
||||||
// N.B. Uses key ACLs for now. See demo.Register()
|
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
|
||||||
func TestWatchList_ACL_ListAllowed_ReadDenied(t *testing.T) {
|
func TestWatchList_ACL_ListAllowed_ReadDenied(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -152,7 +152,7 @@ func TestWatchList_ACL_ListAllowed_ReadDenied(t *testing.T) {
|
||||||
mustGetNoResource(t, rspCh)
|
mustGetNoResource(t, rspCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// N.B. Uses key ACLs for now. See demo.Register()
|
// N.B. Uses key ACLs for now. See demo.RegisterTypes()
|
||||||
func TestWatchList_ACL_ListAllowed_ReadAllowed(t *testing.T) {
|
func TestWatchList_ACL_ListAllowed_ReadAllowed(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -177,7 +177,7 @@ func roundTripACL(t *testing.T, authz acl.Authorizer) (<-chan resourceOrError, *
|
||||||
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
|
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
|
||||||
Return(authz, nil)
|
Return(authz, nil)
|
||||||
server.ACLResolver = mockACLResolver
|
server.ACLResolver = mockACLResolver
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -69,7 +69,7 @@ func TestWriteStatus_InputValidation(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
testCases := map[string]func(*pbresource.WriteStatusRequest){
|
testCases := map[string]func(*pbresource.WriteStatusRequest){
|
||||||
"no id": func(req *pbresource.WriteStatusRequest) { req.Id = nil },
|
"no id": func(req *pbresource.WriteStatusRequest) { req.Id = nil },
|
||||||
|
@ -113,7 +113,7 @@ func TestWriteStatus_Success(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -148,7 +148,7 @@ func TestWriteStatus_CASFailure(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -183,7 +183,7 @@ func TestWriteStatus_TypeNotFound(t *testing.T) {
|
||||||
func TestWriteStatus_ResourceNotFound(t *testing.T) {
|
func TestWriteStatus_ResourceNotFound(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -198,7 +198,7 @@ func TestWriteStatus_ResourceNotFound(t *testing.T) {
|
||||||
func TestWriteStatus_WrongUid(t *testing.T) {
|
func TestWriteStatus_WrongUid(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -219,7 +219,7 @@ func TestWriteStatus_NonCASUpdate_Retry(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -26,7 +26,7 @@ func TestWrite_InputValidation(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
testCases := map[string]func(*pbresource.WriteRequest){
|
testCases := map[string]func(*pbresource.WriteRequest){
|
||||||
"no resource": func(req *pbresource.WriteRequest) { req.Resource = nil },
|
"no resource": func(req *pbresource.WriteRequest) { req.Resource = nil },
|
||||||
|
@ -79,7 +79,7 @@ func TestWrite_OwnerValidation(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
modReqFn func(req *pbresource.WriteRequest)
|
modReqFn func(req *pbresource.WriteRequest)
|
||||||
|
@ -183,7 +183,7 @@ func TestWrite_ACLs(t *testing.T) {
|
||||||
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
|
mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything).
|
||||||
Return(tc.authz, nil)
|
Return(tc.authz, nil)
|
||||||
server.ACLResolver = mockACLResolver
|
server.ACLResolver = mockACLResolver
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -198,7 +198,7 @@ func TestWrite_ACLs(t *testing.T) {
|
||||||
func TestWrite_Mutate(t *testing.T) {
|
func TestWrite_Mutate(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -224,7 +224,7 @@ func TestWrite_ResourceCreation_Success(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -240,7 +240,7 @@ func TestWrite_CASUpdate_Success(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -262,7 +262,7 @@ func TestWrite_ResourceCreation_StatusProvided(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -281,7 +281,7 @@ func TestWrite_CASUpdate_Failure(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -302,7 +302,7 @@ func TestWrite_Update_WrongUid(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -323,7 +323,7 @@ func TestWrite_Update_StatusModified(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -353,7 +353,7 @@ func TestWrite_Update_NilStatus(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -377,7 +377,7 @@ func TestWrite_Update_NoUid(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -396,7 +396,7 @@ func TestWrite_NonCASUpdate_Success(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -417,7 +417,7 @@ func TestWrite_NonCASUpdate_Retry(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
res, err := demo.GenerateV2Artist()
|
res, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -467,7 +467,7 @@ func TestWrite_Owner_Immutable(t *testing.T) {
|
||||||
server := testServer(t)
|
server := testServer(t)
|
||||||
client := testClient(t, server)
|
client := testClient(t, server)
|
||||||
|
|
||||||
demo.Register(server.Registry)
|
demo.RegisterTypes(server.Registry)
|
||||||
|
|
||||||
artist, err := demo.GenerateV2Artist()
|
artist, err := demo.GenerateV2Artist()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ForType begins building a Controller for the given resource type.
|
||||||
|
func ForType(managedType *pbresource.Type) Controller {
|
||||||
|
return Controller{managedType: managedType}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Controller runs a reconciliation loop to respond to changes in resources and
|
||||||
|
// their dependencies. It is heavily inspired by Kubernetes' controller pattern:
|
||||||
|
// https://kubernetes.io/docs/concepts/architecture/controller/
|
||||||
|
//
|
||||||
|
// Use the builder methods in this package (starting with ForType) to construct
|
||||||
|
// a controller, and then pass it to a Manager to be executed.
|
||||||
|
type Controller struct {
|
||||||
|
managedType *pbresource.Type
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// controllerRunner contains the actual implementation of running a controller
|
||||||
|
// including creating watches, calling the reconciler, handling retries, etc.
|
||||||
|
type controllerRunner struct {
|
||||||
|
ctrl Controller
|
||||||
|
logger hclog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *controllerRunner) run(ctx context.Context) error {
|
||||||
|
c.logger.Debug("controller running")
|
||||||
|
defer c.logger.Debug("controller stopping")
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
package controller
|
||||||
|
|
||||||
|
// Lease is used to ensure controllers are run as singletons (i.e. one leader-
|
||||||
|
// elected instance per cluster).
|
||||||
|
//
|
||||||
|
// Currently, this is just an abstraction over Raft leadership. In the future,
|
||||||
|
// we'll build a backend-agnostic leasing system into the Resource Service which
|
||||||
|
// will allow us to balance controllers between many servers.
|
||||||
|
type Lease interface {
|
||||||
|
// Held returns whether we are the current lease-holders.
|
||||||
|
Held() bool
|
||||||
|
|
||||||
|
// Changed returns a channel on which you can receive notifications whenever
|
||||||
|
// the lease is acquired or lost.
|
||||||
|
Changed() <-chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type raftLease struct {
|
||||||
|
m *Manager
|
||||||
|
ch <-chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *raftLease) Held() bool { return l.m.raftLeader.Load() }
|
||||||
|
func (l *raftLease) Changed() <-chan struct{} { return l.ch }
|
|
@ -0,0 +1,89 @@
|
||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/internal/resource"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Manager is responsible for scheduling the execution of controllers.
|
||||||
|
type Manager struct {
|
||||||
|
logger hclog.Logger
|
||||||
|
|
||||||
|
raftLeader atomic.Bool
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
running bool
|
||||||
|
controllers []Controller
|
||||||
|
leaseChans []chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewManager creates a Manager. logger will be used by the Manager, and as the
|
||||||
|
// base logger for controllers when one is not specified using WithLogger.
|
||||||
|
func NewManager(logger hclog.Logger) *Manager {
|
||||||
|
return &Manager{logger: logger}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the given controller to be executed by the Manager. Cannot be called
|
||||||
|
// once the Manager is running.
|
||||||
|
func (m *Manager) Register(ctrl Controller) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
if m.running {
|
||||||
|
panic("cannot register additional controllers after calling Run")
|
||||||
|
}
|
||||||
|
|
||||||
|
m.controllers = append(m.controllers, ctrl)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the Manager and start executing controllers until the given context is
|
||||||
|
// canceled. Cannot be called more than once.
|
||||||
|
func (m *Manager) Run(ctx context.Context) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
if m.running {
|
||||||
|
panic("cannot call Run more than once")
|
||||||
|
}
|
||||||
|
m.running = true
|
||||||
|
|
||||||
|
for _, desc := range m.controllers {
|
||||||
|
runner := &controllerRunner{
|
||||||
|
ctrl: desc,
|
||||||
|
logger: m.logger.With("managed_type", resource.ToGVK(desc.managedType)),
|
||||||
|
}
|
||||||
|
go newSupervisor(runner.run, m.newLeaseLocked()).run(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRaftLeader notifies the Manager of Raft leadership changes. Controllers
|
||||||
|
// are currently only executed on the Raft leader, so calling this method will
|
||||||
|
// cause the Manager to spin them up/down accordingly.
|
||||||
|
func (m *Manager) SetRaftLeader(leader bool) {
|
||||||
|
m.raftLeader.Store(leader)
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
for _, ch := range m.leaseChans {
|
||||||
|
select {
|
||||||
|
case ch <- struct{}{}:
|
||||||
|
default:
|
||||||
|
// Do not block if there's nothing receiving on ch (because the supervisor is
|
||||||
|
// busy doing something else). Note that ch has a buffer of 1, so we'll never
|
||||||
|
// miss the notification that something has changed so we need to re-evaluate
|
||||||
|
// the lease.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) newLeaseLocked() Lease {
|
||||||
|
ch := make(chan struct{}, 1)
|
||||||
|
m.leaseChans = append(m.leaseChans, ch)
|
||||||
|
return &raftLease{m: m, ch: ch}
|
||||||
|
}
|
|
@ -0,0 +1,140 @@
|
||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
|
)
|
||||||
|
|
||||||
|
// flapThreshold is the minimum amount of time between restarts for us *not* to
|
||||||
|
// consider a controller to be stuck in a crash-loop.
|
||||||
|
const flapThreshold = 2 * time.Second
|
||||||
|
|
||||||
|
// supervisor keeps a task running, restarting it on-error, for as long as the
|
||||||
|
// given lease is held. When the lease is lost, the context given to the task
|
||||||
|
// will be canceled. If the task persistently fails (i.e. the controller is in
|
||||||
|
// a crash-loop) supervisor will use exponential backoff to delay restarts.
|
||||||
|
type supervisor struct {
|
||||||
|
task task
|
||||||
|
lease Lease
|
||||||
|
|
||||||
|
running bool
|
||||||
|
startedAt time.Time
|
||||||
|
errCh chan error
|
||||||
|
cancelTask context.CancelFunc
|
||||||
|
|
||||||
|
backoff *retry.Waiter
|
||||||
|
backoffUntil time.Time
|
||||||
|
backoffTimerCh <-chan time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSupervisor(task task, lease Lease) *supervisor {
|
||||||
|
return &supervisor{
|
||||||
|
task: task,
|
||||||
|
lease: lease,
|
||||||
|
errCh: make(chan error),
|
||||||
|
backoff: &retry.Waiter{
|
||||||
|
MinFailures: 1,
|
||||||
|
MinWait: 500 * time.Millisecond,
|
||||||
|
MaxWait: time.Minute,
|
||||||
|
Jitter: retry.NewJitter(25),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type task func(context.Context) error
|
||||||
|
|
||||||
|
func (s *supervisor) run(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
if s.shouldStart() {
|
||||||
|
s.startTask(ctx)
|
||||||
|
} else if s.shouldStop() {
|
||||||
|
s.stopTask()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
// Outer context canceled.
|
||||||
|
case <-ctx.Done():
|
||||||
|
if s.cancelTask != nil {
|
||||||
|
s.cancelTask()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
|
||||||
|
// Task stopped running.
|
||||||
|
case err := <-s.errCh:
|
||||||
|
stopBackoffTimer := s.handleError(err)
|
||||||
|
if stopBackoffTimer != nil {
|
||||||
|
defer stopBackoffTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unblock when the lease is acquired/lost, or the backoff timer fires.
|
||||||
|
case <-s.lease.Changed():
|
||||||
|
case <-s.backoffTimerCh:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *supervisor) shouldStart() bool {
|
||||||
|
if s.running {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if !s.lease.Held() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if time.Now().Before(s.backoffUntil) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *supervisor) startTask(ctx context.Context) {
|
||||||
|
if s.cancelTask != nil {
|
||||||
|
s.cancelTask()
|
||||||
|
}
|
||||||
|
|
||||||
|
taskCtx, cancelTask := context.WithCancel(ctx)
|
||||||
|
s.cancelTask = cancelTask
|
||||||
|
s.startedAt = time.Now()
|
||||||
|
s.running = true
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err := s.task(taskCtx)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case s.errCh <- err:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *supervisor) shouldStop() bool {
|
||||||
|
return s.running && !s.lease.Held()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *supervisor) stopTask() {
|
||||||
|
s.cancelTask()
|
||||||
|
s.backoff.Reset()
|
||||||
|
s.running = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *supervisor) handleError(err error) func() bool {
|
||||||
|
s.running = false
|
||||||
|
|
||||||
|
if time.Since(s.startedAt) > flapThreshold {
|
||||||
|
s.backoff.Reset()
|
||||||
|
s.backoffUntil = time.Time{}
|
||||||
|
} else {
|
||||||
|
delay := s.backoff.WaitDuration()
|
||||||
|
s.backoffUntil = time.Now().Add(delay)
|
||||||
|
|
||||||
|
timer := time.NewTimer(delay)
|
||||||
|
s.backoffTimerCh = timer.C
|
||||||
|
return timer.Stop
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,118 @@
|
||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSupervise(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
runCh := make(chan struct{})
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
errCh := make(chan error)
|
||||||
|
|
||||||
|
task := func(taskCtx context.Context) error {
|
||||||
|
runCh <- struct{}{}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
return err
|
||||||
|
case <-taskCtx.Done():
|
||||||
|
stopCh <- struct{}{}
|
||||||
|
return taskCtx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lease := newTestLease()
|
||||||
|
|
||||||
|
go newSupervisor(task, lease).run(ctx)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-runCh:
|
||||||
|
t.Fatal("task should not be running before lease is held")
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
lease.acquired()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-runCh:
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
t.Fatal("task not running after lease is acquired")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
t.Fatal("task should not have stopped before lease is lost")
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
lease.lost()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
t.Fatal("task still running after lease was lost")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-runCh:
|
||||||
|
t.Fatal("task should not be run again before lease is re-acquired")
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
lease.acquired()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-runCh:
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
t.Fatal("task not running after lease is re-acquired")
|
||||||
|
}
|
||||||
|
|
||||||
|
errCh <- errors.New("KABOOM")
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-runCh:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("task was not restarted")
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
t.Fatal("task still running after parent context was canceled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestLease() *testLease {
|
||||||
|
return &testLease{ch: make(chan struct{}, 1)}
|
||||||
|
}
|
||||||
|
|
||||||
|
type testLease struct {
|
||||||
|
held atomic.Bool
|
||||||
|
ch chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *testLease) Held() bool { return l.held.Load() }
|
||||||
|
func (l *testLease) Changed() <-chan struct{} { return l.ch }
|
||||||
|
|
||||||
|
func (l *testLease) acquired() { l.setHeld(true) }
|
||||||
|
func (l *testLease) lost() { l.setHeld(false) }
|
||||||
|
|
||||||
|
func (l *testLease) setHeld(held bool) {
|
||||||
|
l.held.Store(held)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case l.ch <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
package demo
|
||||||
|
|
||||||
|
import "github.com/hashicorp/consul/internal/controller"
|
||||||
|
|
||||||
|
// RegisterControllers registers controllers for the demo types. Should only be
|
||||||
|
// called in dev mode.
|
||||||
|
func RegisterControllers(mgr *controller.Manager) {
|
||||||
|
mgr.Register(artistController())
|
||||||
|
}
|
||||||
|
|
||||||
|
func artistController() controller.Controller {
|
||||||
|
return controller.ForType(TypeV2Artist)
|
||||||
|
}
|
|
@ -66,12 +66,12 @@ const (
|
||||||
ArtistV2ListPolicy = `key_prefix "resource/" { policy = "list" }`
|
ArtistV2ListPolicy = `key_prefix "resource/" { policy = "list" }`
|
||||||
)
|
)
|
||||||
|
|
||||||
// Register demo types. Should only be called in tests and dev mode.
|
// RegisterTypes registers the demo types. Should only be called in tests and
|
||||||
// acls are optional.
|
// dev mode.
|
||||||
//
|
//
|
||||||
// TODO(spatel): We're standing-in key ACLs for demo resources until our ACL
|
// TODO(spatel): We're standing-in key ACLs for demo resources until our ACL
|
||||||
// system can be more modularly extended (or support generic resource permissions).
|
// system can be more modularly extended (or support generic resource permissions).
|
||||||
func Register(r resource.Registry) {
|
func RegisterTypes(r resource.Registry) {
|
||||||
readACL := func(authz acl.Authorizer, id *pbresource.ID) error {
|
readACL := func(authz acl.Authorizer, id *pbresource.ID) error {
|
||||||
key := fmt.Sprintf("resource/%s/%s", resource.ToGVK(id.Type), id.Name)
|
key := fmt.Sprintf("resource/%s/%s", resource.ToGVK(id.Type), id.Name)
|
||||||
return authz.ToAllowAuthorizer().KeyReadAllowed(key, &acl.AuthorizerContext{})
|
return authz.ToAllowAuthorizer().KeyReadAllowed(key, &acl.AuthorizerContext{})
|
||||||
|
|
|
@ -104,8 +104,8 @@ func (w *Waiter) Failures() int {
|
||||||
// such as when the context is canceled. This makes it suitable for
|
// such as when the context is canceled. This makes it suitable for
|
||||||
// long-running routines that do not get re-initialized, such as replication.
|
// long-running routines that do not get re-initialized, such as replication.
|
||||||
func (w *Waiter) Wait(ctx context.Context) error {
|
func (w *Waiter) Wait(ctx context.Context) error {
|
||||||
w.failures++
|
delay := w.WaitDuration()
|
||||||
timer := time.NewTimer(w.delay())
|
timer := time.NewTimer(delay)
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
|
@ -115,6 +115,15 @@ func (w *Waiter) Wait(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitDuration increases the number of failures by one, and returns the
|
||||||
|
// duration the caller must wait for. This is an alternative to the Wait
|
||||||
|
// method for cases where you want to handle the timer yourself (e.g. as
|
||||||
|
// part of a larger select statement).
|
||||||
|
func (w *Waiter) WaitDuration() time.Duration {
|
||||||
|
w.failures++
|
||||||
|
return w.delay()
|
||||||
|
}
|
||||||
|
|
||||||
// NextWait returns the period the next call to Wait with block for assuming
|
// NextWait returns the period the next call to Wait with block for assuming
|
||||||
// it's context is not cancelled. It's useful for informing a user how long
|
// it's context is not cancelled. It's useful for informing a user how long
|
||||||
// it will be before the next attempt is made.
|
// it will be before the next attempt is made.
|
||||||
|
|
|
@ -20,6 +20,7 @@ const (
|
||||||
Consul string = "consul"
|
Consul string = "consul"
|
||||||
ConsulClient string = "client"
|
ConsulClient string = "client"
|
||||||
ConsulServer string = "server"
|
ConsulServer string = "server"
|
||||||
|
ControllerRuntime string = "controller-runtime"
|
||||||
Coordinate string = "coordinate"
|
Coordinate string = "coordinate"
|
||||||
DNS string = "dns"
|
DNS string = "dns"
|
||||||
Envoy string = "envoy"
|
Envoy string = "envoy"
|
||||||
|
|
Loading…
Reference in New Issue