diff --git a/agent/consul/server.go b/agent/consul/server.go index 8b67cc372..e5140fcc2 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -68,6 +68,7 @@ import ( "github.com/hashicorp/consul/agent/rpc/peering" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" + "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource/demo" raftstorage "github.com/hashicorp/consul/internal/storage/raft" @@ -435,6 +436,10 @@ type Server struct { // with the Resource Service in-process (i.e. not via the network) without auth. // It should only be used for purely-internal workloads, such as controllers. internalResourceServiceClient pbresource.ResourceServiceClient + + // controllerManager schedules the execution of controllers. + controllerManager *controller.Manager + // handles metrics reporting to HashiCorp reportingManager *reporting.ReportingManager } @@ -500,6 +505,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom incomingRPCLimiter: incomingRPCLimiter, routineManager: routine.NewManager(logger.Named(logging.ConsulServer)), typeRegistry: resource.NewRegistry(), + controllerManager: controller.NewManager(logger.Named(logging.ControllerRuntime)), } incomingRPCLimiter.Register(s) @@ -824,8 +830,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom } if s.config.DevMode { - demo.Register(s.typeRegistry) + demo.RegisterTypes(s.typeRegistry) + demo.RegisterControllers(s.controllerManager) } + go s.controllerManager.Run(&lib.StopChannelContext{StopCh: shutdownCh}) return s, nil } @@ -1951,6 +1959,7 @@ func (s *Server) trackLeaderChanges() { s.grpcLeaderForwarder.UpdateLeaderAddr(s.config.Datacenter, string(leaderObs.LeaderAddr)) s.peeringBackend.SetLeaderAddress(string(leaderObs.LeaderAddr)) s.raftStorageBackend.LeaderChanged() + s.controllerManager.SetRaftLeader(s.IsLeader()) // Trigger sending an update to HCP status s.hcpManager.SendUpdate() diff --git a/agent/grpc-external/services/resource/delete_test.go b/agent/grpc-external/services/resource/delete_test.go index 3147fb5b3..39af826eb 100644 --- a/agent/grpc-external/services/resource/delete_test.go +++ b/agent/grpc-external/services/resource/delete_test.go @@ -22,7 +22,7 @@ func TestDelete_InputValidation(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) testCases := map[string]func(*pbresource.DeleteRequest){ "no id": func(req *pbresource.DeleteRequest) { req.Id = nil }, @@ -101,7 +101,7 @@ func TestDelete_ACLs(t *testing.T) { mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything). Return(tc.authz, nil) server.ACLResolver = mockACLResolver - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -122,8 +122,7 @@ func TestDelete_Success(t *testing.T) { for desc, tc := range deleteTestCases() { t.Run(desc, func(t *testing.T) { server, client, ctx := testDeps(t) - demo.Register(server.Registry) - + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -150,7 +149,7 @@ func TestDelete_NotFound(t *testing.T) { for desc, tc := range deleteTestCases() { t.Run(desc, func(t *testing.T) { server, client, ctx := testDeps(t) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -165,7 +164,7 @@ func TestDelete_VersionMismatch(t *testing.T) { t.Parallel() server, client, ctx := testDeps(t) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) rsp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: artist}) diff --git a/agent/grpc-external/services/resource/list_test.go b/agent/grpc-external/services/resource/list_test.go index ef0ba96ce..b476c82ae 100644 --- a/agent/grpc-external/services/resource/list_test.go +++ b/agent/grpc-external/services/resource/list_test.go @@ -40,7 +40,7 @@ func TestList_Empty(t *testing.T) { for desc, tc := range listTestCases() { t.Run(desc, func(t *testing.T) { server := testServer(t) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) client := testClient(t, server) rsp, err := client.List(tc.ctx, &pbresource.ListRequest{ @@ -58,7 +58,7 @@ func TestList_Many(t *testing.T) { for desc, tc := range listTestCases() { t.Run(desc, func(t *testing.T) { server := testServer(t) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) client := testClient(t, server) resources := make([]*pbresource.Resource, 10) @@ -89,7 +89,7 @@ func TestList_GroupVersionMismatch(t *testing.T) { for desc, tc := range listTestCases() { t.Run(desc, func(t *testing.T) { server := testServer(t) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) client := testClient(t, server) artist, err := demo.GenerateV2Artist() @@ -116,7 +116,7 @@ func TestList_VerifyReadConsistencyArg(t *testing.T) { mockBackend := NewMockBackend(t) server := testServer(t) server.Backend = mockBackend - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -133,7 +133,7 @@ func TestList_VerifyReadConsistencyArg(t *testing.T) { } } -// N.B. Uses key ACLs for now. See demo.Register() +// N.B. Uses key ACLs for now. See demo.RegisterTypes() func TestList_ACL_ListDenied(t *testing.T) { t.Parallel() @@ -146,7 +146,7 @@ func TestList_ACL_ListDenied(t *testing.T) { require.Contains(t, err.Error(), "lacks permission 'key:list'") } -// N.B. Uses key ACLs for now. See demo.Register() +// N.B. Uses key ACLs for now. See demo.RegisterTypes() func TestList_ACL_ListAllowed_ReadDenied(t *testing.T) { t.Parallel() @@ -160,7 +160,7 @@ func TestList_ACL_ListAllowed_ReadDenied(t *testing.T) { require.Empty(t, rsp.Resources) } -// N.B. Uses key ACLs for now. See demo.Register() +// N.B. Uses key ACLs for now. See demo.RegisterTypes() func TestList_ACL_ListAllowed_ReadAllowed(t *testing.T) { t.Parallel() @@ -184,7 +184,7 @@ func roundTripList(t *testing.T, authz acl.Authorizer) (*pbresource.Resource, *p mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything). Return(authz, nil) server.ACLResolver = mockACLResolver - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) diff --git a/agent/grpc-external/services/resource/read_test.go b/agent/grpc-external/services/resource/read_test.go index e7265043f..237895eac 100644 --- a/agent/grpc-external/services/resource/read_test.go +++ b/agent/grpc-external/services/resource/read_test.go @@ -25,7 +25,7 @@ func TestRead_InputValidation(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) testCases := map[string]func(*pbresource.ReadRequest){ "no id": func(req *pbresource.ReadRequest) { req.Id = nil }, @@ -79,7 +79,7 @@ func TestRead_ResourceNotFound(t *testing.T) { t.Run(desc, func(t *testing.T) { server := testServer(t) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) client := testClient(t, server) artist, err := demo.GenerateV2Artist() @@ -98,7 +98,7 @@ func TestRead_GroupVersionMismatch(t *testing.T) { t.Run(desc, func(t *testing.T) { server := testServer(t) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) client := testClient(t, server) artist, err := demo.GenerateV2Artist() @@ -123,7 +123,7 @@ func TestRead_Success(t *testing.T) { t.Run(desc, func(t *testing.T) { server := testServer(t) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) client := testClient(t, server) artist, err := demo.GenerateV2Artist() @@ -146,7 +146,7 @@ func TestRead_VerifyReadConsistencyArg(t *testing.T) { server := testServer(t) mockBackend := NewMockBackend(t) server.Backend = mockBackend - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -162,7 +162,7 @@ func TestRead_VerifyReadConsistencyArg(t *testing.T) { } } -// N.B. Uses key ACLs for now. See demo.Register() +// N.B. Uses key ACLs for now. See demo.RegisterTypes() func TestRead_ACLs(t *testing.T) { type testCase struct { authz resolver.Result @@ -188,7 +188,7 @@ func TestRead_ACLs(t *testing.T) { mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything). Return(tc.authz, nil) server.ACLResolver = mockACLResolver - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) diff --git a/agent/grpc-external/services/resource/watch_test.go b/agent/grpc-external/services/resource/watch_test.go index af6f68e58..b62dc8a40 100644 --- a/agent/grpc-external/services/resource/watch_test.go +++ b/agent/grpc-external/services/resource/watch_test.go @@ -46,7 +46,7 @@ func TestWatchList_GroupVersionMatches(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) ctx := context.Background() // create a watch @@ -90,7 +90,7 @@ func TestWatchList_GroupVersionMismatch(t *testing.T) { t.Parallel() server := testServer(t) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) client := testClient(t, server) ctx := context.Background() @@ -123,7 +123,7 @@ func TestWatchList_GroupVersionMismatch(t *testing.T) { mustGetNoResource(t, rspCh) } -// N.B. Uses key ACLs for now. See demo.Register() +// N.B. Uses key ACLs for now. See demo.RegisterTypes() func TestWatchList_ACL_ListDenied(t *testing.T) { t.Parallel() @@ -137,7 +137,7 @@ func TestWatchList_ACL_ListDenied(t *testing.T) { require.Contains(t, err.Error(), "lacks permission 'key:list'") } -// N.B. Uses key ACLs for now. See demo.Register() +// N.B. Uses key ACLs for now. See demo.RegisterTypes() func TestWatchList_ACL_ListAllowed_ReadDenied(t *testing.T) { t.Parallel() @@ -152,7 +152,7 @@ func TestWatchList_ACL_ListAllowed_ReadDenied(t *testing.T) { mustGetNoResource(t, rspCh) } -// N.B. Uses key ACLs for now. See demo.Register() +// N.B. Uses key ACLs for now. See demo.RegisterTypes() func TestWatchList_ACL_ListAllowed_ReadAllowed(t *testing.T) { t.Parallel() @@ -177,7 +177,7 @@ func roundTripACL(t *testing.T, authz acl.Authorizer) (<-chan resourceOrError, * mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything). Return(authz, nil) server.ACLResolver = mockACLResolver - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) diff --git a/agent/grpc-external/services/resource/write_status_test.go b/agent/grpc-external/services/resource/write_status_test.go index 35eb26385..c64a277e7 100644 --- a/agent/grpc-external/services/resource/write_status_test.go +++ b/agent/grpc-external/services/resource/write_status_test.go @@ -69,7 +69,7 @@ func TestWriteStatus_InputValidation(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) testCases := map[string]func(*pbresource.WriteStatusRequest){ "no id": func(req *pbresource.WriteStatusRequest) { req.Id = nil }, @@ -113,7 +113,7 @@ func TestWriteStatus_Success(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -148,7 +148,7 @@ func TestWriteStatus_CASFailure(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -183,7 +183,7 @@ func TestWriteStatus_TypeNotFound(t *testing.T) { func TestWriteStatus_ResourceNotFound(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -198,7 +198,7 @@ func TestWriteStatus_ResourceNotFound(t *testing.T) { func TestWriteStatus_WrongUid(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -219,7 +219,7 @@ func TestWriteStatus_NonCASUpdate_Retry(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) diff --git a/agent/grpc-external/services/resource/write_test.go b/agent/grpc-external/services/resource/write_test.go index 666937c12..f8620f4a3 100644 --- a/agent/grpc-external/services/resource/write_test.go +++ b/agent/grpc-external/services/resource/write_test.go @@ -26,7 +26,7 @@ func TestWrite_InputValidation(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) testCases := map[string]func(*pbresource.WriteRequest){ "no resource": func(req *pbresource.WriteRequest) { req.Resource = nil }, @@ -79,7 +79,7 @@ func TestWrite_OwnerValidation(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) type testCase struct { modReqFn func(req *pbresource.WriteRequest) @@ -183,7 +183,7 @@ func TestWrite_ACLs(t *testing.T) { mockACLResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything). Return(tc.authz, nil) server.ACLResolver = mockACLResolver - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -198,7 +198,7 @@ func TestWrite_ACLs(t *testing.T) { func TestWrite_Mutate(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -224,7 +224,7 @@ func TestWrite_ResourceCreation_Success(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -240,7 +240,7 @@ func TestWrite_CASUpdate_Success(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -262,7 +262,7 @@ func TestWrite_ResourceCreation_StatusProvided(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -281,7 +281,7 @@ func TestWrite_CASUpdate_Failure(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -302,7 +302,7 @@ func TestWrite_Update_WrongUid(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -323,7 +323,7 @@ func TestWrite_Update_StatusModified(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -353,7 +353,7 @@ func TestWrite_Update_NilStatus(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -377,7 +377,7 @@ func TestWrite_Update_NoUid(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -396,7 +396,7 @@ func TestWrite_NonCASUpdate_Success(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -417,7 +417,7 @@ func TestWrite_NonCASUpdate_Retry(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -467,7 +467,7 @@ func TestWrite_Owner_Immutable(t *testing.T) { server := testServer(t) client := testClient(t, server) - demo.Register(server.Registry) + demo.RegisterTypes(server.Registry) artist, err := demo.GenerateV2Artist() require.NoError(t, err) diff --git a/internal/controller/api.go b/internal/controller/api.go new file mode 100644 index 000000000..8545d339a --- /dev/null +++ b/internal/controller/api.go @@ -0,0 +1,20 @@ +package controller + +import ( + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// ForType begins building a Controller for the given resource type. +func ForType(managedType *pbresource.Type) Controller { + return Controller{managedType: managedType} +} + +// Controller runs a reconciliation loop to respond to changes in resources and +// their dependencies. It is heavily inspired by Kubernetes' controller pattern: +// https://kubernetes.io/docs/concepts/architecture/controller/ +// +// Use the builder methods in this package (starting with ForType) to construct +// a controller, and then pass it to a Manager to be executed. +type Controller struct { + managedType *pbresource.Type +} diff --git a/internal/controller/controller.go b/internal/controller/controller.go new file mode 100644 index 000000000..11933b39f --- /dev/null +++ b/internal/controller/controller.go @@ -0,0 +1,22 @@ +package controller + +import ( + "context" + + "github.com/hashicorp/go-hclog" +) + +// controllerRunner contains the actual implementation of running a controller +// including creating watches, calling the reconciler, handling retries, etc. +type controllerRunner struct { + ctrl Controller + logger hclog.Logger +} + +func (c *controllerRunner) run(ctx context.Context) error { + c.logger.Debug("controller running") + defer c.logger.Debug("controller stopping") + + <-ctx.Done() + return ctx.Err() +} diff --git a/internal/controller/lease.go b/internal/controller/lease.go new file mode 100644 index 000000000..33e284a69 --- /dev/null +++ b/internal/controller/lease.go @@ -0,0 +1,24 @@ +package controller + +// Lease is used to ensure controllers are run as singletons (i.e. one leader- +// elected instance per cluster). +// +// Currently, this is just an abstraction over Raft leadership. In the future, +// we'll build a backend-agnostic leasing system into the Resource Service which +// will allow us to balance controllers between many servers. +type Lease interface { + // Held returns whether we are the current lease-holders. + Held() bool + + // Changed returns a channel on which you can receive notifications whenever + // the lease is acquired or lost. + Changed() <-chan struct{} +} + +type raftLease struct { + m *Manager + ch <-chan struct{} +} + +func (l *raftLease) Held() bool { return l.m.raftLeader.Load() } +func (l *raftLease) Changed() <-chan struct{} { return l.ch } diff --git a/internal/controller/manager.go b/internal/controller/manager.go new file mode 100644 index 000000000..90b9f2994 --- /dev/null +++ b/internal/controller/manager.go @@ -0,0 +1,89 @@ +package controller + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/internal/resource" +) + +// Manager is responsible for scheduling the execution of controllers. +type Manager struct { + logger hclog.Logger + + raftLeader atomic.Bool + + mu sync.Mutex + running bool + controllers []Controller + leaseChans []chan struct{} +} + +// NewManager creates a Manager. logger will be used by the Manager, and as the +// base logger for controllers when one is not specified using WithLogger. +func NewManager(logger hclog.Logger) *Manager { + return &Manager{logger: logger} +} + +// Register the given controller to be executed by the Manager. Cannot be called +// once the Manager is running. +func (m *Manager) Register(ctrl Controller) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.running { + panic("cannot register additional controllers after calling Run") + } + + m.controllers = append(m.controllers, ctrl) +} + +// Run the Manager and start executing controllers until the given context is +// canceled. Cannot be called more than once. +func (m *Manager) Run(ctx context.Context) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.running { + panic("cannot call Run more than once") + } + m.running = true + + for _, desc := range m.controllers { + runner := &controllerRunner{ + ctrl: desc, + logger: m.logger.With("managed_type", resource.ToGVK(desc.managedType)), + } + go newSupervisor(runner.run, m.newLeaseLocked()).run(ctx) + } +} + +// SetRaftLeader notifies the Manager of Raft leadership changes. Controllers +// are currently only executed on the Raft leader, so calling this method will +// cause the Manager to spin them up/down accordingly. +func (m *Manager) SetRaftLeader(leader bool) { + m.raftLeader.Store(leader) + + m.mu.Lock() + defer m.mu.Unlock() + + for _, ch := range m.leaseChans { + select { + case ch <- struct{}{}: + default: + // Do not block if there's nothing receiving on ch (because the supervisor is + // busy doing something else). Note that ch has a buffer of 1, so we'll never + // miss the notification that something has changed so we need to re-evaluate + // the lease. + } + } +} + +func (m *Manager) newLeaseLocked() Lease { + ch := make(chan struct{}, 1) + m.leaseChans = append(m.leaseChans, ch) + return &raftLease{m: m, ch: ch} +} diff --git a/internal/controller/supervisor.go b/internal/controller/supervisor.go new file mode 100644 index 000000000..ed9fdccdb --- /dev/null +++ b/internal/controller/supervisor.go @@ -0,0 +1,140 @@ +package controller + +import ( + "context" + "time" + + "github.com/hashicorp/consul/lib/retry" +) + +// flapThreshold is the minimum amount of time between restarts for us *not* to +// consider a controller to be stuck in a crash-loop. +const flapThreshold = 2 * time.Second + +// supervisor keeps a task running, restarting it on-error, for as long as the +// given lease is held. When the lease is lost, the context given to the task +// will be canceled. If the task persistently fails (i.e. the controller is in +// a crash-loop) supervisor will use exponential backoff to delay restarts. +type supervisor struct { + task task + lease Lease + + running bool + startedAt time.Time + errCh chan error + cancelTask context.CancelFunc + + backoff *retry.Waiter + backoffUntil time.Time + backoffTimerCh <-chan time.Time +} + +func newSupervisor(task task, lease Lease) *supervisor { + return &supervisor{ + task: task, + lease: lease, + errCh: make(chan error), + backoff: &retry.Waiter{ + MinFailures: 1, + MinWait: 500 * time.Millisecond, + MaxWait: time.Minute, + Jitter: retry.NewJitter(25), + }, + } +} + +type task func(context.Context) error + +func (s *supervisor) run(ctx context.Context) { + for { + if s.shouldStart() { + s.startTask(ctx) + } else if s.shouldStop() { + s.stopTask() + } + + select { + // Outer context canceled. + case <-ctx.Done(): + if s.cancelTask != nil { + s.cancelTask() + } + return + + // Task stopped running. + case err := <-s.errCh: + stopBackoffTimer := s.handleError(err) + if stopBackoffTimer != nil { + defer stopBackoffTimer() + } + + // Unblock when the lease is acquired/lost, or the backoff timer fires. + case <-s.lease.Changed(): + case <-s.backoffTimerCh: + } + } +} + +func (s *supervisor) shouldStart() bool { + if s.running { + return false + } + + if !s.lease.Held() { + return false + } + + if time.Now().Before(s.backoffUntil) { + return false + } + + return true +} + +func (s *supervisor) startTask(ctx context.Context) { + if s.cancelTask != nil { + s.cancelTask() + } + + taskCtx, cancelTask := context.WithCancel(ctx) + s.cancelTask = cancelTask + s.startedAt = time.Now() + s.running = true + + go func() { + err := s.task(taskCtx) + + select { + case s.errCh <- err: + case <-ctx.Done(): + } + }() +} + +func (s *supervisor) shouldStop() bool { + return s.running && !s.lease.Held() +} + +func (s *supervisor) stopTask() { + s.cancelTask() + s.backoff.Reset() + s.running = false +} + +func (s *supervisor) handleError(err error) func() bool { + s.running = false + + if time.Since(s.startedAt) > flapThreshold { + s.backoff.Reset() + s.backoffUntil = time.Time{} + } else { + delay := s.backoff.WaitDuration() + s.backoffUntil = time.Now().Add(delay) + + timer := time.NewTimer(delay) + s.backoffTimerCh = timer.C + return timer.Stop + } + + return nil +} diff --git a/internal/controller/supervisor_test.go b/internal/controller/supervisor_test.go new file mode 100644 index 000000000..1792b8b95 --- /dev/null +++ b/internal/controller/supervisor_test.go @@ -0,0 +1,118 @@ +package controller + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" +) + +func TestSupervise(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + runCh := make(chan struct{}) + stopCh := make(chan struct{}) + errCh := make(chan error) + + task := func(taskCtx context.Context) error { + runCh <- struct{}{} + + select { + case err := <-errCh: + return err + case <-taskCtx.Done(): + stopCh <- struct{}{} + return taskCtx.Err() + } + } + + lease := newTestLease() + + go newSupervisor(task, lease).run(ctx) + + select { + case <-runCh: + t.Fatal("task should not be running before lease is held") + case <-time.After(500 * time.Millisecond): + } + + lease.acquired() + + select { + case <-runCh: + case <-time.After(500 * time.Millisecond): + t.Fatal("task not running after lease is acquired") + } + + select { + case <-stopCh: + t.Fatal("task should not have stopped before lease is lost") + case <-time.After(500 * time.Millisecond): + } + + lease.lost() + + select { + case <-stopCh: + case <-time.After(500 * time.Millisecond): + t.Fatal("task still running after lease was lost") + } + + select { + case <-runCh: + t.Fatal("task should not be run again before lease is re-acquired") + case <-time.After(500 * time.Millisecond): + } + + lease.acquired() + + select { + case <-runCh: + case <-time.After(500 * time.Millisecond): + t.Fatal("task not running after lease is re-acquired") + } + + errCh <- errors.New("KABOOM") + + select { + case <-runCh: + case <-time.After(2 * time.Second): + t.Fatal("task was not restarted") + } + + cancel() + + select { + case <-stopCh: + case <-time.After(500 * time.Millisecond): + t.Fatal("task still running after parent context was canceled") + } +} + +func newTestLease() *testLease { + return &testLease{ch: make(chan struct{}, 1)} +} + +type testLease struct { + held atomic.Bool + ch chan struct{} +} + +func (l *testLease) Held() bool { return l.held.Load() } +func (l *testLease) Changed() <-chan struct{} { return l.ch } + +func (l *testLease) acquired() { l.setHeld(true) } +func (l *testLease) lost() { l.setHeld(false) } + +func (l *testLease) setHeld(held bool) { + l.held.Store(held) + + select { + case l.ch <- struct{}{}: + default: + } +} diff --git a/internal/resource/demo/controller.go b/internal/resource/demo/controller.go new file mode 100644 index 000000000..f2172f0f8 --- /dev/null +++ b/internal/resource/demo/controller.go @@ -0,0 +1,13 @@ +package demo + +import "github.com/hashicorp/consul/internal/controller" + +// RegisterControllers registers controllers for the demo types. Should only be +// called in dev mode. +func RegisterControllers(mgr *controller.Manager) { + mgr.Register(artistController()) +} + +func artistController() controller.Controller { + return controller.ForType(TypeV2Artist) +} diff --git a/internal/resource/demo/demo.go b/internal/resource/demo/demo.go index 20c630d5e..fde9272ae 100644 --- a/internal/resource/demo/demo.go +++ b/internal/resource/demo/demo.go @@ -66,12 +66,12 @@ const ( ArtistV2ListPolicy = `key_prefix "resource/" { policy = "list" }` ) -// Register demo types. Should only be called in tests and dev mode. -// acls are optional. +// RegisterTypes registers the demo types. Should only be called in tests and +// dev mode. // // TODO(spatel): We're standing-in key ACLs for demo resources until our ACL // system can be more modularly extended (or support generic resource permissions). -func Register(r resource.Registry) { +func RegisterTypes(r resource.Registry) { readACL := func(authz acl.Authorizer, id *pbresource.ID) error { key := fmt.Sprintf("resource/%s/%s", resource.ToGVK(id.Type), id.Name) return authz.ToAllowAuthorizer().KeyReadAllowed(key, &acl.AuthorizerContext{}) diff --git a/lib/retry/retry.go b/lib/retry/retry.go index 2a3589661..ee9a8b235 100644 --- a/lib/retry/retry.go +++ b/lib/retry/retry.go @@ -104,8 +104,8 @@ func (w *Waiter) Failures() int { // such as when the context is canceled. This makes it suitable for // long-running routines that do not get re-initialized, such as replication. func (w *Waiter) Wait(ctx context.Context) error { - w.failures++ - timer := time.NewTimer(w.delay()) + delay := w.WaitDuration() + timer := time.NewTimer(delay) select { case <-ctx.Done(): timer.Stop() @@ -115,6 +115,15 @@ func (w *Waiter) Wait(ctx context.Context) error { } } +// WaitDuration increases the number of failures by one, and returns the +// duration the caller must wait for. This is an alternative to the Wait +// method for cases where you want to handle the timer yourself (e.g. as +// part of a larger select statement). +func (w *Waiter) WaitDuration() time.Duration { + w.failures++ + return w.delay() +} + // NextWait returns the period the next call to Wait with block for assuming // it's context is not cancelled. It's useful for informing a user how long // it will be before the next attempt is made. diff --git a/logging/names.go b/logging/names.go index 955a1bf26..1c7ecc448 100644 --- a/logging/names.go +++ b/logging/names.go @@ -20,6 +20,7 @@ const ( Consul string = "consul" ConsulClient string = "client" ConsulServer string = "server" + ControllerRuntime string = "controller-runtime" Coordinate string = "coordinate" DNS string = "dns" Envoy string = "envoy"