225 lines
5.9 KiB
Go
225 lines
5.9 KiB
Go
|
package consul
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||
|
"github.com/hashicorp/consul/agent/structs"
|
||
|
memdb "github.com/hashicorp/go-memdb"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
// federationStatePruneInterval is how often we check for stale federation
|
||
|
// states to remove should a datacenter be removed from the WAN.
|
||
|
federationStatePruneInterval = time.Hour
|
||
|
)
|
||
|
|
||
|
func (s *Server) startFederationStateAntiEntropy() {
|
||
|
if s.config.DisableFederationStateAntiEntropy {
|
||
|
return
|
||
|
}
|
||
|
s.leaderRoutineManager.Start(federationStateAntiEntropyRoutineName, s.federationStateAntiEntropySync)
|
||
|
|
||
|
// If this is the primary, then also prune any stale datacenters from the
|
||
|
// list of federation states.
|
||
|
if s.config.PrimaryDatacenter == s.config.Datacenter {
|
||
|
s.leaderRoutineManager.Start(federationStatePruningRoutineName, s.federationStatePruning)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *Server) stopFederationStateAntiEntropy() {
|
||
|
if s.config.DisableFederationStateAntiEntropy {
|
||
|
return
|
||
|
}
|
||
|
s.leaderRoutineManager.Stop(federationStateAntiEntropyRoutineName)
|
||
|
if s.config.PrimaryDatacenter == s.config.Datacenter {
|
||
|
s.leaderRoutineManager.Stop(federationStatePruningRoutineName)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *Server) federationStateAntiEntropySync(ctx context.Context) error {
|
||
|
var lastFetchIndex uint64
|
||
|
|
||
|
retryLoopBackoff(ctx.Done(), func() error {
|
||
|
idx, err := s.federationStateAntiEntropyMaybeSync(ctx, lastFetchIndex)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
lastFetchIndex = idx
|
||
|
return nil
|
||
|
}, func(err error) {
|
||
|
s.logger.Error("error performing anti-entropy sync of federation state", "error", err)
|
||
|
})
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Server) federationStateAntiEntropyMaybeSync(ctx context.Context, lastFetchIndex uint64) (uint64, error) {
|
||
|
queryOpts := &structs.QueryOptions{
|
||
|
MinQueryIndex: lastFetchIndex,
|
||
|
RequireConsistent: true,
|
||
|
// This is just for a local blocking query so no token is needed.
|
||
|
}
|
||
|
idx, prev, curr, err := s.fetchFederationStateAntiEntropyDetails(queryOpts)
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
|
||
|
// We should check to see if our context was cancelled while we were blocked.
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return 0, ctx.Err()
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
if prev != nil && prev.IsSame(curr) {
|
||
|
s.logger.Trace("federation state anti-entropy sync skipped; already up to date")
|
||
|
return idx, nil
|
||
|
}
|
||
|
|
||
|
if err := s.updateOurFederationState(curr); err != nil {
|
||
|
return 0, fmt.Errorf("error performing federation state anti-entropy sync: %v", err)
|
||
|
}
|
||
|
|
||
|
s.logger.Info("federation state anti-entropy synced")
|
||
|
|
||
|
return idx, nil
|
||
|
}
|
||
|
|
||
|
func (s *Server) updateOurFederationState(curr *structs.FederationState) error {
|
||
|
if curr.Datacenter != s.config.Datacenter { // sanity check
|
||
|
return fmt.Errorf("cannot use this mechanism to update federation states for other datacenters")
|
||
|
}
|
||
|
|
||
|
curr.UpdatedAt = time.Now().UTC()
|
||
|
|
||
|
args := structs.FederationStateRequest{
|
||
|
Op: structs.FederationStateUpsert,
|
||
|
State: curr,
|
||
|
}
|
||
|
|
||
|
if s.config.Datacenter == s.config.PrimaryDatacenter {
|
||
|
// We are the primary, so we can't do an RPC as we don't have a replication token.
|
||
|
resp, err := s.raftApply(structs.FederationStateRequestType, args)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if respErr, ok := resp.(error); ok {
|
||
|
return respErr
|
||
|
}
|
||
|
} else {
|
||
|
args.WriteRequest = structs.WriteRequest{
|
||
|
Token: s.tokens.ReplicationToken(),
|
||
|
}
|
||
|
ignored := false
|
||
|
if err := s.forwardDC("FederationState.Apply", s.config.PrimaryDatacenter, &args, &ignored); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Server) fetchFederationStateAntiEntropyDetails(
|
||
|
queryOpts *structs.QueryOptions,
|
||
|
) (uint64, *structs.FederationState, *structs.FederationState, error) {
|
||
|
var (
|
||
|
prevFedState, currFedState *structs.FederationState
|
||
|
queryMeta structs.QueryMeta
|
||
|
)
|
||
|
err := s.blockingQuery(
|
||
|
queryOpts,
|
||
|
&queryMeta,
|
||
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||
|
// Get the existing stored version of this FedState that has replicated down.
|
||
|
// We could phone home to get this but that would incur extra WAN traffic
|
||
|
// when we already have enough information locally to figure it out
|
||
|
// (assuming that our replicator is still functioning).
|
||
|
idx1, prev, err := state.FederationStateGet(ws, s.config.Datacenter)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Fetch our current list of all mesh gateways.
|
||
|
entMeta := structs.WildcardEnterpriseMeta()
|
||
|
idx2, raw, err := state.ServiceDump(ws, structs.ServiceKindMeshGateway, true, entMeta)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
curr := &structs.FederationState{
|
||
|
Datacenter: s.config.Datacenter,
|
||
|
MeshGateways: raw,
|
||
|
}
|
||
|
|
||
|
// Compute the maximum index seen.
|
||
|
if idx2 > idx1 {
|
||
|
queryMeta.Index = idx2
|
||
|
} else {
|
||
|
queryMeta.Index = idx1
|
||
|
}
|
||
|
|
||
|
prevFedState = prev
|
||
|
currFedState = curr
|
||
|
|
||
|
return nil
|
||
|
})
|
||
|
if err != nil {
|
||
|
return 0, nil, nil, err
|
||
|
}
|
||
|
|
||
|
return queryMeta.Index, prevFedState, currFedState, nil
|
||
|
}
|
||
|
|
||
|
func (s *Server) federationStatePruning(ctx context.Context) error {
|
||
|
ticker := time.NewTicker(federationStatePruneInterval)
|
||
|
defer ticker.Stop()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return nil
|
||
|
case <-ticker.C:
|
||
|
if err := s.pruneStaleFederationStates(); err != nil {
|
||
|
s.logger.Error("error pruning stale federation states", "error", err)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *Server) pruneStaleFederationStates() error {
|
||
|
state := s.fsm.State()
|
||
|
_, fedStates, err := state.FederationStateList(nil)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
for _, fedState := range fedStates {
|
||
|
dc := fedState.Datacenter
|
||
|
if s.router.HasDatacenter(dc) {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
s.logger.Info("pruning stale federation state", "datacenter", dc)
|
||
|
|
||
|
req := structs.FederationStateRequest{
|
||
|
Op: structs.FederationStateDelete,
|
||
|
State: &structs.FederationState{
|
||
|
Datacenter: dc,
|
||
|
},
|
||
|
}
|
||
|
resp, err := s.raftApply(structs.FederationStateRequestType, &req)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("Failed to delete federation state %s: %v", dc, err)
|
||
|
}
|
||
|
if respErr, ok := resp.(error); ok && err != nil {
|
||
|
return fmt.Errorf("Failed to delete federation state %s: %v", dc, respErr)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|