266 lines
8.9 KiB
Go
266 lines
8.9 KiB
Go
package consul
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
metrics "github.com/armon/go-metrics"
|
|
"github.com/hashicorp/consul/lib"
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
const (
|
|
// replicationMaxRetryWait is the maximum number of seconds to wait between
|
|
// failed blocking queries when backing off.
|
|
replicationDefaultMaxRetryWait = 120 * time.Second
|
|
|
|
replicationDefaultRate = 1
|
|
)
|
|
|
|
type ReplicatorDelegate interface {
|
|
Replicate(ctx context.Context, lastRemoteIndex uint64) (index uint64, exit bool, err error)
|
|
}
|
|
|
|
type ReplicatorConfig struct {
|
|
// Name to be used in various logging
|
|
Name string
|
|
// Delegate to perform each round of replication
|
|
Delegate ReplicatorDelegate
|
|
// The number of replication rounds per second that are allowed
|
|
Rate int
|
|
// The number of replication rounds that can be done in a burst
|
|
Burst int
|
|
// Minimum number of RPC failures to ignore before backing off
|
|
MinFailures int
|
|
// Maximum wait time between failing RPCs
|
|
MaxRetryWait time.Duration
|
|
// Where to send our logs
|
|
Logger *log.Logger
|
|
}
|
|
|
|
type Replicator struct {
|
|
name string
|
|
limiter *rate.Limiter
|
|
waiter *lib.RetryWaiter
|
|
delegate ReplicatorDelegate
|
|
logger *log.Logger
|
|
lastRemoteIndex uint64
|
|
}
|
|
|
|
func NewReplicator(config *ReplicatorConfig) (*Replicator, error) {
|
|
if config == nil {
|
|
return nil, fmt.Errorf("Cannot create the Replicator without a config")
|
|
}
|
|
if config.Delegate == nil {
|
|
return nil, fmt.Errorf("Cannot create the Replicator without a Delegate set in the config")
|
|
}
|
|
if config.Logger == nil {
|
|
config.Logger = log.New(os.Stderr, "", log.LstdFlags)
|
|
}
|
|
limiter := rate.NewLimiter(rate.Limit(config.Rate), config.Burst)
|
|
|
|
maxWait := config.MaxRetryWait
|
|
if maxWait == 0 {
|
|
maxWait = replicationDefaultMaxRetryWait
|
|
}
|
|
|
|
minFailures := config.MinFailures
|
|
if minFailures < 0 {
|
|
minFailures = 0
|
|
}
|
|
waiter := lib.NewRetryWaiter(minFailures, 0*time.Second, maxWait, lib.NewJitterRandomStagger(10))
|
|
return &Replicator{
|
|
name: config.Name,
|
|
limiter: limiter,
|
|
waiter: waiter,
|
|
delegate: config.Delegate,
|
|
logger: config.Logger,
|
|
}, nil
|
|
}
|
|
|
|
func (r *Replicator) Run(ctx context.Context) error {
|
|
defer r.logger.Printf("[INFO] replication: stopped %s replication", r.name)
|
|
|
|
for {
|
|
// This ensures we aren't doing too many successful replication rounds - mostly useful when
|
|
// the data within the primary datacenter is changing rapidly but we try to limit the amount
|
|
// of resources replication into the secondary datacenter should take
|
|
if err := r.limiter.Wait(ctx); err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Perform a single round of replication
|
|
index, exit, err := r.delegate.Replicate(ctx, atomic.LoadUint64(&r.lastRemoteIndex))
|
|
if exit {
|
|
// the replication function told us to exit
|
|
return nil
|
|
}
|
|
|
|
if err != nil {
|
|
// reset the lastRemoteIndex when there is an RPC failure. This should cause a full sync to be done during
|
|
// the next round of replication
|
|
atomic.StoreUint64(&r.lastRemoteIndex, 0)
|
|
r.logger.Printf("[WARN] replication: %s replication error (will retry if still leader): %v", r.name, err)
|
|
} else {
|
|
atomic.StoreUint64(&r.lastRemoteIndex, index)
|
|
r.logger.Printf("[DEBUG] replication: %s replication completed through remote index %d", r.name, index)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
// wait some amount of time to prevent churning through many replication rounds while replication is failing
|
|
case <-r.waiter.WaitIfErr(err):
|
|
// do nothing
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Replicator) Index() uint64 {
|
|
return atomic.LoadUint64(&r.lastRemoteIndex)
|
|
}
|
|
|
|
type ReplicatorFunc func(ctx context.Context, lastRemoteIndex uint64) (index uint64, exit bool, err error)
|
|
|
|
type FunctionReplicator struct {
|
|
ReplicateFn ReplicatorFunc
|
|
}
|
|
|
|
func (r *FunctionReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
|
|
return r.ReplicateFn(ctx, lastRemoteIndex)
|
|
}
|
|
|
|
type IndexReplicatorDiff struct {
|
|
NumUpdates int
|
|
Updates interface{}
|
|
NumDeletions int
|
|
Deletions interface{}
|
|
}
|
|
|
|
type IndexReplicatorDelegate interface {
|
|
// SingularNoun is the singular form of the item being replicated.
|
|
SingularNoun() string
|
|
|
|
// PluralNoun is the plural form of the item being replicated.
|
|
PluralNoun() string
|
|
|
|
// Name to use when emitting metrics
|
|
MetricName() string
|
|
|
|
// FetchRemote retrieves items newer than the provided index from the
|
|
// remote datacenter (for diffing purposes).
|
|
FetchRemote(lastRemoteIndex uint64) (int, interface{}, uint64, error)
|
|
|
|
// FetchLocal retrieves items from the current datacenter (for diffing
|
|
// purposes).
|
|
FetchLocal() (int, interface{}, error)
|
|
|
|
DiffRemoteAndLocalState(local interface{}, remote interface{}, lastRemoteIndex uint64) (*IndexReplicatorDiff, error)
|
|
|
|
PerformDeletions(ctx context.Context, deletions interface{}) (exit bool, err error)
|
|
|
|
PerformUpdates(ctx context.Context, updates interface{}) (exit bool, err error)
|
|
}
|
|
|
|
type IndexReplicator struct {
|
|
Delegate IndexReplicatorDelegate
|
|
Logger *log.Logger
|
|
}
|
|
|
|
func (r *IndexReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
|
|
fetchStart := time.Now()
|
|
lenRemote, remote, remoteIndex, err := r.Delegate.FetchRemote(lastRemoteIndex)
|
|
metrics.MeasureSince([]string{"leader", "replication", r.Delegate.MetricName(), "fetch"}, fetchStart)
|
|
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("failed to retrieve %s: %v", r.Delegate.PluralNoun(), err)
|
|
}
|
|
|
|
r.Logger.Printf("[DEBUG] replication: finished fetching %s: %d", r.Delegate.PluralNoun(), lenRemote)
|
|
|
|
// Need to check if we should be stopping. This will be common as the fetching process is a blocking
|
|
// RPC which could have been hanging around for a long time and during that time leadership could
|
|
// have been lost.
|
|
select {
|
|
case <-ctx.Done():
|
|
return 0, true, nil
|
|
default:
|
|
// do nothing
|
|
}
|
|
|
|
// Measure everything after the remote query, which can block for long
|
|
// periods of time. This metric is a good measure of how expensive the
|
|
// replication process is.
|
|
defer metrics.MeasureSince([]string{"leader", "replication", r.Delegate.MetricName(), "apply"}, time.Now())
|
|
|
|
lenLocal, local, err := r.Delegate.FetchLocal()
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("failed to retrieve local %s: %v", r.Delegate.PluralNoun(), err)
|
|
}
|
|
|
|
// If the remote index ever goes backwards, it's a good indication that
|
|
// the remote side was rebuilt and we should do a full sync since we
|
|
// can't make any assumptions about what's going on.
|
|
//
|
|
// Resetting lastRemoteIndex to 0 will work because we never consider local
|
|
// raft indices. Instead we compare the raft modify index in the response object
|
|
// with the lastRemoteIndex (only when we already have a config entry of the same kind/name)
|
|
// to determine if an update is needed. Resetting lastRemoteIndex to 0 then has the affect
|
|
// of making us think all the local state is out of date and any matching entries should
|
|
// still be updated.
|
|
//
|
|
// The lastRemoteIndex is not used when the entry exists either only in the local state or
|
|
// only in the remote state. In those situations we need to either delete it or create it.
|
|
if remoteIndex < lastRemoteIndex {
|
|
r.Logger.Printf("[WARN] replication: %[1]s replication remote index moved backwards (%d to %d), forcing a full %[1]s sync", r.Delegate.SingularNoun(), lastRemoteIndex, remoteIndex)
|
|
lastRemoteIndex = 0
|
|
}
|
|
|
|
r.Logger.Printf("[DEBUG] replication: %s replication - local: %d, remote: %d", r.Delegate.SingularNoun(), lenLocal, lenRemote)
|
|
|
|
// Calculate the changes required to bring the state into sync and then
|
|
// apply them.
|
|
diff, err := r.Delegate.DiffRemoteAndLocalState(local, remote, lastRemoteIndex)
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("failed to diff %s local and remote states: %v", r.Delegate.SingularNoun(), err)
|
|
}
|
|
|
|
r.Logger.Printf("[DEBUG] replication: %s replication - deletions: %d, updates: %d", r.Delegate.SingularNoun(), diff.NumDeletions, diff.NumUpdates)
|
|
|
|
if diff.NumDeletions > 0 {
|
|
r.Logger.Printf("[DEBUG] replication: %s replication - performing %d deletions", r.Delegate.SingularNoun(), diff.NumDeletions)
|
|
|
|
exit, err := r.Delegate.PerformDeletions(ctx, diff.Deletions)
|
|
if exit {
|
|
return 0, true, nil
|
|
}
|
|
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("failed to apply local %s deletions: %v", r.Delegate.SingularNoun(), err)
|
|
}
|
|
r.Logger.Printf("[DEBUG] replication: %s replication - finished deletions", r.Delegate.SingularNoun())
|
|
}
|
|
|
|
if diff.NumUpdates > 0 {
|
|
r.Logger.Printf("[DEBUG] replication: %s replication - performing %d updates", r.Delegate.SingularNoun(), diff.NumUpdates)
|
|
|
|
exit, err := r.Delegate.PerformUpdates(ctx, diff.Updates)
|
|
if exit {
|
|
return 0, true, nil
|
|
}
|
|
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("failed to apply local %s updates: %v", r.Delegate.SingularNoun(), err)
|
|
}
|
|
r.Logger.Printf("[DEBUG] replication: %s replication - finished updates", r.Delegate.SingularNoun())
|
|
}
|
|
|
|
// Return the index we got back from the remote side, since we've synced
|
|
// up with the remote state as of that index.
|
|
return remoteIndex, false, nil
|
|
}
|