Adds an ACL replication status endpoint.

This commit is contained in:
James Phillips 2016-08-04 21:32:36 -07:00
parent c94f1e1b83
commit 4a931ae12e
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
9 changed files with 156 additions and 9 deletions

View File

@ -205,3 +205,20 @@ func (s *HTTPServer) ACLList(resp http.ResponseWriter, req *http.Request) (inter
} }
return out.ACLs, nil return out.ACLs, nil
} }
func (s *HTTPServer) ACLReplicationStatus(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Note that we do not forward to the ACL DC here. This is a query for
// any DC that's doing replication.
args := structs.DCSpecificRequest{}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
// Make the request.
var out structs.ACLReplicationStatus
if err := s.agent.RPC("ACL.ReplicationStatus", &args, &out); err != nil {
return nil, err
}
return out, nil
}

View File

@ -218,3 +218,18 @@ func TestACLList(t *testing.T) {
} }
}) })
} }
func TestACLReplicationStatus(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
req, err := http.NewRequest("GET", "/v1/acl/replication", nil)
resp := httptest.NewRecorder()
obj, err := srv.ACLReplicationStatus(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
_, ok := obj.(structs.ACLReplicationStatus)
if !ok {
t.Fatalf("should work")
}
})
}

View File

@ -257,6 +257,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/acl/info/", s.wrap(s.ACLGet)) s.mux.HandleFunc("/v1/acl/info/", s.wrap(s.ACLGet))
s.mux.HandleFunc("/v1/acl/clone/", s.wrap(s.ACLClone)) s.mux.HandleFunc("/v1/acl/clone/", s.wrap(s.ACLClone))
s.mux.HandleFunc("/v1/acl/list", s.wrap(s.ACLList)) s.mux.HandleFunc("/v1/acl/list", s.wrap(s.ACLList))
s.mux.HandleFunc("/v1/acl/replication", s.wrap(s.ACLReplicationStatus))
} else { } else {
s.mux.HandleFunc("/v1/acl/create", s.wrap(aclDisabled)) s.mux.HandleFunc("/v1/acl/create", s.wrap(aclDisabled))
s.mux.HandleFunc("/v1/acl/update", s.wrap(aclDisabled)) s.mux.HandleFunc("/v1/acl/update", s.wrap(aclDisabled))
@ -264,6 +265,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/acl/info/", s.wrap(aclDisabled)) s.mux.HandleFunc("/v1/acl/info/", s.wrap(aclDisabled))
s.mux.HandleFunc("/v1/acl/clone/", s.wrap(aclDisabled)) s.mux.HandleFunc("/v1/acl/clone/", s.wrap(aclDisabled))
s.mux.HandleFunc("/v1/acl/list", s.wrap(aclDisabled)) s.mux.HandleFunc("/v1/acl/list", s.wrap(aclDisabled))
s.mux.HandleFunc("/v1/acl/replication", s.wrap(aclDisabled))
} }
s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral)) s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral))

View File

@ -235,3 +235,25 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
return nil return nil
}) })
} }
// ReplicationStatus is used to retrieve the current ACL replication status.
func (a *ACL) ReplicationStatus(args *structs.DCSpecificRequest,
reply *structs.ACLReplicationStatus) error {
// This must be sent to the leader, so we fix the args since we are
// re-using a structure where we don't support all the options.
args.RequireConsistent = true
args.AllowStale = false
if done, err := a.srv.forward("ACL.ReplicationStatus", args, args, reply); done {
return err
}
// There's no ACL token required here since this doesn't leak any
// sensitive information, and we don't want people to have to use
// management tokens if they are querying this via a health check.
// Poll the latest status.
a.srv.aclReplicationStatusLock.RLock()
*reply = a.srv.aclReplicationStatus
a.srv.aclReplicationStatusLock.RUnlock()
return nil
}

View File

@ -466,3 +466,29 @@ func TestACLEndpoint_List_Denied(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
} }
func TestACLEndpoint_ReplicationStatus(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc2"
c.ACLReplicationToken = "secret"
c.ACLReplicationInterval = 0
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
getR := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var status structs.ACLReplicationStatus
err := msgpackrpc.CallWithCodec(codec, "ACL.ReplicationStatus", &getR, &status)
if err != nil {
t.Fatalf("err: %v", err)
}
if !status.Enabled || !status.Running || status.SourceDatacenter != "dc2" {
t.Fatalf("bad: %#v", status)
}
}

View File

