Unified revocation migration code (#18866)
* Unified revocation migration code - Add a periodic function that will list the local revocations and if any are missing from the unified revocation area will force a write to the unified revocation folder/remote instance. * PR Feedback - Do not transfer expired certificates to unified space from local - Move new periodic code into a periodic.go file - Add a flag so we only run this stuff once if all is good, with a force flag if we encounter errors or if unified_crl is toggled on * PR feedback take 2
This commit is contained in:
parent
1cef81f025
commit
d12534c2bd
|
@ -9,6 +9,8 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
|
||||
atomic2 "go.uber.org/atomic"
|
||||
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
|
@ -239,6 +241,8 @@ func Backend(conf *logical.BackendConfig) *backend {
|
|||
b.possibleDoubleCountedSerials = make([]string, 0, 250)
|
||||
b.possibleDoubleCountedRevokedSerials = make([]string, 0, 250)
|
||||
|
||||
b.unifiedTransferStatus = newUnifiedTransferStatus()
|
||||
|
||||
return &b
|
||||
}
|
||||
|
||||
|
@ -255,6 +259,8 @@ type backend struct {
|
|||
tidyStatus *tidyStatus
|
||||
lastTidy time.Time
|
||||
|
||||
unifiedTransferStatus *unifiedTransferStatus
|
||||
|
||||
certCount *uint32
|
||||
revokedCertCount *uint32
|
||||
certsCounted *atomic2.Bool
|
||||
|
@ -563,19 +569,23 @@ func (b *backend) periodicFunc(ctx context.Context, request *logical.Request) er
|
|||
return nil
|
||||
}
|
||||
|
||||
backgroundSc := b.makeStorageContext(context.Background(), b.storage)
|
||||
go runUnifiedTransfer(backgroundSc)
|
||||
|
||||
crlErr := doCRL()
|
||||
tidyErr := doAutoTidy()
|
||||
|
||||
if crlErr != nil && tidyErr != nil {
|
||||
return fmt.Errorf("Error building CRLs:\n - %v\n\nError running auto-tidy:\n - %w\n", crlErr, tidyErr)
|
||||
}
|
||||
|
||||
var errors error
|
||||
if crlErr != nil {
|
||||
return fmt.Errorf("Error building CRLs:\n - %w\n", crlErr)
|
||||
errors = multierror.Append(errors, fmt.Errorf("Error building CRLs:\n - %w\n", crlErr))
|
||||
}
|
||||
|
||||
if tidyErr != nil {
|
||||
return fmt.Errorf("Error running auto-tidy:\n - %w\n", tidyErr)
|
||||
errors = multierror.Append(errors, fmt.Errorf("Error running auto-tidy:\n - %w\n", tidyErr))
|
||||
}
|
||||
|
||||
if errors != nil {
|
||||
return errors
|
||||
}
|
||||
|
||||
// Check if the CRL was invalidated due to issuer swap and update
|
||||
|
|
|
@ -94,6 +94,7 @@ type crlBuilder struct {
|
|||
_config sync.RWMutex
|
||||
dirty *atomic2.Bool
|
||||
config crlConfig
|
||||
haveInitializedConfig bool
|
||||
|
||||
// Whether to invalidate our LastModifiedTime due to write on the
|
||||
// global issuance config.
|
||||
|
@ -148,6 +149,8 @@ func (cb *crlBuilder) reloadConfigIfRequired(sc *storageContext) error {
|
|||
return err
|
||||
}
|
||||
|
||||
previousConfig := cb.config
|
||||
|
||||
// Set the default config if none was returned to us.
|
||||
if config != nil {
|
||||
cb.config = *config
|
||||
|
@ -157,6 +160,16 @@ func (cb *crlBuilder) reloadConfigIfRequired(sc *storageContext) error {
|
|||
|
||||
// Updated the config; unset dirty.
|
||||
cb.dirty.Store(false)
|
||||
triggerChangeNotification := true
|
||||
if !cb.haveInitializedConfig {
|
||||
cb.haveInitializedConfig = true
|
||||
triggerChangeNotification = false // do not trigger on the initial loading of configuration.
|
||||
}
|
||||
|
||||
// Certain things need to be triggered on all server types when crlConfig is loaded.
|
||||
if triggerChangeNotification {
|
||||
cb.notifyOnConfigChange(sc, previousConfig, cb.config)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -736,6 +749,15 @@ func (cb *crlBuilder) processRevocationQueue(sc *storageContext) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cb *crlBuilder) notifyOnConfigChange(sc *storageContext, priorConfig crlConfig, newConfig crlConfig) {
|
||||
// If you need to hook into a CRL configuration change across different server types
|
||||
// such as primary clusters as well as performance replicas, it is easier to do here than
|
||||
// in two places (API layer and in invalidateFunc)
|
||||
if priorConfig.UnifiedCRL != newConfig.UnifiedCRL && newConfig.UnifiedCRL {
|
||||
sc.Backend.unifiedTransferStatus.forceRun()
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to fetch a map of issuerID->parsed cert for revocation
|
||||
// usage. Unlike other paths, this needs to handle the legacy bundle
|
||||
// more gracefully than rejecting it outright.
|
||||
|
@ -836,32 +858,21 @@ func revokeCert(sc *storageContext, config *crlConfig, cert *x509.Certificate) (
|
|||
}
|
||||
}
|
||||
|
||||
var revInfo revocationInfo
|
||||
revEntry, err := fetchCertBySerial(sc, revokedPath, colonSerial)
|
||||
curRevInfo, err := sc.fetchRevocationInfo(colonSerial)
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case errutil.UserError:
|
||||
return logical.ErrorResponse(err.Error()), nil
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if revEntry != nil {
|
||||
// Set the revocation info to the existing values
|
||||
err = revEntry.DecodeJSON(&revInfo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error decoding existing revocation info")
|
||||
}
|
||||
|
||||
if curRevInfo != nil {
|
||||
resp := &logical.Response{
|
||||
Data: map[string]interface{}{
|
||||
"revocation_time": revInfo.RevocationTime,
|
||||
"revocation_time": curRevInfo.RevocationTime,
|
||||
"state": "revoked",
|
||||
},
|
||||
}
|
||||
if !revInfo.RevocationTimeUTC.IsZero() {
|
||||
resp.Data["revocation_time_rfc3339"] = revInfo.RevocationTimeUTC.Format(time.RFC3339Nano)
|
||||
if !curRevInfo.RevocationTimeUTC.IsZero() {
|
||||
resp.Data["revocation_time_rfc3339"] = curRevInfo.RevocationTimeUTC.Format(time.RFC3339Nano)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
@ -874,15 +885,17 @@ func revokeCert(sc *storageContext, config *crlConfig, cert *x509.Certificate) (
|
|||
}
|
||||
|
||||
currTime := time.Now()
|
||||
revInfo.CertificateBytes = cert.Raw
|
||||
revInfo.RevocationTime = currTime.Unix()
|
||||
revInfo.RevocationTimeUTC = currTime.UTC()
|
||||
revInfo := revocationInfo{
|
||||
CertificateBytes: cert.Raw,
|
||||
RevocationTime: currTime.Unix(),
|
||||
RevocationTimeUTC: currTime.UTC(),
|
||||
}
|
||||
|
||||
// We may not find an issuer with this certificate; that's fine so
|
||||
// ignore the return value.
|
||||
associateRevokedCertWithIsssuer(&revInfo, cert, issuerIDCertMap)
|
||||
|
||||
revEntry, err = logical.StorageEntryJSON(revokedPath+hyphenSerial, revInfo)
|
||||
revEntry, err := logical.StorageEntryJSON(revokedPath+hyphenSerial, revInfo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating revocation entry")
|
||||
}
|
||||
|
@ -908,8 +921,9 @@ func revokeCert(sc *storageContext, config *crlConfig, cert *x509.Certificate) (
|
|||
if ignoreErr != nil {
|
||||
// Just log the error if we fail to write across clusters, a separate background
|
||||
// thread will reattempt it later on as we have the local write done.
|
||||
sc.Backend.Logger().Debug("Failed to write unified revocation entry",
|
||||
sc.Backend.Logger().Debug("Failed to write unified revocation entry, will re-attempt later",
|
||||
"serial_number", colonSerial, "error", ignoreErr)
|
||||
sc.Backend.unifiedTransferStatus.forceRun()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -273,6 +273,9 @@ func (b *backend) pathCRLWrite(ctx context.Context, req *logical.Request, d *fra
|
|||
b.crlBuilder.markConfigDirty()
|
||||
b.crlBuilder.reloadConfigIfRequired(sc)
|
||||
|
||||
// Note this only affects/happens on the main cluster node, if you need to
|
||||
// notify something based on a configuration change on all server types
|
||||
// have a look at crlBuilder::reloadConfigIfRequired
|
||||
if oldDisable != config.Disable || (oldAutoRebuild && !config.AutoRebuild) || (oldEnableDelta != config.EnableDelta) || (oldUnifiedCRL != config.UnifiedCRL) {
|
||||
// It wasn't disabled but now it is (or equivalently, we were set to
|
||||
// auto-rebuild and we aren't now or equivalently, we changed our
|
||||
|
|
|
@ -0,0 +1,176 @@
|
|||
package pki
|
||||
|
||||
import (
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
)
|
||||
|
||||
const (
|
||||
minUnifiedTransferDelay = 30 * time.Minute
|
||||
)
|
||||
|
||||
type unifiedTransferStatus struct {
|
||||
isRunning atomic.Bool
|
||||
lastRun time.Time
|
||||
forceRerun atomic.Bool
|
||||
}
|
||||
|
||||
func (uts *unifiedTransferStatus) forceRun() {
|
||||
uts.forceRerun.Store(true)
|
||||
}
|
||||
|
||||
func newUnifiedTransferStatus() *unifiedTransferStatus {
|
||||
return &unifiedTransferStatus{}
|
||||
}
|
||||
|
||||
// runUnifiedTransfer meant to run as a background, this will process all and
|
||||
// send all missing local revocation entries to the unified space if the feature
|
||||
// is enabled.
|
||||
func runUnifiedTransfer(sc *storageContext) {
|
||||
b := sc.Backend
|
||||
status := b.unifiedTransferStatus
|
||||
|
||||
isPerfStandby := b.System().ReplicationState().HasState(consts.ReplicationDRSecondary | consts.ReplicationPerformanceStandby)
|
||||
|
||||
if isPerfStandby || b.System().LocalMount() {
|
||||
// We only do this on active enterprise nodes, when we aren't a local mount
|
||||
return
|
||||
}
|
||||
|
||||
config, err := b.crlBuilder.getConfigWithUpdate(sc)
|
||||
if err != nil {
|
||||
b.Logger().Error("failed to retrieve crl config from storage for unified transfer background process",
|
||||
"error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !status.lastRun.IsZero() {
|
||||
// We have run before, we only run again if we have
|
||||
// been requested to forceRerun, and we haven't run since our
|
||||
// minimum delay
|
||||
if !(status.forceRerun.Load() && time.Since(status.lastRun) < minUnifiedTransferDelay) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if !config.UnifiedCRL {
|
||||
// Feature is disabled, no need to run
|
||||
return
|
||||
}
|
||||
|
||||
clusterId, err := b.System().ClusterID(sc.Context)
|
||||
if err != nil {
|
||||
b.Logger().Error("failed to fetch cluster id for unified transfer background process",
|
||||
"error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !status.isRunning.CompareAndSwap(false, true) {
|
||||
b.Logger().Debug("an existing unified transfer process is already running")
|
||||
return
|
||||
}
|
||||
defer status.isRunning.Store(false)
|
||||
|
||||
// Reset our flag before we begin, we do this before we start as
|
||||
// we can't guarantee that we can properly parse/fix the error from an
|
||||
// error that comes in from the revoke API after that. This will
|
||||
// force another run, which worst case, we will fix it on the next
|
||||
// periodic function call that passes our min delay.
|
||||
status.forceRerun.Store(false)
|
||||
|
||||
err = doUnifiedTransferMissingLocalSerials(sc, clusterId)
|
||||
if err != nil {
|
||||
b.Logger().Error("an error occurred running unified transfer", "error", err.Error())
|
||||
status.forceRerun.Store(true)
|
||||
}
|
||||
status.lastRun = time.Now()
|
||||
}
|
||||
|
||||
func doUnifiedTransferMissingLocalSerials(sc *storageContext, clusterId string) error {
|
||||
localRevokedSerialNums, err := sc.listRevokedCerts()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(localRevokedSerialNums) == 0 {
|
||||
// No local certs to transfer, no further work to do.
|
||||
return nil
|
||||
}
|
||||
|
||||
unifiedSerials, err := listClusterSpecificUnifiedRevokedCerts(sc, clusterId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
unifiedCertLookup := sliceToMapKey(unifiedSerials)
|
||||
|
||||
errCount := 0
|
||||
for i, serialNum := range localRevokedSerialNums {
|
||||
if i%25 == 0 {
|
||||
config, _ := sc.Backend.crlBuilder.getConfigWithUpdate(sc)
|
||||
if config != nil && !config.UnifiedCRL {
|
||||
return errors.New("unified crl has been disabled after we started, stopping")
|
||||
}
|
||||
}
|
||||
if _, ok := unifiedCertLookup[serialNum]; !ok {
|
||||
err := readRevocationEntryAndTransfer(sc, serialNum)
|
||||
if err != nil {
|
||||
errCount++
|
||||
sc.Backend.Logger().Debug("Failed transferring local revocation to unified space",
|
||||
"serial", serialNum, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if errCount > 0 {
|
||||
sc.Backend.Logger().Warn(fmt.Sprintf("Failed transfering %d local serials to unified storage", errCount))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func readRevocationEntryAndTransfer(sc *storageContext, serial string) error {
|
||||
hyphenSerial := normalizeSerial(serial)
|
||||
revInfo, err := sc.fetchRevocationInfo(hyphenSerial)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed loading revocation entry for serial: %s: %w", serial, err)
|
||||
}
|
||||
if revInfo == nil {
|
||||
sc.Backend.Logger().Debug("no certificate revocation entry for serial", "serial", serial)
|
||||
return nil
|
||||
}
|
||||
cert, err := x509.ParseCertificate(revInfo.CertificateBytes)
|
||||
if err != nil {
|
||||
sc.Backend.Logger().Debug("failed parsing certificate stored in revocation entry for serial",
|
||||
"serial", serial, "error", err)
|
||||
return nil
|
||||
}
|
||||
if revInfo.CertificateIssuer == "" {
|
||||
// No certificate issuer assigned to this serial yet, just drop it for now,
|
||||
// as a crl rebuild/tidy needs to happen
|
||||
return nil
|
||||
}
|
||||
|
||||
revocationTime := revInfo.RevocationTimeUTC
|
||||
if revInfo.RevocationTimeUTC.IsZero() {
|
||||
// Legacy revocation entries only had this field and not revocationTimeUTC set...
|
||||
revocationTime = time.Unix(revInfo.RevocationTime, 0)
|
||||
}
|
||||
|
||||
if time.Now().After(cert.NotAfter) {
|
||||
// ignore transferring this entry as it has already expired.
|
||||
return nil
|
||||
}
|
||||
|
||||
entry := &unifiedRevocationEntry{
|
||||
SerialNumber: hyphenSerial,
|
||||
CertExpiration: cert.NotAfter,
|
||||
RevocationTimeUTC: revocationTime,
|
||||
CertificateIssuer: revInfo.CertificateIssuer,
|
||||
}
|
||||
|
||||
return writeUnifiedRevocationEntry(sc, entry)
|
||||
}
|
|
@ -1389,3 +1389,19 @@ func (sc *storageContext) writeClusterConfig(config *clusterConfigEntry) error {
|
|||
|
||||
return sc.Storage.Put(sc.Context, entry)
|
||||
}
|
||||
|
||||
func (sc *storageContext) fetchRevocationInfo(serial string) (*revocationInfo, error) {
|
||||
var revInfo *revocationInfo
|
||||
revEntry, err := fetchCertBySerial(sc, revokedPath, serial)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if revEntry != nil {
|
||||
err = revEntry.DecodeJSON(&revInfo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error decoding existing revocation info")
|
||||
}
|
||||
}
|
||||
|
||||
return revInfo, nil
|
||||
}
|
||||
|
|
|
@ -488,3 +488,13 @@ func (q *revocationQueue) Iterate() []*revocationQueueEntry {
|
|||
|
||||
return ret
|
||||
}
|
||||
|
||||
// sliceToMapKey return a map that who's keys are entries in a map.
|
||||
func sliceToMapKey(s []string) map[string]struct{} {
|
||||
var empty struct{}
|
||||
myMap := make(map[string]struct{}, len(s))
|
||||
for _, s := range s {
|
||||
myMap[s] = empty
|
||||
}
|
||||
return myMap
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue