9542fdc9bc
Roles are named and can express the same bundle of permissions that can currently be assigned to a Token (lists of Policies and Service Identities). The difference with a Role is that it not itself a bearer token, but just another entity that can be tied to a Token. This lets an operator potentially curate a set of smaller reusable Policies and compose them together into reusable Roles, rather than always exploding that same list of Policies on any Token that needs similar permissions. This also refactors the acl replication code to be semi-generic to avoid 3x copypasta.
556 lines
17 KiB
Go
556 lines
17 KiB
Go
package consul
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
metrics "github.com/armon/go-metrics"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
)
|
|
|
|
const (
|
|
// aclReplicationMaxRetryBackoff is the max number of seconds to sleep between ACL replication RPC errors
|
|
aclReplicationMaxRetryBackoff = 64
|
|
)
|
|
|
|
// aclTypeReplicator allows the machinery of acl replication to be shared between
|
|
// types with minimal code duplication (barring generics magically popping into
|
|
// existence).
|
|
//
|
|
// Concrete implementations of this interface should internally contain a
|
|
// pointer to the server so that data lookups can occur, and they should
|
|
// maintain the smallest quantity of type-specific state they can.
|
|
//
|
|
// Implementations of this interface are short-lived and recreated on every
|
|
// iteration.
|
|
type aclTypeReplicator interface {
|
|
// Type is variant of replication in use. Used for updating the replication
|
|
// status tracker.
|
|
Type() structs.ACLReplicationType
|
|
|
|
// SingularNoun is the singular form of the item being replicated.
|
|
SingularNoun() string
|
|
|
|
// PluralNoun is the plural form of the item being replicated.
|
|
PluralNoun() string
|
|
|
|
// FetchRemote retrieves items newer than the provided index from the
|
|
// remote datacenter (for diffing purposes).
|
|
FetchRemote(srv *Server, lastRemoteIndex uint64) (int, uint64, error)
|
|
|
|
// FetchLocal retrieves items from the current datacenter (for diffing
|
|
// purposes).
|
|
FetchLocal(srv *Server) (int, uint64, error)
|
|
|
|
// SortState sorts the internal working state output of FetchRemote and
|
|
// FetchLocal so that a sane diff can be performed.
|
|
SortState() (lenLocal, lenRemote int)
|
|
|
|
// LocalMeta allows for type-agnostic metadata from the sorted local state
|
|
// can be retrieved for the purposes of diffing.
|
|
LocalMeta(i int) (id string, modIndex uint64, hash []byte)
|
|
|
|
// RemoteMeta allows for type-agnostic metadata from the sorted remote
|
|
// state can be retrieved for the purposes of diffing.
|
|
RemoteMeta(i int) (id string, modIndex uint64, hash []byte)
|
|
|
|
// FetchUpdated retrieves the specific items from the remote (during the
|
|
// correction phase).
|
|
FetchUpdated(srv *Server, updates []string) (int, error)
|
|
|
|
// LenPendingUpdates should be the size of the data retrieved in
|
|
// FetchUpdated.
|
|
LenPendingUpdates() int
|
|
|
|
// PendingUpdateIsRedacted returns true if the update contains redacted
|
|
// data. Really only valid for tokens.
|
|
PendingUpdateIsRedacted(i int) bool
|
|
|
|
// PendingUpdateEstimatedSize is the item's EstimatedSize in the state
|
|
// populated by FetchUpdated.
|
|
PendingUpdateEstimatedSize(i int) int
|
|
|
|
// UpdateLocalBatch applies a portion of the state populated by
|
|
// FetchUpdated to the current datacenter.
|
|
UpdateLocalBatch(ctx context.Context, srv *Server, start, end int) error
|
|
|
|
// DeleteLocalBatch removes items from the current datacenter.
|
|
DeleteLocalBatch(srv *Server, batch []string) error
|
|
}
|
|
|
|
var errContainsRedactedData = errors.New("replication results contain redacted data")
|
|
|
|
func (s *Server) fetchACLRolesBatch(roleIDs []string) (*structs.ACLRoleBatchResponse, error) {
|
|
req := structs.ACLRoleBatchGetRequest{
|
|
Datacenter: s.config.ACLDatacenter,
|
|
RoleIDs: roleIDs,
|
|
QueryOptions: structs.QueryOptions{
|
|
AllowStale: true,
|
|
Token: s.tokens.ReplicationToken(),
|
|
},
|
|
}
|
|
|
|
var response structs.ACLRoleBatchResponse
|
|
if err := s.RPC("ACL.RoleBatchRead", &req, &response); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &response, nil
|
|
}
|
|
|
|
func (s *Server) fetchACLRoles(lastRemoteIndex uint64) (*structs.ACLRoleListResponse, error) {
|
|
defer metrics.MeasureSince([]string{"leader", "replication", "acl", "role", "fetch"}, time.Now())
|
|
|
|
req := structs.ACLRoleListRequest{
|
|
Datacenter: s.config.ACLDatacenter,
|
|
QueryOptions: structs.QueryOptions{
|
|
AllowStale: true,
|
|
MinQueryIndex: lastRemoteIndex,
|
|
Token: s.tokens.ReplicationToken(),
|
|
},
|
|
}
|
|
|
|
var response structs.ACLRoleListResponse
|
|
if err := s.RPC("ACL.RoleList", &req, &response); err != nil {
|
|
return nil, err
|
|
}
|
|
return &response, nil
|
|
}
|
|
|
|
func (s *Server) fetchACLPoliciesBatch(policyIDs []string) (*structs.ACLPolicyBatchResponse, error) {
|
|
req := structs.ACLPolicyBatchGetRequest{
|
|
Datacenter: s.config.ACLDatacenter,
|
|
PolicyIDs: policyIDs,
|
|
QueryOptions: structs.QueryOptions{
|
|
AllowStale: true,
|
|
Token: s.tokens.ReplicationToken(),
|
|
},
|
|
}
|
|
|
|
var response structs.ACLPolicyBatchResponse
|
|
if err := s.RPC("ACL.PolicyBatchRead", &req, &response); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &response, nil
|
|
}
|
|
|
|
func (s *Server) fetchACLPolicies(lastRemoteIndex uint64) (*structs.ACLPolicyListResponse, error) {
|
|
defer metrics.MeasureSince([]string{"leader", "replication", "acl", "policy", "fetch"}, time.Now())
|
|
|
|
req := structs.ACLPolicyListRequest{
|
|
Datacenter: s.config.ACLDatacenter,
|
|
QueryOptions: structs.QueryOptions{
|
|
AllowStale: true,
|
|
MinQueryIndex: lastRemoteIndex,
|
|
Token: s.tokens.ReplicationToken(),
|
|
},
|
|
}
|
|
|
|
var response structs.ACLPolicyListResponse
|
|
if err := s.RPC("ACL.PolicyList", &req, &response); err != nil {
|
|
return nil, err
|
|
}
|
|
return &response, nil
|
|
}
|
|
|
|
type itemDiffResults struct {
|
|
LocalDeletes []string
|
|
LocalUpserts []string
|
|
LocalSkipped int
|
|
RemoteSkipped int
|
|
}
|
|
|
|
func diffACLType(tr aclTypeReplicator, lastRemoteIndex uint64) itemDiffResults {
|
|
// Note: items with empty IDs will bubble up to the top (like legacy, unmigrated Tokens)
|
|
|
|
lenLocal, lenRemote := tr.SortState()
|
|
|
|
var res itemDiffResults
|
|
var localIdx int
|
|
var remoteIdx int
|
|
for localIdx, remoteIdx = 0, 0; localIdx < lenLocal && remoteIdx < lenRemote; {
|
|
localID, _, localHash := tr.LocalMeta(localIdx)
|
|
remoteID, remoteMod, remoteHash := tr.RemoteMeta(remoteIdx)
|
|
|
|
if localID == "" {
|
|
res.LocalSkipped++
|
|
localIdx += 1
|
|
continue
|
|
}
|
|
if remoteID == "" {
|
|
res.RemoteSkipped++
|
|
remoteIdx += 1
|
|
continue
|
|
}
|
|
|
|
if localID == remoteID {
|
|
// item is in both the local and remote state - need to check raft indices and the Hash
|
|
if remoteMod > lastRemoteIndex && !bytes.Equal(remoteHash, localHash) {
|
|
res.LocalUpserts = append(res.LocalUpserts, remoteID)
|
|
}
|
|
// increment both indices when equal
|
|
localIdx += 1
|
|
remoteIdx += 1
|
|
} else if localID < remoteID {
|
|
// item no longer in remote state - needs deleting
|
|
res.LocalDeletes = append(res.LocalDeletes, localID)
|
|
|
|
// increment just the local index
|
|
localIdx += 1
|
|
} else {
|
|
// local state doesn't have this item - needs updating
|
|
res.LocalUpserts = append(res.LocalUpserts, remoteID)
|
|
|
|
// increment just the remote index
|
|
remoteIdx += 1
|
|
}
|
|
}
|
|
|
|
for ; localIdx < lenLocal; localIdx += 1 {
|
|
localID, _, _ := tr.LocalMeta(localIdx)
|
|
if localID != "" {
|
|
res.LocalDeletes = append(res.LocalDeletes, localID)
|
|
} else {
|
|
res.LocalSkipped++
|
|
}
|
|
}
|
|
|
|
for ; remoteIdx < lenRemote; remoteIdx += 1 {
|
|
remoteID, _, _ := tr.RemoteMeta(remoteIdx)
|
|
if remoteID != "" {
|
|
res.LocalUpserts = append(res.LocalUpserts, remoteID)
|
|
} else {
|
|
res.RemoteSkipped++
|
|
}
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func (s *Server) deleteLocalACLType(ctx context.Context, tr aclTypeReplicator, deletions []string) (bool, error) {
|
|
ticker := time.NewTicker(time.Second / time.Duration(s.config.ACLReplicationApplyLimit))
|
|
defer ticker.Stop()
|
|
|
|
for i := 0; i < len(deletions); i += aclBatchDeleteSize {
|
|
var batch []string
|
|
|
|
if i+aclBatchDeleteSize > len(deletions) {
|
|
batch = deletions[i:]
|
|
} else {
|
|
batch = deletions[i : i+aclBatchDeleteSize]
|
|
}
|
|
|
|
if err := tr.DeleteLocalBatch(s, batch); err != nil {
|
|
return false, fmt.Errorf("Failed to apply %s deletions: %v", tr.SingularNoun(), err)
|
|
}
|
|
|
|
if i+aclBatchDeleteSize < len(deletions) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return true, nil
|
|
case <-ticker.C:
|
|
// do nothing - ready for the next batch
|
|
}
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func (s *Server) updateLocalACLType(ctx context.Context, tr aclTypeReplicator) (bool, error) {
|
|
ticker := time.NewTicker(time.Second / time.Duration(s.config.ACLReplicationApplyLimit))
|
|
defer ticker.Stop()
|
|
|
|
lenPending := tr.LenPendingUpdates()
|
|
|
|
// outer loop handles submitting a batch
|
|
for batchStart := 0; batchStart < lenPending; {
|
|
// inner loop finds the last element to include in this batch.
|
|
batchSize := 0
|
|
batchEnd := batchStart
|
|
for ; batchEnd < lenPending && batchSize < aclBatchUpsertSize; batchEnd += 1 {
|
|
if tr.PendingUpdateIsRedacted(batchEnd) {
|
|
return false, fmt.Errorf(
|
|
"Detected redacted %s secrets: stopping %s update round - verify that the replication token in use has acl:write permissions.",
|
|
tr.SingularNoun(),
|
|
tr.SingularNoun(),
|
|
)
|
|
}
|
|
batchSize += tr.PendingUpdateEstimatedSize(batchEnd)
|
|
}
|
|
|
|
err := tr.UpdateLocalBatch(ctx, s, batchStart, batchEnd)
|
|
if err != nil {
|
|
return false, fmt.Errorf("Failed to apply %s upserts: %v", tr.SingularNoun(), err)
|
|
}
|
|
s.logger.Printf(
|
|
"[DEBUG] acl: %s replication - upserted 1 batch with %d %s of size %d",
|
|
tr.SingularNoun(),
|
|
batchEnd-batchStart,
|
|
tr.PluralNoun(),
|
|
batchSize,
|
|
)
|
|
|
|
// items[batchEnd] wasn't include as the slicing doesn't include the element at the stop index
|
|
batchStart = batchEnd
|
|
|
|
// prevent waiting if we are done
|
|
if batchEnd < lenPending {
|
|
select {
|
|
case <-ctx.Done():
|
|
return true, nil
|
|
case <-ticker.C:
|
|
// nothing to do - just rate limiting
|
|
}
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func (s *Server) fetchACLTokensBatch(tokenIDs []string) (*structs.ACLTokenBatchResponse, error) {
|
|
req := structs.ACLTokenBatchGetRequest{
|
|
Datacenter: s.config.ACLDatacenter,
|
|
AccessorIDs: tokenIDs,
|
|
QueryOptions: structs.QueryOptions{
|
|
AllowStale: true,
|
|
Token: s.tokens.ReplicationToken(),
|
|
},
|
|
}
|
|
|
|
var response structs.ACLTokenBatchResponse
|
|
if err := s.RPC("ACL.TokenBatchRead", &req, &response); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &response, nil
|
|
}
|
|
|
|
func (s *Server) fetchACLTokens(lastRemoteIndex uint64) (*structs.ACLTokenListResponse, error) {
|
|
defer metrics.MeasureSince([]string{"leader", "replication", "acl", "token", "fetch"}, time.Now())
|
|
|
|
req := structs.ACLTokenListRequest{
|
|
Datacenter: s.config.ACLDatacenter,
|
|
QueryOptions: structs.QueryOptions{
|
|
AllowStale: true,
|
|
MinQueryIndex: lastRemoteIndex,
|
|
Token: s.tokens.ReplicationToken(),
|
|
},
|
|
IncludeLocal: false,
|
|
IncludeGlobal: true,
|
|
}
|
|
|
|
var response structs.ACLTokenListResponse
|
|
if err := s.RPC("ACL.TokenList", &req, &response); err != nil {
|
|
return nil, err
|
|
}
|
|
return &response, nil
|
|
}
|
|
|
|
func (s *Server) replicateACLTokens(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
|
|
tr := &aclTokenReplicator{}
|
|
return s.replicateACLType(ctx, tr, lastRemoteIndex)
|
|
}
|
|
|
|
func (s *Server) replicateACLPolicies(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
|
|
tr := &aclPolicyReplicator{}
|
|
return s.replicateACLType(ctx, tr, lastRemoteIndex)
|
|
}
|
|
|
|
func (s *Server) replicateACLRoles(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
|
|
tr := &aclRoleReplicator{}
|
|
return s.replicateACLType(ctx, tr, lastRemoteIndex)
|
|
}
|
|
|
|
func (s *Server) replicateACLType(ctx context.Context, tr aclTypeReplicator, lastRemoteIndex uint64) (uint64, bool, error) {
|
|
lenRemote, remoteIndex, err := tr.FetchRemote(s, lastRemoteIndex)
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("failed to retrieve remote ACL %s: %v", tr.PluralNoun(), err)
|
|
}
|
|
|
|
s.logger.Printf("[DEBUG] acl: finished fetching %s: %d", tr.PluralNoun(), lenRemote)
|
|
|
|
// Need to check if we should be stopping. This will be common as the fetching process is a blocking
|
|
// RPC which could have been hanging around for a long time and during that time leadership could
|
|
// have been lost.
|
|
select {
|
|
case <-ctx.Done():
|
|
return 0, true, nil
|
|
default:
|
|
// do nothing
|
|
}
|
|
|
|
// Measure everything after the remote query, which can block for long
|
|
// periods of time. This metric is a good measure of how expensive the
|
|
// replication process is.
|
|
defer metrics.MeasureSince([]string{"leader", "replication", "acl", tr.SingularNoun(), "apply"}, time.Now())
|
|
|
|
lenLocal, _, err := tr.FetchLocal(s)
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("failed to retrieve local ACL %s: %v", tr.PluralNoun(), err)
|
|
}
|
|
|
|
// If the remote index ever goes backwards, it's a good indication that
|
|
// the remote side was rebuilt and we should do a full sync since we
|
|
// can't make any assumptions about what's going on.
|
|
if remoteIndex < lastRemoteIndex {
|
|
s.logger.Printf(
|
|
"[WARN] consul: ACL %s replication remote index moved backwards (%d to %d), forcing a full ACL %s sync",
|
|
tr.SingularNoun(),
|
|
lastRemoteIndex,
|
|
remoteIndex,
|
|
tr.SingularNoun(),
|
|
)
|
|
lastRemoteIndex = 0
|
|
}
|
|
|
|
s.logger.Printf(
|
|
"[DEBUG] acl: %s replication - local: %d, remote: %d",
|
|
tr.SingularNoun(),
|
|
lenLocal,
|
|
lenRemote,
|
|
)
|
|
// Calculate the changes required to bring the state into sync and then apply them.
|
|
res := diffACLType(tr, lastRemoteIndex)
|
|
if res.LocalSkipped > 0 || res.RemoteSkipped > 0 {
|
|
s.logger.Printf(
|
|
"[DEBUG] acl: %s replication - deletions: %d, updates: %d, skipped: %d, skippedRemote: %d",
|
|
tr.SingularNoun(),
|
|
len(res.LocalDeletes),
|
|
len(res.LocalUpserts),
|
|
res.LocalSkipped,
|
|
res.RemoteSkipped,
|
|
)
|
|
} else {
|
|
s.logger.Printf(
|
|
"[DEBUG] acl: %s replication - deletions: %d, updates: %d",
|
|
tr.SingularNoun(),
|
|
len(res.LocalDeletes),
|
|
len(res.LocalUpserts),
|
|
)
|
|
}
|
|
|
|
if len(res.LocalUpserts) > 0 {
|
|
lenUpdated, err := tr.FetchUpdated(s, res.LocalUpserts)
|
|
if err == errContainsRedactedData {
|
|
return 0, false, fmt.Errorf("failed to retrieve unredacted %s - replication token in use does not grant acl:write", tr.PluralNoun())
|
|
} else if err != nil {
|
|
return 0, false, fmt.Errorf("failed to retrieve ACL %s updates: %v", tr.SingularNoun(), err)
|
|
}
|
|
s.logger.Printf(
|
|
"[DEBUG] acl: %s replication - downloaded %d %s",
|
|
tr.SingularNoun(),
|
|
lenUpdated,
|
|
tr.PluralNoun(),
|
|
)
|
|
}
|
|
|
|
if len(res.LocalDeletes) > 0 {
|
|
s.logger.Printf(
|
|
"[DEBUG] acl: %s replication - performing deletions",
|
|
tr.SingularNoun(),
|
|
)
|
|
|
|
exit, err := s.deleteLocalACLType(ctx, tr, res.LocalDeletes)
|
|
if exit {
|
|
return 0, true, nil
|
|
}
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("failed to delete local ACL %s: %v", tr.PluralNoun(), err)
|
|
}
|
|
s.logger.Printf("[DEBUG] acl: %s replication - finished deletions", tr.SingularNoun())
|
|
}
|
|
|
|
if len(res.LocalUpserts) > 0 {
|
|
s.logger.Printf("[DEBUG] acl: %s replication - performing updates", tr.SingularNoun())
|
|
exit, err := s.updateLocalACLType(ctx, tr)
|
|
if exit {
|
|
return 0, true, nil
|
|
}
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("failed to update local ACL %s: %v", tr.PluralNoun(), err)
|
|
}
|
|
s.logger.Printf("[DEBUG] acl: %s replication - finished updates", tr.SingularNoun())
|
|
}
|
|
|
|
// Return the index we got back from the remote side, since we've synced
|
|
// up with the remote state as of that index.
|
|
return remoteIndex, false, nil
|
|
}
|
|
|
|
// IsACLReplicationEnabled returns true if ACL replication is enabled.
|
|
// DEPRECATED (ACL-Legacy-Compat) - with new ACLs at least policy replication is required
|
|
func (s *Server) IsACLReplicationEnabled() bool {
|
|
authDC := s.config.ACLDatacenter
|
|
return len(authDC) > 0 && (authDC != s.config.Datacenter) &&
|
|
s.config.ACLTokenReplication
|
|
}
|
|
|
|
func (s *Server) updateACLReplicationStatusError() {
|
|
s.aclReplicationStatusLock.Lock()
|
|
defer s.aclReplicationStatusLock.Unlock()
|
|
|
|
s.aclReplicationStatus.LastError = time.Now().Round(time.Second).UTC()
|
|
}
|
|
|
|
func (s *Server) updateACLReplicationStatusIndex(replicationType structs.ACLReplicationType, index uint64) {
|
|
s.aclReplicationStatusLock.Lock()
|
|
defer s.aclReplicationStatusLock.Unlock()
|
|
|
|
s.aclReplicationStatus.LastSuccess = time.Now().Round(time.Second).UTC()
|
|
switch replicationType {
|
|
case structs.ACLReplicateLegacy:
|
|
s.aclReplicationStatus.ReplicatedIndex = index
|
|
case structs.ACLReplicateTokens:
|
|
s.aclReplicationStatus.ReplicatedTokenIndex = index
|
|
case structs.ACLReplicatePolicies:
|
|
s.aclReplicationStatus.ReplicatedIndex = index
|
|
case structs.ACLReplicateRoles:
|
|
s.aclReplicationStatus.ReplicatedRoleIndex = index
|
|
default:
|
|
panic("unknown replication type: " + replicationType.SingularNoun())
|
|
}
|
|
}
|
|
|
|
func (s *Server) initReplicationStatus() {
|
|
s.aclReplicationStatusLock.Lock()
|
|
defer s.aclReplicationStatusLock.Unlock()
|
|
|
|
s.aclReplicationStatus.Enabled = true
|
|
s.aclReplicationStatus.Running = true
|
|
s.aclReplicationStatus.SourceDatacenter = s.config.ACLDatacenter
|
|
}
|
|
|
|
func (s *Server) updateACLReplicationStatusStopped() {
|
|
s.aclReplicationStatusLock.Lock()
|
|
defer s.aclReplicationStatusLock.Unlock()
|
|
|
|
s.aclReplicationStatus.Running = false
|
|
}
|
|
|
|
func (s *Server) updateACLReplicationStatusRunning(replicationType structs.ACLReplicationType) {
|
|
s.aclReplicationStatusLock.Lock()
|
|
defer s.aclReplicationStatusLock.Unlock()
|
|
|
|
// The running state represents which type of overall replication has been
|
|
// configured. Though there are various types of internal plumbing for acl
|
|
// replication, to the end user there are only 3 distinctly configurable
|
|
// variants: legacy, policy, token. Roles replicate with policies so we
|
|
// round that up here.
|
|
if replicationType == structs.ACLReplicateRoles {
|
|
replicationType = structs.ACLReplicatePolicies
|
|
}
|
|
|
|
s.aclReplicationStatus.Running = true
|
|
s.aclReplicationStatus.ReplicationType = replicationType
|
|
}
|
|
|
|
func (s *Server) getACLReplicationStatusRunningType() (structs.ACLReplicationType, bool) {
|
|
s.aclReplicationStatusLock.RLock()
|
|
defer s.aclReplicationStatusLock.RUnlock()
|
|
return s.aclReplicationStatus.ReplicationType, s.aclReplicationStatus.Running
|
|
}
|