From 4f7d1b4700ab88d817da0375b8463968e0928148 Mon Sep 17 00:00:00 2001 From: Poonam Jadhav Date: Tue, 18 Apr 2023 11:03:05 -0400 Subject: [PATCH] feat: set up reporting agent (#16991) --- agent/consul/acl_server.go | 2 +- agent/consul/leader.go | 8 +++++- agent/consul/leader_connect.go | 8 +++--- agent/consul/leader_intentions.go | 4 +-- agent/consul/leader_intentions_test.go | 2 +- agent/consul/leader_test.go | 6 ++-- agent/consul/reporting/reporting.go | 38 +++++++++++++++++++++++++ agent/consul/reporting/reporting_oss.go | 21 ++++++++++++++ agent/consul/server.go | 5 ++++ agent/consul/server_oss.go | 6 ++++ agent/consul/system_metadata.go | 4 +-- agent/consul/system_metadata_test.go | 8 +++--- 12 files changed, 94 insertions(+), 18 deletions(-) create mode 100644 agent/consul/reporting/reporting.go create mode 100644 agent/consul/reporting/reporting_oss.go diff --git a/agent/consul/acl_server.go b/agent/consul/acl_server.go index f068bd5b2..b6d0ef4aa 100644 --- a/agent/consul/acl_server.go +++ b/agent/consul/acl_server.go @@ -113,7 +113,7 @@ type serverACLResolverBackend struct { } func (s *serverACLResolverBackend) IsServerManagementToken(token string) bool { - mgmt, err := s.getSystemMetadata(structs.ServerManagementTokenAccessorID) + mgmt, err := s.GetSystemMetadata(structs.ServerManagementTokenAccessorID) if err != nil { s.logger.Debug("failed to fetch server management token: %w", err) return false diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 8334704fd..d2e25d0e9 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -341,6 +341,10 @@ func (s *Server) establishLeadership(ctx context.Context) error { s.startLogVerification(ctx) } + if s.config.Reporting.License.Enabled && s.reportingManager != nil { + s.reportingManager.StartReportingAgent() + } + s.logger.Debug("successfully established leadership", "duration", time.Since(start)) return nil } @@ -377,6 +381,8 @@ func (s *Server) revokeLeadership() { s.resetConsistentReadReady() s.autopilot.DisableReconciliation() + + s.reportingManager.StopReportingAgent() } // initializeACLs is used to setup the ACLs if we are the leader @@ -525,7 +531,7 @@ func (s *Server) initializeACLs(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to generate the secret ID for the server management token: %w", err) } - if err := s.setSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil { + if err := s.SetSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil { return fmt.Errorf("failed to persist server management token: %w", err) } diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go index f4779744f..f872508bb 100644 --- a/agent/consul/leader_connect.go +++ b/agent/consul/leader_connect.go @@ -207,7 +207,7 @@ func (s *Server) setVirtualIPFlags() (bool, error) { } func (s *Server) setVirtualIPVersionFlag() (bool, error) { - val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled) + val, err := s.GetSystemMetadata(structs.SystemMetadataVirtualIPsEnabled) if err != nil { return false, err } @@ -220,7 +220,7 @@ func (s *Server) setVirtualIPVersionFlag() (bool, error) { minVirtualIPVersion.String()) } - if err := s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil { + if err := s.SetSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil { return false, nil } @@ -228,7 +228,7 @@ func (s *Server) setVirtualIPVersionFlag() (bool, error) { } func (s *Server) setVirtualIPTerminatingGatewayVersionFlag() (bool, error) { - val, err := s.getSystemMetadata(structs.SystemMetadataTermGatewayVirtualIPsEnabled) + val, err := s.GetSystemMetadata(structs.SystemMetadataTermGatewayVirtualIPsEnabled) if err != nil { return false, err } @@ -241,7 +241,7 @@ func (s *Server) setVirtualIPTerminatingGatewayVersionFlag() (bool, error) { minVirtualIPTerminatingGatewayVersion.String()) } - if err := s.setSystemMetadataKey(structs.SystemMetadataTermGatewayVirtualIPsEnabled, "true"); err != nil { + if err := s.SetSystemMetadataKey(structs.SystemMetadataTermGatewayVirtualIPsEnabled, "true"); err != nil { return false, nil } diff --git a/agent/consul/leader_intentions.go b/agent/consul/leader_intentions.go index 41d4529fb..cf0844d3f 100644 --- a/agent/consul/leader_intentions.go +++ b/agent/consul/leader_intentions.go @@ -25,7 +25,7 @@ func (s *Server) startIntentionConfigEntryMigration(ctx context.Context) error { // Check for the system metadata first, as that's the most trustworthy in // both the primary and secondaries. - intentionFormat, err := s.getSystemMetadata(structs.SystemMetadataIntentionFormatKey) + intentionFormat, err := s.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey) if err != nil { return err } @@ -243,7 +243,7 @@ func (s *Server) legacyIntentionMigrationInSecondaryDC(ctx context.Context) erro // error. for { // Check for the system metadata first, as that's the most trustworthy. - intentionFormat, err := s.getSystemMetadata(structs.SystemMetadataIntentionFormatKey) + intentionFormat, err := s.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey) if err != nil { return err } diff --git a/agent/consul/leader_intentions_test.go b/agent/consul/leader_intentions_test.go index 2975872a2..2de1b97dd 100644 --- a/agent/consul/leader_intentions_test.go +++ b/agent/consul/leader_intentions_test.go @@ -523,7 +523,7 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) { // Wait until the migration routine is complete. retry.Run(t, func(r *retry.R) { - intentionFormat, err := s1.getSystemMetadata(structs.SystemMetadataIntentionFormatKey) + intentionFormat, err := s1.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey) require.NoError(r, err) if intentionFormat != structs.SystemMetadataIntentionFormatConfigValue { r.Fatal("intention migration is not yet complete") diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 72f70a9ec..09d9d6de5 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -1303,7 +1303,7 @@ func TestLeader_ACL_Initialization(t *testing.T) { require.NoError(t, err) require.NotNil(t, policy) - serverToken, err := s1.getSystemMetadata(structs.ServerManagementTokenAccessorID) + serverToken, err := s1.GetSystemMetadata(structs.ServerManagementTokenAccessorID) require.NoError(t, err) require.NotEmpty(t, serverToken) @@ -1341,14 +1341,14 @@ func TestLeader_ACL_Initialization_SecondaryDC(t *testing.T) { testrpc.WaitForTestAgent(t, s2.RPC, "dc2") // Check dc1's management token - serverToken1, err := s1.getSystemMetadata(structs.ServerManagementTokenAccessorID) + serverToken1, err := s1.GetSystemMetadata(structs.ServerManagementTokenAccessorID) require.NoError(t, err) require.NotEmpty(t, serverToken1) _, err = uuid.ParseUUID(serverToken1) require.NoError(t, err) // Check dc2's management token - serverToken2, err := s2.getSystemMetadata(structs.ServerManagementTokenAccessorID) + serverToken2, err := s2.GetSystemMetadata(structs.ServerManagementTokenAccessorID) require.NoError(t, err) require.NotEmpty(t, serverToken2) _, err = uuid.ParseUUID(serverToken2) diff --git a/agent/consul/reporting/reporting.go b/agent/consul/reporting/reporting.go new file mode 100644 index 000000000..982ac0225 --- /dev/null +++ b/agent/consul/reporting/reporting.go @@ -0,0 +1,38 @@ +package reporting + +import ( + "sync" + + "github.com/hashicorp/go-hclog" +) + +type ReportingManager struct { + logger hclog.Logger + server ServerDelegate + EntDeps + sync.RWMutex +} + +const ( + SystemMetadataReportingProcessID = "reporting-process-id" +) + +//go:generate mockery --name ServerDelegate --inpackage +type ServerDelegate interface { + GetSystemMetadata(key string) (string, error) + SetSystemMetadataKey(key, val string) error + IsLeader() bool +} + +func NewReportingManager(logger hclog.Logger, deps EntDeps, server ServerDelegate) *ReportingManager { + rm := &ReportingManager{ + logger: logger.Named("reporting"), + server: server, + } + err := rm.initEnterpriseReporting(deps) + if err != nil { + rm.logger.Error("Error initializing reporting manager", "error", err) + return nil + } + return rm +} diff --git a/agent/consul/reporting/reporting_oss.go b/agent/consul/reporting/reporting_oss.go new file mode 100644 index 000000000..673559902 --- /dev/null +++ b/agent/consul/reporting/reporting_oss.go @@ -0,0 +1,21 @@ +//go:build !consulent +// +build !consulent + +package reporting + +type EntDeps struct{} + +func (rm *ReportingManager) initEnterpriseReporting(entDeps EntDeps) error { + // no op + return nil +} + +func (rm *ReportingManager) StartReportingAgent() error { + // no op + return nil +} + +func (rm *ReportingManager) StopReportingAgent() error { + // no op + return nil +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 25b8384f9..8b67cc372 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -44,6 +44,7 @@ import ( "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/multilimiter" rpcRate "github.com/hashicorp/consul/agent/consul/rate" + "github.com/hashicorp/consul/agent/consul/reporting" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/usagemetrics" @@ -434,6 +435,8 @@ type Server struct { // with the Resource Service in-process (i.e. not via the network) without auth. // It should only be used for purely-internal workloads, such as controllers. internalResourceServiceClient pbresource.ResourceServiceClient + // handles metrics reporting to HashiCorp + reportingManager *reporting.ReportingManager } type connHandler interface { @@ -760,6 +763,8 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval) go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + s.reportingManager = reporting.NewReportingManager(s.logger, getEnterpriseReportingDeps(flat), s) + // Initialize external gRPC server s.setupExternalGRPC(config, logger) diff --git a/agent/consul/server_oss.go b/agent/consul/server_oss.go index 44ffebf5d..e31539efb 100644 --- a/agent/consul/server_oss.go +++ b/agent/consul/server_oss.go @@ -18,6 +18,7 @@ import ( "google.golang.org/grpc" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/reporting" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" ) @@ -186,3 +187,8 @@ func addSerfMetricsLabels(conf *serf.Config, wan bool, segment string, partition func (s *Server) updateReportingConfig(config ReloadableConfig) { // no-op } + +func getEnterpriseReportingDeps(deps Deps) reporting.EntDeps { + // no-op + return reporting.EntDeps{} +} diff --git a/agent/consul/system_metadata.go b/agent/consul/system_metadata.go index 3c25b2729..40185294b 100644 --- a/agent/consul/system_metadata.go +++ b/agent/consul/system_metadata.go @@ -7,7 +7,7 @@ import ( "github.com/hashicorp/consul/agent/structs" ) -func (s *Server) getSystemMetadata(key string) (string, error) { +func (s *Server) GetSystemMetadata(key string) (string, error) { _, entry, err := s.fsm.State().SystemMetadataGet(nil, key) if err != nil { return "", err @@ -19,7 +19,7 @@ func (s *Server) getSystemMetadata(key string) (string, error) { return entry.Value, nil } -func (s *Server) setSystemMetadataKey(key, val string) error { +func (s *Server) SetSystemMetadataKey(key, val string) error { args := &structs.SystemMetadataRequest{ Op: structs.SystemMetadataUpsert, Entry: &structs.SystemMetadataEntry{Key: key, Value: val}, diff --git a/agent/consul/system_metadata_test.go b/agent/consul/system_metadata_test.go index 084bad046..7c4eb30e4 100644 --- a/agent/consul/system_metadata_test.go +++ b/agent/consul/system_metadata_test.go @@ -42,9 +42,9 @@ func TestLeader_SystemMetadata_CRUD(t *testing.T) { require.Len(t, entries, 0) // Create 3 - require.NoError(t, srv.setSystemMetadataKey("key1", "val1")) - require.NoError(t, srv.setSystemMetadataKey("key2", "val2")) - require.NoError(t, srv.setSystemMetadataKey("key3", "")) + require.NoError(t, srv.SetSystemMetadataKey("key1", "val1")) + require.NoError(t, srv.SetSystemMetadataKey("key2", "val2")) + require.NoError(t, srv.SetSystemMetadataKey("key3", "")) mapify := func(entries []*structs.SystemMetadataEntry) map[string]string { m := make(map[string]string) @@ -65,7 +65,7 @@ func TestLeader_SystemMetadata_CRUD(t *testing.T) { }, mapify(entries)) // Update one and delete one. - require.NoError(t, srv.setSystemMetadataKey("key3", "val3")) + require.NoError(t, srv.SetSystemMetadataKey("key3", "val3")) require.NoError(t, srv.deleteSystemMetadataKey("key1")) _, entries, err = state.SystemMetadataList(nil)