@ -256,9 +256,33 @@ func (s *Server) IsACLReplicationEnabled() bool {
len(s.config.ACLReplicationToken) > 0 len(s.config.ACLReplicationToken) > 0
} }
// updateACLReplicationStatus safely updates the ACL replication status.
func (s *Server) updateACLReplicationStatus(status structs.ACLReplicationStatus) {
// Fixup the times to shed some useless precision to ease formattting,
// and always report UTC.
status.LastError = status.LastError.Round(time.Second).UTC()
status.LastSuccess = status.LastSuccess.Round(time.Second).UTC()
// Set the shared state.
s.aclReplicationStatusLock.Lock()
s.aclReplicationStatus = status
s.aclReplicationStatusLock.Unlock()
}
// runACLReplication is a long-running goroutine that will attempt to replicate // runACLReplication is a long-running goroutine that will attempt to replicate
// ACLs while the server is the leader, until the shutdown channel closes. // ACLs while the server is the leader, until the shutdown channel closes.
func (s *Server) runACLReplication() { func (s *Server) runACLReplication() {
var status structs.ACLReplicationStatus
status.Enabled = true
status.SourceDatacenter = s.config.ACLDatacenter
s.updateACLReplicationStatus(status)
// Show that it's not running on the way out.
defer func() {
status.Running = false
s.updateACLReplicationStatus(status)
}()
// Give each server's replicator a random initial phase for good // Give each server's replicator a random initial phase for good
// measure. // measure.
select { select {
@ -266,26 +290,39 @@ func (s *Server) runACLReplication() {
case <-s.shutdownCh: case <-s.shutdownCh:
} }
// We are fairly conservative with the lastRemoteIndex so that after a
// leadership change or an error we re-sync everything (we also don't
// want to block the first time after one of these events so we can
// show a successful sync in the status endpoint).
var lastRemoteIndex uint64 var lastRemoteIndex uint64
var wasActive bool
replicate := func() { replicate := func() {
if !wasActive { if !status.Running {
lastRemoteIndex = 0 // Re-sync everything.
status.Running = true
s.updateACLReplicationStatus(status)
s.logger.Printf("[INFO] consul: ACL replication started") s.logger.Printf("[INFO] consul: ACL replication started")
wasActive = true
} }
var err error index, err := s.replicateACLs(lastRemoteIndex)
lastRemoteIndex, err = s.replicateACLs(lastRemoteIndex)
if err != nil { if err != nil {
lastRemoteIndex = 0 // Re-sync everything.
status.LastError = time.Now()
s.updateACLReplicationStatus(status)
s.logger.Printf("[WARN] consul: ACL replication error (will retry if still leader): %v", err) s.logger.Printf("[WARN] consul: ACL replication error (will retry if still leader): %v", err)
} else { } else {
s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", lastRemoteIndex) lastRemoteIndex = index
status.ReplicatedIndex = index
status.LastSuccess = time.Now()
s.updateACLReplicationStatus(status)
s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", index)
} }
} }
pause := func() { pause := func() {
if wasActive { if status.Running {
lastRemoteIndex = 0 // Re-sync everything.
status.Running = false
s.updateACLReplicationStatus(status)
s.logger.Printf("[INFO] consul: ACL replication stopped (no longer leader)") s.logger.Printf("[INFO] consul: ACL replication stopped (no longer leader)")
wasActive = false
} }
} }

View File

@ -364,7 +364,7 @@ func TestACLReplication(t *testing.T) {
} }
checkSame := func() (bool, error) { checkSame := func() (bool, error) {
_, remote, err := s1.fsm.State().ACLList() index, remote, err := s1.fsm.State().ACLList()
if err != nil { if err != nil {
return false, err return false, err
} }
@ -380,6 +380,17 @@ func TestACLReplication(t *testing.T) {
return false, nil return false, nil
} }
} }
var status structs.ACLReplicationStatus
s2.aclReplicationStatusLock.RLock()
status = s2.aclReplicationStatus
s2.aclReplicationStatusLock.RUnlock()
if !status.Enabled || !status.Running ||
status.ReplicatedIndex != index ||
status.SourceDatacenter != "dc1" {
return false, nil
}
return true, nil return true, nil
} }

View File

@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb" "github.com/hashicorp/raft-boltdb"
@ -149,6 +150,11 @@ type Server struct {
// for the KV tombstones // for the KV tombstones
tombstoneGC *state.TombstoneGC tombstoneGC *state.TombstoneGC
// aclReplicationStatus (and its associated lock) provide information
// about the health of the ACL replication goroutine.
aclReplicationStatus structs.ACLReplicationStatus
aclReplicationStatusLock sync.RWMutex
// shutdown and the associated members here are used in orchestrating // shutdown and the associated members here are used in orchestrating
// a clean shutdown. The shutdownCh is never written to, only closed to // a clean shutdown. The shutdownCh is never written to, only closed to
// indicate a shutdown has been initiated. // indicate a shutdown has been initiated.

View File

@ -748,6 +748,17 @@ type ACLPolicy struct {
QueryMeta QueryMeta
} }
// ACLReplicationStatus provides information about the health of the ACL
// replication system.
type ACLReplicationStatus struct {
Enabled bool
Running bool
SourceDatacenter string
ReplicatedIndex uint64
LastSuccess time.Time
LastError time.Time
}
// Coordinate stores a node name with its associated network coordinate. // Coordinate stores a node name with its associated network coordinate.
type Coordinate struct { type Coordinate struct {
Node string Node string