Adds basic ACL replication plumbing.

This commit is contained in:
James Phillips 2016-08-02 22:04:11 -07:00
parent cf9eeec223
commit 9cece515c0
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
10 changed files with 507 additions and 66 deletions

View File

@ -339,6 +339,9 @@ func (a *Agent) consulConfig() *consul.Config {
if a.config.ACLDownPolicy != "" { if a.config.ACLDownPolicy != "" {
base.ACLDownPolicy = a.config.ACLDownPolicy base.ACLDownPolicy = a.config.ACLDownPolicy
} }
if a.config.ACLReplicationToken != "" {
base.ACLReplicationToken = a.config.ACLReplicationToken
}
if a.config.SessionTTLMinRaw != "" { if a.config.SessionTTLMinRaw != "" {
base.SessionTTLMin = a.config.SessionTTLMin base.SessionTTLMin = a.config.SessionTTLMin
} }

View File

@ -452,6 +452,12 @@ type Config struct {
// this acts like deny. // this acts like deny.
ACLDownPolicy string `mapstructure:"acl_down_policy"` ACLDownPolicy string `mapstructure:"acl_down_policy"`
// ACLReplicationToken is used to fetch ACLs from the ACLDatacenter in
// order to replicate them locally. Setting this to a non-empty value
// also enables replication. Replication is only available in datacenters
// other than the ACLDatacenter.
ACLReplicationToken string `mapstructure:"acl_replication_token"`
// Watches are used to monitor various endpoints and to invoke a // Watches are used to monitor various endpoints and to invoke a
// handler to act appropriately. These are managed entirely in the // handler to act appropriately. These are managed entirely in the
// agent layer using the standard APIs. // agent layer using the standard APIs.
@ -1319,6 +1325,9 @@ func MergeConfig(a, b *Config) *Config {
if b.ACLDefaultPolicy != "" { if b.ACLDefaultPolicy != "" {
result.ACLDefaultPolicy = b.ACLDefaultPolicy result.ACLDefaultPolicy = b.ACLDefaultPolicy
} }
if b.ACLReplicationToken != "" {
result.ACLReplicationToken = b.ACLReplicationToken
}
if len(b.Watches) != 0 { if len(b.Watches) != 0 {
result.Watches = append(result.Watches, b.Watches...) result.Watches = append(result.Watches, b.Watches...)
} }

View File

@ -622,7 +622,8 @@ func TestDecodeConfig(t *testing.T) {
// ACLs // ACLs
input = `{"acl_token": "1234", "acl_datacenter": "dc2", input = `{"acl_token": "1234", "acl_datacenter": "dc2",
"acl_ttl": "60s", "acl_down_policy": "deny", "acl_ttl": "60s", "acl_down_policy": "deny",
"acl_default_policy": "deny", "acl_master_token": "2345"}` "acl_default_policy": "deny", "acl_master_token": "2345",
"acl_replication_token": "8675309"}`
config, err = DecodeConfig(bytes.NewReader([]byte(input))) config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
@ -646,6 +647,9 @@ func TestDecodeConfig(t *testing.T) {
if config.ACLDefaultPolicy != "deny" { if config.ACLDefaultPolicy != "deny" {
t.Fatalf("bad: %#v", config) t.Fatalf("bad: %#v", config)
} }
if config.ACLReplicationToken != "8675309" {
t.Fatalf("bad: %#v", config)
}
// Watches // Watches
input = `{"watches": [{"type":"keyprefix", "prefix":"foo/", "handler":"foobar"}]}` input = `{"watches": [{"type":"keyprefix", "prefix":"foo/", "handler":"foobar"}]}`
@ -1432,6 +1436,7 @@ func TestMergeConfig(t *testing.T) {
ACLTTLRaw: "15s", ACLTTLRaw: "15s",
ACLDownPolicy: "deny", ACLDownPolicy: "deny",
ACLDefaultPolicy: "deny", ACLDefaultPolicy: "deny",
ACLReplicationToken: "8765309",
Watches: []map[string]interface{}{ Watches: []map[string]interface{}{
map[string]interface{}{ map[string]interface{}{
"type": "keyprefix", "type": "keyprefix",

View File

@ -74,7 +74,7 @@ func (s *Server) aclFault(id string) (string, string, error) {
return s.config.ACLDefaultPolicy, acl.Rules, nil return s.config.ACLDefaultPolicy, acl.Rules, nil
} }
// resolveToken is used to resolve an ACL is any is appropriate // resolveToken is used to resolve an ACL if any is appropriate
func (s *Server) resolveToken(id string) (acl.ACL, error) { func (s *Server) resolveToken(id string) (acl.ACL, error) {
// Check if there is no ACL datacenter (ACL's disabled) // Check if there is no ACL datacenter (ACL's disabled)
authDC := s.config.ACLDatacenter authDC := s.config.ACLDatacenter

View File

@ -15,26 +15,13 @@ type ACL struct {
srv *Server srv *Server
} }
// Apply is used to apply a modifying request to the data store. This should // aclApplyInternal is used to apply an ACL request after it has been vetted that
// only be used for operations that modify the data // this is a valid operation. It is used when users are updating ACLs, in which
func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { // case we check their token to make sure they have management privileges. It is
if done, err := a.srv.forward("ACL.Apply", args, args, reply); done { // also used for ACL replication. We want to run the replicated ACLs through the
return err // same checks on the change itself. If an operation needs to generate an ID,
} // routine will fill in an ID with the args as part of the request.
defer metrics.MeasureSince([]string{"consul", "acl", "apply"}, time.Now()) func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) error {
// Verify we are allowed to serve this request
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {
return fmt.Errorf(aclDisabled)
}
// Verify token is permitted to modify ACLs
if acl, err := a.srv.resolveToken(args.Token); err != nil {
return err
} else if acl == nil || !acl.ACLModify() {
return permissionDeniedErr
}
switch args.Op { switch args.Op {
case structs.ACLSet: case structs.ACLSet:
// Verify the ACL type // Verify the ACL type
@ -61,16 +48,16 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
// deterministic. Once the entry is in the log, the state update MUST // deterministic. Once the entry is in the log, the state update MUST
// be deterministic or the followers will not converge. // be deterministic or the followers will not converge.
if args.ACL.ID == "" { if args.ACL.ID == "" {
state := a.srv.fsm.State() state := srv.fsm.State()
for { for {
if args.ACL.ID, err = uuid.GenerateUUID(); err != nil { if args.ACL.ID, err = uuid.GenerateUUID(); err != nil {
a.srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err) srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err)
return err return err
} }
_, acl, err := state.ACLGet(args.ACL.ID) _, acl, err := state.ACLGet(args.ACL.ID)
if err != nil { if err != nil {
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err) srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)
return err return err
} }
if acl == nil { if acl == nil {
@ -91,24 +78,53 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
} }
// Apply the update // Apply the update
resp, err := a.srv.raftApply(structs.ACLRequestType, args) resp, err := srv.raftApply(structs.ACLRequestType, args)
if err != nil { if err != nil {
a.srv.logger.Printf("[ERR] consul.acl: Apply failed: %v", err) srv.logger.Printf("[ERR] consul.acl: Apply failed: %v", err)
return err return err
} }
if respErr, ok := resp.(error); ok { if respErr, ok := resp.(error); ok {
return respErr return respErr
} }
// Check if the return type is a string
if respString, ok := resp.(string); ok {
*reply = respString
}
return nil
}
// Apply is used to apply a modifying request to the data store. This should
// only be used for operations that modify the data
func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
if done, err := a.srv.forward("ACL.Apply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "acl", "apply"}, time.Now())
// Verify we are allowed to serve this request
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {
return fmt.Errorf(aclDisabled)
}
// Verify token is permitted to modify ACLs
if acl, err := a.srv.resolveToken(args.Token); err != nil {
return err
} else if acl == nil || !acl.ACLModify() {
return permissionDeniedErr
}
// Do the apply now that this update is vetted.
if err := aclApplyInternal(a.srv, args, reply); err != nil {
return err
}
// Clear the cache if applicable // Clear the cache if applicable
if args.ACL.ID != "" { if args.ACL.ID != "" {
a.srv.aclAuthCache.ClearACL(args.ACL.ID) a.srv.aclAuthCache.ClearACL(args.ACL.ID)
} }
// Check if the return type is a string
if respString, ok := resp.(string); ok {
*reply = respString
}
return nil return nil
} }

309
consul/acl_replication.go Normal file
View File

@ -0,0 +1,309 @@
package consul
import (
"fmt"
"sort"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
)
// aclIDSorter is used to make sure a given list of ACLs is sorted by token ID.
// This should always be true, but since this is crucial for correctness and we
// are accepting input from another server, we sort to make sure.
type aclIDSorter struct {
acls structs.ACLs
}
// See sort.Interface.
func (a *aclIDSorter) Len() int {
return len(a.acls)
}
// See sort.Interface.
func (a *aclIDSorter) Swap(i, j int) {
a.acls[i], a.acls[j] = a.acls[j], a.acls[i]
}
// See sort.Interface.
func (a *aclIDSorter) Less(i, j int) bool {
return a.acls[i].ID < a.acls[j].ID
}
// aclIterator simplifies the algorithm below by providing a basic iterator that
// moves through a list of ACLs and returns nil when it's exhausted.
type aclIterator struct {
acls structs.ACLs
// index is the current position of the iterator.
index int
}
// Front returns the item at index position, or nil if the list is exhausted.
func (a *aclIterator) Front() *structs.ACL {
if a.index < len(a.acls) {
return a.acls[a.index]
}
return nil
}
// Next advances the iterator to the next index.
func (a *aclIterator) Next() {
a.index++
}
// reconcileACLs takes the local and remote ACL state, and produces a list of
// changes required in order to bring the local ACLs into sync with the remote
// ACLs. You can supply lastRemoteIndex as a hint that replication has succeeded
// up to that remote index and it will make this process more efficient by only
// comparing ACL entries modified after that index. Setting this to 0 will force
// a full compare of all existing ACLs.
func reconcileACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.ACLRequests {
// Since sorting the lists is crucial for correctness, we are depending
// on data coming from other servers potentially running a different,
// version of Consul, and sorted-ness is kind of a subtle property of
// the state store indexing, it's prudent to make sure things are sorted
// before we begin.
sort.Sort(&aclIDSorter{local})
sort.Sort(&aclIDSorter{remote})
// Run through both lists and reconcile them.
var changes structs.ACLRequests
localIter, remoteIter := &aclIterator{local, 0}, &aclIterator{remote, 0}
for localIter.Front() != nil || remoteIter.Front() != nil {
// If the local list is exhausted, then process this as a remote
// add. We know from the loop condition that there's something
// in the remote list.
if localIter.Front() == nil {
changes = append(changes, &structs.ACLRequest{
Op: structs.ACLSet,
ACL: *(remoteIter.Front()),
})
remoteIter.Next()
continue
}
// If the remote list is exhausted, then process this as a local
// delete. We know from the loop condition that there's something
// in the local list.
if remoteIter.Front() == nil {
changes = append(changes, &structs.ACLRequest{
Op: structs.ACLDelete,
ACL: *(localIter.Front()),
})
localIter.Next()
continue
}
// At this point we know there's something at the front of each
// list we need to resolve.
// If the remote list has something local doesn't, we add it.
if localIter.Front().ID > remoteIter.Front().ID {
changes = append(changes, &structs.ACLRequest{
Op: structs.ACLSet,
ACL: *(remoteIter.Front()),
})
remoteIter.Next()
continue
}
// If local has something remote doesn't, we delete it.
if localIter.Front().ID < remoteIter.Front().ID {
changes = append(changes, &structs.ACLRequest{
Op: structs.ACLDelete,
ACL: *(localIter.Front()),
})
localIter.Next()
continue
}
// Local and remote have an ACL with the same ID, so we might
// need to compare them.
l, r := localIter.Front(), remoteIter.Front()
if r.RaftIndex.ModifyIndex > lastRemoteIndex && !r.IsSame(l) {
changes = append(changes, &structs.ACLRequest{
Op: structs.ACLSet,
ACL: *r,
})
}
localIter.Next()
remoteIter.Next()
}
return changes
}
// FetchLocalACLs returns the ACLs in the local state store.
func (s *Server) fetchLocalACLs() (structs.ACLs, error) {
_, local, err := s.fsm.State().ACLList()
if err != nil {
return nil, err
}
return local, nil
}
// FetchRemoteACLs is used to get the remote set of ACLs from the ACL
// datacenter. The lastIndex parameter is a hint about which remote index we
// have replicated to, so this is expected to block until something changes.
func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) {
args := structs.DCSpecificRequest{
Datacenter: s.config.ACLDatacenter,
QueryOptions: structs.QueryOptions{
Token: s.config.ACLReplicationToken,
MinQueryIndex: lastRemoteIndex,
AllowStale: true,
},
}
var remote structs.IndexedACLs
if err := s.RPC("ACL.List", &args, &remote); err != nil {
return nil, err
}
return &remote, nil
}
// UpdateLocalACLs is given a list of changes to apply in order to bring the
// local ACLs in-line with the remote ACLs from the ACL datacenter.
func (s *Server) updateLocalACLs(changes structs.ACLRequests) error {
var ops int
start := time.Now()
for _, change := range changes {
// Do a very simple rate limit algorithm where we check every N
// operations and wait out to the second before we continue. If
// it's going slower than that, the sleep time will be negative
// so we will just keep going without delay.
if ops > s.config.ACLReplicationApplyLimit {
elapsed := time.Now().Sub(start)
time.Sleep(1*time.Second - elapsed)
ops, start = 0, time.Now()
}
// Note that we are using the single ACL interface here and not
// performing all this inside a single transaction. This is OK
// for two reasons. First, there's nothing else other than this
// replication routine that alters the local ACLs, so there's
// nothing to contend with locally. Second, if an apply fails
// in the middle (most likely due to losing leadership), the
// next replication pass will clean up and check everything
// again.
var reply string
if err := aclApplyInternal(s, change, &reply); err != nil {
return err
}
ops++
}
return nil
}
// replicateACLs is a runs one pass of the algorithm for replicating ACLs from
// a remote ACL datacenter to local state. If there's any error, this will return
// 0 for the lastRemoteIndex, which will cause us to immediately do a full sync
// next time.
func (s *Server) replicateACLs(lastRemoteIndex uint64) (uint64, error) {
remote, err := s.fetchRemoteACLs(lastRemoteIndex)
if err != nil {
return 0, fmt.Errorf("failed to retrieve remote ACLs: %v", err)
}
// This will be pretty common because we will be blocking for a long time
// and may have lost leadership, so lets control the message here instead
// of returning deeper error messages from from Raft.
if !s.IsLeader() {
return 0, fmt.Errorf("no longer cluster leader")
}
// Measure everything after the remote query, which can block for long
// periods of time. This metric is a good measure of how expensive the
// replication process is.
defer metrics.MeasureSince([]string{"consul", "leader", "replicateACLs"}, time.Now())
local, err := s.fetchLocalACLs()
if err != nil {
return 0, fmt.Errorf("failed to retrieve local ACLs: %v", err)
}
// If the remote index ever goes backwards, it's a good indication that
// the remote side was rebuilt and we should do a full sync since we
// can't make any assumptions about what's going on.
if remote.QueryMeta.Index < lastRemoteIndex {
s.logger.Printf("[WARN] consul: ACL replication remote index moved backwards (%d to %d), forcing a full ACL sync", lastRemoteIndex, remote.QueryMeta.Index)
lastRemoteIndex = 0
}
changes := reconcileACLs(local, remote.ACLs, lastRemoteIndex)
if err := s.updateLocalACLs(changes); err != nil {
return 0, fmt.Errorf("failed to sync ACL changes: %v", err)
}
// Return the index we got back from the remote side, since we've synced
// up with the remote state as of that index.
return remote.QueryMeta.Index, nil
}
// IsACLReplicationEnabled returns true if ACL replication is enabled.
func (s *Server) IsACLReplicationEnabled() bool {
authDC := s.config.ACLDatacenter
return len(authDC) > 0 && (authDC != s.config.Datacenter) &&
len(s.config.ACLReplicationToken) > 0
}
// runACLReplication is a long-running goroutine that will attempt to replicate
// ACLs while the server is the leader, until the shutdown channel closes.
func (s *Server) runACLReplication() {
defer s.shutdownWait.Done()
// Give each server's replicator a random initial phase for good
// measure.
select {
case <-time.After(lib.RandomStagger(s.config.ACLReplicationInterval)):
case <-s.shutdownCh:
}
var lastRemoteIndex uint64
var wasActive bool
replicate := func() {
if !wasActive {
s.logger.Printf("[INFO] consul: ACL replication started")
wasActive = true
}
var err error
lastRemoteIndex, err = s.replicateACLs(lastRemoteIndex)
if err != nil {
s.logger.Printf("[WARN] consul: ACL replication error (will retry if still leader): %v", err)
} else {
s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", lastRemoteIndex)
}
}
pause := func() {
if wasActive {
s.logger.Printf("[INFO] consul: ACL replication stopped (no longer leader)")
wasActive = false
}
}
// This will slowly poll to see if replication should be active. Once it
// is and we've caught up, the replicate() call will begin to block and
// only wake up when the query timer expires or there are new ACLs to
// replicate. We've chosen this design so that the ACLReplicationInterval
// is the lower bound for how quickly we will replicate, no matter how
// much ACL churn is happening on the remote side.
//
// The blocking query inside replicate() respects the shutdown channel,
// so we won't get stuck in here as things are torn down.
for {
select {
case <-s.shutdownCh:
return
case <-time.After(s.config.ACLReplicationInterval):
if s.IsLeader() {
replicate()
} else {
pause()
}
}
}
}

View File

@ -173,6 +173,24 @@ type Config struct {
// "allow" can be used to allow all requests. This is not recommended. // "allow" can be used to allow all requests. This is not recommended.
ACLDownPolicy string ACLDownPolicy string
// ACLReplicationToken is used to fetch ACLs from the ACLDatacenter in
// order to replicate them locally. Setting this to a non-empty value
// also enables replication. Replication is only available in datacenters
// other than the ACLDatacenter.
ACLReplicationToken string
// ACLReplicationInterval is the interval at which replication passes
// will occur. Queries to the ACLDatacenter may block, so replication
// can happen less often than this, but the interval forms the upper
// limit to how fast we will go if there was constant ACL churn on the
// remote end.
ACLReplicationInterval time.Duration
// ACLReplicationApplyLimit is the max number of replication-related
// apply operations that we allow during a one second period. This is
// used to limit the amount of Raft bandwidth used for replication.
ACLReplicationApplyLimit int
// TombstoneTTL is used to control how long KV tombstones are retained. // TombstoneTTL is used to control how long KV tombstones are retained.
// This provides a window of time where the X-Consul-Index is monotonic. // This provides a window of time where the X-Consul-Index is monotonic.
// Outside this window, the index may not be monotonic. This is a result // Outside this window, the index may not be monotonic. This is a result
@ -282,6 +300,8 @@ func DefaultConfig() *Config {
ACLTTL: 30 * time.Second, ACLTTL: 30 * time.Second,
ACLDefaultPolicy: "allow", ACLDefaultPolicy: "allow",
ACLDownPolicy: "extend-cache", ACLDownPolicy: "extend-cache",
ACLReplicationInterval: 30 * time.Second,
ACLReplicationApplyLimit: 100, // ops / sec
TombstoneTTL: 15 * time.Minute, TombstoneTTL: 15 * time.Minute,
TombstoneTTLGranularity: 30 * time.Second, TombstoneTTLGranularity: 30 * time.Second,
SessionTTLMin: 10 * time.Second, SessionTTLMin: 10 * time.Second,

View File

@ -149,9 +149,17 @@ type Server struct {
// for the KV tombstones // for the KV tombstones
tombstoneGC *state.TombstoneGC tombstoneGC *state.TombstoneGC
// shutdown and the associated members here are used in orchestrating
// a clean shutdown. The shutdownCh is never written to, only closed to
// indicate a shutdown has been initiated. The shutdownWait group will
// be waited on after closing the shutdownCh, but before any other
// shutdown activities take place in the server. Anything added to the
// shutdownWait group will block all the rest of shutdown, so use this
// sparingly and carefully.
shutdown bool shutdown bool
shutdownCh chan struct{} shutdownCh chan struct{}
shutdownLock sync.Mutex shutdownLock sync.Mutex
shutdownWait sync.WaitGroup
} }
// Holds the RPC endpoints // Holds the RPC endpoints
@ -171,49 +179,47 @@ type endpoints struct {
// NewServer is used to construct a new Consul server from the // NewServer is used to construct a new Consul server from the
// configuration, potentially returning an error // configuration, potentially returning an error
func NewServer(config *Config) (*Server, error) { func NewServer(config *Config) (*Server, error) {
// Check the protocol version // Check the protocol version.
if err := config.CheckVersion(); err != nil { if err := config.CheckVersion(); err != nil {
return nil, err return nil, err
} }
// Check for a data directory! // Check for a data directory.
if config.DataDir == "" && !config.DevMode { if config.DataDir == "" && !config.DevMode {
return nil, fmt.Errorf("Config must provide a DataDir") return nil, fmt.Errorf("Config must provide a DataDir")
} }
// Sanity check the ACLs // Sanity check the ACLs.
if err := config.CheckACL(); err != nil { if err := config.CheckACL(); err != nil {
return nil, err return nil, err
} }
// Ensure we have a log output // Ensure we have a log output and create a logger.
if config.LogOutput == nil { if config.LogOutput == nil {
config.LogOutput = os.Stderr config.LogOutput = os.Stderr
} }
logger := log.New(config.LogOutput, "", log.LstdFlags)
// Create the tls wrapper for outgoing connections // Create the TLS wrapper for outgoing connections.
tlsConf := config.tlsConfig() tlsConf := config.tlsConfig()
tlsWrap, err := tlsConf.OutgoingTLSWrapper() tlsWrap, err := tlsConf.OutgoingTLSWrapper()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Get the incoming tls config // Get the incoming TLS config.
incomingTLS, err := tlsConf.IncomingTLSConfig() incomingTLS, err := tlsConf.IncomingTLSConfig()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Create a logger // Create the tombstone GC.
logger := log.New(config.LogOutput, "", log.LstdFlags)
// Create the tombstone GC
gc, err := state.NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity) gc, err := state.NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Create server // Create server.
s := &Server{ s := &Server{
config: config, config: config,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
@ -229,32 +235,32 @@ func NewServer(config *Config) (*Server, error) {
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
} }
// Initialize the authoritative ACL cache // Initialize the authoritative ACL cache.
s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclFault) s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclFault)
if err != nil { if err != nil {
s.Shutdown() s.Shutdown()
return nil, fmt.Errorf("Failed to create ACL cache: %v", err) return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
} }
// Set up the non-authoritative ACL cache // Set up the non-authoritative ACL cache.
if s.aclCache, err = newAclCache(config, logger, s.RPC); err != nil { if s.aclCache, err = newAclCache(config, logger, s.RPC); err != nil {
s.Shutdown() s.Shutdown()
return nil, err return nil, err
} }
// Initialize the RPC layer // Initialize the RPC layer.
if err := s.setupRPC(tlsWrap); err != nil { if err := s.setupRPC(tlsWrap); err != nil {
s.Shutdown() s.Shutdown()
return nil, fmt.Errorf("Failed to start RPC layer: %v", err) return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
} }
// Initialize the Raft server // Initialize the Raft server.
if err := s.setupRaft(); err != nil { if err := s.setupRaft(); err != nil {
s.Shutdown() s.Shutdown()
return nil, fmt.Errorf("Failed to start Raft: %v", err) return nil, fmt.Errorf("Failed to start Raft: %v", err)
} }
// Initialize the lan Serf // Initialize the LAN Serf.
s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.serfLAN, err = s.setupSerf(config.SerfLANConfig,
s.eventChLAN, serfLANSnapshot, false) s.eventChLAN, serfLANSnapshot, false)
if err != nil { if err != nil {
@ -263,7 +269,7 @@ func NewServer(config *Config) (*Server, error) {
} }
go s.lanEventHandler() go s.lanEventHandler()
// Initialize the wan Serf // Initialize the WAN Serf.
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.serfWAN, err = s.setupSerf(config.SerfWANConfig,
s.eventChWAN, serfWANSnapshot, true) s.eventChWAN, serfWANSnapshot, true)
if err != nil { if err != nil {
@ -272,11 +278,18 @@ func NewServer(config *Config) (*Server, error) {
} }
go s.wanEventHandler() go s.wanEventHandler()
// Start listening for RPC requests // Start ACL replication.
if s.IsACLReplicationEnabled() {
s.shutdownWait.Add(1)
go s.runACLReplication()
}
// Start listening for RPC requests.
go s.listen() go s.listen()
// Start the metrics handlers // Start the metrics handlers.
go s.sessionStats() go s.sessionStats()
return s, nil return s, nil
} }
@ -496,6 +509,7 @@ func (s *Server) Shutdown() error {
s.shutdown = true s.shutdown = true
close(s.shutdownCh) close(s.shutdownCh)
s.shutdownWait.Wait()
if s.serfLAN != nil { if s.serfLAN != nil {
s.serfLAN.Shutdown() s.serfLAN.Shutdown()

View File

@ -662,7 +662,7 @@ type IndexedSessions struct {
QueryMeta QueryMeta
} }
// ACL is used to represent a token and it's rules // ACL is used to represent a token and its rules
type ACL struct { type ACL struct {
ID string ID string
Name string Name string
@ -681,6 +681,21 @@ const (
ACLDelete = "delete" ACLDelete = "delete"
) )
// IsSame checks if one ACL is the same as another, without looking
// at the Raft information (that's why we didn't call it IsEqual). This is
// useful for seeing if an update would be idempotent for all the functional
// parts of the structure.
func (a *ACL) IsSame(other *ACL) bool {
if a.ID != other.ID ||
a.Name != other.Name ||
a.Type != other.Type ||
a.Rules != other.Rules {
return false
}
return true
}
// ACLRequest is used to create, update or delete an ACL // ACLRequest is used to create, update or delete an ACL
type ACLRequest struct { type ACLRequest struct {
Datacenter string Datacenter string
@ -693,6 +708,9 @@ func (r *ACLRequest) RequestDatacenter() string {
return r.Datacenter return r.Datacenter
} }
// ACLRequests is a list of ACL change requests.
type ACLRequests []*ACLRequest
// ACLSpecificRequest is used to request an ACL by ID // ACLSpecificRequest is used to request an ACL by ID
type ACLSpecificRequest struct { type ACLSpecificRequest struct {
Datacenter string Datacenter string

View File

@ -58,6 +58,53 @@ func TestStructs_Implements(t *testing.T) {
) )
} }
func TestStructs_ACL_IsSame(t *testing.T) {
acl := &ACL{
ID: "guid",
Name: "An ACL for testing",
Type: "client",
Rules: "service \"\" { policy = \"read\" }",
}
if !acl.IsSame(acl) {
t.Fatalf("should be equal to itself")
}
other := &ACL{
ID: "guid",
Name: "An ACL for testing",
Type: "client",
Rules: "service \"\" { policy = \"read\" }",
RaftIndex: RaftIndex{
CreateIndex: 1,
ModifyIndex: 2,
},
}
if !acl.IsSame(other) || !other.IsSame(acl) {
t.Fatalf("should not care about Raft fields")
}
check := func(twiddle, restore func()) {
if !acl.IsSame(other) || !other.IsSame(acl) {
t.Fatalf("should be the same")
}
twiddle()
if acl.IsSame(other) || other.IsSame(acl) {
t.Fatalf("should not be the same")
}
restore()
if !acl.IsSame(other) || !other.IsSame(acl) {
t.Fatalf("should be the same")
}
}
check(func() { other.ID = "nope" }, func() { other.ID = "guid" })
check(func() { other.Name = "nope" }, func() { other.Name = "An ACL for testing" })
check(func() { other.Type = "management" }, func() { other.Type = "client" })
check(func() { other.Rules = "" }, func() { other.Rules = "service \"\" { policy = \"read\" }" })
}
// testServiceNode gives a fully filled out ServiceNode instance. // testServiceNode gives a fully filled out ServiceNode instance.
func testServiceNode() *ServiceNode { func testServiceNode() *ServiceNode {
return &ServiceNode{ return &ServiceNode{