VAULT-14735: repeated and segmented activity log clients (#20699)
* add repeated, segmented, and writing * simplify * pr fixes * remove comment * Update vault/logical_system_activity_write_testonly.go Co-authored-by: Mike Palmiotto <mike.palmiotto@hashicorp.com> --------- Co-authored-by: Mike Palmiotto <mike.palmiotto@hashicorp.com>
This commit is contained in:
parent
4da72c45ce
commit
fdecd99d26
|
@ -7,7 +7,6 @@ package vault
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
@ -61,6 +60,11 @@ func (b *SystemBackend) handleActivityWriteData(ctx context.Context, request *lo
|
|||
type singleMonthActivityClients struct {
|
||||
// clients are indexed by ID
|
||||
clients []*activity.EntityRecord
|
||||
// predefinedSegments map from the segment number to the client's index in
|
||||
// the clients slice
|
||||
predefinedSegments map[int][]int
|
||||
// generationParameters holds the generation request
|
||||
generationParameters *generation.Data
|
||||
}
|
||||
|
||||
// multipleMonthsActivityClients holds multiple month's data
|
||||
|
@ -69,9 +73,80 @@ type multipleMonthsActivityClients struct {
|
|||
months []*singleMonthActivityClients
|
||||
}
|
||||
|
||||
func (s *singleMonthActivityClients) addEntityRecord(record *activity.EntityRecord, segmentIndex *int) {
|
||||
s.clients = append(s.clients, record)
|
||||
if segmentIndex != nil {
|
||||
index := len(s.clients) - 1
|
||||
s.predefinedSegments[*segmentIndex] = append(s.predefinedSegments[*segmentIndex], index)
|
||||
}
|
||||
}
|
||||
|
||||
// populateSegments converts a month of clients into a segmented map. The map's
|
||||
// keys are the segment index, and the value are the clients that were seen in
|
||||
// that index. If the value is an empty slice, then it's an empty index. If the
|
||||
// value is nil, then it's a skipped index
|
||||
func (s *singleMonthActivityClients) populateSegments() (map[int][]*activity.EntityRecord, error) {
|
||||
segments := make(map[int][]*activity.EntityRecord)
|
||||
ignoreIndexes := make(map[int]struct{})
|
||||
skipIndexes := s.generationParameters.SkipSegmentIndexes
|
||||
emptyIndexes := s.generationParameters.EmptySegmentIndexes
|
||||
|
||||
for _, i := range skipIndexes {
|
||||
segments[int(i)] = nil
|
||||
ignoreIndexes[int(i)] = struct{}{}
|
||||
}
|
||||
for _, i := range emptyIndexes {
|
||||
segments[int(i)] = make([]*activity.EntityRecord, 0, 0)
|
||||
ignoreIndexes[int(i)] = struct{}{}
|
||||
}
|
||||
|
||||
// if we have predefined segments, then we can construct the map using those
|
||||
if len(s.predefinedSegments) > 0 {
|
||||
for segment, clientIndexes := range s.predefinedSegments {
|
||||
clientsInSegment := make([]*activity.EntityRecord, 0, len(clientIndexes))
|
||||
for _, idx := range clientIndexes {
|
||||
clientsInSegment = append(clientsInSegment, s.clients[idx])
|
||||
}
|
||||
segments[segment] = clientsInSegment
|
||||
}
|
||||
return segments, nil
|
||||
}
|
||||
|
||||
totalSegmentCount := 1
|
||||
if s.generationParameters.GetNumSegments() > 0 {
|
||||
totalSegmentCount = int(s.generationParameters.GetNumSegments())
|
||||
}
|
||||
numNonUsable := len(skipIndexes) + len(emptyIndexes)
|
||||
usableSegmentCount := totalSegmentCount - numNonUsable
|
||||
if usableSegmentCount <= 0 {
|
||||
return nil, fmt.Errorf("num segments %d is too low, it must be greater than %d (%d skipped indexes + %d empty indexes)", totalSegmentCount, numNonUsable, len(skipIndexes), len(emptyIndexes))
|
||||
}
|
||||
|
||||
// determine how many clients should be in each segment
|
||||
segmentSizes := len(s.clients) / usableSegmentCount
|
||||
if len(s.clients)%usableSegmentCount != 0 {
|
||||
segmentSizes++
|
||||
}
|
||||
|
||||
clientIndex := 0
|
||||
for i := 0; i < totalSegmentCount; i++ {
|
||||
if clientIndex >= len(s.clients) {
|
||||
break
|
||||
}
|
||||
if _, ok := ignoreIndexes[i]; ok {
|
||||
continue
|
||||
}
|
||||
for len(segments[i]) < segmentSizes && clientIndex < len(s.clients) {
|
||||
segments[i] = append(segments[i], s.clients[clientIndex])
|
||||
clientIndex++
|
||||
}
|
||||
}
|
||||
return segments, nil
|
||||
}
|
||||
|
||||
// addNewClients generates clients according to the given parameters, and adds them to the month
|
||||
// the client will always have the mountAccessor as its mount accessor
|
||||
func (s *singleMonthActivityClients) addNewClients(c *generation.Client, mountAccessor string) error {
|
||||
func (s *singleMonthActivityClients) addNewClients(c *generation.Client, mountAccessor string, segmentIndex *int) error {
|
||||
count := 1
|
||||
if c.Count > 1 {
|
||||
count = int(c.Count)
|
||||
|
@ -95,17 +170,13 @@ func (s *singleMonthActivityClients) addNewClients(c *generation.Client, mountAc
|
|||
return err
|
||||
}
|
||||
}
|
||||
s.clients = append(s.clients, record)
|
||||
s.addEntityRecord(record, segmentIndex)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processMonth populates a month of client data
|
||||
func (m *multipleMonthsActivityClients) processMonth(ctx context.Context, core *Core, month *generation.Data) error {
|
||||
if month.GetAll() == nil {
|
||||
return errors.New("segmented monthly data is not yet supported")
|
||||
}
|
||||
|
||||
// default to using the root namespace and the first mount on the root namespace
|
||||
mounts, err := core.ListMounts()
|
||||
if err != nil {
|
||||
|
@ -118,52 +189,101 @@ func (m *multipleMonthsActivityClients) processMonth(ctx context.Context, core *
|
|||
break
|
||||
}
|
||||
}
|
||||
addingTo := m.months[month.GetMonthsAgo()]
|
||||
m.months[month.GetMonthsAgo()].generationParameters = month
|
||||
add := func(c []*generation.Client, segmentIndex *int) error {
|
||||
for _, clients := range c {
|
||||
|
||||
for _, clients := range month.GetAll().Clients {
|
||||
if clients.Repeated || clients.RepeatedFromMonth > 0 {
|
||||
return errors.New("repeated clients are not yet supported")
|
||||
}
|
||||
|
||||
if clients.Namespace == "" {
|
||||
clients.Namespace = namespace.RootNamespaceID
|
||||
}
|
||||
|
||||
// verify that the namespace exists
|
||||
ns, err := core.NamespaceByID(ctx, clients.Namespace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// verify that the mount exists
|
||||
if clients.Mount != "" {
|
||||
nctx := namespace.ContextWithNamespace(ctx, ns)
|
||||
mountEntry := core.router.MatchingMountEntry(nctx, clients.Mount)
|
||||
if mountEntry == nil {
|
||||
return fmt.Errorf("unable to find matching mount in namespace %s", clients.Namespace)
|
||||
if clients.Namespace == "" {
|
||||
clients.Namespace = namespace.RootNamespaceID
|
||||
}
|
||||
}
|
||||
|
||||
mountAccessor := defaultMountAccessorRootNS
|
||||
if clients.Namespace != namespace.RootNamespaceID && clients.Mount == "" {
|
||||
// if we're not using the root namespace, find a mount on the namespace that we are using
|
||||
found := false
|
||||
for _, mount := range mounts {
|
||||
if mount.NamespaceID == clients.Namespace {
|
||||
mountAccessor = mount.Accessor
|
||||
found = true
|
||||
break
|
||||
// verify that the namespace exists
|
||||
ns, err := core.NamespaceByID(ctx, clients.Namespace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// verify that the mount exists
|
||||
if clients.Mount != "" {
|
||||
nctx := namespace.ContextWithNamespace(ctx, ns)
|
||||
mountEntry := core.router.MatchingMountEntry(nctx, clients.Mount)
|
||||
if mountEntry == nil {
|
||||
return fmt.Errorf("unable to find matching mount in namespace %s", clients.Namespace)
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return fmt.Errorf("unable to find matching mount in namespace %s", clients.Namespace)
|
||||
|
||||
mountAccessor := defaultMountAccessorRootNS
|
||||
if clients.Namespace != namespace.RootNamespaceID && clients.Mount == "" {
|
||||
// if we're not using the root namespace, find a mount on the namespace that we are using
|
||||
found := false
|
||||
for _, mount := range mounts {
|
||||
if mount.NamespaceID == clients.Namespace {
|
||||
mountAccessor = mount.Accessor
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return fmt.Errorf("unable to find matching mount in namespace %s", clients.Namespace)
|
||||
}
|
||||
}
|
||||
|
||||
err = m.addClientToMonth(month.GetMonthsAgo(), clients, mountAccessor, segmentIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = addingTo.addNewClients(clients, mountAccessor)
|
||||
return nil
|
||||
}
|
||||
|
||||
if month.GetAll() != nil {
|
||||
return add(month.GetAll().GetClients(), nil)
|
||||
}
|
||||
predefinedSegments := month.GetSegments()
|
||||
for i, segment := range predefinedSegments.GetSegments() {
|
||||
index := i
|
||||
if segment.SegmentIndex != nil {
|
||||
index = int(*segment.SegmentIndex)
|
||||
}
|
||||
err = add(segment.GetClients().GetClients(), &index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *multipleMonthsActivityClients) addClientToMonth(monthsAgo int32, c *generation.Client, mountAccessor string, segmentIndex *int) error {
|
||||
if c.Repeated || c.RepeatedFromMonth > 0 {
|
||||
return m.addRepeatedClients(monthsAgo, c, mountAccessor, segmentIndex)
|
||||
}
|
||||
return m.months[monthsAgo].addNewClients(c, mountAccessor, segmentIndex)
|
||||
}
|
||||
|
||||
func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *generation.Client, mountAccessor string, segmentIndex *int) error {
|
||||
addingTo := m.months[monthsAgo]
|
||||
repeatedFromMonth := monthsAgo + 1
|
||||
if c.RepeatedFromMonth > 0 {
|
||||
repeatedFromMonth = c.RepeatedFromMonth
|
||||
}
|
||||
repeatedFrom := m.months[repeatedFromMonth]
|
||||
numClients := 1
|
||||
if c.Count > 0 {
|
||||
numClients = int(c.Count)
|
||||
}
|
||||
for _, client := range repeatedFrom.clients {
|
||||
if c.NonEntity == client.NonEntity && mountAccessor == client.MountAccessor && c.Namespace == client.NamespaceID {
|
||||
addingTo.addEntityRecord(client, segmentIndex)
|
||||
numClients--
|
||||
if numClients == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if numClients > 0 {
|
||||
return fmt.Errorf("missing repeated %d clients matching given parameters", numClients)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -172,7 +292,9 @@ func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivit
|
|||
months: make([]*singleMonthActivityClients, numberOfMonths),
|
||||
}
|
||||
for i := 0; i < numberOfMonths; i++ {
|
||||
m.months[i] = new(singleMonthActivityClients)
|
||||
m.months[i] = &singleMonthActivityClients{
|
||||
predefinedSegments: make(map[int][]int),
|
||||
}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/vault/activity"
|
||||
"github.com/hashicorp/vault/vault/activity/generation"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -88,8 +89,10 @@ func TestSystemBackend_handleActivityWriteData(t *testing.T) {
|
|||
// Test_singleMonthActivityClients_addNewClients verifies that new clients are
|
||||
// created correctly, adhering to the requested parameters. The clients should
|
||||
// use the inputted mount and a generated ID if one is not supplied. The new
|
||||
// client should be added to the month's `clients` slice
|
||||
// client should be added to the month's `clients` slice and segment map, if
|
||||
// a segment index is supplied
|
||||
func Test_singleMonthActivityClients_addNewClients(t *testing.T) {
|
||||
segmentIndex := 0
|
||||
tests := []struct {
|
||||
name string
|
||||
mount string
|
||||
|
@ -97,6 +100,7 @@ func Test_singleMonthActivityClients_addNewClients(t *testing.T) {
|
|||
wantNamespace string
|
||||
wantMount string
|
||||
wantID string
|
||||
segmentIndex *int
|
||||
}{
|
||||
{
|
||||
name: "default mount is used",
|
||||
|
@ -133,18 +137,25 @@ func Test_singleMonthActivityClients_addNewClients(t *testing.T) {
|
|||
NonEntity: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "added to segment",
|
||||
clients: &generation.Client{},
|
||||
segmentIndex: &segmentIndex,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
m := &singleMonthActivityClients{}
|
||||
err := m.addNewClients(tt.clients, tt.mount)
|
||||
m := &singleMonthActivityClients{
|
||||
predefinedSegments: make(map[int][]int),
|
||||
}
|
||||
err := m.addNewClients(tt.clients, tt.mount, tt.segmentIndex)
|
||||
require.NoError(t, err)
|
||||
numNew := tt.clients.Count
|
||||
if numNew == 0 {
|
||||
numNew = 1
|
||||
}
|
||||
require.Len(t, m.clients, int(numNew))
|
||||
for _, rec := range m.clients {
|
||||
for i, rec := range m.clients {
|
||||
require.NotNil(t, rec)
|
||||
require.Equal(t, tt.wantNamespace, rec.NamespaceID)
|
||||
require.Equal(t, tt.wantMount, rec.MountAccessor)
|
||||
|
@ -154,6 +165,9 @@ func Test_singleMonthActivityClients_addNewClients(t *testing.T) {
|
|||
} else {
|
||||
require.NotEqual(t, "", rec.ClientID)
|
||||
}
|
||||
if tt.segmentIndex != nil {
|
||||
require.Contains(t, m.predefinedSegments[*tt.segmentIndex], i)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -245,3 +259,172 @@ func Test_multipleMonthsActivityClients_processMonth(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Test_multipleMonthsActivityClients_processMonth_segmented verifies that segments
|
||||
// are filled correctly when a month is processed with segmented data. The clients
|
||||
// should be in the clients array, and should also be in the predefinedSegments map
|
||||
// at the correct segment index
|
||||
func Test_multipleMonthsActivityClients_processMonth_segmented(t *testing.T) {
|
||||
index7 := int32(7)
|
||||
data := &generation.Data{
|
||||
Clients: &generation.Data_Segments{
|
||||
Segments: &generation.Segments{
|
||||
Segments: []*generation.Segment{
|
||||
{
|
||||
Clients: &generation.Clients{Clients: []*generation.Client{
|
||||
{},
|
||||
}},
|
||||
},
|
||||
{
|
||||
Clients: &generation.Clients{Clients: []*generation.Client{{}}},
|
||||
},
|
||||
{
|
||||
SegmentIndex: &index7,
|
||||
Clients: &generation.Clients{Clients: []*generation.Client{{}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
m := newMultipleMonthsActivityClients(1)
|
||||
core, _, _ := TestCoreUnsealed(t)
|
||||
require.NoError(t, m.processMonth(context.Background(), core, data))
|
||||
require.Len(t, m.months[0].predefinedSegments, 3)
|
||||
require.Len(t, m.months[0].clients, 3)
|
||||
|
||||
// segment indexes are correct
|
||||
require.Contains(t, m.months[0].predefinedSegments, 0)
|
||||
require.Contains(t, m.months[0].predefinedSegments, 1)
|
||||
require.Contains(t, m.months[0].predefinedSegments, 7)
|
||||
|
||||
// the data in each segment is correct
|
||||
require.Contains(t, m.months[0].predefinedSegments[0], 0)
|
||||
require.Contains(t, m.months[0].predefinedSegments[1], 1)
|
||||
require.Contains(t, m.months[0].predefinedSegments[7], 2)
|
||||
}
|
||||
|
||||
// Test_multipleMonthsActivityClients_addRepeatedClients adds repeated clients
|
||||
// from 1 month ago and 2 months ago, and verifies that the correct clients are
|
||||
// added based on namespace, mount, and non-entity attributes
|
||||
func Test_multipleMonthsActivityClients_addRepeatedClients(t *testing.T) {
|
||||
m := newMultipleMonthsActivityClients(3)
|
||||
defaultMount := "default"
|
||||
|
||||
require.NoError(t, m.addClientToMonth(2, &generation.Client{Count: 2}, "identity", nil))
|
||||
require.NoError(t, m.addClientToMonth(2, &generation.Client{Count: 2, Namespace: "other_ns"}, defaultMount, nil))
|
||||
require.NoError(t, m.addClientToMonth(1, &generation.Client{Count: 2}, defaultMount, nil))
|
||||
require.NoError(t, m.addClientToMonth(1, &generation.Client{Count: 2, NonEntity: true}, defaultMount, nil))
|
||||
|
||||
month2Clients := m.months[2].clients
|
||||
month1Clients := m.months[1].clients
|
||||
|
||||
thisMonth := m.months[0]
|
||||
// this will match the first client in month 1
|
||||
require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, Repeated: true}, defaultMount, nil))
|
||||
require.Contains(t, month1Clients, thisMonth.clients[0])
|
||||
|
||||
// this will match the 3rd client in month 1
|
||||
require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, Repeated: true, NonEntity: true}, defaultMount, nil))
|
||||
require.Equal(t, month1Clients[2], thisMonth.clients[1])
|
||||
|
||||
// this will match the first two clients in month 1
|
||||
require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 2, Repeated: true}, defaultMount, nil))
|
||||
require.Equal(t, month1Clients[0:2], thisMonth.clients[2:4])
|
||||
|
||||
// this will match the first client in month 2
|
||||
require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2}, "identity", nil))
|
||||
require.Equal(t, month2Clients[0], thisMonth.clients[4])
|
||||
|
||||
// this will match the 3rd client in month 2
|
||||
require.NoError(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2, Namespace: "other_ns"}, defaultMount, nil))
|
||||
require.Equal(t, month2Clients[2], thisMonth.clients[5])
|
||||
|
||||
require.Error(t, m.addRepeatedClients(0, &generation.Client{Count: 1, RepeatedFromMonth: 2, Namespace: "other_ns"}, "other_mount", nil))
|
||||
}
|
||||
|
||||
// Test_singleMonthActivityClients_populateSegments calls populateSegments for a
|
||||
// collection of 5 clients, segmented in various ways. The test ensures that the
|
||||
// resulting map has the correct clients for each segment index
|
||||
func Test_singleMonthActivityClients_populateSegments(t *testing.T) {
|
||||
clients := []*activity.EntityRecord{
|
||||
{ClientID: "a"},
|
||||
{ClientID: "b"},
|
||||
{ClientID: "c"},
|
||||
{ClientID: "d"},
|
||||
{ClientID: "e"},
|
||||
}
|
||||
cases := []struct {
|
||||
name string
|
||||
segments map[int][]int
|
||||
numSegments int
|
||||
emptyIndexes []int32
|
||||
skipIndexes []int32
|
||||
wantSegments map[int][]*activity.EntityRecord
|
||||
}{
|
||||
{
|
||||
name: "segmented",
|
||||
segments: map[int][]int{
|
||||
0: {0, 1},
|
||||
1: {2, 3},
|
||||
2: {4},
|
||||
},
|
||||
wantSegments: map[int][]*activity.EntityRecord{
|
||||
0: {{ClientID: "a"}, {ClientID: "b"}},
|
||||
1: {{ClientID: "c"}, {ClientID: "d"}},
|
||||
2: {{ClientID: "e"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "segmented with skip and empty",
|
||||
segments: map[int][]int{
|
||||
0: {0, 1},
|
||||
2: {0, 1},
|
||||
},
|
||||
emptyIndexes: []int32{1, 4},
|
||||
skipIndexes: []int32{3},
|
||||
wantSegments: map[int][]*activity.EntityRecord{
|
||||
0: {{ClientID: "a"}, {ClientID: "b"}},
|
||||
1: {},
|
||||
2: {{ClientID: "a"}, {ClientID: "b"}},
|
||||
3: nil,
|
||||
4: {},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "all clients",
|
||||
numSegments: 0,
|
||||
wantSegments: map[int][]*activity.EntityRecord{
|
||||
0: {{ClientID: "a"}, {ClientID: "b"}, {ClientID: "c"}, {ClientID: "d"}, {ClientID: "e"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "all clients split",
|
||||
numSegments: 2,
|
||||
wantSegments: map[int][]*activity.EntityRecord{
|
||||
0: {{ClientID: "a"}, {ClientID: "b"}, {ClientID: "c"}},
|
||||
1: {{ClientID: "d"}, {ClientID: "e"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "all clients with skip and empty",
|
||||
numSegments: 5,
|
||||
skipIndexes: []int32{0, 3},
|
||||
emptyIndexes: []int32{2},
|
||||
wantSegments: map[int][]*activity.EntityRecord{
|
||||
0: nil,
|
||||
1: {{ClientID: "a"}, {ClientID: "b"}, {ClientID: "c"}},
|
||||
2: {},
|
||||
3: nil,
|
||||
4: {{ClientID: "d"}, {ClientID: "e"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
s := singleMonthActivityClients{predefinedSegments: tc.segments, clients: clients, generationParameters: &generation.Data{EmptySegmentIndexes: tc.emptyIndexes, SkipSegmentIndexes: tc.skipIndexes, NumSegments: int32(tc.numSegments)}}
|
||||
gotSegments, err := s.populateSegments()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.wantSegments, gotSegments)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue