More cross cluster queue tweaks (#18789)

* Move comment about perf-primary only invalidation

Also remove noisy debug log.

Signed-off-by: Alexander Scheel <alex.scheel@hashicorp.com>

* Remove more noisy log statements during queue

Signed-off-by: Alexander Scheel <alex.scheel@hashicorp.com>

* Skip revocation entries from our current cluster

Signed-off-by: Alexander Scheel <alex.scheel@hashicorp.com>

* Add locking and comment about tidying revoke queue

Signed-off-by: Alexander Scheel <alex.scheel@hashicorp.com>

* Switch to time.Since for tidy

Signed-off-by: Alexander Scheel <alex.scheel@hashicorp.com>

* Refactor tidyStatuses into path_tidy.go

Leaving these in backend.go often causes us to miss adding useful values
to tidyStatus when we add a new config parameter.

Signed-off-by: Alexander Scheel <alex.scheel@hashicorp.com>

* Track the number of deleted revocation request

Signed-off-by: Alexander Scheel <alex.scheel@hashicorp.com>

* Allow tidy to remove confirmed revocation requests

Signed-off-by: Alexander Scheel <alex.scheel@hashicorp.com>

* Add missing field to tidy test

Signed-off-by: Alexander Scheel <alex.scheel@hashicorp.com>

Signed-off-by: Alexander Scheel <alex.scheel@hashicorp.com>
This commit is contained in:
Alexander Scheel 2023-01-23 11:52:38 -05:00 committed by GitHub
parent d0453ed40b
commit ec7502aa44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 132 additions and 55 deletions

View File

@ -261,41 +261,7 @@ type backend struct {
issuersLock sync.RWMutex issuersLock sync.RWMutex
} }
type ( type roleOperation func(ctx context.Context, req *logical.Request, data *framework.FieldData, role *roleEntry) (*logical.Response, error)
tidyStatusState int
roleOperation func(ctx context.Context, req *logical.Request, data *framework.FieldData, role *roleEntry) (*logical.Response, error)
)
const (
tidyStatusInactive tidyStatusState = iota
tidyStatusStarted = iota
tidyStatusFinished = iota
tidyStatusError = iota
tidyStatusCancelling = iota
tidyStatusCancelled = iota
)
type tidyStatus struct {
// Parameters used to initiate the operation
safetyBuffer int
issuerSafetyBuffer int
tidyCertStore bool
tidyRevokedCerts bool
tidyRevokedAssocs bool
tidyExpiredIssuers bool
tidyBackupBundle bool
pauseDuration string
// Status
state tidyStatusState
err error
timeStarted time.Time
timeFinished time.Time
message string
certStoreDeletedCount uint
revokedCertDeletedCount uint
missingIssuerCertCount uint
}
const backendHelp = ` const backendHelp = `
The PKI backend dynamically generates X509 server and client certificates. The PKI backend dynamically generates X509 server and client certificates.
@ -480,19 +446,21 @@ func (b *backend) invalidate(ctx context.Context, key string) {
if !strings.HasSuffix(key, "/confirmed") { if !strings.HasSuffix(key, "/confirmed") {
cluster := split[len(split)-2] cluster := split[len(split)-2]
serial := split[len(split)-1] serial := split[len(split)-1]
// Only process confirmations on the perf primary.
b.crlBuilder.addCertForRevocationCheck(cluster, serial) b.crlBuilder.addCertForRevocationCheck(cluster, serial)
} else { } else {
if len(split) >= 3 { if len(split) >= 3 {
cluster := split[len(split)-3] cluster := split[len(split)-3]
serial := split[len(split)-2] serial := split[len(split)-2]
// Only process confirmations on the perf primary. The
// performance secondaries cannot remove other clusters'
// entries, and so do not need to track them (only to
// ignore them). On performance primary nodes though,
// we do want to track them to remove them.
if !isNotPerfPrimary { if !isNotPerfPrimary {
b.crlBuilder.addCertForRevocationRemoval(cluster, serial) b.crlBuilder.addCertForRevocationRemoval(cluster, serial)
} }
} }
} }
b.Logger().Debug("got replicated cross-cluster revocation: " + key)
} }
} }

View File

@ -3952,6 +3952,7 @@ func TestBackend_RevokePlusTidy_Intermediate(t *testing.T) {
"missing_issuer_cert_count": json.Number("0"), "missing_issuer_cert_count": json.Number("0"),
"current_cert_store_count": json.Number("0"), "current_cert_store_count": json.Number("0"),
"current_revoked_cert_count": json.Number("0"), "current_revoked_cert_count": json.Number("0"),
"revocation_queue_deleted_count": json.Number("0"),
} }
// Let's copy the times from the response so that we can use deep.Equal() // Let's copy the times from the response so that we can use deep.Equal()
timeStarted, ok := tidyStatus.Data["time_started"] timeStarted, ok := tidyStatus.Data["time_started"]

View File

@ -508,9 +508,7 @@ func (cb *crlBuilder) maybeGatherQueueForFirstProcess(sc *storageContext, isNotP
cb.revQueue.Add(entry) cb.revQueue.Add(entry)
} else if !isNotPerfPrimary { } else if !isNotPerfPrimary {
cb.removalQueue.Add(entry) cb.removalQueue.Add(entry)
} else { } // Else, this is a confirmation but we're on a perf secondary so ignore it.
sc.Backend.Logger().Debug(fmt.Sprintf("ignoring confirmed revoked serial %v: %v vs %v ", serial, err, removalEntry))
}
// Overwrite the error; we don't really care about its contents // Overwrite the error; we don't really care about its contents
// at this step. // at this step.
@ -543,12 +541,27 @@ func (cb *crlBuilder) processRevocationQueue(sc *storageContext) error {
removalQueue := cb.removalQueue.Iterate() removalQueue := cb.removalQueue.Iterate()
sc.Backend.Logger().Debug(fmt.Sprintf("gathered %v revocations and %v confirmation entries", len(revQueue), len(removalQueue))) sc.Backend.Logger().Debug(fmt.Sprintf("gathered %v revocations and %v confirmation entries", len(revQueue), len(removalQueue)))
crlConfig, err := cb.getConfigWithUpdate(sc) crlConfig, err := cb.getConfigWithUpdate(sc)
if err != nil { if err != nil {
return err return err
} }
ourClusterId, err := sc.Backend.System().ClusterID(sc.Context)
if err != nil {
return fmt.Errorf("unable to fetch clusterID to ignore local revocation entries: %w", err)
}
for _, req := range revQueue { for _, req := range revQueue {
sc.Backend.Logger().Debug(fmt.Sprintf("handling revocation request: %v", req)) // Regardless of whether we're on the perf primary or a secondary
// cluster, we can safely ignore revocation requests originating
// from our node, because we've already checked them once (when
// they were created).
if ourClusterId != "" && ourClusterId == req.Cluster {
continue
}
// Fetch the revocation entry to ensure it exists.
rPath := crossRevocationPrefix + req.Cluster + "/" + req.Serial rPath := crossRevocationPrefix + req.Cluster + "/" + req.Serial
entry, err := sc.Storage.Get(sc.Context, rPath) entry, err := sc.Storage.Get(sc.Context, rPath)
if err != nil { if err != nil {
@ -562,7 +575,6 @@ func (cb *crlBuilder) processRevocationQueue(sc *storageContext) error {
} }
resp, err := tryRevokeCertBySerial(sc, crlConfig, req.Serial) resp, err := tryRevokeCertBySerial(sc, crlConfig, req.Serial)
sc.Backend.Logger().Debug(fmt.Sprintf("checked local revocation entry: %v / %v", resp, err))
if err == nil && resp != nil && !resp.IsError() && resp.Data != nil && resp.Data["state"].(string) == "revoked" { if err == nil && resp != nil && !resp.IsError() && resp.Data != nil && resp.Data["state"].(string) == "revoked" {
if isNotPerfPrimary { if isNotPerfPrimary {
// Write a revocation queue removal entry. // Write a revocation queue removal entry.
@ -597,7 +609,9 @@ func (cb *crlBuilder) processRevocationQueue(sc *storageContext) error {
} }
if isNotPerfPrimary { if isNotPerfPrimary {
sc.Backend.Logger().Debug(fmt.Sprintf("not on perf primary so done; ignoring any revocation confirmations")) sc.Backend.Logger().Debug(fmt.Sprintf("not on perf primary so ignoring any revocation confirmations"))
// See note in pki/backend.go; this should be empty.
cb.removalQueue.RemoveAll() cb.removalQueue.RemoveAll()
cb.haveInitializedQueue = true cb.haveInitializedQueue = true
return nil return nil
@ -609,7 +623,6 @@ func (cb *crlBuilder) processRevocationQueue(sc *storageContext) error {
} }
for _, entry := range removalQueue { for _, entry := range removalQueue {
sc.Backend.Logger().Debug(fmt.Sprintf("handling revocation confirmation: %v", entry))
// First remove the revocation request. // First remove the revocation request.
for cIndex, cluster := range clusters { for cIndex, cluster := range clusters {
eEntry := crossRevocationPrefix + cluster + entry.Serial eEntry := crossRevocationPrefix + cluster + entry.Serial

View File

@ -19,6 +19,40 @@ import (
var tidyCancelledError = errors.New("tidy operation cancelled") var tidyCancelledError = errors.New("tidy operation cancelled")
type tidyStatusState int
const (
tidyStatusInactive tidyStatusState = iota
tidyStatusStarted = iota
tidyStatusFinished = iota
tidyStatusError = iota
tidyStatusCancelling = iota
tidyStatusCancelled = iota
)
type tidyStatus struct {
// Parameters used to initiate the operation
safetyBuffer int
issuerSafetyBuffer int
tidyCertStore bool
tidyRevokedCerts bool
tidyRevokedAssocs bool
tidyExpiredIssuers bool
tidyBackupBundle bool
pauseDuration string
// Status
state tidyStatusState
err error
timeStarted time.Time
timeFinished time.Time
message string
certStoreDeletedCount uint
revokedCertDeletedCount uint
missingIssuerCertCount uint
revQueueDeletedCount uint
}
type tidyConfig struct { type tidyConfig struct {
Enabled bool `json:"enabled"` Enabled bool `json:"enabled"`
Interval time.Duration `json:"interval_duration"` Interval time.Duration `json:"interval_duration"`
@ -683,8 +717,15 @@ func (b *backend) doTidyRevocationQueue(ctx context.Context, req *logical.Reques
return fmt.Errorf("failed to list cross-cluster revocation queue participating clusters: %w", err) return fmt.Errorf("failed to list cross-cluster revocation queue participating clusters: %w", err)
} }
// Grab locks as we're potentially modifying revocation-related storage.
b.revokeStorageLock.Lock()
defer b.revokeStorageLock.Unlock()
for cIndex, cluster := range clusters { for cIndex, cluster := range clusters {
if cluster[len(cluster)-1] == '/' {
cluster = cluster[0 : len(cluster)-1] cluster = cluster[0 : len(cluster)-1]
}
cPath := crossRevocationPrefix + cluster + "/" cPath := crossRevocationPrefix + cluster + "/"
serials, err := sc.Storage.List(sc.Context, cPath) serials, err := sc.Storage.List(sc.Context, cPath)
if err != nil { if err != nil {
@ -692,17 +733,50 @@ func (b *backend) doTidyRevocationQueue(ctx context.Context, req *logical.Reques
} }
for _, serial := range serials { for _, serial := range serials {
// Check for pause duration to reduce resource consumption.
if config.PauseDuration > (0 * time.Second) {
b.revokeStorageLock.Unlock()
time.Sleep(config.PauseDuration)
b.revokeStorageLock.Lock()
}
// Confirmation entries _should_ be handled by this cluster's // Confirmation entries _should_ be handled by this cluster's
// processRevocationQueue(...) invocation; if not, when the plugin // processRevocationQueue(...) invocation; if not, when the plugin
// reloads, maybeGatherQueueForFirstProcess(...) will remove all // reloads, maybeGatherQueueForFirstProcess(...) will remove all
// stale confirmation requests. // stale confirmation requests. However, we don't want to force an
// operator to reload their in-use plugin, so allow tidy to also
// clean up confirmation values without reloading.
if serial[len(serial)-1] == '/' { if serial[len(serial)-1] == '/' {
// Check if we have a confirmed entry.
confirmedPath := cPath + serial + "confirmed"
removalEntry, err := sc.Storage.Get(sc.Context, confirmedPath)
if err != nil {
return fmt.Errorf("error reading revocation confirmation (%v) during tidy: %w", confirmedPath, err)
}
if removalEntry == nil {
continue continue
} }
// Check for pause duration to reduce resource consumption. // Remove potential revocation requests from all clusters.
if config.PauseDuration > (0 * time.Second) { for _, subCluster := range clusters {
time.Sleep(config.PauseDuration) if subCluster[len(subCluster)-1] == '/' {
subCluster = subCluster[0 : len(subCluster)-1]
}
reqPath := subCluster + "/" + serial[0:len(serial)-1]
if err := sc.Storage.Delete(sc.Context, reqPath); err != nil {
return fmt.Errorf("failed to remove confirmed revocation request on candidate cluster (%v): %w", reqPath, err)
}
}
// Then delete the confirmation.
if err := sc.Storage.Delete(sc.Context, confirmedPath); err != nil {
return fmt.Errorf("failed to remove confirmed revocation confirmation (%v): %w", confirmedPath, err)
}
// No need to handle a revocation request at this path: it can't
// still exist on this cluster after we deleted it above.
continue
} }
ePath := cPath + serial ePath := cPath + serial
@ -719,9 +793,7 @@ func (b *backend) doTidyRevocationQueue(ctx context.Context, req *logical.Reques
return fmt.Errorf("error reading revocation request (%v) to tidy: %w", ePath, err) return fmt.Errorf("error reading revocation request (%v) to tidy: %w", ePath, err)
} }
now := time.Now() if time.Since(revRequest.RequestedAt) > config.QueueSafetyBuffer {
afterBuffer := now.Add(-1 * config.QueueSafetyBuffer)
if revRequest.RequestedAt.After(afterBuffer) {
continue continue
} }
@ -729,6 +801,20 @@ func (b *backend) doTidyRevocationQueue(ctx context.Context, req *logical.Reques
if err := sc.Storage.Delete(sc.Context, ePath); err != nil { if err := sc.Storage.Delete(sc.Context, ePath); err != nil {
return fmt.Errorf("error deleting revocation request (%v): %w", ePath, err) return fmt.Errorf("error deleting revocation request (%v): %w", ePath, err)
} }
// Assumption: there should never be a need to remove this from
// the processing queue on this node. We're on the active primary,
// so our writes don't cause invalidations. This means we'd have
// to have slated it for deletion very quickly after it'd been
// sent (i.e., inside of the 1-minute boundary that periodicFunc
// executes at). While this is possible, because we grab the
// revocationStorageLock above, we can't execute interleaved
// with that periodicFunc, so the periodicFunc would've had to
// finished before we actually did this deletion (or it wouldn't
// have ignored this serial because our deletion would've
// happened prior to it reading the storage entry). Thus we should
// be safe to ignore the revocation queue removal here.
b.tidyStatusIncRevQueueCount()
} }
} }
@ -782,6 +868,7 @@ func (b *backend) pathTidyStatusRead(_ context.Context, _ *logical.Request, _ *f
"missing_issuer_cert_count": nil, "missing_issuer_cert_count": nil,
"current_cert_store_count": nil, "current_cert_store_count": nil,
"current_revoked_cert_count": nil, "current_revoked_cert_count": nil,
"revocation_queue_deleted_count": nil,
}, },
} }
@ -802,6 +889,7 @@ func (b *backend) pathTidyStatusRead(_ context.Context, _ *logical.Request, _ *f
resp.Data["cert_store_deleted_count"] = b.tidyStatus.certStoreDeletedCount resp.Data["cert_store_deleted_count"] = b.tidyStatus.certStoreDeletedCount
resp.Data["revoked_cert_deleted_count"] = b.tidyStatus.revokedCertDeletedCount resp.Data["revoked_cert_deleted_count"] = b.tidyStatus.revokedCertDeletedCount
resp.Data["missing_issuer_cert_count"] = b.tidyStatus.missingIssuerCertCount resp.Data["missing_issuer_cert_count"] = b.tidyStatus.missingIssuerCertCount
resp.Data["revocation_queue_deleted_count"] = b.tidyStatus.revQueueDeletedCount
switch b.tidyStatus.state { switch b.tidyStatus.state {
case tidyStatusStarted: case tidyStatusStarted:
@ -1038,6 +1126,13 @@ func (b *backend) tidyStatusIncMissingIssuerCertCount() {
b.tidyStatus.missingIssuerCertCount++ b.tidyStatus.missingIssuerCertCount++
} }
func (b *backend) tidyStatusIncRevQueueCount() {
b.tidyStatusLock.Lock()
defer b.tidyStatusLock.Unlock()
b.tidyStatus.revQueueDeletedCount++
}
const pathTidyHelpSyn = ` const pathTidyHelpSyn = `
Tidy up the backend by removing expired certificates, revocation information, Tidy up the backend by removing expired certificates, revocation information,
or both. or both.