feat: set up reporting agent (#16991)
This commit is contained in:
parent
41064eb20b
commit
4f7d1b4700
|
@ -113,7 +113,7 @@ type serverACLResolverBackend struct {
|
|||
}
|
||||
|
||||
func (s *serverACLResolverBackend) IsServerManagementToken(token string) bool {
|
||||
mgmt, err := s.getSystemMetadata(structs.ServerManagementTokenAccessorID)
|
||||
mgmt, err := s.GetSystemMetadata(structs.ServerManagementTokenAccessorID)
|
||||
if err != nil {
|
||||
s.logger.Debug("failed to fetch server management token: %w", err)
|
||||
return false
|
||||
|
|
|
@ -341,6 +341,10 @@ func (s *Server) establishLeadership(ctx context.Context) error {
|
|||
s.startLogVerification(ctx)
|
||||
}
|
||||
|
||||
if s.config.Reporting.License.Enabled && s.reportingManager != nil {
|
||||
s.reportingManager.StartReportingAgent()
|
||||
}
|
||||
|
||||
s.logger.Debug("successfully established leadership", "duration", time.Since(start))
|
||||
return nil
|
||||
}
|
||||
|
@ -377,6 +381,8 @@ func (s *Server) revokeLeadership() {
|
|||
s.resetConsistentReadReady()
|
||||
|
||||
s.autopilot.DisableReconciliation()
|
||||
|
||||
s.reportingManager.StopReportingAgent()
|
||||
}
|
||||
|
||||
// initializeACLs is used to setup the ACLs if we are the leader
|
||||
|
@ -525,7 +531,7 @@ func (s *Server) initializeACLs(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to generate the secret ID for the server management token: %w", err)
|
||||
}
|
||||
if err := s.setSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil {
|
||||
if err := s.SetSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil {
|
||||
return fmt.Errorf("failed to persist server management token: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -207,7 +207,7 @@ func (s *Server) setVirtualIPFlags() (bool, error) {
|
|||
}
|
||||
|
||||
func (s *Server) setVirtualIPVersionFlag() (bool, error) {
|
||||
val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
|
||||
val, err := s.GetSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -220,7 +220,7 @@ func (s *Server) setVirtualIPVersionFlag() (bool, error) {
|
|||
minVirtualIPVersion.String())
|
||||
}
|
||||
|
||||
if err := s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil {
|
||||
if err := s.SetSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
@ -228,7 +228,7 @@ func (s *Server) setVirtualIPVersionFlag() (bool, error) {
|
|||
}
|
||||
|
||||
func (s *Server) setVirtualIPTerminatingGatewayVersionFlag() (bool, error) {
|
||||
val, err := s.getSystemMetadata(structs.SystemMetadataTermGatewayVirtualIPsEnabled)
|
||||
val, err := s.GetSystemMetadata(structs.SystemMetadataTermGatewayVirtualIPsEnabled)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -241,7 +241,7 @@ func (s *Server) setVirtualIPTerminatingGatewayVersionFlag() (bool, error) {
|
|||
minVirtualIPTerminatingGatewayVersion.String())
|
||||
}
|
||||
|
||||
if err := s.setSystemMetadataKey(structs.SystemMetadataTermGatewayVirtualIPsEnabled, "true"); err != nil {
|
||||
if err := s.SetSystemMetadataKey(structs.SystemMetadataTermGatewayVirtualIPsEnabled, "true"); err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ func (s *Server) startIntentionConfigEntryMigration(ctx context.Context) error {
|
|||
|
||||
// Check for the system metadata first, as that's the most trustworthy in
|
||||
// both the primary and secondaries.
|
||||
intentionFormat, err := s.getSystemMetadata(structs.SystemMetadataIntentionFormatKey)
|
||||
intentionFormat, err := s.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -243,7 +243,7 @@ func (s *Server) legacyIntentionMigrationInSecondaryDC(ctx context.Context) erro
|
|||
// error.
|
||||
for {
|
||||
// Check for the system metadata first, as that's the most trustworthy.
|
||||
intentionFormat, err := s.getSystemMetadata(structs.SystemMetadataIntentionFormatKey)
|
||||
intentionFormat, err := s.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -523,7 +523,7 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
|||
|
||||
// Wait until the migration routine is complete.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
intentionFormat, err := s1.getSystemMetadata(structs.SystemMetadataIntentionFormatKey)
|
||||
intentionFormat, err := s1.GetSystemMetadata(structs.SystemMetadataIntentionFormatKey)
|
||||
require.NoError(r, err)
|
||||
if intentionFormat != structs.SystemMetadataIntentionFormatConfigValue {
|
||||
r.Fatal("intention migration is not yet complete")
|
||||
|
|
|
@ -1303,7 +1303,7 @@ func TestLeader_ACL_Initialization(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.NotNil(t, policy)
|
||||
|
||||
serverToken, err := s1.getSystemMetadata(structs.ServerManagementTokenAccessorID)
|
||||
serverToken, err := s1.GetSystemMetadata(structs.ServerManagementTokenAccessorID)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, serverToken)
|
||||
|
||||
|
@ -1341,14 +1341,14 @@ func TestLeader_ACL_Initialization_SecondaryDC(t *testing.T) {
|
|||
testrpc.WaitForTestAgent(t, s2.RPC, "dc2")
|
||||
|
||||
// Check dc1's management token
|
||||
serverToken1, err := s1.getSystemMetadata(structs.ServerManagementTokenAccessorID)
|
||||
serverToken1, err := s1.GetSystemMetadata(structs.ServerManagementTokenAccessorID)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, serverToken1)
|
||||
_, err = uuid.ParseUUID(serverToken1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check dc2's management token
|
||||
serverToken2, err := s2.getSystemMetadata(structs.ServerManagementTokenAccessorID)
|
||||
serverToken2, err := s2.GetSystemMetadata(structs.ServerManagementTokenAccessorID)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, serverToken2)
|
||||
_, err = uuid.ParseUUID(serverToken2)
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
package reporting
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
type ReportingManager struct {
|
||||
logger hclog.Logger
|
||||
server ServerDelegate
|
||||
EntDeps
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
const (
|
||||
SystemMetadataReportingProcessID = "reporting-process-id"
|
||||
)
|
||||
|
||||
//go:generate mockery --name ServerDelegate --inpackage
|
||||
type ServerDelegate interface {
|
||||
GetSystemMetadata(key string) (string, error)
|
||||
SetSystemMetadataKey(key, val string) error
|
||||
IsLeader() bool
|
||||
}
|
||||
|
||||
func NewReportingManager(logger hclog.Logger, deps EntDeps, server ServerDelegate) *ReportingManager {
|
||||
rm := &ReportingManager{
|
||||
logger: logger.Named("reporting"),
|
||||
server: server,
|
||||
}
|
||||
err := rm.initEnterpriseReporting(deps)
|
||||
if err != nil {
|
||||
rm.logger.Error("Error initializing reporting manager", "error", err)
|
||||
return nil
|
||||
}
|
||||
return rm
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
//go:build !consulent
|
||||
// +build !consulent
|
||||
|
||||
package reporting
|
||||
|
||||
type EntDeps struct{}
|
||||
|
||||
func (rm *ReportingManager) initEnterpriseReporting(entDeps EntDeps) error {
|
||||
// no op
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rm *ReportingManager) StartReportingAgent() error {
|
||||
// no op
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rm *ReportingManager) StopReportingAgent() error {
|
||||
// no op
|
||||
return nil
|
||||
}
|
|
@ -44,6 +44,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/consul/multilimiter"
|
||||
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
|
||||
"github.com/hashicorp/consul/agent/consul/reporting"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/consul/usagemetrics"
|
||||
|
@ -434,6 +435,8 @@ type Server struct {
|
|||
// with the Resource Service in-process (i.e. not via the network) without auth.
|
||||
// It should only be used for purely-internal workloads, such as controllers.
|
||||
internalResourceServiceClient pbresource.ResourceServiceClient
|
||||
// handles metrics reporting to HashiCorp
|
||||
reportingManager *reporting.ReportingManager
|
||||
}
|
||||
|
||||
type connHandler interface {
|
||||
|
@ -760,6 +763,8 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
|||
s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval)
|
||||
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
|
||||
s.reportingManager = reporting.NewReportingManager(s.logger, getEnterpriseReportingDeps(flat), s)
|
||||
|
||||
// Initialize external gRPC server
|
||||
s.setupExternalGRPC(config, logger)
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/reporting"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
@ -186,3 +187,8 @@ func addSerfMetricsLabels(conf *serf.Config, wan bool, segment string, partition
|
|||
func (s *Server) updateReportingConfig(config ReloadableConfig) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
func getEnterpriseReportingDeps(deps Deps) reporting.EntDeps {
|
||||
// no-op
|
||||
return reporting.EntDeps{}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func (s *Server) getSystemMetadata(key string) (string, error) {
|
||||
func (s *Server) GetSystemMetadata(key string) (string, error) {
|
||||
_, entry, err := s.fsm.State().SystemMetadataGet(nil, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
@ -19,7 +19,7 @@ func (s *Server) getSystemMetadata(key string) (string, error) {
|
|||
return entry.Value, nil
|
||||
}
|
||||
|
||||
func (s *Server) setSystemMetadataKey(key, val string) error {
|
||||
func (s *Server) SetSystemMetadataKey(key, val string) error {
|
||||
args := &structs.SystemMetadataRequest{
|
||||
Op: structs.SystemMetadataUpsert,
|
||||
Entry: &structs.SystemMetadataEntry{Key: key, Value: val},
|
||||
|
|
|
@ -42,9 +42,9 @@ func TestLeader_SystemMetadata_CRUD(t *testing.T) {
|
|||
require.Len(t, entries, 0)
|
||||
|
||||
// Create 3
|
||||
require.NoError(t, srv.setSystemMetadataKey("key1", "val1"))
|
||||
require.NoError(t, srv.setSystemMetadataKey("key2", "val2"))
|
||||
require.NoError(t, srv.setSystemMetadataKey("key3", ""))
|
||||
require.NoError(t, srv.SetSystemMetadataKey("key1", "val1"))
|
||||
require.NoError(t, srv.SetSystemMetadataKey("key2", "val2"))
|
||||
require.NoError(t, srv.SetSystemMetadataKey("key3", ""))
|
||||
|
||||
mapify := func(entries []*structs.SystemMetadataEntry) map[string]string {
|
||||
m := make(map[string]string)
|
||||
|
@ -65,7 +65,7 @@ func TestLeader_SystemMetadata_CRUD(t *testing.T) {
|
|||
}, mapify(entries))
|
||||
|
||||
// Update one and delete one.
|
||||
require.NoError(t, srv.setSystemMetadataKey("key3", "val3"))
|
||||
require.NoError(t, srv.SetSystemMetadataKey("key3", "val3"))
|
||||
require.NoError(t, srv.deleteSystemMetadataKey("key1"))
|
||||
|
||||
_, entries, err = state.SystemMetadataList(nil)
|
||||
|
|
Loading…
Reference in New Issue