Basics of Cert-Count Non-Locking Telemetry (#16676)

Basics of Cert-Count Telemetry, changelog,  "best attempt" slice to capture (and test for) duplicates, Move sorting of possibleDoubleCountedRevokedSerials to after compare of entries. Add values to counter when still initializing.
Set lists to nil after use, Fix atomic2 import, Delay reporting metrics until after deduplication has completed, 
The test works now, Move string slice to helper function; Add backendUUID to gauge name.
This commit is contained in:
Kit Haines 2022-09-20 13:32:20 -04:00 committed by GitHub
parent 2e197fcfcd
commit f2adbb3e47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 380 additions and 16 deletions

View File

@ -207,7 +207,7 @@ func Test_combinePolicyDocuments(t *testing.T) {
`{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "NotAction": "ec2:DescribeAvailabilityZones", "Resource": "*"}]}`,
},
expectedOutput: `{"Version": "2012-10-17","Statement":[{"Effect": "Allow","NotAction": "ec2:DescribeAvailabilityZones", "Resource": "*"}]}`,
expectedErr: false,
expectedErr: false,
},
{
description: "one blank policy",

View File

@ -3,11 +3,14 @@ package pki
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
atomic2 "go.uber.org/atomic"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/armon/go-metrics"
@ -203,6 +206,13 @@ func Backend(conf *logical.BackendConfig) *backend {
// Delay the first tidy until after we've started up.
b.lastTidy = time.Now()
// Metrics initialization for count of certificates in storage
b.certsCounted = atomic2.NewBool(false)
b.certCount = new(uint32)
b.revokedCertCount = new(uint32)
b.possibleDoubleCountedSerials = make([]string, 0, 250)
b.possibleDoubleCountedRevokedSerials = make([]string, 0, 250)
return &b
}
@ -219,6 +229,12 @@ type backend struct {
tidyStatus *tidyStatus
lastTidy time.Time
certCount *uint32
revokedCertCount *uint32
certsCounted *atomic2.Bool
possibleDoubleCountedSerials []string
possibleDoubleCountedRevokedSerials []string
pkiStorageVersion atomic.Value
crlBuilder *crlBuilder
@ -330,6 +346,21 @@ func (b *backend) initialize(ctx context.Context, _ *logical.InitializationReque
return err
}
err := b.initializePKIIssuersStorage(ctx)
if err != nil {
return err
}
// Initialize also needs to populate our certificate and revoked certificate count
err = b.initializeStoredCertificateCounts(ctx)
if err != nil {
return err
}
return nil
}
func (b *backend) initializePKIIssuersStorage(ctx context.Context) error {
// Grab the lock prior to the updating of the storage lock preventing us flipping
// the storage flag midway through the request stream of other requests.
b.issuersLock.Lock()
@ -532,3 +563,160 @@ func (b *backend) periodicFunc(ctx context.Context, request *logical.Request) er
// All good!
return nil
}
func (b *backend) initializeStoredCertificateCounts(ctx context.Context) error {
b.tidyStatusLock.RLock()
defer b.tidyStatusLock.RUnlock()
// For performance reasons, we can't lock on issuance/storage of certs until a list operation completes,
// but we want to limit possible miscounts / double-counts to over-counting, so we take the tidy lock which
// prevents (most) deletions - in particular we take a read lock (sufficient to block the write lock in
// tidyStatusStart while allowing tidy to still acquire a read lock to report via its endpoint)
entries, err := b.storage.List(ctx, "certs/")
if err != nil {
return err
}
atomic.AddUint32(b.certCount, uint32(len(entries)))
revokedEntries, err := b.storage.List(ctx, "revoked/")
if err != nil {
return err
}
atomic.AddUint32(b.revokedCertCount, uint32(len(revokedEntries)))
b.certsCounted.Store(true)
// Now that the metrics are set, we can switch from appending newly-stored certificates to the possible double-count
// list, and instead have them update the counter directly. We need to do this so that we are looking at a static
// slice of possibly double counted serials. Note that certsCounted is computed before the storage operation, so
// there may be some delay here.
// Sort the listed-entries first, to accommodate that delay.
sort.Slice(entries, func(i, j int) bool {
return entries[i] < entries[j]
})
sort.Slice(revokedEntries, func(i, j int) bool {
return revokedEntries[i] < revokedEntries[j]
})
// We assume here that these lists are now complete.
sort.Slice(b.possibleDoubleCountedSerials, func(i, j int) bool {
return b.possibleDoubleCountedSerials[i] < b.possibleDoubleCountedSerials[j]
})
listEntriesIndex := 0
possibleDoubleCountIndex := 0
for {
if listEntriesIndex >= len(entries) {
break
}
if possibleDoubleCountIndex >= len(b.possibleDoubleCountedSerials) {
break
}
if entries[listEntriesIndex] == b.possibleDoubleCountedSerials[possibleDoubleCountIndex] {
// This represents a double-counted entry
b.decrementTotalCertificatesCountNoReport()
listEntriesIndex = listEntriesIndex + 1
possibleDoubleCountIndex = possibleDoubleCountIndex + 1
continue
}
if entries[listEntriesIndex] < b.possibleDoubleCountedSerials[possibleDoubleCountIndex] {
listEntriesIndex = listEntriesIndex + 1
continue
}
if entries[listEntriesIndex] > b.possibleDoubleCountedSerials[possibleDoubleCountIndex] {
possibleDoubleCountIndex = possibleDoubleCountIndex + 1
continue
}
}
sort.Slice(b.possibleDoubleCountedRevokedSerials, func(i, j int) bool {
return b.possibleDoubleCountedRevokedSerials[i] < b.possibleDoubleCountedRevokedSerials[j]
})
listRevokedEntriesIndex := 0
possibleRevokedDoubleCountIndex := 0
for {
if listRevokedEntriesIndex >= len(revokedEntries) {
break
}
if possibleRevokedDoubleCountIndex >= len(b.possibleDoubleCountedRevokedSerials) {
break
}
if revokedEntries[listRevokedEntriesIndex] == b.possibleDoubleCountedRevokedSerials[possibleRevokedDoubleCountIndex] {
// This represents a double-counted revoked entry
b.decrementTotalRevokedCertificatesCountNoReport()
listRevokedEntriesIndex = listRevokedEntriesIndex + 1
possibleRevokedDoubleCountIndex = possibleRevokedDoubleCountIndex + 1
continue
}
if revokedEntries[listRevokedEntriesIndex] < b.possibleDoubleCountedRevokedSerials[possibleRevokedDoubleCountIndex] {
listRevokedEntriesIndex = listRevokedEntriesIndex + 1
continue
}
if revokedEntries[listRevokedEntriesIndex] > b.possibleDoubleCountedRevokedSerials[possibleRevokedDoubleCountIndex] {
possibleRevokedDoubleCountIndex = possibleRevokedDoubleCountIndex + 1
continue
}
}
b.possibleDoubleCountedRevokedSerials = nil
b.possibleDoubleCountedSerials = nil
metrics.SetGauge([]string{"secrets", "pki", b.backendUUID, "total_certificates_stored"}, float32(*b.certCount))
metrics.SetGauge([]string{"secrets", "pki", b.backendUUID, "total_revoked_certificates_stored"}, float32(*b.revokedCertCount))
return nil
}
// The "certsCounted" boolean here should be loaded from the backend certsCounted before the corresponding storage call:
// eg. certsCounted := b.certsCounted.Load()
func (b *backend) incrementTotalCertificatesCount(certsCounted bool, newSerial string) {
atomic.AddUint32(b.certCount, 1)
switch {
case !certsCounted:
// This is unsafe, but a good best-attempt
if strings.HasPrefix(newSerial, "certs/") {
newSerial = newSerial[6:]
}
b.possibleDoubleCountedSerials = append(b.possibleDoubleCountedSerials, newSerial)
default:
metrics.SetGauge([]string{"secrets", "pki", b.backendUUID, "total_certificates_stored"}, float32(*b.certCount))
}
}
func (b *backend) decrementTotalCertificatesCountReport() {
b.decrementTotalCertificatesCountNoReport()
metrics.SetGauge([]string{"secrets", "pki", b.backendUUID, "total_certificates_stored"}, float32(*b.certCount))
}
// Called directly only by the initialize function to deduplicate the count, when we don't have a full count yet
func (b *backend) decrementTotalCertificatesCountNoReport() {
atomic.AddUint32(b.certCount, ^uint32(0))
}
// The "certsCounted" boolean here should be loaded from the backend certsCounted before the corresponding storage call:
// eg. certsCounted := b.certsCounted.Load()
func (b *backend) incrementTotalRevokedCertificatesCount(certsCounted bool, newSerial string) {
atomic.AddUint32(b.revokedCertCount, 1)
switch {
case !certsCounted:
// This is unsafe, but a good best-attempt
if strings.HasPrefix(newSerial, "revoked/") { // allow passing in the path (revoked/serial) OR the serial
newSerial = newSerial[8:]
}
b.possibleDoubleCountedRevokedSerials = append(b.possibleDoubleCountedRevokedSerials, newSerial)
default:
metrics.SetGauge([]string{"secrets", "pki", b.backendUUID, "total_revoked_certificates_stored"}, float32(*b.revokedCertCount))
}
}
func (b *backend) decrementTotalRevokedCertificatesCountReport() {
b.decrementTotalRevokedCertificatesCountNoReport()
metrics.SetGauge([]string{"secrets", "pki", b.backendUUID, "total_revoked_certificates_stored"}, float32(*b.revokedCertCount))
}
// Called directly only by the initialize function to deduplicate the count, when we don't have a full count yet
func (b *backend) decrementTotalRevokedCertificatesCountNoReport() {
atomic.AddUint32(b.revokedCertCount, ^uint32(0))
}

View File

@ -27,6 +27,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -3456,7 +3457,7 @@ func TestBackend_AllowedDomainsTemplate(t *testing.T) {
t.Fatal(err)
}
// Issue certificate for foobar.com to verify allowed_domain_templae doesnt break plain domains.
// Issue certificate for foobar.com to verify allowed_domain_template doesn't break plain domains.
_, err = client.Logical().Write("pki/issue/test", map[string]interface{}{"common_name": "foobar.com"})
if err != nil {
t.Fatal(err)
@ -3731,7 +3732,10 @@ func TestBackend_RevokePlusTidy_Intermediate(t *testing.T) {
metricsConf.EnableServiceLabel = false
metricsConf.EnableTypePrefix = false
metrics.NewGlobal(metricsConf, inmemSink)
_, err := metrics.NewGlobal(metricsConf, inmemSink)
if err != nil {
t.Fatal(err)
}
// Enable PKI secret engine
coreConfig := &vault.CoreConfig{
@ -3748,8 +3752,6 @@ func TestBackend_RevokePlusTidy_Intermediate(t *testing.T) {
vault.TestWaitActive(t, cores[0].Core)
client := cores[0].Client
var err error
// Mount /pki as a root CA
err = client.Sys().Mount("pki", &api.MountInput{
Type: "pki",
@ -3762,6 +3764,22 @@ func TestBackend_RevokePlusTidy_Intermediate(t *testing.T) {
t.Fatal(err)
}
// Check the metrics initialized in order to calculate backendUUID for /pki
// BackendUUID not consistent during tests with UUID from /sys/mounts/pki
metricsSuffix := "total_certificates_stored"
backendUUID := ""
mostRecentInterval := inmemSink.Data()[len(inmemSink.Data())-1]
for _, existingGauge := range mostRecentInterval.Gauges {
if strings.HasSuffix(existingGauge.Name, metricsSuffix) {
expandedGaugeName := existingGauge.Name
backendUUID = strings.Split(expandedGaugeName, ".")[2]
break
}
}
if backendUUID == "" {
t.Fatalf("No Gauge Found ending with %s", metricsSuffix)
}
// Set the cluster's certificate as the root CA in /pki
pemBundleRootCA := string(cluster.CACertPEM) + string(cluster.CAKeyPEM)
_, err = client.Logical().Write("pki/config/ca", map[string]interface{}{
@ -3819,6 +3837,21 @@ func TestBackend_RevokePlusTidy_Intermediate(t *testing.T) {
t.Fatal(err)
}
// Check the cert-count metrics
expectedCertCountGaugeMetrics := map[string]float32{
"secrets.pki." + backendUUID + ".total_revoked_certificates_stored": 1,
"secrets.pki." + backendUUID + ".total_certificates_stored": 1,
}
mostRecentInterval = inmemSink.Data()[len(inmemSink.Data())-1]
for gauge, value := range expectedCertCountGaugeMetrics {
if _, ok := mostRecentInterval.Gauges[gauge]; !ok {
t.Fatalf("Expected metrics to include a value for gauge %s", gauge)
}
if value != mostRecentInterval.Gauges[gauge].Value {
t.Fatalf("Expected value metric %s to be %f but got %f", gauge, value, mostRecentInterval.Gauges[gauge].Value)
}
}
// Revoke adds a fixed 2s buffer, so we sleep for a bit longer to ensure
// the revocation time is past the current time.
time.Sleep(3 * time.Second)
@ -3886,6 +3919,8 @@ func TestBackend_RevokePlusTidy_Intermediate(t *testing.T) {
"cert_store_deleted_count": json.Number("1"),
"revoked_cert_deleted_count": json.Number("1"),
"missing_issuer_cert_count": json.Number("0"),
"current_cert_store_count": json.Number("0"),
"current_revoked_cert_count": json.Number("0"),
}
// Let's copy the times from the response so that we can use deep.Equal()
timeStarted, ok := tidyStatus.Data["time_started"]
@ -3907,13 +3942,15 @@ func TestBackend_RevokePlusTidy_Intermediate(t *testing.T) {
{
// Map of gauges to expected value
expectedGauges := map[string]float32{
"secrets.pki.tidy.cert_store_current_entry": 0,
"secrets.pki.tidy.cert_store_total_entries": 1,
"secrets.pki.tidy.revoked_cert_current_entry": 0,
"secrets.pki.tidy.revoked_cert_total_entries": 1,
"secrets.pki.tidy.start_time_epoch": 0,
"secrets.pki.tidy.cert_store_total_entries_remaining": 0,
"secrets.pki.tidy.revoked_cert_total_entries_remaining": 0,
"secrets.pki.tidy.cert_store_current_entry": 0,
"secrets.pki.tidy.cert_store_total_entries": 1,
"secrets.pki.tidy.revoked_cert_current_entry": 0,
"secrets.pki.tidy.revoked_cert_total_entries": 1,
"secrets.pki.tidy.start_time_epoch": 0,
"secrets.pki." + backendUUID + ".total_certificates_stored": 0,
"secrets.pki." + backendUUID + ".total_revoked_certificates_stored": 0,
"secrets.pki.tidy.cert_store_total_entries_remaining": 0,
"secrets.pki.tidy.revoked_cert_total_entries_remaining": 0,
}
// Map of counters to the sum of the metrics for that counter
expectedCounters := map[string]float64{
@ -3923,7 +3960,7 @@ func TestBackend_RevokePlusTidy_Intermediate(t *testing.T) {
// Note that "secrets.pki.tidy.failure" won't be in the captured metrics
}
// If the metrics span mnore than one interval, skip the checks
// If the metrics span more than one interval, skip the checks
intervals := inmemSink.Data()
if len(intervals) == 1 {
interval := inmemSink.Data()[0]
@ -5462,6 +5499,109 @@ func TestBackend_IfModifiedSinceHeaders(t *testing.T) {
}
}
func TestBackend_InitializeCertificateCounts(t *testing.T) {
t.Parallel()
b, s := createBackendWithStorage(t)
ctx := context.Background()
// Set up an Issuer and Role
// We need a root certificate to write/revoke certificates with
resp, err := CBWrite(b, s, "root/generate/internal", map[string]interface{}{
"common_name": "myvault.com",
})
if err != nil {
t.Fatal(err)
}
if resp == nil {
t.Fatal("expected ca info")
}
// Create a role
_, err = CBWrite(b, s, "roles/example", map[string]interface{}{
"allowed_domains": "myvault.com",
"allow_bare_domains": true,
"allow_subdomains": true,
"max_ttl": "2h",
})
if err != nil {
t.Fatal(err)
}
// Put certificates A, B, C, D, E in backend
var certificates []string = []string{"a", "b", "c", "d", "e"}
serials := make([]string, 5)
for i, cn := range certificates {
resp, err = CBWrite(b, s, "issue/example", map[string]interface{}{
"common_name": cn + ".myvault.com",
})
if err != nil {
t.Fatal(err)
}
serials[i] = resp.Data["serial_number"].(string)
}
// Revoke certificates A + B
revocations := serials[0:2]
for _, key := range revocations {
resp, err = CBWrite(b, s, "revoke", map[string]interface{}{
"serial_number": key,
})
if err != nil {
t.Fatal(err)
}
}
// Assert initialize from clean is correct:
b.initializeStoredCertificateCounts(ctx)
if *b.certCount != 6 {
t.Fatalf("Failed to count six certificates root,A,B,C,D,E, instead counted %d certs", *b.certCount)
}
if *b.revokedCertCount != 2 {
t.Fatalf("Failed to count two revoked certificates A+B, instead counted %d certs", *b.revokedCertCount)
}
// Simulates listing while initialize in progress, by "restarting it"
atomic.StoreUint32(b.certCount, 0)
atomic.StoreUint32(b.revokedCertCount, 0)
b.certsCounted.Store(false)
// Revoke certificates C, D
dirtyRevocations := serials[2:4]
for _, key := range dirtyRevocations {
resp, err = CBWrite(b, s, "revoke", map[string]interface{}{
"serial_number": key,
})
if err != nil {
t.Fatal(err)
}
}
// Put certificates F, G in the backend
dirtyCertificates := []string{"f", "g"}
for _, cn := range dirtyCertificates {
resp, err = CBWrite(b, s, "issue/example", map[string]interface{}{
"common_name": cn + ".myvault.com",
})
if err != nil {
t.Fatal(err)
}
}
// Run initialize
b.initializeStoredCertificateCounts(ctx)
// Test certificate count
if *(b.certCount) != 8 {
t.Fatalf("Failed to initialize count of certificates root, A,B,C,D,E,F,G counted %d certs", *(b.certCount))
}
if *(b.revokedCertCount) != 4 {
t.Fatalf("Failed to count revoked certificates A,B,C,D counted %d certs", *(b.revokedCertCount))
}
return
}
// Verify that our default values are consistent when creating an issuer and when we do an
// empty POST update to it. This will hopefully identify if we have different default values
// for fields across the two APIs.

View File

@ -226,10 +226,18 @@ func fetchCertBySerial(ctx context.Context, b *backend, req *logical.Request, pr
// Update old-style paths to new-style paths
certEntry.Key = path
certsCounted := b.certsCounted.Load()
if err = req.Storage.Put(ctx, certEntry); err != nil {
return nil, errutil.InternalError{Err: fmt.Sprintf("error saving certificate with serial %s to new location", serial)}
}
if err = req.Storage.Delete(ctx, legacyPath); err != nil {
// If we fail here, we have an extra (copy) of a cert in storage, add to metrics:
switch {
case strings.HasPrefix(prefix, "revoked/"):
b.incrementTotalRevokedCertificatesCount(certsCounted, path)
default:
b.incrementTotalCertificatesCount(certsCounted, path)
}
return nil, errutil.InternalError{Err: fmt.Sprintf("error deleting certificate with serial %s from old location", serial)}
}

View File

@ -577,10 +577,12 @@ func revokeCert(ctx context.Context, b *backend, req *logical.Request, serial st
return nil, fmt.Errorf("error creating revocation entry")
}
certsCounted := b.certsCounted.Load()
err = req.Storage.Put(ctx, revEntry)
if err != nil {
return nil, fmt.Errorf("error saving revoked certificate to new location")
}
b.incrementTotalRevokedCertificatesCount(certsCounted, revEntry.Key)
}
// Fetch the config and see if we need to rebuild the CRL. If we have

View File

@ -409,13 +409,16 @@ func (b *backend) pathIssueSignCert(ctx context.Context, req *logical.Request, d
}
if !role.NoStore {
key := "certs/" + normalizeSerial(cb.SerialNumber)
certsCounted := b.certsCounted.Load()
err = req.Storage.Put(ctx, &logical.StorageEntry{
Key: "certs/" + normalizeSerial(cb.SerialNumber),
Key: key,
Value: parsedBundle.CertificateBytes,
})
if err != nil {
return nil, fmt.Errorf("unable to store certificate locally: %w", err)
}
b.incrementTotalCertificatesCount(certsCounted, key)
}
if useCSR {

View File

@ -257,13 +257,16 @@ func (b *backend) pathCAGenerateRoot(ctx context.Context, req *logical.Request,
// Also store it as just the certificate identified by serial number, so it
// can be revoked
key := "certs/" + normalizeSerial(cb.SerialNumber)
certsCounted := b.certsCounted.Load()
err = req.Storage.Put(ctx, &logical.StorageEntry{
Key: "certs/" + normalizeSerial(cb.SerialNumber),
Key: key,
Value: parsedBundle.CertificateBytes,
})
if err != nil {
return nil, fmt.Errorf("unable to store certificate locally: %w", err)
}
b.incrementTotalCertificatesCount(certsCounted, key)
// Build a fresh CRL
err = b.crlBuilder.rebuild(ctx, b, req, true)
@ -441,13 +444,16 @@ func (b *backend) pathIssuerSignIntermediate(ctx context.Context, req *logical.R
return nil, fmt.Errorf("unsupported format argument: %s", format)
}
key := "certs/" + normalizeSerial(cb.SerialNumber)
certsCounted := b.certsCounted.Load()
err = req.Storage.Put(ctx, &logical.StorageEntry{
Key: "certs/" + normalizeSerial(cb.SerialNumber),
Key: key,
Value: parsedBundle.CertificateBytes,
})
if err != nil {
return nil, fmt.Errorf("unable to store certificate locally: %w", err)
}
b.incrementTotalCertificatesCount(certsCounted, key)
if parsedBundle.Certificate.MaxPathLen == 0 {
resp.AddWarning("Max path length of the signed certificate is zero. This certificate cannot be used to issue intermediate CA certificates.")

View File

@ -493,6 +493,8 @@ func (b *backend) pathTidyStatusRead(_ context.Context, _ *logical.Request, _ *f
"cert_store_deleted_count": nil,
"revoked_cert_deleted_count": nil,
"missing_issuer_cert_count": nil,
"current_cert_store_count": nil,
"current_revoked_cert_count": nil,
},
}
@ -531,6 +533,14 @@ func (b *backend) pathTidyStatusRead(_ context.Context, _ *logical.Request, _ *f
resp.Data["time_finished"] = b.tidyStatus.timeFinished
}
resp.Data["current_cert_store_count"] = b.certCount
resp.Data["current_revoked_cert_count"] = b.revokedCertCount
if !b.certsCounted.Load() {
resp.AddWarning("Certificates in storage are still being counted, current counts provided may be " +
"inaccurate")
}
return resp, nil
}
@ -665,6 +675,8 @@ func (b *backend) tidyStatusIncCertStoreCount() {
defer b.tidyStatusLock.Unlock()
b.tidyStatus.certStoreDeletedCount++
b.decrementTotalCertificatesCountReport()
}
func (b *backend) tidyStatusIncRevokedCertCount() {
@ -672,6 +684,8 @@ func (b *backend) tidyStatusIncRevokedCertCount() {
defer b.tidyStatusLock.Unlock()
b.tidyStatus.revokedCertDeletedCount++
b.decrementTotalRevokedCertificatesCountReport()
}
func (b *backend) tidyStatusIncMissingIssuerCertCount() {

3
changelog/16676.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
secrets/pki: Added gauge metrics "secrets.pki.total_revoked_certificates_stored" and "secrets.pki.total_certificates_stored" to track the number of certificates in storage.
```