diff --git a/changelog/23781.txt b/changelog/23781.txt new file mode 100644 index 000000000..32d3b51e9 --- /dev/null +++ b/changelog/23781.txt @@ -0,0 +1,3 @@ +```release-note:bug +core/activity: Fixes segments fragment loss due to exceeding entry record size limit +``` \ No newline at end of file diff --git a/vault/activity_log.go b/vault/activity_log.go index 216f0c752..03bad4ce6 100644 --- a/vault/activity_log.go +++ b/vault/activity_log.go @@ -60,18 +60,18 @@ const ( activitySegmentWriteTimeout = 1 * time.Minute // Number of client records to store per segment. Each ClientRecord may - // consume upto 99 bytes; rounding it to 100bytes. Considering the storage - // limit of 512KB per storage entry, we can roughly store 512KB/100bytes = - // 5241 clients; rounding down to 5000 clients. - activitySegmentClientCapacity = 5000 + // consume upto 99 bytes; rounding it to 100bytes. This []byte undergo JSON marshalling + // before adding them in storage increasing the size by approximately 4/3 times. Considering the storage + // limit of 512KB per storage entry, we can roughly store 512KB/(100bytes * 4/3) yielding approximately 3820 records. + ActivitySegmentClientCapacity = 3820 // Maximum number of segments per month. This allows for 700K entities per - // month; 700K/5K. These limits are geared towards controlling the storage + // month; 700K/3820 (ActivitySegmentClientCapacity). These limits are geared towards controlling the storage // implications of persisting activity logs. If we hit a scenario where the // storage consequences are less important in comparison to the accuracy of // the client activity, these limits can be further relaxed or even be // removed. - activityLogMaxSegmentPerMonth = 140 + activityLogMaxSegmentPerMonth = 184 // trackedTWESegmentPeriod is a time period of a little over a month, and represents // the amount of time that needs to pass after a 1.9 or later upgrade to result in @@ -351,7 +351,7 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for } // Will all new entities fit? If not, roll over to a new segment. - available := activitySegmentClientCapacity - len(a.currentSegment.currentClients.Clients) + available := ActivitySegmentClientCapacity - len(a.currentSegment.currentClients.Clients) remaining := available - len(newEntities) excess := 0 if remaining < 0 { @@ -389,9 +389,9 @@ func (a *ActivityLog) saveCurrentSegmentToStorageLocked(ctx context.Context, for // Rotate to next segment a.currentSegment.clientSequenceNumber += 1 - if len(excessClients) > activitySegmentClientCapacity { + if len(excessClients) > ActivitySegmentClientCapacity { a.logger.Warn("too many new active clients, dropping tail", "clients", len(excessClients)) - excessClients = excessClients[:activitySegmentClientCapacity] + excessClients = excessClients[:ActivitySegmentClientCapacity] } a.currentSegment.currentClients.Clients = excessClients err := a.saveCurrentSegmentInternal(ctx, force) diff --git a/vault/activity_log_test.go b/vault/activity_log_test.go index 7baf4a709..f0686fe70 100644 --- a/vault/activity_log_test.go +++ b/vault/activity_log_test.go @@ -663,9 +663,11 @@ func TestActivityLog_availableLogs(t *testing.T) { } } -// TestActivityLog_MultipleFragmentsAndSegments adds 4000 clients to a fragment and saves it and reads it. The test then -// adds 4000 more clients and calls receivedFragment with 200 more entities. The current segment is saved to storage and -// read back. The test verifies that there are 5000 clients in the first segment index, then the rest in the second index. +// TestActivityLog_MultipleFragmentsAndSegments adds 4000 clients to a fragment +// and saves it and reads it. The test then adds 4000 more clients and calls +// receivedFragment with 200 more entities. The current segment is saved to +// storage and read back. The test verifies that there are ActivitySegmentClientCapacity clients in the +// first and second segment index, then the rest in the third index. func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) { core, _, _ := TestCoreUnsealed(t) a := core.activityLog @@ -685,6 +687,7 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) { startTimestamp := a.GetStartTimestamp() path0 := fmt.Sprintf("sys/counters/activity/log/entity/%d/0", startTimestamp) path1 := fmt.Sprintf("sys/counters/activity/log/entity/%d/1", startTimestamp) + path2 := fmt.Sprintf("sys/counters/activity/log/entity/%d/2", startTimestamp) tokenPath := fmt.Sprintf("sys/counters/activity/log/directtokens/%d/0", startTimestamp) genID := func(i int) string { @@ -692,7 +695,7 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) { } ts := time.Now().Unix() - // First 4000 should fit in one segment + // First ActivitySegmentClientCapacity should fit in one segment for i := 0; i < 4000; i++ { a.AddEntityToFragment(genID(i), "root", ts) } @@ -705,7 +708,7 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) { default: } - // Save incomplete segment + // Save segment err := a.saveCurrentSegmentToStorage(context.Background(), false) if err != nil { t.Fatalf("got error writing entities to storage: %v", err) @@ -717,8 +720,8 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) { if err != nil { t.Fatalf("could not unmarshal protobuf: %v", err) } - if len(entityLog0.Clients) != 4000 { - t.Fatalf("unexpected entity length. Expected %d, got %d", 4000, len(entityLog0.Clients)) + if len(entityLog0.Clients) != ActivitySegmentClientCapacity { + t.Fatalf("unexpected entity length. Expected %d, got %d", ActivitySegmentClientCapacity, len(entityLog0.Clients)) } // 4000 more local entities @@ -778,8 +781,8 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) { } seqNum := a.GetEntitySequenceNumber() - if seqNum != 1 { - t.Fatalf("expected sequence number 1, got %v", seqNum) + if seqNum != 2 { + t.Fatalf("expected sequence number 2, got %v", seqNum) } protoSegment0 = readSegmentFromStorage(t, core, path0) @@ -787,8 +790,8 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) { if err != nil { t.Fatalf("could not unmarshal protobuf: %v", err) } - if len(entityLog0.Clients) != activitySegmentClientCapacity { - t.Fatalf("unexpected client length. Expected %d, got %d", activitySegmentClientCapacity, + if len(entityLog0.Clients) != ActivitySegmentClientCapacity { + t.Fatalf("unexpected client length. Expected %d, got %d", ActivitySegmentClientCapacity, len(entityLog0.Clients)) } @@ -798,8 +801,19 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) { if err != nil { t.Fatalf("could not unmarshal protobuf: %v", err) } - expectedCount := 8100 - activitySegmentClientCapacity - if len(entityLog1.Clients) != expectedCount { + if len(entityLog1.Clients) != ActivitySegmentClientCapacity { + t.Fatalf("unexpected entity length. Expected %d, got %d", ActivitySegmentClientCapacity, + len(entityLog1.Clients)) + } + + protoSegment2 := readSegmentFromStorage(t, core, path2) + entityLog2 := activity.EntityActivityLog{} + err = proto.Unmarshal(protoSegment2.Value, &entityLog2) + if err != nil { + t.Fatalf("could not unmarshal protobuf: %v", err) + } + expectedCount := 8100 - (ActivitySegmentClientCapacity * 2) + if len(entityLog2.Clients) != expectedCount { t.Fatalf("unexpected entity length. Expected %d, got %d", expectedCount, len(entityLog1.Clients)) } @@ -811,6 +825,9 @@ func TestActivityLog_MultipleFragmentsAndSegments(t *testing.T) { for _, e := range entityLog1.Clients { entityPresent[e.ClientID] = struct{}{} } + for _, e := range entityLog2.Clients { + entityPresent[e.ClientID] = struct{}{} + } for i := 0; i < 8100; i++ { expectedID := genID(i) if _, present := entityPresent[expectedID]; !present {