open-consul/agent/consul/acl_replication.go

556 lines
17 KiB
Go

package consul
import (
"bytes"
"context"
"errors"
"fmt"
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/structs"
)
const (
// aclReplicationMaxRetryBackoff is the max number of seconds to sleep between ACL replication RPC errors
aclReplicationMaxRetryBackoff = 64
)
// aclTypeReplicator allows the machinery of acl replication to be shared between
// types with minimal code duplication (barring generics magically popping into
// existence).
//
// Concrete implementations of this interface should internally contain a
// pointer to the server so that data lookups can occur, and they should
// maintain the smallest quantity of type-specific state they can.
//
// Implementations of this interface are short-lived and recreated on every
// iteration.
type aclTypeReplicator interface {
// Type is variant of replication in use. Used for updating the replication
// status tracker.
Type() structs.ACLReplicationType
// SingularNoun is the singular form of the item being replicated.
SingularNoun() string
// PluralNoun is the plural form of the item being replicated.
PluralNoun() string
// FetchRemote retrieves items newer than the provided index from the
// remote datacenter (for diffing purposes).
FetchRemote(srv *Server, lastRemoteIndex uint64) (int, uint64, error)
// FetchLocal retrieves items from the current datacenter (for diffing
// purposes).
FetchLocal(srv *Server) (int, uint64, error)
// SortState sorts the internal working state output of FetchRemote and
// FetchLocal so that a sane diff can be performed.
SortState() (lenLocal, lenRemote int)
// LocalMeta allows for type-agnostic metadata from the sorted local state
// can be retrieved for the purposes of diffing.
LocalMeta(i int) (id string, modIndex uint64, hash []byte)
// RemoteMeta allows for type-agnostic metadata from the sorted remote
// state can be retrieved for the purposes of diffing.
RemoteMeta(i int) (id string, modIndex uint64, hash []byte)
// FetchUpdated retrieves the specific items from the remote (during the
// correction phase).
FetchUpdated(srv *Server, updates []string) (int, error)
// LenPendingUpdates should be the size of the data retrieved in
// FetchUpdated.
LenPendingUpdates() int
// PendingUpdateIsRedacted returns true if the update contains redacted
// data. Really only valid for tokens.
PendingUpdateIsRedacted(i int) bool
// PendingUpdateEstimatedSize is the item's EstimatedSize in the state
// populated by FetchUpdated.
PendingUpdateEstimatedSize(i int) int
// UpdateLocalBatch applies a portion of the state populated by
// FetchUpdated to the current datacenter.
UpdateLocalBatch(ctx context.Context, srv *Server, start, end int) error
// DeleteLocalBatch removes items from the current datacenter.
DeleteLocalBatch(srv *Server, batch []string) error
}
var errContainsRedactedData = errors.New("replication results contain redacted data")
func (s *Server) fetchACLRolesBatch(roleIDs []string) (*structs.ACLRoleBatchResponse, error) {
req := structs.ACLRoleBatchGetRequest{
Datacenter: s.config.ACLDatacenter,
RoleIDs: roleIDs,
QueryOptions: structs.QueryOptions{
AllowStale: true,
Token: s.tokens.ReplicationToken(),
},
}
var response structs.ACLRoleBatchResponse
if err := s.RPC("ACL.RoleBatchRead", &req, &response); err != nil {
return nil, err
}
return &response, nil
}
func (s *Server) fetchACLRoles(lastRemoteIndex uint64) (*structs.ACLRoleListResponse, error) {
defer metrics.MeasureSince([]string{"leader", "replication", "acl", "role", "fetch"}, time.Now())
req := structs.ACLRoleListRequest{
Datacenter: s.config.ACLDatacenter,
QueryOptions: structs.QueryOptions{
AllowStale: true,
MinQueryIndex: lastRemoteIndex,
Token: s.tokens.ReplicationToken(),
},
}
var response structs.ACLRoleListResponse
if err := s.RPC("ACL.RoleList", &req, &response); err != nil {
return nil, err
}
return &response, nil
}
func (s *Server) fetchACLPoliciesBatch(policyIDs []string) (*structs.ACLPolicyBatchResponse, error) {
req := structs.ACLPolicyBatchGetRequest{
Datacenter: s.config.ACLDatacenter,
PolicyIDs: policyIDs,
QueryOptions: structs.QueryOptions{
AllowStale: true,
Token: s.tokens.ReplicationToken(),
},
}
var response structs.ACLPolicyBatchResponse
if err := s.RPC("ACL.PolicyBatchRead", &req, &response); err != nil {
return nil, err
}
return &response, nil
}
func (s *Server) fetchACLPolicies(lastRemoteIndex uint64) (*structs.ACLPolicyListResponse, error) {
defer metrics.MeasureSince([]string{"leader", "replication", "acl", "policy", "fetch"}, time.Now())
req := structs.ACLPolicyListRequest{
Datacenter: s.config.ACLDatacenter,
QueryOptions: structs.QueryOptions{
AllowStale: true,
MinQueryIndex: lastRemoteIndex,
Token: s.tokens.ReplicationToken(),
},
}
var response structs.ACLPolicyListResponse
if err := s.RPC("ACL.PolicyList", &req, &response); err != nil {
return nil, err
}
return &response, nil
}
type itemDiffResults struct {
LocalDeletes []string
LocalUpserts []string
LocalSkipped int
RemoteSkipped int
}
func diffACLType(tr aclTypeReplicator, lastRemoteIndex uint64) itemDiffResults {
// Note: items with empty IDs will bubble up to the top (like legacy, unmigrated Tokens)
lenLocal, lenRemote := tr.SortState()
var res itemDiffResults
var localIdx int
var remoteIdx int
for localIdx, remoteIdx = 0, 0; localIdx < lenLocal && remoteIdx < lenRemote; {
localID, _, localHash := tr.LocalMeta(localIdx)
remoteID, remoteMod, remoteHash := tr.RemoteMeta(remoteIdx)
if localID == "" {
res.LocalSkipped++
localIdx += 1
continue
}
if remoteID == "" {
res.RemoteSkipped++
remoteIdx += 1
continue
}
if localID == remoteID {
// item is in both the local and remote state - need to check raft indices and the Hash
if remoteMod > lastRemoteIndex && !bytes.Equal(remoteHash, localHash) {
res.LocalUpserts = append(res.LocalUpserts, remoteID)
}
// increment both indices when equal
localIdx += 1
remoteIdx += 1
} else if localID < remoteID {
// item no longer in remote state - needs deleting
res.LocalDeletes = append(res.LocalDeletes, localID)
// increment just the local index
localIdx += 1
} else {
// local state doesn't have this item - needs updating
res.LocalUpserts = append(res.LocalUpserts, remoteID)
// increment just the remote index
remoteIdx += 1
}
}
for ; localIdx < lenLocal; localIdx += 1 {
localID, _, _ := tr.LocalMeta(localIdx)
if localID != "" {
res.LocalDeletes = append(res.LocalDeletes, localID)
} else {
res.LocalSkipped++
}
}
for ; remoteIdx < lenRemote; remoteIdx += 1 {
remoteID, _, _ := tr.RemoteMeta(remoteIdx)
if remoteID != "" {
res.LocalUpserts = append(res.LocalUpserts, remoteID)
} else {
res.RemoteSkipped++
}
}
return res
}
func (s *Server) deleteLocalACLType(ctx context.Context, tr aclTypeReplicator, deletions []string) (bool, error) {
ticker := time.NewTicker(time.Second / time.Duration(s.config.ACLReplicationApplyLimit))
defer ticker.Stop()
for i := 0; i < len(deletions); i += aclBatchDeleteSize {
var batch []string
if i+aclBatchDeleteSize > len(deletions) {
batch = deletions[i:]
} else {
batch = deletions[i : i+aclBatchDeleteSize]
}
if err := tr.DeleteLocalBatch(s, batch); err != nil {
return false, fmt.Errorf("Failed to apply %s deletions: %v", tr.SingularNoun(), err)
}
if i+aclBatchDeleteSize < len(deletions) {
select {
case <-ctx.Done():
return true, nil
case <-ticker.C:
// do nothing - ready for the next batch
}
}
}
return false, nil
}
func (s *Server) updateLocalACLType(ctx context.Context, tr aclTypeReplicator) (bool, error) {
ticker := time.NewTicker(time.Second / time.Duration(s.config.ACLReplicationApplyLimit))
defer ticker.Stop()
lenPending := tr.LenPendingUpdates()
// outer loop handles submitting a batch
for batchStart := 0; batchStart < lenPending; {
// inner loop finds the last element to include in this batch.
batchSize := 0
batchEnd := batchStart
for ; batchEnd < lenPending && batchSize < aclBatchUpsertSize; batchEnd += 1 {
if tr.PendingUpdateIsRedacted(batchEnd) {
return false, fmt.Errorf(
"Detected redacted %s secrets: stopping %s update round - verify that the replication token in use has acl:write permissions.",
tr.SingularNoun(),
tr.SingularNoun(),
)
}
batchSize += tr.PendingUpdateEstimatedSize(batchEnd)
}
err := tr.UpdateLocalBatch(ctx, s, batchStart, batchEnd)
if err != nil {
return false, fmt.Errorf("Failed to apply %s upserts: %v", tr.SingularNoun(), err)
}
s.logger.Printf(
"[DEBUG] acl: %s replication - upserted 1 batch with %d %s of size %d",
tr.SingularNoun(),
batchEnd-batchStart,
tr.PluralNoun(),
batchSize,
)
// items[batchEnd] wasn't include as the slicing doesn't include the element at the stop index
batchStart = batchEnd
// prevent waiting if we are done
if batchEnd < lenPending {
select {
case <-ctx.Done():
return true, nil
case <-ticker.C:
// nothing to do - just rate limiting
}
}
}
return false, nil
}
func (s *Server) fetchACLTokensBatch(tokenIDs []string) (*structs.ACLTokenBatchResponse, error) {
req := structs.ACLTokenBatchGetRequest{
Datacenter: s.config.ACLDatacenter,
AccessorIDs: tokenIDs,
QueryOptions: structs.QueryOptions{
AllowStale: true,
Token: s.tokens.ReplicationToken(),
},
}
var response structs.ACLTokenBatchResponse
if err := s.RPC("ACL.TokenBatchRead", &req, &response); err != nil {
return nil, err
}
return &response, nil
}
func (s *Server) fetchACLTokens(lastRemoteIndex uint64) (*structs.ACLTokenListResponse, error) {
defer metrics.MeasureSince([]string{"leader", "replication", "acl", "token", "fetch"}, time.Now())
req := structs.ACLTokenListRequest{
Datacenter: s.config.ACLDatacenter,
QueryOptions: structs.QueryOptions{
AllowStale: true,
MinQueryIndex: lastRemoteIndex,
Token: s.tokens.ReplicationToken(),
},
IncludeLocal: false,
IncludeGlobal: true,
}
var response structs.ACLTokenListResponse
if err := s.RPC("ACL.TokenList", &req, &response); err != nil {
return nil, err
}
return &response, nil
}
func (s *Server) replicateACLTokens(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
tr := &aclTokenReplicator{}
return s.replicateACLType(ctx, tr, lastRemoteIndex)
}
func (s *Server) replicateACLPolicies(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
tr := &aclPolicyReplicator{}
return s.replicateACLType(ctx, tr, lastRemoteIndex)
}
func (s *Server) replicateACLRoles(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
tr := &aclRoleReplicator{}
return s.replicateACLType(ctx, tr, lastRemoteIndex)
}
func (s *Server) replicateACLType(ctx context.Context, tr aclTypeReplicator, lastRemoteIndex uint64) (uint64, bool, error) {
lenRemote, remoteIndex, err := tr.FetchRemote(s, lastRemoteIndex)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve remote ACL %s: %v", tr.PluralNoun(), err)
}
s.logger.Printf("[DEBUG] acl: finished fetching %s: %d", tr.PluralNoun(), lenRemote)
// Need to check if we should be stopping. This will be common as the fetching process is a blocking
// RPC which could have been hanging around for a long time and during that time leadership could
// have been lost.
select {
case <-ctx.Done():
return 0, true, nil
default:
// do nothing
}
// Measure everything after the remote query, which can block for long
// periods of time. This metric is a good measure of how expensive the
// replication process is.
defer metrics.MeasureSince([]string{"leader", "replication", "acl", tr.SingularNoun(), "apply"}, time.Now())
lenLocal, _, err := tr.FetchLocal(s)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve local ACL %s: %v", tr.PluralNoun(), err)
}
// If the remote index ever goes backwards, it's a good indication that
// the remote side was rebuilt and we should do a full sync since we
// can't make any assumptions about what's going on.
if remoteIndex < lastRemoteIndex {
s.logger.Printf(
"[WARN] consul: ACL %s replication remote index moved backwards (%d to %d), forcing a full ACL %s sync",
tr.SingularNoun(),
lastRemoteIndex,
remoteIndex,
tr.SingularNoun(),
)
lastRemoteIndex = 0
}
s.logger.Printf(
"[DEBUG] acl: %s replication - local: %d, remote: %d",
tr.SingularNoun(),
lenLocal,
lenRemote,
)
// Calculate the changes required to bring the state into sync and then apply them.
res := diffACLType(tr, lastRemoteIndex)
if res.LocalSkipped > 0 || res.RemoteSkipped > 0 {
s.logger.Printf(
"[DEBUG] acl: %s replication - deletions: %d, updates: %d, skipped: %d, skippedRemote: %d",
tr.SingularNoun(),
len(res.LocalDeletes),
len(res.LocalUpserts),
res.LocalSkipped,
res.RemoteSkipped,
)
} else {
s.logger.Printf(
"[DEBUG] acl: %s replication - deletions: %d, updates: %d",
tr.SingularNoun(),
len(res.LocalDeletes),
len(res.LocalUpserts),
)
}
if len(res.LocalUpserts) > 0 {
lenUpdated, err := tr.FetchUpdated(s, res.LocalUpserts)
if err == errContainsRedactedData {
return 0, false, fmt.Errorf("failed to retrieve unredacted %s - replication token in use does not grant acl:write", tr.PluralNoun())
} else if err != nil {
return 0, false, fmt.Errorf("failed to retrieve ACL %s updates: %v", tr.SingularNoun(), err)
}
s.logger.Printf(
"[DEBUG] acl: %s replication - downloaded %d %s",
tr.SingularNoun(),
lenUpdated,
tr.PluralNoun(),
)
}
if len(res.LocalDeletes) > 0 {
s.logger.Printf(
"[DEBUG] acl: %s replication - performing deletions",
tr.SingularNoun(),
)
exit, err := s.deleteLocalACLType(ctx, tr, res.LocalDeletes)
if exit {
return 0, true, nil
}
if err != nil {
return 0, false, fmt.Errorf("failed to delete local ACL %s: %v", tr.PluralNoun(), err)
}
s.logger.Printf("[DEBUG] acl: %s replication - finished deletions", tr.SingularNoun())
}
if len(res.LocalUpserts) > 0 {
s.logger.Printf("[DEBUG] acl: %s replication - performing updates", tr.SingularNoun())
exit, err := s.updateLocalACLType(ctx, tr)
if exit {
return 0, true, nil
}
if err != nil {
return 0, false, fmt.Errorf("failed to update local ACL %s: %v", tr.PluralNoun(), err)
}
s.logger.Printf("[DEBUG] acl: %s replication - finished updates", tr.SingularNoun())
}
// Return the index we got back from the remote side, since we've synced
// up with the remote state as of that index.
return remoteIndex, false, nil
}
// IsACLReplicationEnabled returns true if ACL replication is enabled.
// DEPRECATED (ACL-Legacy-Compat) - with new ACLs at least policy replication is required
func (s *Server) IsACLReplicationEnabled() bool {
authDC := s.config.ACLDatacenter
return len(authDC) > 0 && (authDC != s.config.Datacenter) &&
s.config.ACLTokenReplication
}
func (s *Server) updateACLReplicationStatusError() {
s.aclReplicationStatusLock.Lock()
defer s.aclReplicationStatusLock.Unlock()
s.aclReplicationStatus.LastError = time.Now().Round(time.Second).UTC()
}
func (s *Server) updateACLReplicationStatusIndex(replicationType structs.ACLReplicationType, index uint64) {
s.aclReplicationStatusLock.Lock()
defer s.aclReplicationStatusLock.Unlock()
s.aclReplicationStatus.LastSuccess = time.Now().Round(time.Second).UTC()
switch replicationType {
case structs.ACLReplicateLegacy:
s.aclReplicationStatus.ReplicatedIndex = index
case structs.ACLReplicateTokens:
s.aclReplicationStatus.ReplicatedTokenIndex = index
case structs.ACLReplicatePolicies:
s.aclReplicationStatus.ReplicatedIndex = index
case structs.ACLReplicateRoles:
s.aclReplicationStatus.ReplicatedRoleIndex = index
default:
panic("unknown replication type: " + replicationType.SingularNoun())
}
}
func (s *Server) initReplicationStatus() {
s.aclReplicationStatusLock.Lock()
defer s.aclReplicationStatusLock.Unlock()
s.aclReplicationStatus.Enabled = true
s.aclReplicationStatus.Running = true
s.aclReplicationStatus.SourceDatacenter = s.config.ACLDatacenter
}
func (s *Server) updateACLReplicationStatusStopped() {
s.aclReplicationStatusLock.Lock()
defer s.aclReplicationStatusLock.Unlock()
s.aclReplicationStatus.Running = false
}
func (s *Server) updateACLReplicationStatusRunning(replicationType structs.ACLReplicationType) {
s.aclReplicationStatusLock.Lock()
defer s.aclReplicationStatusLock.Unlock()
// The running state represents which type of overall replication has been
// configured. Though there are various types of internal plumbing for acl
// replication, to the end user there are only 3 distinctly configurable
// variants: legacy, policy, token. Roles replicate with policies so we
// round that up here.
if replicationType == structs.ACLReplicateRoles {
replicationType = structs.ACLReplicatePolicies
}
s.aclReplicationStatus.Running = true
s.aclReplicationStatus.ReplicationType = replicationType
}
func (s *Server) getACLReplicationStatusRunningType() (structs.ACLReplicationType, bool) {
s.aclReplicationStatusLock.RLock()
defer s.aclReplicationStatusLock.RUnlock()
return s.aclReplicationStatus.ReplicationType, s.aclReplicationStatus.Running
}