Vault 2823 cc namespace (#12393)

* vault-2823 adding changes

* VAULT-2823 adding alias

* Vault-2823 addressing comments

* Vault-2823 removing comments

* Vault-2823 removing comments

* vault-2823 removing q debug

* adding changelog

* Vault-2823 updating external test

* adding approved changes

* fixing returns

* fixing returns
This commit is contained in:
akshya96 2021-09-07 09:16:12 -07:00 committed by GitHub
parent 45a83d8e0f
commit f4bd14ed3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 226 additions and 92 deletions

3
changelog/12393.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note: improvement
core: observe the client counts broken down by namespace for partial month client count
```

View File

@ -67,6 +67,11 @@ type segmentInfo struct {
entitySequenceNumber uint64
}
type clients struct {
distinctEntities uint64
nonEntityTokens uint64
}
// ActivityLog tracks unique entity counts and non-entity token counts.
// It handles assembling log fragments (and sending them to the active
// node), writing log segments, and precomputing queries.
@ -119,10 +124,6 @@ type ActivityLog struct {
// Channel to stop background processing
doneCh chan struct{}
// All known active entities this month; use fragmentLock read-locked
// to check whether it already exists.
activeEntities map[string]struct{}
// track metadata and contents of the most recent log segment
currentSegment segmentInfo
@ -143,6 +144,16 @@ type ActivityLog struct {
// for testing: is config currently being invalidated. protected by l
configInvalidationInProgress bool
// entityTracker tracks active entities this month. Protected by fragmentLock.
entityTracker *EntityTracker
}
type EntityTracker struct {
// All known active entities this month; use fragmentLock read-locked
// to check whether it already exists.
activeEntities map[string]struct{}
entityCountByNamespaceID map[string]uint64
}
// These non-persistent configuration options allow us to disable
@ -174,7 +185,10 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size
writeCh: make(chan struct{}, 1), // same for full segment
doneCh: make(chan struct{}, 1),
activeEntities: make(map[string]struct{}),
entityTracker: &EntityTracker{
activeEntities: make(map[string]struct{}),
entityCountByNamespaceID: make(map[string]uint64),
},
currentSegment: segmentInfo{
startTimestamp: 0,
currentEntities: &activity.EntityActivityLog{
@ -531,7 +545,7 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time
// Or the feature has been disabled.
if a.enabled && startTime.Unix() == a.currentSegment.startTimestamp {
for _, ent := range out.Entities {
a.activeEntities[ent.EntityID] = struct{}{}
a.entityTracker.addEntity(ent)
}
}
a.fragmentLock.Unlock()
@ -571,7 +585,7 @@ func (a *ActivityLog) loadCurrentEntitySegment(ctx context.Context, startTime ti
}
for _, ent := range out.Entities {
a.activeEntities[ent.EntityID] = struct{}{}
a.entityTracker.addEntity(ent)
}
return nil
@ -683,7 +697,8 @@ func (a *ActivityLog) resetCurrentLog() {
a.currentSegment.entitySequenceNumber = 0
a.fragment = nil
a.activeEntities = make(map[string]struct{})
a.entityTracker.activeEntities = make(map[string]struct{})
a.entityTracker.entityCountByNamespaceID = make(map[string]uint64)
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
}
@ -1094,7 +1109,8 @@ func (a *ActivityLog) perfStandbyFragmentWorker() {
// clear active entity set
a.fragmentLock.Lock()
a.activeEntities = make(map[string]struct{})
a.entityTracker.activeEntities = make(map[string]struct{})
a.entityTracker.entityCountByNamespaceID = make(map[string]uint64)
a.fragmentLock.Unlock()
// Set timer for next month.
@ -1282,7 +1298,7 @@ func (a *ActivityLog) AddEntityToFragment(entityID string, namespaceID string, t
a.fragmentLock.RLock()
if a.enabled {
_, present = a.activeEntities[entityID]
_, present = a.entityTracker.activeEntities[entityID]
} else {
present = true
}
@ -1296,20 +1312,20 @@ func (a *ActivityLog) AddEntityToFragment(entityID string, namespaceID string, t
defer a.fragmentLock.Unlock()
// Re-check entity ID after re-acquiring lock
_, present = a.activeEntities[entityID]
_, present = a.entityTracker.activeEntities[entityID]
if present {
return
}
a.createCurrentFragment()
a.fragment.Entities = append(a.fragment.Entities,
&activity.EntityRecord{
EntityID: entityID,
NamespaceID: namespaceID,
Timestamp: timestamp,
})
a.activeEntities[entityID] = struct{}{}
entityRecord := &activity.EntityRecord{
EntityID: entityID,
NamespaceID: namespaceID,
Timestamp: timestamp,
}
a.fragment.Entities = append(a.fragment.Entities, entityRecord)
a.entityTracker.addEntity(entityRecord)
}
func (a *ActivityLog) AddTokenToFragment(namespaceID string) {
@ -1353,7 +1369,7 @@ func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) {
}
for _, e := range fragment.Entities {
a.activeEntities[e.EntityID] = struct{}{}
a.entityTracker.addEntity(e)
}
a.standbyFragmentsReceived = append(a.standbyFragmentsReceived, fragment)
@ -1770,7 +1786,7 @@ func (a *ActivityLog) PartialMonthMetrics(ctx context.Context) ([]metricsutil.Ga
// Empty list
return []metricsutil.GaugeLabelValues{}, nil
}
count := len(a.activeEntities)
count := len(a.entityTracker.activeEntities)
return []metricsutil.GaugeLabelValues{
{
@ -1792,26 +1808,94 @@ func (c *Core) activeEntityGaugeCollector(ctx context.Context) ([]metricsutil.Ga
// partialMonthClientCount returns the number of clients used so far this month.
// If activity log is not enabled, the response will be nil
func (a *ActivityLog) partialMonthClientCount(ctx context.Context) map[string]interface{} {
func (a *ActivityLog) partialMonthClientCount(ctx context.Context) (map[string]interface{}, error) {
a.fragmentLock.RLock()
defer a.fragmentLock.RUnlock()
if !a.enabled {
// nothing to count
return nil
return nil, nil
}
entityCount := len(a.activeEntities)
var tokenCount int
for _, countByNS := range a.currentSegment.tokenCount.CountByNamespaceID {
tokenCount += int(countByNS)
}
clientCount := entityCount + tokenCount
byNamespace := make([]*ClientCountInNamespace, 0)
responseData := make(map[string]interface{})
responseData["distinct_entities"] = entityCount
responseData["non_entity_tokens"] = tokenCount
responseData["clients"] = clientCount
totalEntities := 0
totalTokens := 0
return responseData
clientCountTable := createClientCountTable(a.entityTracker.entityCountByNamespaceID, a.currentSegment.tokenCount.CountByNamespaceID)
queryNS, err := namespace.FromContext(ctx)
if err != nil {
return nil, err
}
for nsID, clients := range clientCountTable {
ns, err := NamespaceByID(ctx, nsID, a.core)
if err != nil {
return nil, err
}
// Only include namespaces that are the queryNS or within it. If queryNS is the
// root namespace, include all namespaces, even those which have been deleted.
if a.includeInResponse(queryNS, ns) {
var displayPath string
if ns == nil {
displayPath = fmt.Sprintf("deleted namespace %q", nsID)
} else {
displayPath = ns.Path
}
byNamespace = append(byNamespace, &ClientCountInNamespace{
NamespaceID: nsID,
NamespacePath: displayPath,
Counts: ClientCountResponse{
DistinctEntities: int(clients.distinctEntities),
NonEntityTokens: int(clients.nonEntityTokens),
Clients: int(clients.distinctEntities + clients.nonEntityTokens),
},
})
totalEntities += int(clients.distinctEntities)
totalTokens += int(clients.nonEntityTokens)
}
}
sort.Slice(byNamespace, func(i, j int) bool {
return byNamespace[i].NamespaceID < byNamespace[j].NamespaceID
})
responseData["by_namespace"] = byNamespace
responseData["distinct_entities"] = totalEntities
responseData["non_entity_tokens"] = totalTokens
responseData["clients"] = totalEntities + totalTokens
return responseData, nil
}
//createClientCountTable maps the entitycount and token count to the namespace id
func createClientCountTable(entityMap map[string]uint64, tokenMap map[string]uint64) map[string]*clients {
clientCountTable := make(map[string]*clients)
for nsID, count := range entityMap {
if _, ok := clientCountTable[nsID]; !ok {
clientCountTable[nsID] = &clients{distinctEntities: 0, nonEntityTokens: 0}
}
clientCountTable[nsID].distinctEntities += count
}
for nsID, count := range tokenMap {
if _, ok := clientCountTable[nsID]; !ok {
clientCountTable[nsID] = &clients{distinctEntities: 0, nonEntityTokens: 0}
}
clientCountTable[nsID].nonEntityTokens += count
}
return clientCountTable
}
func (et *EntityTracker) addEntity(e *activity.EntityRecord) {
if _, ok := et.activeEntities[e.EntityID]; !ok {
et.activeEntities[e.EntityID] = struct{}{}
et.entityCountByNamespaceID[e.NamespaceID] += 1
}
}

View File

@ -15,10 +15,12 @@ import (
"github.com/go-test/deep"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/vault/helper/constants"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/timeutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault/activity"
"github.com/mitchellh/mapstructure"
)
func TestActivityLog_Creation(t *testing.T) {
@ -1005,7 +1007,7 @@ func (a *ActivityLog) resetEntitiesInMemory(t *testing.T) {
entitySequenceNumber: 0,
}
a.activeEntities = make(map[string]struct{})
a.entityTracker.activeEntities = make(map[string]struct{})
}
func TestActivityLog_loadCurrentEntitySegment(t *testing.T) {
@ -1188,7 +1190,7 @@ func TestActivityLog_loadPriorEntitySegment(t *testing.T) {
if tc.refresh {
a.l.Lock()
a.fragmentLock.Lock()
a.activeEntities = make(map[string]struct{})
a.entityTracker.activeEntities = make(map[string]struct{})
a.currentSegment.startTimestamp = tc.time
a.fragmentLock.Unlock()
a.l.Unlock()
@ -1326,54 +1328,54 @@ func setupActivityRecordsInStorage(t *testing.T, base time.Time, includeEntities
entityRecords = []*activity.EntityRecord{
{
EntityID: "11111111-1111-1111-1111-111111111111",
NamespaceID: "root",
NamespaceID: namespace.RootNamespaceID,
Timestamp: time.Now().Unix(),
},
{
EntityID: "22222222-2222-2222-2222-222222222222",
NamespaceID: "root",
NamespaceID: namespace.RootNamespaceID,
Timestamp: time.Now().Unix(),
},
{
EntityID: "33333333-2222-2222-2222-222222222222",
NamespaceID: "root",
NamespaceID: namespace.RootNamespaceID,
Timestamp: time.Now().Unix(),
},
}
if constants.IsEnterprise {
entityRecords = append(entityRecords, []*activity.EntityRecord{
{
EntityID: "44444444-1111-1111-1111-111111111111",
NamespaceID: "ns1",
Timestamp: time.Now().Unix(),
},
}...)
}
for i, entityRecord := range entityRecords {
entityData, err := proto.Marshal(&activity.EntityActivityLog{
Entities: []*activity.EntityRecord{entityRecord},
})
if err != nil {
t.Fatalf(err.Error())
}
if i == 0 {
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData)
} else {
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/"+strconv.Itoa(i-1), entityData)
}
testEntities1 := &activity.EntityActivityLog{
Entities: entityRecords[:1],
}
entityData1, err := proto.Marshal(testEntities1)
if err != nil {
t.Fatalf(err.Error())
}
testEntities2 := &activity.EntityActivityLog{
Entities: entityRecords[1:2],
}
entityData2, err := proto.Marshal(testEntities2)
if err != nil {
t.Fatalf(err.Error())
}
testEntities3 := &activity.EntityActivityLog{
Entities: entityRecords[2:],
}
entityData3, err := proto.Marshal(testEntities3)
if err != nil {
t.Fatalf(err.Error())
}
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(monthsAgo.Unix())+"/0", entityData1)
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/0", entityData2)
WriteToStorage(t, core, ActivityLogPrefix+"entity/"+fmt.Sprint(base.Unix())+"/1", entityData3)
}
var tokenRecords map[string]uint64
if includeTokens {
tokenRecords = make(map[string]uint64)
for i := 1; i < 4; i++ {
nsID := "ns" + strconv.Itoa(i)
tokenRecords[nsID] = uint64(i)
tokenRecords[namespace.RootNamespaceID] = uint64(1)
if constants.IsEnterprise {
for i := 1; i < 4; i++ {
nsID := "ns" + strconv.Itoa(i)
tokenRecords[nsID] = uint64(i)
}
}
tokenCount := &activity.TokenCount{
CountByNamespaceID: tokenRecords,
@ -1405,7 +1407,7 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) {
Entities: expectedEntityRecords[1:],
}
expectedCurrent := &activity.EntityActivityLog{
Entities: expectedEntityRecords[2:],
Entities: expectedEntityRecords[len(expectedEntityRecords)-1:],
}
currentEntities := a.GetCurrentEntities()
@ -1446,7 +1448,7 @@ func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testi
wg.Wait()
expected := &activity.EntityActivityLog{
Entities: expectedEntityRecords[2:],
Entities: expectedEntityRecords[len(expectedEntityRecords)-1:],
}
currentEntities := a.GetCurrentEntities()
@ -1496,7 +1498,7 @@ func TestActivityLog_refreshFromStoredLogNoTokens(t *testing.T) {
Entities: expectedEntityRecords[1:],
}
expectedCurrent := &activity.EntityActivityLog{
Entities: expectedEntityRecords[2:],
Entities: expectedEntityRecords[len(expectedEntityRecords)-1:],
}
currentEntities := a.GetCurrentEntities()
@ -1595,7 +1597,7 @@ func TestActivityLog_refreshFromStoredLogPreviousMonth(t *testing.T) {
Entities: expectedEntityRecords[1:],
}
expectedCurrent := &activity.EntityActivityLog{
Entities: expectedEntityRecords[2:],
Entities: expectedEntityRecords[len(expectedEntityRecords)-1:],
}
currentEntities := a.GetCurrentEntities()
@ -2499,9 +2501,15 @@ func TestActivityLog_Deletion(t *testing.T) {
func TestActivityLog_partialMonthClientCount(t *testing.T) {
timeutil.SkipAtEndOfMonth(t)
ctx := context.Background()
ctx := namespace.RootContext(nil)
now := time.Now().UTC()
a, entities, tokenCounts := setupActivityRecordsInStorage(t, timeutil.StartOfMonth(now), true, true)
entities = entities[1:]
entityCounts := make(map[string]uint64)
for _, entity := range entities {
entityCounts[entity.NamespaceID] += 1
}
a.SetEnable(true)
var wg sync.WaitGroup
@ -2512,7 +2520,7 @@ func TestActivityLog_partialMonthClientCount(t *testing.T) {
wg.Wait()
// entities[0] is from a previous month
partialMonthEntityCount := len(entities[1:])
partialMonthEntityCount := len(entities)
var partialMonthTokenCount int
for _, countByNS := range tokenCounts {
partialMonthTokenCount += int(countByNS)
@ -2520,11 +2528,42 @@ func TestActivityLog_partialMonthClientCount(t *testing.T) {
expectedClientCount := partialMonthEntityCount + partialMonthTokenCount
results := a.partialMonthClientCount(ctx)
results, err := a.partialMonthClientCount(ctx)
if err != nil {
t.Fatal(err)
}
if results == nil {
t.Fatal("no results to test")
}
byNamespace, ok := results["by_namespace"]
if !ok {
t.Fatalf("malformed results. got %v", results)
}
clientCountResponse := make([]*ClientCountInNamespace, 0)
err = mapstructure.Decode(byNamespace, &clientCountResponse)
if err != nil {
t.Fatal(err)
}
for _, clientCount := range clientCountResponse {
if int(entityCounts[clientCount.NamespaceID]) != clientCount.Counts.DistinctEntities {
t.Errorf("bad entity count for namespace %s . expected %d, got %d", clientCount.NamespaceID, int(entityCounts[clientCount.NamespaceID]), clientCount.Counts.DistinctEntities)
}
if int(tokenCounts[clientCount.NamespaceID]) != clientCount.Counts.NonEntityTokens {
t.Errorf("bad token count for namespace %s . expected %d, got %d", clientCount.NamespaceID, int(tokenCounts[clientCount.NamespaceID]), clientCount.Counts.NonEntityTokens)
}
totalCount := int(entityCounts[clientCount.NamespaceID] + tokenCounts[clientCount.NamespaceID])
if totalCount != clientCount.Counts.Clients {
t.Errorf("bad client count for namespace %s . expected %d, got %d", clientCount.NamespaceID, totalCount, clientCount.Counts.Clients)
}
}
entityCount, ok := results["distinct_entities"]
if !ok {
t.Fatalf("malformed results. got %v", results)

View File

@ -4,6 +4,7 @@ import (
"context"
"testing"
"github.com/hashicorp/vault/helper/constants"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault/activity"
)
@ -11,18 +12,20 @@ import (
// InjectActivityLogDataThisMonth populates the in-memory client store
// with some entities and tokens, overriding what was already there
// It is currently used for API integration tests
func (c *Core) InjectActivityLogDataThisMonth(t *testing.T) (map[string]struct{}, map[string]uint64) {
func (c *Core) InjectActivityLogDataThisMonth(t *testing.T) (map[string]uint64, map[string]uint64) {
t.Helper()
tokens := make(map[string]uint64, 0)
entitiesByNS := make(map[string]uint64, 0)
tokens["root"] = 5
entitiesByNS["root"] = 5
activeEntities := map[string]struct{}{
"entity0": {},
"entity1": {},
"entity2": {},
}
tokens := map[string]uint64{
"ns0": 5,
"ns1": 1,
"ns2": 10,
if constants.IsEnterprise {
tokens["ns0"] = 5
tokens["ns1"] = 1
tokens["ns2"] = 1
entitiesByNS["ns0"] = 1
entitiesByNS["ns1"] = 1
entitiesByNS["ns2"] = 1
}
c.activityLog.l.Lock()
@ -30,10 +33,9 @@ func (c *Core) InjectActivityLogDataThisMonth(t *testing.T) (map[string]struct{}
c.activityLog.fragmentLock.Lock()
defer c.activityLog.fragmentLock.Unlock()
c.activityLog.activeEntities = activeEntities
c.activityLog.currentSegment.tokenCount.CountByNamespaceID = tokens
return activeEntities, tokens
c.activityLog.entityTracker.entityCountByNamespaceID = entitiesByNS
return entitiesByNS, tokens
}
// Return the in-memory activeEntities from an activity log
@ -42,7 +44,7 @@ func (c *Core) GetActiveEntities() map[string]struct{} {
c.stateLock.RLock()
c.activityLog.fragmentLock.RLock()
for k, v := range c.activityLog.activeEntities {
for k, v := range c.activityLog.entityTracker.activeEntities {
out[k] = v
}
c.activityLog.fragmentLock.RUnlock()
@ -104,7 +106,7 @@ func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart
if a.currentSegment.currentEntities.Entities == nil {
t.Errorf("expected non-nil currentSegment.currentEntities.Entities")
}
if a.activeEntities == nil {
if a.entityTracker.activeEntities == nil {
t.Errorf("expected non-nil activeEntities")
}
if a.currentSegment.tokenCount == nil {
@ -117,8 +119,8 @@ func (a *ActivityLog) ExpectCurrentSegmentRefreshed(t *testing.T, expectedStart
if len(a.currentSegment.currentEntities.Entities) > 0 {
t.Errorf("expected no current entity segment to be loaded. got: %v", a.currentSegment.currentEntities)
}
if len(a.activeEntities) > 0 {
t.Errorf("expected no active entity segment to be loaded. got: %v", a.activeEntities)
if len(a.entityTracker.activeEntities) > 0 {
t.Errorf("expected no active entity segment to be loaded. got: %v", a.entityTracker.activeEntities)
}
if len(a.currentSegment.tokenCount.CountByNamespaceID) > 0 {
t.Errorf("expected no token counts to be loaded. got: %v", a.currentSegment.tokenCount.CountByNamespaceID)

View File

@ -88,7 +88,10 @@ func TestActivityLog_MonthlyActivityApi(t *testing.T) {
// inject some data and query the API
entities, tokens := core.InjectActivityLogDataThisMonth(t)
expectedEntities := len(entities)
var expectedEntities int
for _, entityCount := range entities {
expectedEntities += int(entityCount)
}
var expectedTokens int
for _, tokenCount := range tokens {
expectedTokens += int(tokenCount)

View File

@ -142,7 +142,10 @@ func (b *SystemBackend) handleMonthlyActivityCount(ctx context.Context, req *log
return logical.ErrorResponse("no activity log present"), nil
}
results := a.partialMonthClientCount(ctx)
results, err := a.partialMonthClientCount(ctx)
if err != nil {
return nil, err
}
if results == nil {
return logical.RespondWithStatusCode(nil, req, http.StatusNoContent)
}