package consul import ( "bytes" "context" "errors" "fmt" "time" metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/agent/structs" ) const ( // aclReplicationMaxRetryBackoff is the max number of seconds to sleep between ACL replication RPC errors aclReplicationMaxRetryBackoff = 64 ) // aclTypeReplicator allows the machinery of acl replication to be shared between // types with minimal code duplication (barring generics magically popping into // existence). // // Concrete implementations of this interface should internally contain a // pointer to the server so that data lookups can occur, and they should // maintain the smallest quantity of type-specific state they can. // // Implementations of this interface are short-lived and recreated on every // iteration. type aclTypeReplicator interface { // Type is variant of replication in use. Used for updating the replication // status tracker. Type() structs.ACLReplicationType // SingularNoun is the singular form of the item being replicated. SingularNoun() string // PluralNoun is the plural form of the item being replicated. PluralNoun() string // FetchRemote retrieves items newer than the provided index from the // remote datacenter (for diffing purposes). FetchRemote(srv *Server, lastRemoteIndex uint64) (int, uint64, error) // FetchLocal retrieves items from the current datacenter (for diffing // purposes). FetchLocal(srv *Server) (int, uint64, error) // SortState sorts the internal working state output of FetchRemote and // FetchLocal so that a sane diff can be performed. SortState() (lenLocal, lenRemote int) // LocalMeta allows for type-agnostic metadata from the sorted local state // can be retrieved for the purposes of diffing. LocalMeta(i int) (id string, modIndex uint64, hash []byte) // RemoteMeta allows for type-agnostic metadata from the sorted remote // state can be retrieved for the purposes of diffing. RemoteMeta(i int) (id string, modIndex uint64, hash []byte) // FetchUpdated retrieves the specific items from the remote (during the // correction phase). FetchUpdated(srv *Server, updates []string) (int, error) // LenPendingUpdates should be the size of the data retrieved in // FetchUpdated. LenPendingUpdates() int // PendingUpdateIsRedacted returns true if the update contains redacted // data. Really only valid for tokens. PendingUpdateIsRedacted(i int) bool // PendingUpdateEstimatedSize is the item's EstimatedSize in the state // populated by FetchUpdated. PendingUpdateEstimatedSize(i int) int // UpdateLocalBatch applies a portion of the state populated by // FetchUpdated to the current datacenter. UpdateLocalBatch(ctx context.Context, srv *Server, start, end int) error // DeleteLocalBatch removes items from the current datacenter. DeleteLocalBatch(srv *Server, batch []string) error } var errContainsRedactedData = errors.New("replication results contain redacted data") func (s *Server) fetchACLRolesBatch(roleIDs []string) (*structs.ACLRoleBatchResponse, error) { req := structs.ACLRoleBatchGetRequest{ Datacenter: s.config.ACLDatacenter, RoleIDs: roleIDs, QueryOptions: structs.QueryOptions{ AllowStale: true, Token: s.tokens.ReplicationToken(), }, } var response structs.ACLRoleBatchResponse if err := s.RPC("ACL.RoleBatchRead", &req, &response); err != nil { return nil, err } return &response, nil } func (s *Server) fetchACLRoles(lastRemoteIndex uint64) (*structs.ACLRoleListResponse, error) { defer metrics.MeasureSince([]string{"leader", "replication", "acl", "role", "fetch"}, time.Now()) req := structs.ACLRoleListRequest{ Datacenter: s.config.ACLDatacenter, QueryOptions: structs.QueryOptions{ AllowStale: true, MinQueryIndex: lastRemoteIndex, Token: s.tokens.ReplicationToken(), }, } var response structs.ACLRoleListResponse if err := s.RPC("ACL.RoleList", &req, &response); err != nil { return nil, err } return &response, nil } func (s *Server) fetchACLPoliciesBatch(policyIDs []string) (*structs.ACLPolicyBatchResponse, error) { req := structs.ACLPolicyBatchGetRequest{ Datacenter: s.config.ACLDatacenter, PolicyIDs: policyIDs, QueryOptions: structs.QueryOptions{ AllowStale: true, Token: s.tokens.ReplicationToken(), }, } var response structs.ACLPolicyBatchResponse if err := s.RPC("ACL.PolicyBatchRead", &req, &response); err != nil { return nil, err } return &response, nil } func (s *Server) fetchACLPolicies(lastRemoteIndex uint64) (*structs.ACLPolicyListResponse, error) { defer metrics.MeasureSince([]string{"leader", "replication", "acl", "policy", "fetch"}, time.Now()) req := structs.ACLPolicyListRequest{ Datacenter: s.config.ACLDatacenter, QueryOptions: structs.QueryOptions{ AllowStale: true, MinQueryIndex: lastRemoteIndex, Token: s.tokens.ReplicationToken(), }, } var response structs.ACLPolicyListResponse if err := s.RPC("ACL.PolicyList", &req, &response); err != nil { return nil, err } return &response, nil } type itemDiffResults struct { LocalDeletes []string LocalUpserts []string LocalSkipped int RemoteSkipped int } func diffACLType(tr aclTypeReplicator, lastRemoteIndex uint64) itemDiffResults { // Note: items with empty IDs will bubble up to the top (like legacy, unmigrated Tokens) lenLocal, lenRemote := tr.SortState() var res itemDiffResults var localIdx int var remoteIdx int for localIdx, remoteIdx = 0, 0; localIdx < lenLocal && remoteIdx < lenRemote; { localID, _, localHash := tr.LocalMeta(localIdx) remoteID, remoteMod, remoteHash := tr.RemoteMeta(remoteIdx) if localID == "" { res.LocalSkipped++ localIdx += 1 continue } if remoteID == "" { res.RemoteSkipped++ remoteIdx += 1 continue } if localID == remoteID { // item is in both the local and remote state - need to check raft indices and the Hash if remoteMod > lastRemoteIndex && !bytes.Equal(remoteHash, localHash) { res.LocalUpserts = append(res.LocalUpserts, remoteID) } // increment both indices when equal localIdx += 1 remoteIdx += 1 } else if localID < remoteID { // item no longer in remote state - needs deleting res.LocalDeletes = append(res.LocalDeletes, localID) // increment just the local index localIdx += 1 } else { // local state doesn't have this item - needs updating res.LocalUpserts = append(res.LocalUpserts, remoteID) // increment just the remote index remoteIdx += 1 } } for ; localIdx < lenLocal; localIdx += 1 { localID, _, _ := tr.LocalMeta(localIdx) if localID != "" { res.LocalDeletes = append(res.LocalDeletes, localID) } else { res.LocalSkipped++ } } for ; remoteIdx < lenRemote; remoteIdx += 1 { remoteID, _, _ := tr.RemoteMeta(remoteIdx) if remoteID != "" { res.LocalUpserts = append(res.LocalUpserts, remoteID) } else { res.RemoteSkipped++ } } return res } func (s *Server) deleteLocalACLType(ctx context.Context, tr aclTypeReplicator, deletions []string) (bool, error) { ticker := time.NewTicker(time.Second / time.Duration(s.config.ACLReplicationApplyLimit)) defer ticker.Stop() for i := 0; i < len(deletions); i += aclBatchDeleteSize { var batch []string if i+aclBatchDeleteSize > len(deletions) { batch = deletions[i:] } else { batch = deletions[i : i+aclBatchDeleteSize] } if err := tr.DeleteLocalBatch(s, batch); err != nil { return false, fmt.Errorf("Failed to apply %s deletions: %v", tr.SingularNoun(), err) } if i+aclBatchDeleteSize < len(deletions) { select { case <-ctx.Done(): return true, nil case <-ticker.C: // do nothing - ready for the next batch } } } return false, nil } func (s *Server) updateLocalACLType(ctx context.Context, tr aclTypeReplicator) (bool, error) { ticker := time.NewTicker(time.Second / time.Duration(s.config.ACLReplicationApplyLimit)) defer ticker.Stop() lenPending := tr.LenPendingUpdates() // outer loop handles submitting a batch for batchStart := 0; batchStart < lenPending; { // inner loop finds the last element to include in this batch. batchSize := 0 batchEnd := batchStart for ; batchEnd < lenPending && batchSize < aclBatchUpsertSize; batchEnd += 1 { if tr.PendingUpdateIsRedacted(batchEnd) { return false, fmt.Errorf( "Detected redacted %s secrets: stopping %s update round - verify that the replication token in use has acl:write permissions.", tr.SingularNoun(), tr.SingularNoun(), ) } batchSize += tr.PendingUpdateEstimatedSize(batchEnd) } err := tr.UpdateLocalBatch(ctx, s, batchStart, batchEnd) if err != nil { return false, fmt.Errorf("Failed to apply %s upserts: %v", tr.SingularNoun(), err) } s.logger.Printf( "[DEBUG] acl: %s replication - upserted 1 batch with %d %s of size %d", tr.SingularNoun(), batchEnd-batchStart, tr.PluralNoun(), batchSize, ) // items[batchEnd] wasn't include as the slicing doesn't include the element at the stop index batchStart = batchEnd // prevent waiting if we are done if batchEnd < lenPending { select { case <-ctx.Done(): return true, nil case <-ticker.C: // nothing to do - just rate limiting } } } return false, nil } func (s *Server) fetchACLTokensBatch(tokenIDs []string) (*structs.ACLTokenBatchResponse, error) { req := structs.ACLTokenBatchGetRequest{ Datacenter: s.config.ACLDatacenter, AccessorIDs: tokenIDs, QueryOptions: structs.QueryOptions{ AllowStale: true, Token: s.tokens.ReplicationToken(), }, } var response structs.ACLTokenBatchResponse if err := s.RPC("ACL.TokenBatchRead", &req, &response); err != nil { return nil, err } return &response, nil } func (s *Server) fetchACLTokens(lastRemoteIndex uint64) (*structs.ACLTokenListResponse, error) { defer metrics.MeasureSince([]string{"leader", "replication", "acl", "token", "fetch"}, time.Now()) req := structs.ACLTokenListRequest{ Datacenter: s.config.ACLDatacenter, QueryOptions: structs.QueryOptions{ AllowStale: true, MinQueryIndex: lastRemoteIndex, Token: s.tokens.ReplicationToken(), }, IncludeLocal: false, IncludeGlobal: true, } var response structs.ACLTokenListResponse if err := s.RPC("ACL.TokenList", &req, &response); err != nil { return nil, err } return &response, nil } func (s *Server) replicateACLTokens(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) { tr := &aclTokenReplicator{} return s.replicateACLType(ctx, tr, lastRemoteIndex) } func (s *Server) replicateACLPolicies(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) { tr := &aclPolicyReplicator{} return s.replicateACLType(ctx, tr, lastRemoteIndex) } func (s *Server) replicateACLRoles(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) { tr := &aclRoleReplicator{} return s.replicateACLType(ctx, tr, lastRemoteIndex) } func (s *Server) replicateACLType(ctx context.Context, tr aclTypeReplicator, lastRemoteIndex uint64) (uint64, bool, error) { lenRemote, remoteIndex, err := tr.FetchRemote(s, lastRemoteIndex) if err != nil { return 0, false, fmt.Errorf("failed to retrieve remote ACL %s: %v", tr.PluralNoun(), err) } s.logger.Printf("[DEBUG] acl: finished fetching %s: %d", tr.PluralNoun(), lenRemote) // Need to check if we should be stopping. This will be common as the fetching process is a blocking // RPC which could have been hanging around for a long time and during that time leadership could // have been lost. select { case <-ctx.Done(): return 0, true, nil default: // do nothing } // Measure everything after the remote query, which can block for long // periods of time. This metric is a good measure of how expensive the // replication process is. defer metrics.MeasureSince([]string{"leader", "replication", "acl", tr.SingularNoun(), "apply"}, time.Now()) lenLocal, _, err := tr.FetchLocal(s) if err != nil { return 0, false, fmt.Errorf("failed to retrieve local ACL %s: %v", tr.PluralNoun(), err) } // If the remote index ever goes backwards, it's a good indication that // the remote side was rebuilt and we should do a full sync since we // can't make any assumptions about what's going on. if remoteIndex < lastRemoteIndex { s.logger.Printf( "[WARN] consul: ACL %s replication remote index moved backwards (%d to %d), forcing a full ACL %s sync", tr.SingularNoun(), lastRemoteIndex, remoteIndex, tr.SingularNoun(), ) lastRemoteIndex = 0 } s.logger.Printf( "[DEBUG] acl: %s replication - local: %d, remote: %d", tr.SingularNoun(), lenLocal, lenRemote, ) // Calculate the changes required to bring the state into sync and then apply them. res := diffACLType(tr, lastRemoteIndex) if res.LocalSkipped > 0 || res.RemoteSkipped > 0 { s.logger.Printf( "[DEBUG] acl: %s replication - deletions: %d, updates: %d, skipped: %d, skippedRemote: %d", tr.SingularNoun(), len(res.LocalDeletes), len(res.LocalUpserts), res.LocalSkipped, res.RemoteSkipped, ) } else { s.logger.Printf( "[DEBUG] acl: %s replication - deletions: %d, updates: %d", tr.SingularNoun(), len(res.LocalDeletes), len(res.LocalUpserts), ) } if len(res.LocalUpserts) > 0 { lenUpdated, err := tr.FetchUpdated(s, res.LocalUpserts) if err == errContainsRedactedData { return 0, false, fmt.Errorf("failed to retrieve unredacted %s - replication token in use does not grant acl:write", tr.PluralNoun()) } else if err != nil { return 0, false, fmt.Errorf("failed to retrieve ACL %s updates: %v", tr.SingularNoun(), err) } s.logger.Printf( "[DEBUG] acl: %s replication - downloaded %d %s", tr.SingularNoun(), lenUpdated, tr.PluralNoun(), ) } if len(res.LocalDeletes) > 0 { s.logger.Printf( "[DEBUG] acl: %s replication - performing deletions", tr.SingularNoun(), ) exit, err := s.deleteLocalACLType(ctx, tr, res.LocalDeletes) if exit { return 0, true, nil } if err != nil { return 0, false, fmt.Errorf("failed to delete local ACL %s: %v", tr.PluralNoun(), err) } s.logger.Printf("[DEBUG] acl: %s replication - finished deletions", tr.SingularNoun()) } if len(res.LocalUpserts) > 0 { s.logger.Printf("[DEBUG] acl: %s replication - performing updates", tr.SingularNoun()) exit, err := s.updateLocalACLType(ctx, tr) if exit { return 0, true, nil } if err != nil { return 0, false, fmt.Errorf("failed to update local ACL %s: %v", tr.PluralNoun(), err) } s.logger.Printf("[DEBUG] acl: %s replication - finished updates", tr.SingularNoun()) } // Return the index we got back from the remote side, since we've synced // up with the remote state as of that index. return remoteIndex, false, nil } // IsACLReplicationEnabled returns true if ACL replication is enabled. // DEPRECATED (ACL-Legacy-Compat) - with new ACLs at least policy replication is required func (s *Server) IsACLReplicationEnabled() bool { authDC := s.config.ACLDatacenter return len(authDC) > 0 && (authDC != s.config.Datacenter) && s.config.ACLTokenReplication } func (s *Server) updateACLReplicationStatusError() { s.aclReplicationStatusLock.Lock() defer s.aclReplicationStatusLock.Unlock() s.aclReplicationStatus.LastError = time.Now().Round(time.Second).UTC() } func (s *Server) updateACLReplicationStatusIndex(replicationType structs.ACLReplicationType, index uint64) { s.aclReplicationStatusLock.Lock() defer s.aclReplicationStatusLock.Unlock() s.aclReplicationStatus.LastSuccess = time.Now().Round(time.Second).UTC() switch replicationType { case structs.ACLReplicateLegacy: s.aclReplicationStatus.ReplicatedIndex = index case structs.ACLReplicateTokens: s.aclReplicationStatus.ReplicatedTokenIndex = index case structs.ACLReplicatePolicies: s.aclReplicationStatus.ReplicatedIndex = index case structs.ACLReplicateRoles: s.aclReplicationStatus.ReplicatedRoleIndex = index default: panic("unknown replication type: " + replicationType.SingularNoun()) } } func (s *Server) initReplicationStatus() { s.aclReplicationStatusLock.Lock() defer s.aclReplicationStatusLock.Unlock() s.aclReplicationStatus.Enabled = true s.aclReplicationStatus.Running = true s.aclReplicationStatus.SourceDatacenter = s.config.ACLDatacenter } func (s *Server) updateACLReplicationStatusStopped() { s.aclReplicationStatusLock.Lock() defer s.aclReplicationStatusLock.Unlock() s.aclReplicationStatus.Running = false } func (s *Server) updateACLReplicationStatusRunning(replicationType structs.ACLReplicationType) { s.aclReplicationStatusLock.Lock() defer s.aclReplicationStatusLock.Unlock() // The running state represents which type of overall replication has been // configured. Though there are various types of internal plumbing for acl // replication, to the end user there are only 3 distinctly configurable // variants: legacy, policy, token. Roles replicate with policies so we // round that up here. if replicationType == structs.ACLReplicateRoles { replicationType = structs.ACLReplicatePolicies } s.aclReplicationStatus.Running = true s.aclReplicationStatus.ReplicationType = replicationType } func (s *Server) getACLReplicationStatusRunningType() (structs.ACLReplicationType, bool) { s.aclReplicationStatusLock.RLock() defer s.aclReplicationStatusLock.RUnlock() return s.aclReplicationStatus.ReplicationType, s.aclReplicationStatus.Running } func (s *Server) getACLReplicationStatus() structs.ACLReplicationStatus { s.aclReplicationStatusLock.RLock() defer s.aclReplicationStatusLock.RUnlock() return s.aclReplicationStatus }