2016-08-03 05:04:11 +00:00
package consul
import (
"fmt"
"sort"
"time"
"github.com/armon/go-metrics"
pkg refactor
command/agent/* -> agent/*
command/consul/* -> agent/consul/*
command/agent/command{,_test}.go -> command/agent{,_test}.go
command/base/command.go -> command/base.go
command/base/* -> command/*
commands.go -> command/commands.go
The script which did the refactor is:
(
cd $GOPATH/src/github.com/hashicorp/consul
git mv command/agent/command.go command/agent.go
git mv command/agent/command_test.go command/agent_test.go
git mv command/agent/flag_slice_value{,_test}.go command/
git mv command/agent .
git mv command/base/command.go command/base.go
git mv command/base/config_util{,_test}.go command/
git mv commands.go command/
git mv consul agent
rmdir command/base/
gsed -i -e 's|package agent|package command|' command/agent{,_test}.go
gsed -i -e 's|package agent|package command|' command/flag_slice_value{,_test}.go
gsed -i -e 's|package base|package command|' command/base.go command/config_util{,_test}.go
gsed -i -e 's|package main|package command|' command/commands.go
gsed -i -e 's|base.Command|BaseCommand|' command/commands.go
gsed -i -e 's|agent.Command|AgentCommand|' command/commands.go
gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/commands.go
gsed -i -e 's|base\.||' command/commands.go
gsed -i -e 's|command\.||' command/commands.go
gsed -i -e 's|command|c|' main.go
gsed -i -e 's|range Commands|range command.Commands|' main.go
gsed -i -e 's|Commands: Commands|Commands: command.Commands|' main.go
gsed -i -e 's|base\.BoolValue|BoolValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.DurationValue|DurationValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.StringValue|StringValue|' command/operator_autopilot_set.go
gsed -i -e 's|base\.UintValue|UintValue|' command/operator_autopilot_set.go
gsed -i -e 's|\bCommand\b|BaseCommand|' command/base.go
gsed -i -e 's|BaseCommand Options|Command Options|' command/base.go
gsed -i -e 's|base.Command|BaseCommand|' command/*.go
gsed -i -e 's|c\.Command|c.BaseCommand|g' command/*.go
gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/*_test.go
gsed -i -e 's|base\.||' command/*_test.go
gsed -i -e 's|\bCommand\b|AgentCommand|' command/agent{,_test}.go
gsed -i -e 's|cmd.AgentCommand|cmd.BaseCommand|' command/agent.go
gsed -i -e 's|cli.AgentCommand = new(Command)|cli.Command = new(AgentCommand)|' command/agent_test.go
gsed -i -e 's|exec.AgentCommand|exec.Command|' command/agent_test.go
gsed -i -e 's|exec.BaseCommand|exec.Command|' command/agent_test.go
gsed -i -e 's|NewTestAgent|agent.NewTestAgent|' command/agent_test.go
gsed -i -e 's|= TestConfig|= agent.TestConfig|' command/agent_test.go
gsed -i -e 's|: RetryJoin|: agent.RetryJoin|' command/agent_test.go
gsed -i -e 's|\.\./\.\./|../|' command/config_util_test.go
gsed -i -e 's|\bverifyUniqueListeners|VerifyUniqueListeners|' agent/config{,_test}.go command/agent.go
gsed -i -e 's|\bserfLANKeyring\b|SerfLANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go
gsed -i -e 's|\bserfWANKeyring\b|SerfWANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go
gsed -i -e 's|\bNewAgent\b|agent.New|g' command/agent{,_test}.go
gsed -i -e 's|\bNewAgent|New|' agent/{acl_test,agent,testagent}.go
gsed -i -e 's|\bAgent\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bBool\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bDefaultConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bDevConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bMergeConfig\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bReadConfigPaths\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bParseMetaPair\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bSerfLANKeyring\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|\bSerfWANKeyring\b|agent.&|g' command/agent{,_test}.go
gsed -i -e 's|circonus\.agent|circonus|g' command/agent{,_test}.go
gsed -i -e 's|logger\.agent|logger|g' command/agent{,_test}.go
gsed -i -e 's|metrics\.agent|metrics|g' command/agent{,_test}.go
gsed -i -e 's|// agent.Agent|// agent|' command/agent{,_test}.go
gsed -i -e 's|a\.agent\.Config|a.Config|' command/agent{,_test}.go
gsed -i -e 's|agent\.AppendSliceValue|AppendSliceValue|' command/{configtest,validate}.go
gsed -i -e 's|consul/consul|agent/consul|' GNUmakefile
gsed -i -e 's|\.\./test|../../test|' agent/consul/server_test.go
# fix imports
f=$(grep -rl 'github.com/hashicorp/consul/command/agent' * | grep '\.go')
gsed -i -e 's|github.com/hashicorp/consul/command/agent|github.com/hashicorp/consul/agent|' $f
goimports -w $f
f=$(grep -rl 'github.com/hashicorp/consul/consul' * | grep '\.go')
gsed -i -e 's|github.com/hashicorp/consul/consul|github.com/hashicorp/consul/agent/consul|' $f
goimports -w $f
goimports -w command/*.go main.go
)
2017-06-09 22:28:28 +00:00
"github.com/hashicorp/consul/agent/consul/structs"
2016-08-03 05:04:11 +00:00
"github.com/hashicorp/consul/lib"
)
2016-08-09 18:08:26 +00:00
// aclIterator simplifies the algorithm below by providing a basic iterator that
// moves through a list of ACLs and returns nil when it's exhausted. It also has
// methods for pre-sorting the ACLs being iterated over by ID, which should
// already be true, but since this is crucial for correctness and we are taking
// input from other servers, we sort to make sure.
type aclIterator struct {
2016-08-03 05:04:11 +00:00
acls structs . ACLs
2016-08-09 18:08:26 +00:00
// index is the current position of the iterator.
index int
}
// newACLIterator returns a new ACL iterator.
func newACLIterator ( acls structs . ACLs ) * aclIterator {
return & aclIterator { acls : acls }
2016-08-03 05:04:11 +00:00
}
// See sort.Interface.
2016-08-09 18:08:26 +00:00
func ( a * aclIterator ) Len ( ) int {
2016-08-03 05:04:11 +00:00
return len ( a . acls )
}
// See sort.Interface.
2016-08-09 18:08:26 +00:00
func ( a * aclIterator ) Swap ( i , j int ) {
2016-08-03 05:04:11 +00:00
a . acls [ i ] , a . acls [ j ] = a . acls [ j ] , a . acls [ i ]
}
// See sort.Interface.
2016-08-09 18:08:26 +00:00
func ( a * aclIterator ) Less ( i , j int ) bool {
2016-08-03 05:04:11 +00:00
return a . acls [ i ] . ID < a . acls [ j ] . ID
}
// Front returns the item at index position, or nil if the list is exhausted.
func ( a * aclIterator ) Front ( ) * structs . ACL {
if a . index < len ( a . acls ) {
return a . acls [ a . index ]
}
2017-04-21 01:59:42 +00:00
return nil
2016-08-03 05:04:11 +00:00
}
// Next advances the iterator to the next index.
func ( a * aclIterator ) Next ( ) {
a . index ++
}
// reconcileACLs takes the local and remote ACL state, and produces a list of
// changes required in order to bring the local ACLs into sync with the remote
// ACLs. You can supply lastRemoteIndex as a hint that replication has succeeded
// up to that remote index and it will make this process more efficient by only
// comparing ACL entries modified after that index. Setting this to 0 will force
// a full compare of all existing ACLs.
func reconcileACLs ( local , remote structs . ACLs , lastRemoteIndex uint64 ) structs . ACLRequests {
// Since sorting the lists is crucial for correctness, we are depending
// on data coming from other servers potentially running a different,
// version of Consul, and sorted-ness is kind of a subtle property of
// the state store indexing, it's prudent to make sure things are sorted
// before we begin.
2016-08-09 18:08:26 +00:00
localIter , remoteIter := newACLIterator ( local ) , newACLIterator ( remote )
sort . Sort ( localIter )
sort . Sort ( remoteIter )
2016-08-03 05:04:11 +00:00
// Run through both lists and reconcile them.
var changes structs . ACLRequests
for localIter . Front ( ) != nil || remoteIter . Front ( ) != nil {
// If the local list is exhausted, then process this as a remote
// add. We know from the loop condition that there's something
// in the remote list.
if localIter . Front ( ) == nil {
changes = append ( changes , & structs . ACLRequest {
Op : structs . ACLSet ,
ACL : * ( remoteIter . Front ( ) ) ,
} )
remoteIter . Next ( )
continue
}
// If the remote list is exhausted, then process this as a local
// delete. We know from the loop condition that there's something
// in the local list.
if remoteIter . Front ( ) == nil {
changes = append ( changes , & structs . ACLRequest {
Op : structs . ACLDelete ,
ACL : * ( localIter . Front ( ) ) ,
} )
localIter . Next ( )
continue
}
// At this point we know there's something at the front of each
// list we need to resolve.
// If the remote list has something local doesn't, we add it.
if localIter . Front ( ) . ID > remoteIter . Front ( ) . ID {
changes = append ( changes , & structs . ACLRequest {
Op : structs . ACLSet ,
ACL : * ( remoteIter . Front ( ) ) ,
} )
remoteIter . Next ( )
continue
}
// If local has something remote doesn't, we delete it.
if localIter . Front ( ) . ID < remoteIter . Front ( ) . ID {
changes = append ( changes , & structs . ACLRequest {
Op : structs . ACLDelete ,
ACL : * ( localIter . Front ( ) ) ,
} )
localIter . Next ( )
continue
}
// Local and remote have an ACL with the same ID, so we might
// need to compare them.
l , r := localIter . Front ( ) , remoteIter . Front ( )
if r . RaftIndex . ModifyIndex > lastRemoteIndex && ! r . IsSame ( l ) {
changes = append ( changes , & structs . ACLRequest {
Op : structs . ACLSet ,
ACL : * r ,
} )
}
localIter . Next ( )
remoteIter . Next ( )
}
return changes
}
// FetchLocalACLs returns the ACLs in the local state store.
func ( s * Server ) fetchLocalACLs ( ) ( structs . ACLs , error ) {
2017-01-24 08:00:06 +00:00
_ , local , err := s . fsm . State ( ) . ACLList ( nil )
2016-08-03 05:04:11 +00:00
if err != nil {
return nil , err
}
return local , nil
}
// FetchRemoteACLs is used to get the remote set of ACLs from the ACL
// datacenter. The lastIndex parameter is a hint about which remote index we
// have replicated to, so this is expected to block until something changes.
func ( s * Server ) fetchRemoteACLs ( lastRemoteIndex uint64 ) ( * structs . IndexedACLs , error ) {
2016-08-09 18:32:12 +00:00
defer metrics . MeasureSince ( [ ] string { "consul" , "leader" , "fetchRemoteACLs" } , time . Now ( ) )
2016-08-03 05:04:11 +00:00
args := structs . DCSpecificRequest {
Datacenter : s . config . ACLDatacenter ,
QueryOptions : structs . QueryOptions {
Token : s . config . ACLReplicationToken ,
MinQueryIndex : lastRemoteIndex ,
AllowStale : true ,
} ,
}
var remote structs . IndexedACLs
if err := s . RPC ( "ACL.List" , & args , & remote ) ; err != nil {
return nil , err
}
return & remote , nil
}
// UpdateLocalACLs is given a list of changes to apply in order to bring the
// local ACLs in-line with the remote ACLs from the ACL datacenter.
func ( s * Server ) updateLocalACLs ( changes structs . ACLRequests ) error {
2016-08-09 18:32:12 +00:00
defer metrics . MeasureSince ( [ ] string { "consul" , "leader" , "updateLocalACLs" } , time . Now ( ) )
2016-08-09 18:29:12 +00:00
minTimePerOp := time . Second / time . Duration ( s . config . ACLReplicationApplyLimit )
2016-08-03 05:04:11 +00:00
for _ , change := range changes {
// Note that we are using the single ACL interface here and not
// performing all this inside a single transaction. This is OK
// for two reasons. First, there's nothing else other than this
// replication routine that alters the local ACLs, so there's
// nothing to contend with locally. Second, if an apply fails
// in the middle (most likely due to losing leadership), the
// next replication pass will clean up and check everything
// again.
var reply string
2016-08-09 18:29:12 +00:00
start := time . Now ( )
2016-08-03 05:04:11 +00:00
if err := aclApplyInternal ( s , change , & reply ) ; err != nil {
return err
}
2016-08-09 18:29:12 +00:00
// Do a smooth rate limit to wait out the min time allowed for
// each op. If this op took longer than the min, then the sleep
// time will be negative and we will just move on.
elapsed := time . Now ( ) . Sub ( start )
time . Sleep ( minTimePerOp - elapsed )
2016-08-03 05:04:11 +00:00
}
return nil
}
// replicateACLs is a runs one pass of the algorithm for replicating ACLs from
// a remote ACL datacenter to local state. If there's any error, this will return
// 0 for the lastRemoteIndex, which will cause us to immediately do a full sync
// next time.
func ( s * Server ) replicateACLs ( lastRemoteIndex uint64 ) ( uint64 , error ) {
remote , err := s . fetchRemoteACLs ( lastRemoteIndex )
if err != nil {
return 0 , fmt . Errorf ( "failed to retrieve remote ACLs: %v" , err )
}
// This will be pretty common because we will be blocking for a long time
// and may have lost leadership, so lets control the message here instead
// of returning deeper error messages from from Raft.
if ! s . IsLeader ( ) {
return 0 , fmt . Errorf ( "no longer cluster leader" )
}
// Measure everything after the remote query, which can block for long
// periods of time. This metric is a good measure of how expensive the
// replication process is.
defer metrics . MeasureSince ( [ ] string { "consul" , "leader" , "replicateACLs" } , time . Now ( ) )
local , err := s . fetchLocalACLs ( )
if err != nil {
return 0 , fmt . Errorf ( "failed to retrieve local ACLs: %v" , err )
}
// If the remote index ever goes backwards, it's a good indication that
// the remote side was rebuilt and we should do a full sync since we
// can't make any assumptions about what's going on.
if remote . QueryMeta . Index < lastRemoteIndex {
s . logger . Printf ( "[WARN] consul: ACL replication remote index moved backwards (%d to %d), forcing a full ACL sync" , lastRemoteIndex , remote . QueryMeta . Index )
lastRemoteIndex = 0
}
2016-08-04 00:01:32 +00:00
// Calculate the changes required to bring the state into sync and then
// apply them.
2016-08-03 05:04:11 +00:00
changes := reconcileACLs ( local , remote . ACLs , lastRemoteIndex )
if err := s . updateLocalACLs ( changes ) ; err != nil {
return 0 , fmt . Errorf ( "failed to sync ACL changes: %v" , err )
}
// Return the index we got back from the remote side, since we've synced
// up with the remote state as of that index.
return remote . QueryMeta . Index , nil
}
// IsACLReplicationEnabled returns true if ACL replication is enabled.
func ( s * Server ) IsACLReplicationEnabled ( ) bool {
authDC := s . config . ACLDatacenter
return len ( authDC ) > 0 && ( authDC != s . config . Datacenter ) &&
len ( s . config . ACLReplicationToken ) > 0
}
2016-08-05 04:32:36 +00:00
// updateACLReplicationStatus safely updates the ACL replication status.
func ( s * Server ) updateACLReplicationStatus ( status structs . ACLReplicationStatus ) {
// Fixup the times to shed some useless precision to ease formattting,
// and always report UTC.
status . LastError = status . LastError . Round ( time . Second ) . UTC ( )
status . LastSuccess = status . LastSuccess . Round ( time . Second ) . UTC ( )
// Set the shared state.
s . aclReplicationStatusLock . Lock ( )
s . aclReplicationStatus = status
s . aclReplicationStatusLock . Unlock ( )
}
2016-08-03 05:04:11 +00:00
// runACLReplication is a long-running goroutine that will attempt to replicate
// ACLs while the server is the leader, until the shutdown channel closes.
func ( s * Server ) runACLReplication ( ) {
2016-08-05 04:32:36 +00:00
var status structs . ACLReplicationStatus
status . Enabled = true
status . SourceDatacenter = s . config . ACLDatacenter
s . updateACLReplicationStatus ( status )
// Show that it's not running on the way out.
defer func ( ) {
status . Running = false
s . updateACLReplicationStatus ( status )
} ( )
2016-08-03 05:04:11 +00:00
// Give each server's replicator a random initial phase for good
// measure.
select {
case <- s . shutdownCh :
2016-08-09 18:09:48 +00:00
return
2016-08-09 18:33:42 +00:00
case <- time . After ( lib . RandomStagger ( s . config . ACLReplicationInterval ) ) :
2016-08-03 05:04:11 +00:00
}
2016-08-05 04:32:36 +00:00
// We are fairly conservative with the lastRemoteIndex so that after a
// leadership change or an error we re-sync everything (we also don't
// want to block the first time after one of these events so we can
// show a successful sync in the status endpoint).
2016-08-03 05:04:11 +00:00
var lastRemoteIndex uint64
replicate := func ( ) {
2016-08-05 04:32:36 +00:00
if ! status . Running {
lastRemoteIndex = 0 // Re-sync everything.
status . Running = true
s . updateACLReplicationStatus ( status )
2016-08-03 05:04:11 +00:00
s . logger . Printf ( "[INFO] consul: ACL replication started" )
}
2016-08-05 04:32:36 +00:00
index , err := s . replicateACLs ( lastRemoteIndex )
2016-08-03 05:04:11 +00:00
if err != nil {
2016-08-05 04:32:36 +00:00
lastRemoteIndex = 0 // Re-sync everything.
status . LastError = time . Now ( )
s . updateACLReplicationStatus ( status )
2016-08-03 05:04:11 +00:00
s . logger . Printf ( "[WARN] consul: ACL replication error (will retry if still leader): %v" , err )
} else {
2016-08-05 04:32:36 +00:00
lastRemoteIndex = index
status . ReplicatedIndex = index
status . LastSuccess = time . Now ( )
s . updateACLReplicationStatus ( status )
2016-08-09 18:10:32 +00:00
s . logger . Printf ( "[DEBUG] consul: ACL replication completed through remote index %d" , index )
2016-08-03 05:04:11 +00:00
}
}
pause := func ( ) {
2016-08-05 04:32:36 +00:00
if status . Running {
lastRemoteIndex = 0 // Re-sync everything.
status . Running = false
s . updateACLReplicationStatus ( status )
2016-08-03 05:04:11 +00:00
s . logger . Printf ( "[INFO] consul: ACL replication stopped (no longer leader)" )
}
}
// This will slowly poll to see if replication should be active. Once it
// is and we've caught up, the replicate() call will begin to block and
// only wake up when the query timer expires or there are new ACLs to
// replicate. We've chosen this design so that the ACLReplicationInterval
// is the lower bound for how quickly we will replicate, no matter how
// much ACL churn is happening on the remote side.
//
// The blocking query inside replicate() respects the shutdown channel,
// so we won't get stuck in here as things are torn down.
for {
select {
case <- s . shutdownCh :
return
case <- time . After ( s . config . ACLReplicationInterval ) :
if s . IsLeader ( ) {
replicate ( )
} else {
pause ( )
}
}
}
}