open-vault/serviceregistration/kubernetes/retry_handler.go

260 lines
7.2 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package kubernetes
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/hashicorp/go-hclog"
sr "github.com/hashicorp/vault/serviceregistration"
"github.com/hashicorp/vault/serviceregistration/kubernetes/client"
"github.com/oklog/run"
)
// How often to retry sending a state update if it fails.
var retryFreq = 5 * time.Second
// retryHandler executes retries.
// It is thread-safe.
type retryHandler struct {
// These don't need a mutex because they're never mutated.
logger hclog.Logger
namespace, podName string
// To synchronize setInitialState and patchesToRetry.
lock sync.Mutex
// initialStateSet determines whether an initial state has been set
// successfully or whether a state already exists.
initialStateSet bool
// State stores an initial state to be set
initialState sr.State
// The map holds the path to the label being updated. It will only either
// not hold a particular label, or hold _the last_ state we were aware of.
// These should only be updated after initial state has been set.
patchesToRetry map[string]*client.Patch
// client is the Client to use when making API calls against kubernetes
client *client.Client
}
// Run must be called for retries to be started.
func (r *retryHandler) Run(shutdownCh <-chan struct{}, wait *sync.WaitGroup) {
r.setInitialState(shutdownCh)
// Run this in a go func so this call doesn't block.
wait.Add(1)
go func() {
// Make sure Vault will give us time to finish up here.
defer wait.Done()
var g run.Group
// This run group watches for the shutdownCh
shutdownActorStop := make(chan struct{})
g.Add(func() error {
select {
case <-shutdownCh:
case <-shutdownActorStop:
}
return nil
}, func(error) {
close(shutdownActorStop)
})
checkUpdateStateStop := make(chan struct{})
g.Add(func() error {
r.periodicUpdateState(checkUpdateStateStop)
return nil
}, func(error) {
close(checkUpdateStateStop)
r.client.Shutdown()
})
if err := g.Run(); err != nil {
r.logger.Error("error encountered during periodic state update", "error", err)
}
}()
}
func (r *retryHandler) setInitialState(shutdownCh <-chan struct{}) {
r.lock.Lock()
defer r.lock.Unlock()
doneCh := make(chan struct{})
go func() {
if err := r.setInitialStateInternal(); err != nil {
if r.logger.IsWarn() {
r.logger.Warn(fmt.Sprintf("unable to set initial state due to %s, will retry", err.Error()))
}
}
close(doneCh)
}()
// Wait until the state is set or shutdown happens
select {
case <-doneCh:
case <-shutdownCh:
}
}
// Notify adds a patch to be retried until it's either completed without
// error, or no longer needed.
func (r *retryHandler) Notify(patch *client.Patch) {
r.lock.Lock()
defer r.lock.Unlock()
// Initial state must be set first, or subsequent notifications we've
// received could get smashed by a late-arriving initial state.
// We will store this to retry it when appropriate.
if !r.initialStateSet {
if r.logger.IsWarn() {
r.logger.Warn(fmt.Sprintf("cannot notify of present state for %s because initial state is unset", patch.Path))
}
r.patchesToRetry[patch.Path] = patch
return
}
// Initial state has been sent, so it's OK to attempt a patch immediately.
if err := r.client.PatchPod(r.namespace, r.podName, patch); err != nil {
if r.logger.IsWarn() {
r.logger.Warn(fmt.Sprintf("unable to update state for %s due to %s, will retry", patch.Path, err.Error()))
}
r.patchesToRetry[patch.Path] = patch
}
}
// setInitialStateInternal sets the initial state remotely. This should be
// called with the lock held.
func (r *retryHandler) setInitialStateInternal() error {
// If this is set, we return immediately
if r.initialStateSet {
return nil
}
// Verify that the pod exists and our configuration looks good.
pod, err := r.client.GetPod(r.namespace, r.podName)
if err != nil {
return err
}
// Now to initially label our pod.
if pod.Metadata == nil {
// This should never happen IRL, just being defensive.
return fmt.Errorf("no pod metadata on %+v", pod)
}
if pod.Metadata.Labels == nil {
// Notify the labels field, and the labels as part of that one call.
// The reason we must take a different approach to adding them is discussed here:
// https://stackoverflow.com/questions/57480205/error-while-applying-json-patch-to-kubernetes-custom-resource
if err := r.client.PatchPod(r.namespace, r.podName, &client.Patch{
Operation: client.Add,
Path: "/metadata/labels",
Value: map[string]string{
labelVaultVersion: r.initialState.VaultVersion,
labelActive: strconv.FormatBool(r.initialState.IsActive),
labelSealed: strconv.FormatBool(r.initialState.IsSealed),
labelPerfStandby: strconv.FormatBool(r.initialState.IsPerformanceStandby),
labelInitialized: strconv.FormatBool(r.initialState.IsInitialized),
},
}); err != nil {
return err
}
} else {
// Create the labels through a patch to each individual field.
patches := []*client.Patch{
{
Operation: client.Replace,
Path: pathToLabels + labelVaultVersion,
Value: r.initialState.VaultVersion,
},
{
Operation: client.Replace,
Path: pathToLabels + labelActive,
Value: strconv.FormatBool(r.initialState.IsActive),
},
{
Operation: client.Replace,
Path: pathToLabels + labelSealed,
Value: strconv.FormatBool(r.initialState.IsSealed),
},
{
Operation: client.Replace,
Path: pathToLabels + labelPerfStandby,
Value: strconv.FormatBool(r.initialState.IsPerformanceStandby),
},
{
Operation: client.Replace,
Path: pathToLabels + labelInitialized,
Value: strconv.FormatBool(r.initialState.IsInitialized),
},
}
if err := r.client.PatchPod(r.namespace, r.podName, patches...); err != nil {
return err
}
}
r.initialStateSet = true
return nil
}
func (r *retryHandler) periodicUpdateState(stopCh chan struct{}) {
retry := time.NewTicker(retryFreq)
defer retry.Stop()
for {
// Call updateState immediately so we don't wait for the first tick
// if setting the initial state
r.updateState()
select {
case <-stopCh:
return
case <-retry.C:
}
}
}
func (r *retryHandler) updateState() {
r.lock.Lock()
defer r.lock.Unlock()
// Initial state must be set first, or subsequent notifications we've
// received could get smashed by a late-arriving initial state.
// If the state is already set, this is a no-op.
if err := r.setInitialStateInternal(); err != nil {
if r.logger.IsWarn() {
r.logger.Warn(fmt.Sprintf("unable to set initial state due to %s, will retry", err.Error()))
}
// On failure, we leave the initial state func populated for
// the next retry.
return
}
if len(r.patchesToRetry) == 0 {
// Nothing further to do here.
return
}
patches := make([]*client.Patch, len(r.patchesToRetry))
i := 0
for _, patch := range r.patchesToRetry {
patches[i] = patch
i++
}
if err := r.client.PatchPod(r.namespace, r.podName, patches...); err != nil {
if r.logger.IsWarn() {
r.logger.Warn(fmt.Sprintf("unable to update state for due to %s, will retry", err.Error()))
}
return
}
r.patchesToRetry = make(map[string]*client.Patch)
}