diff --git a/command/agent/agent.go b/command/agent/agent.go index 42fda9963..79b94d55d 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -339,6 +339,9 @@ func (a *Agent) consulConfig() *consul.Config { if a.config.ACLDownPolicy != "" { base.ACLDownPolicy = a.config.ACLDownPolicy } + if a.config.ACLReplicationToken != "" { + base.ACLReplicationToken = a.config.ACLReplicationToken + } if a.config.SessionTTLMinRaw != "" { base.SessionTTLMin = a.config.SessionTTLMin } diff --git a/command/agent/config.go b/command/agent/config.go index 11d58c2c9..a83a2ddfd 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -452,6 +452,12 @@ type Config struct { // this acts like deny. ACLDownPolicy string `mapstructure:"acl_down_policy"` + // ACLReplicationToken is used to fetch ACLs from the ACLDatacenter in + // order to replicate them locally. Setting this to a non-empty value + // also enables replication. Replication is only available in datacenters + // other than the ACLDatacenter. + ACLReplicationToken string `mapstructure:"acl_replication_token"` + // Watches are used to monitor various endpoints and to invoke a // handler to act appropriately. These are managed entirely in the // agent layer using the standard APIs. @@ -1319,6 +1325,9 @@ func MergeConfig(a, b *Config) *Config { if b.ACLDefaultPolicy != "" { result.ACLDefaultPolicy = b.ACLDefaultPolicy } + if b.ACLReplicationToken != "" { + result.ACLReplicationToken = b.ACLReplicationToken + } if len(b.Watches) != 0 { result.Watches = append(result.Watches, b.Watches...) } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 06e5d8542..00e5a0dfb 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -622,7 +622,8 @@ func TestDecodeConfig(t *testing.T) { // ACLs input = `{"acl_token": "1234", "acl_datacenter": "dc2", "acl_ttl": "60s", "acl_down_policy": "deny", - "acl_default_policy": "deny", "acl_master_token": "2345"}` + "acl_default_policy": "deny", "acl_master_token": "2345", + "acl_replication_token": "8675309"}` config, err = DecodeConfig(bytes.NewReader([]byte(input))) if err != nil { t.Fatalf("err: %s", err) @@ -646,6 +647,9 @@ func TestDecodeConfig(t *testing.T) { if config.ACLDefaultPolicy != "deny" { t.Fatalf("bad: %#v", config) } + if config.ACLReplicationToken != "8675309" { + t.Fatalf("bad: %#v", config) + } // Watches input = `{"watches": [{"type":"keyprefix", "prefix":"foo/", "handler":"foobar"}]}` @@ -1432,6 +1436,7 @@ func TestMergeConfig(t *testing.T) { ACLTTLRaw: "15s", ACLDownPolicy: "deny", ACLDefaultPolicy: "deny", + ACLReplicationToken: "8765309", Watches: []map[string]interface{}{ map[string]interface{}{ "type": "keyprefix", diff --git a/consul/acl.go b/consul/acl.go index fa3f558a6..d2a646173 100644 --- a/consul/acl.go +++ b/consul/acl.go @@ -74,7 +74,7 @@ func (s *Server) aclFault(id string) (string, string, error) { return s.config.ACLDefaultPolicy, acl.Rules, nil } -// resolveToken is used to resolve an ACL is any is appropriate +// resolveToken is used to resolve an ACL if any is appropriate func (s *Server) resolveToken(id string) (acl.ACL, error) { // Check if there is no ACL datacenter (ACL's disabled) authDC := s.config.ACLDatacenter diff --git a/consul/acl_endpoint.go b/consul/acl_endpoint.go index 49f927161..d14c90d28 100644 --- a/consul/acl_endpoint.go +++ b/consul/acl_endpoint.go @@ -15,26 +15,13 @@ type ACL struct { srv *Server } -// Apply is used to apply a modifying request to the data store. This should -// only be used for operations that modify the data -func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { - if done, err := a.srv.forward("ACL.Apply", args, args, reply); done { - return err - } - defer metrics.MeasureSince([]string{"consul", "acl", "apply"}, time.Now()) - - // Verify we are allowed to serve this request - if a.srv.config.ACLDatacenter != a.srv.config.Datacenter { - return fmt.Errorf(aclDisabled) - } - - // Verify token is permitted to modify ACLs - if acl, err := a.srv.resolveToken(args.Token); err != nil { - return err - } else if acl == nil || !acl.ACLModify() { - return permissionDeniedErr - } - +// aclApplyInternal is used to apply an ACL request after it has been vetted that +// this is a valid operation. It is used when users are updating ACLs, in which +// case we check their token to make sure they have management privileges. It is +// also used for ACL replication. We want to run the replicated ACLs through the +// same checks on the change itself. If an operation needs to generate an ID, +// routine will fill in an ID with the args as part of the request. +func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) error { switch args.Op { case structs.ACLSet: // Verify the ACL type @@ -61,16 +48,16 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { // deterministic. Once the entry is in the log, the state update MUST // be deterministic or the followers will not converge. if args.ACL.ID == "" { - state := a.srv.fsm.State() + state := srv.fsm.State() for { if args.ACL.ID, err = uuid.GenerateUUID(); err != nil { - a.srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err) + srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err) return err } _, acl, err := state.ACLGet(args.ACL.ID) if err != nil { - a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err) + srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err) return err } if acl == nil { @@ -91,24 +78,53 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { } // Apply the update - resp, err := a.srv.raftApply(structs.ACLRequestType, args) + resp, err := srv.raftApply(structs.ACLRequestType, args) if err != nil { - a.srv.logger.Printf("[ERR] consul.acl: Apply failed: %v", err) + srv.logger.Printf("[ERR] consul.acl: Apply failed: %v", err) return err } if respErr, ok := resp.(error); ok { return respErr } + // Check if the return type is a string + if respString, ok := resp.(string); ok { + *reply = respString + } + + return nil +} + +// Apply is used to apply a modifying request to the data store. This should +// only be used for operations that modify the data +func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { + if done, err := a.srv.forward("ACL.Apply", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"consul", "acl", "apply"}, time.Now()) + + // Verify we are allowed to serve this request + if a.srv.config.ACLDatacenter != a.srv.config.Datacenter { + return fmt.Errorf(aclDisabled) + } + + // Verify token is permitted to modify ACLs + if acl, err := a.srv.resolveToken(args.Token); err != nil { + return err + } else if acl == nil || !acl.ACLModify() { + return permissionDeniedErr + } + + // Do the apply now that this update is vetted. + if err := aclApplyInternal(a.srv, args, reply); err != nil { + return err + } + // Clear the cache if applicable if args.ACL.ID != "" { a.srv.aclAuthCache.ClearACL(args.ACL.ID) } - // Check if the return type is a string - if respString, ok := resp.(string); ok { - *reply = respString - } return nil } diff --git a/consul/acl_replication.go b/consul/acl_replication.go new file mode 100644 index 000000000..af4aebffb --- /dev/null +++ b/consul/acl_replication.go @@ -0,0 +1,309 @@ +package consul + +import ( + "fmt" + "sort" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/lib" +) + +// aclIDSorter is used to make sure a given list of ACLs is sorted by token ID. +// This should always be true, but since this is crucial for correctness and we +// are accepting input from another server, we sort to make sure. +type aclIDSorter struct { + acls structs.ACLs +} + +// See sort.Interface. +func (a *aclIDSorter) Len() int { + return len(a.acls) +} + +// See sort.Interface. +func (a *aclIDSorter) Swap(i, j int) { + a.acls[i], a.acls[j] = a.acls[j], a.acls[i] +} + +// See sort.Interface. +func (a *aclIDSorter) Less(i, j int) bool { + return a.acls[i].ID < a.acls[j].ID +} + +// aclIterator simplifies the algorithm below by providing a basic iterator that +// moves through a list of ACLs and returns nil when it's exhausted. +type aclIterator struct { + acls structs.ACLs + + // index is the current position of the iterator. + index int +} + +// Front returns the item at index position, or nil if the list is exhausted. +func (a *aclIterator) Front() *structs.ACL { + if a.index < len(a.acls) { + return a.acls[a.index] + } + + return nil +} + +// Next advances the iterator to the next index. +func (a *aclIterator) Next() { + a.index++ +} + +// reconcileACLs takes the local and remote ACL state, and produces a list of +// changes required in order to bring the local ACLs into sync with the remote +// ACLs. You can supply lastRemoteIndex as a hint that replication has succeeded +// up to that remote index and it will make this process more efficient by only +// comparing ACL entries modified after that index. Setting this to 0 will force +// a full compare of all existing ACLs. +func reconcileACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.ACLRequests { + // Since sorting the lists is crucial for correctness, we are depending + // on data coming from other servers potentially running a different, + // version of Consul, and sorted-ness is kind of a subtle property of + // the state store indexing, it's prudent to make sure things are sorted + // before we begin. + sort.Sort(&aclIDSorter{local}) + sort.Sort(&aclIDSorter{remote}) + + // Run through both lists and reconcile them. + var changes structs.ACLRequests + localIter, remoteIter := &aclIterator{local, 0}, &aclIterator{remote, 0} + for localIter.Front() != nil || remoteIter.Front() != nil { + // If the local list is exhausted, then process this as a remote + // add. We know from the loop condition that there's something + // in the remote list. + if localIter.Front() == nil { + changes = append(changes, &structs.ACLRequest{ + Op: structs.ACLSet, + ACL: *(remoteIter.Front()), + }) + remoteIter.Next() + continue + } + + // If the remote list is exhausted, then process this as a local + // delete. We know from the loop condition that there's something + // in the local list. + if remoteIter.Front() == nil { + changes = append(changes, &structs.ACLRequest{ + Op: structs.ACLDelete, + ACL: *(localIter.Front()), + }) + localIter.Next() + continue + } + + // At this point we know there's something at the front of each + // list we need to resolve. + + // If the remote list has something local doesn't, we add it. + if localIter.Front().ID > remoteIter.Front().ID { + changes = append(changes, &structs.ACLRequest{ + Op: structs.ACLSet, + ACL: *(remoteIter.Front()), + }) + remoteIter.Next() + continue + } + + // If local has something remote doesn't, we delete it. + if localIter.Front().ID < remoteIter.Front().ID { + changes = append(changes, &structs.ACLRequest{ + Op: structs.ACLDelete, + ACL: *(localIter.Front()), + }) + localIter.Next() + continue + } + + // Local and remote have an ACL with the same ID, so we might + // need to compare them. + l, r := localIter.Front(), remoteIter.Front() + if r.RaftIndex.ModifyIndex > lastRemoteIndex && !r.IsSame(l) { + changes = append(changes, &structs.ACLRequest{ + Op: structs.ACLSet, + ACL: *r, + }) + } + localIter.Next() + remoteIter.Next() + } + return changes +} + +// FetchLocalACLs returns the ACLs in the local state store. +func (s *Server) fetchLocalACLs() (structs.ACLs, error) { + _, local, err := s.fsm.State().ACLList() + if err != nil { + return nil, err + } + return local, nil +} + +// FetchRemoteACLs is used to get the remote set of ACLs from the ACL +// datacenter. The lastIndex parameter is a hint about which remote index we +// have replicated to, so this is expected to block until something changes. +func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) { + args := structs.DCSpecificRequest{ + Datacenter: s.config.ACLDatacenter, + QueryOptions: structs.QueryOptions{ + Token: s.config.ACLReplicationToken, + MinQueryIndex: lastRemoteIndex, + AllowStale: true, + }, + } + var remote structs.IndexedACLs + if err := s.RPC("ACL.List", &args, &remote); err != nil { + return nil, err + } + return &remote, nil +} + +// UpdateLocalACLs is given a list of changes to apply in order to bring the +// local ACLs in-line with the remote ACLs from the ACL datacenter. +func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { + var ops int + start := time.Now() + for _, change := range changes { + // Do a very simple rate limit algorithm where we check every N + // operations and wait out to the second before we continue. If + // it's going slower than that, the sleep time will be negative + // so we will just keep going without delay. + if ops > s.config.ACLReplicationApplyLimit { + elapsed := time.Now().Sub(start) + time.Sleep(1*time.Second - elapsed) + ops, start = 0, time.Now() + } + + // Note that we are using the single ACL interface here and not + // performing all this inside a single transaction. This is OK + // for two reasons. First, there's nothing else other than this + // replication routine that alters the local ACLs, so there's + // nothing to contend with locally. Second, if an apply fails + // in the middle (most likely due to losing leadership), the + // next replication pass will clean up and check everything + // again. + var reply string + if err := aclApplyInternal(s, change, &reply); err != nil { + return err + } + ops++ + } + return nil +} + +// replicateACLs is a runs one pass of the algorithm for replicating ACLs from +// a remote ACL datacenter to local state. If there's any error, this will return +// 0 for the lastRemoteIndex, which will cause us to immediately do a full sync +// next time. +func (s *Server) replicateACLs(lastRemoteIndex uint64) (uint64, error) { + remote, err := s.fetchRemoteACLs(lastRemoteIndex) + if err != nil { + return 0, fmt.Errorf("failed to retrieve remote ACLs: %v", err) + } + + // This will be pretty common because we will be blocking for a long time + // and may have lost leadership, so lets control the message here instead + // of returning deeper error messages from from Raft. + if !s.IsLeader() { + return 0, fmt.Errorf("no longer cluster leader") + } + + // Measure everything after the remote query, which can block for long + // periods of time. This metric is a good measure of how expensive the + // replication process is. + defer metrics.MeasureSince([]string{"consul", "leader", "replicateACLs"}, time.Now()) + + local, err := s.fetchLocalACLs() + if err != nil { + return 0, fmt.Errorf("failed to retrieve local ACLs: %v", err) + } + + // If the remote index ever goes backwards, it's a good indication that + // the remote side was rebuilt and we should do a full sync since we + // can't make any assumptions about what's going on. + if remote.QueryMeta.Index < lastRemoteIndex { + s.logger.Printf("[WARN] consul: ACL replication remote index moved backwards (%d to %d), forcing a full ACL sync", lastRemoteIndex, remote.QueryMeta.Index) + lastRemoteIndex = 0 + } + + changes := reconcileACLs(local, remote.ACLs, lastRemoteIndex) + if err := s.updateLocalACLs(changes); err != nil { + return 0, fmt.Errorf("failed to sync ACL changes: %v", err) + } + + // Return the index we got back from the remote side, since we've synced + // up with the remote state as of that index. + return remote.QueryMeta.Index, nil +} + +// IsACLReplicationEnabled returns true if ACL replication is enabled. +func (s *Server) IsACLReplicationEnabled() bool { + authDC := s.config.ACLDatacenter + return len(authDC) > 0 && (authDC != s.config.Datacenter) && + len(s.config.ACLReplicationToken) > 0 +} + +// runACLReplication is a long-running goroutine that will attempt to replicate +// ACLs while the server is the leader, until the shutdown channel closes. +func (s *Server) runACLReplication() { + defer s.shutdownWait.Done() + + // Give each server's replicator a random initial phase for good + // measure. + select { + case <-time.After(lib.RandomStagger(s.config.ACLReplicationInterval)): + case <-s.shutdownCh: + } + + var lastRemoteIndex uint64 + var wasActive bool + replicate := func() { + if !wasActive { + s.logger.Printf("[INFO] consul: ACL replication started") + wasActive = true + } + + var err error + lastRemoteIndex, err = s.replicateACLs(lastRemoteIndex) + if err != nil { + s.logger.Printf("[WARN] consul: ACL replication error (will retry if still leader): %v", err) + } else { + s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", lastRemoteIndex) + } + } + pause := func() { + if wasActive { + s.logger.Printf("[INFO] consul: ACL replication stopped (no longer leader)") + wasActive = false + } + } + + // This will slowly poll to see if replication should be active. Once it + // is and we've caught up, the replicate() call will begin to block and + // only wake up when the query timer expires or there are new ACLs to + // replicate. We've chosen this design so that the ACLReplicationInterval + // is the lower bound for how quickly we will replicate, no matter how + // much ACL churn is happening on the remote side. + // + // The blocking query inside replicate() respects the shutdown channel, + // so we won't get stuck in here as things are torn down. + for { + select { + case <-s.shutdownCh: + return + + case <-time.After(s.config.ACLReplicationInterval): + if s.IsLeader() { + replicate() + } else { + pause() + } + } + } +} diff --git a/consul/config.go b/consul/config.go index 8e252b635..0e9cee438 100644 --- a/consul/config.go +++ b/consul/config.go @@ -173,6 +173,24 @@ type Config struct { // "allow" can be used to allow all requests. This is not recommended. ACLDownPolicy string + // ACLReplicationToken is used to fetch ACLs from the ACLDatacenter in + // order to replicate them locally. Setting this to a non-empty value + // also enables replication. Replication is only available in datacenters + // other than the ACLDatacenter. + ACLReplicationToken string + + // ACLReplicationInterval is the interval at which replication passes + // will occur. Queries to the ACLDatacenter may block, so replication + // can happen less often than this, but the interval forms the upper + // limit to how fast we will go if there was constant ACL churn on the + // remote end. + ACLReplicationInterval time.Duration + + // ACLReplicationApplyLimit is the max number of replication-related + // apply operations that we allow during a one second period. This is + // used to limit the amount of Raft bandwidth used for replication. + ACLReplicationApplyLimit int + // TombstoneTTL is used to control how long KV tombstones are retained. // This provides a window of time where the X-Consul-Index is monotonic. // Outside this window, the index may not be monotonic. This is a result @@ -271,21 +289,23 @@ func DefaultConfig() *Config { } conf := &Config{ - Datacenter: DefaultDC, - NodeName: hostname, - RPCAddr: DefaultRPCAddr, - RaftConfig: raft.DefaultConfig(), - SerfLANConfig: serf.DefaultConfig(), - SerfWANConfig: serf.DefaultConfig(), - ReconcileInterval: 60 * time.Second, - ProtocolVersion: ProtocolVersion2Compatible, - ACLTTL: 30 * time.Second, - ACLDefaultPolicy: "allow", - ACLDownPolicy: "extend-cache", - TombstoneTTL: 15 * time.Minute, - TombstoneTTLGranularity: 30 * time.Second, - SessionTTLMin: 10 * time.Second, - DisableCoordinates: false, + Datacenter: DefaultDC, + NodeName: hostname, + RPCAddr: DefaultRPCAddr, + RaftConfig: raft.DefaultConfig(), + SerfLANConfig: serf.DefaultConfig(), + SerfWANConfig: serf.DefaultConfig(), + ReconcileInterval: 60 * time.Second, + ProtocolVersion: ProtocolVersion2Compatible, + ACLTTL: 30 * time.Second, + ACLDefaultPolicy: "allow", + ACLDownPolicy: "extend-cache", + ACLReplicationInterval: 30 * time.Second, + ACLReplicationApplyLimit: 100, // ops / sec + TombstoneTTL: 15 * time.Minute, + TombstoneTTLGranularity: 30 * time.Second, + SessionTTLMin: 10 * time.Second, + DisableCoordinates: false, // These are tuned to provide a total throughput of 128 updates // per second. If you update these, you should update the client- diff --git a/consul/server.go b/consul/server.go index 4138e2cc1..02490c6ae 100644 --- a/consul/server.go +++ b/consul/server.go @@ -149,9 +149,17 @@ type Server struct { // for the KV tombstones tombstoneGC *state.TombstoneGC + // shutdown and the associated members here are used in orchestrating + // a clean shutdown. The shutdownCh is never written to, only closed to + // indicate a shutdown has been initiated. The shutdownWait group will + // be waited on after closing the shutdownCh, but before any other + // shutdown activities take place in the server. Anything added to the + // shutdownWait group will block all the rest of shutdown, so use this + // sparingly and carefully. shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex + shutdownWait sync.WaitGroup } // Holds the RPC endpoints @@ -171,49 +179,47 @@ type endpoints struct { // NewServer is used to construct a new Consul server from the // configuration, potentially returning an error func NewServer(config *Config) (*Server, error) { - // Check the protocol version + // Check the protocol version. if err := config.CheckVersion(); err != nil { return nil, err } - // Check for a data directory! + // Check for a data directory. if config.DataDir == "" && !config.DevMode { return nil, fmt.Errorf("Config must provide a DataDir") } - // Sanity check the ACLs + // Sanity check the ACLs. if err := config.CheckACL(); err != nil { return nil, err } - // Ensure we have a log output + // Ensure we have a log output and create a logger. if config.LogOutput == nil { config.LogOutput = os.Stderr } + logger := log.New(config.LogOutput, "", log.LstdFlags) - // Create the tls wrapper for outgoing connections + // Create the TLS wrapper for outgoing connections. tlsConf := config.tlsConfig() tlsWrap, err := tlsConf.OutgoingTLSWrapper() if err != nil { return nil, err } - // Get the incoming tls config + // Get the incoming TLS config. incomingTLS, err := tlsConf.IncomingTLSConfig() if err != nil { return nil, err } - // Create a logger - logger := log.New(config.LogOutput, "", log.LstdFlags) - - // Create the tombstone GC + // Create the tombstone GC. gc, err := state.NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity) if err != nil { return nil, err } - // Create server + // Create server. s := &Server{ config: config, connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), @@ -229,32 +235,32 @@ func NewServer(config *Config) (*Server, error) { shutdownCh: make(chan struct{}), } - // Initialize the authoritative ACL cache + // Initialize the authoritative ACL cache. s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclFault) if err != nil { s.Shutdown() return nil, fmt.Errorf("Failed to create ACL cache: %v", err) } - // Set up the non-authoritative ACL cache + // Set up the non-authoritative ACL cache. if s.aclCache, err = newAclCache(config, logger, s.RPC); err != nil { s.Shutdown() return nil, err } - // Initialize the RPC layer + // Initialize the RPC layer. if err := s.setupRPC(tlsWrap); err != nil { s.Shutdown() return nil, fmt.Errorf("Failed to start RPC layer: %v", err) } - // Initialize the Raft server + // Initialize the Raft server. if err := s.setupRaft(); err != nil { s.Shutdown() return nil, fmt.Errorf("Failed to start Raft: %v", err) } - // Initialize the lan Serf + // Initialize the LAN Serf. s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false) if err != nil { @@ -263,7 +269,7 @@ func NewServer(config *Config) (*Server, error) { } go s.lanEventHandler() - // Initialize the wan Serf + // Initialize the WAN Serf. s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true) if err != nil { @@ -272,11 +278,18 @@ func NewServer(config *Config) (*Server, error) { } go s.wanEventHandler() - // Start listening for RPC requests + // Start ACL replication. + if s.IsACLReplicationEnabled() { + s.shutdownWait.Add(1) + go s.runACLReplication() + } + + // Start listening for RPC requests. go s.listen() - // Start the metrics handlers + // Start the metrics handlers. go s.sessionStats() + return s, nil } @@ -496,6 +509,7 @@ func (s *Server) Shutdown() error { s.shutdown = true close(s.shutdownCh) + s.shutdownWait.Wait() if s.serfLAN != nil { s.serfLAN.Shutdown() diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 4f99399b3..fdd36ea2b 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -662,7 +662,7 @@ type IndexedSessions struct { QueryMeta } -// ACL is used to represent a token and it's rules +// ACL is used to represent a token and its rules type ACL struct { ID string Name string @@ -681,6 +681,21 @@ const ( ACLDelete = "delete" ) +// IsSame checks if one ACL is the same as another, without looking +// at the Raft information (that's why we didn't call it IsEqual). This is +// useful for seeing if an update would be idempotent for all the functional +// parts of the structure. +func (a *ACL) IsSame(other *ACL) bool { + if a.ID != other.ID || + a.Name != other.Name || + a.Type != other.Type || + a.Rules != other.Rules { + return false + } + + return true +} + // ACLRequest is used to create, update or delete an ACL type ACLRequest struct { Datacenter string @@ -693,6 +708,9 @@ func (r *ACLRequest) RequestDatacenter() string { return r.Datacenter } +// ACLRequests is a list of ACL change requests. +type ACLRequests []*ACLRequest + // ACLSpecificRequest is used to request an ACL by ID type ACLSpecificRequest struct { Datacenter string diff --git a/consul/structs/structs_test.go b/consul/structs/structs_test.go index 1aa84b3af..8caa66ffe 100644 --- a/consul/structs/structs_test.go +++ b/consul/structs/structs_test.go @@ -58,6 +58,53 @@ func TestStructs_Implements(t *testing.T) { ) } +func TestStructs_ACL_IsSame(t *testing.T) { + acl := &ACL{ + ID: "guid", + Name: "An ACL for testing", + Type: "client", + Rules: "service \"\" { policy = \"read\" }", + } + if !acl.IsSame(acl) { + t.Fatalf("should be equal to itself") + } + + other := &ACL{ + ID: "guid", + Name: "An ACL for testing", + Type: "client", + Rules: "service \"\" { policy = \"read\" }", + RaftIndex: RaftIndex{ + CreateIndex: 1, + ModifyIndex: 2, + }, + } + if !acl.IsSame(other) || !other.IsSame(acl) { + t.Fatalf("should not care about Raft fields") + } + + check := func(twiddle, restore func()) { + if !acl.IsSame(other) || !other.IsSame(acl) { + t.Fatalf("should be the same") + } + + twiddle() + if acl.IsSame(other) || other.IsSame(acl) { + t.Fatalf("should not be the same") + } + + restore() + if !acl.IsSame(other) || !other.IsSame(acl) { + t.Fatalf("should be the same") + } + } + + check(func() { other.ID = "nope" }, func() { other.ID = "guid" }) + check(func() { other.Name = "nope" }, func() { other.Name = "An ACL for testing" }) + check(func() { other.Type = "management" }, func() { other.Type = "client" }) + check(func() { other.Rules = "" }, func() { other.Rules = "service \"\" { policy = \"read\" }" }) +} + // testServiceNode gives a fully filled out ServiceNode instance. func testServiceNode() *ServiceNode { return &ServiceNode{