From 4a931ae12e89d2368e3a514370944ae859fef234 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 4 Aug 2016 21:32:36 -0700 Subject: [PATCH] Adds an ACL replication status endpoint. --- command/agent/acl_endpoint.go | 17 ++++++++++ command/agent/acl_endpoint_test.go | 15 +++++++++ command/agent/http.go | 2 ++ consul/acl_endpoint.go | 22 +++++++++++++ consul/acl_endpoint_test.go | 26 +++++++++++++++ consul/acl_replication.go | 53 +++++++++++++++++++++++++----- consul/acl_replication_test.go | 13 +++++++- consul/server.go | 6 ++++ consul/structs/structs.go | 11 +++++++ 9 files changed, 156 insertions(+), 9 deletions(-) diff --git a/command/agent/acl_endpoint.go b/command/agent/acl_endpoint.go index 3bc054611..b60502ce9 100644 --- a/command/agent/acl_endpoint.go +++ b/command/agent/acl_endpoint.go @@ -205,3 +205,20 @@ func (s *HTTPServer) ACLList(resp http.ResponseWriter, req *http.Request) (inter } return out.ACLs, nil } + +func (s *HTTPServer) ACLReplicationStatus(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Note that we do not forward to the ACL DC here. This is a query for + // any DC that's doing replication. + args := structs.DCSpecificRequest{} + s.parseSource(req, &args.Source) + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + // Make the request. + var out structs.ACLReplicationStatus + if err := s.agent.RPC("ACL.ReplicationStatus", &args, &out); err != nil { + return nil, err + } + return out, nil +} diff --git a/command/agent/acl_endpoint_test.go b/command/agent/acl_endpoint_test.go index 361d661ac..836d5cd19 100644 --- a/command/agent/acl_endpoint_test.go +++ b/command/agent/acl_endpoint_test.go @@ -218,3 +218,18 @@ func TestACLList(t *testing.T) { } }) } + +func TestACLReplicationStatus(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + req, err := http.NewRequest("GET", "/v1/acl/replication", nil) + resp := httptest.NewRecorder() + obj, err := srv.ACLReplicationStatus(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + _, ok := obj.(structs.ACLReplicationStatus) + if !ok { + t.Fatalf("should work") + } + }) +} diff --git a/command/agent/http.go b/command/agent/http.go index 92247ef2c..a0fa287b9 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -257,6 +257,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/acl/info/", s.wrap(s.ACLGet)) s.mux.HandleFunc("/v1/acl/clone/", s.wrap(s.ACLClone)) s.mux.HandleFunc("/v1/acl/list", s.wrap(s.ACLList)) + s.mux.HandleFunc("/v1/acl/replication", s.wrap(s.ACLReplicationStatus)) } else { s.mux.HandleFunc("/v1/acl/create", s.wrap(aclDisabled)) s.mux.HandleFunc("/v1/acl/update", s.wrap(aclDisabled)) @@ -264,6 +265,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/acl/info/", s.wrap(aclDisabled)) s.mux.HandleFunc("/v1/acl/clone/", s.wrap(aclDisabled)) s.mux.HandleFunc("/v1/acl/list", s.wrap(aclDisabled)) + s.mux.HandleFunc("/v1/acl/replication", s.wrap(aclDisabled)) } s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral)) diff --git a/consul/acl_endpoint.go b/consul/acl_endpoint.go index 19776ac5e..52cb5d228 100644 --- a/consul/acl_endpoint.go +++ b/consul/acl_endpoint.go @@ -235,3 +235,25 @@ func (a *ACL) List(args *structs.DCSpecificRequest, return nil }) } + +// ReplicationStatus is used to retrieve the current ACL replication status. +func (a *ACL) ReplicationStatus(args *structs.DCSpecificRequest, + reply *structs.ACLReplicationStatus) error { + // This must be sent to the leader, so we fix the args since we are + // re-using a structure where we don't support all the options. + args.RequireConsistent = true + args.AllowStale = false + if done, err := a.srv.forward("ACL.ReplicationStatus", args, args, reply); done { + return err + } + + // There's no ACL token required here since this doesn't leak any + // sensitive information, and we don't want people to have to use + // management tokens if they are querying this via a health check. + + // Poll the latest status. + a.srv.aclReplicationStatusLock.RLock() + *reply = a.srv.aclReplicationStatus + a.srv.aclReplicationStatusLock.RUnlock() + return nil +} diff --git a/consul/acl_endpoint_test.go b/consul/acl_endpoint_test.go index 9871dc1ad..e031730bf 100644 --- a/consul/acl_endpoint_test.go +++ b/consul/acl_endpoint_test.go @@ -466,3 +466,29 @@ func TestACLEndpoint_List_Denied(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestACLEndpoint_ReplicationStatus(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc2" + c.ACLReplicationToken = "secret" + c.ACLReplicationInterval = 0 + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + getR := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var status structs.ACLReplicationStatus + err := msgpackrpc.CallWithCodec(codec, "ACL.ReplicationStatus", &getR, &status) + if err != nil { + t.Fatalf("err: %v", err) + } + if !status.Enabled || !status.Running || status.SourceDatacenter != "dc2" { + t.Fatalf("bad: %#v", status) + } +} diff --git a/consul/acl_replication.go b/consul/acl_replication.go index 075a6b46d..c7a5f3dd1 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -256,9 +256,33 @@ func (s *Server) IsACLReplicationEnabled() bool { len(s.config.ACLReplicationToken) > 0 } +// updateACLReplicationStatus safely updates the ACL replication status. +func (s *Server) updateACLReplicationStatus(status structs.ACLReplicationStatus) { + // Fixup the times to shed some useless precision to ease formattting, + // and always report UTC. + status.LastError = status.LastError.Round(time.Second).UTC() + status.LastSuccess = status.LastSuccess.Round(time.Second).UTC() + + // Set the shared state. + s.aclReplicationStatusLock.Lock() + s.aclReplicationStatus = status + s.aclReplicationStatusLock.Unlock() +} + // runACLReplication is a long-running goroutine that will attempt to replicate // ACLs while the server is the leader, until the shutdown channel closes. func (s *Server) runACLReplication() { + var status structs.ACLReplicationStatus + status.Enabled = true + status.SourceDatacenter = s.config.ACLDatacenter + s.updateACLReplicationStatus(status) + + // Show that it's not running on the way out. + defer func() { + status.Running = false + s.updateACLReplicationStatus(status) + }() + // Give each server's replicator a random initial phase for good // measure. select { @@ -266,26 +290,39 @@ func (s *Server) runACLReplication() { case <-s.shutdownCh: } + // We are fairly conservative with the lastRemoteIndex so that after a + // leadership change or an error we re-sync everything (we also don't + // want to block the first time after one of these events so we can + // show a successful sync in the status endpoint). var lastRemoteIndex uint64 - var wasActive bool replicate := func() { - if !wasActive { + if !status.Running { + lastRemoteIndex = 0 // Re-sync everything. + status.Running = true + s.updateACLReplicationStatus(status) s.logger.Printf("[INFO] consul: ACL replication started") - wasActive = true } - var err error - lastRemoteIndex, err = s.replicateACLs(lastRemoteIndex) + index, err := s.replicateACLs(lastRemoteIndex) if err != nil { + lastRemoteIndex = 0 // Re-sync everything. + status.LastError = time.Now() + s.updateACLReplicationStatus(status) s.logger.Printf("[WARN] consul: ACL replication error (will retry if still leader): %v", err) } else { - s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", lastRemoteIndex) + lastRemoteIndex = index + status.ReplicatedIndex = index + status.LastSuccess = time.Now() + s.updateACLReplicationStatus(status) + s.logger.Printf("[DEBUG] consul: ACL replication completed through index %d", index) } } pause := func() { - if wasActive { + if status.Running { + lastRemoteIndex = 0 // Re-sync everything. + status.Running = false + s.updateACLReplicationStatus(status) s.logger.Printf("[INFO] consul: ACL replication stopped (no longer leader)") - wasActive = false } } diff --git a/consul/acl_replication_test.go b/consul/acl_replication_test.go index 3f9b854f2..0f60fc69a 100644 --- a/consul/acl_replication_test.go +++ b/consul/acl_replication_test.go @@ -364,7 +364,7 @@ func TestACLReplication(t *testing.T) { } checkSame := func() (bool, error) { - _, remote, err := s1.fsm.State().ACLList() + index, remote, err := s1.fsm.State().ACLList() if err != nil { return false, err } @@ -380,6 +380,17 @@ func TestACLReplication(t *testing.T) { return false, nil } } + + var status structs.ACLReplicationStatus + s2.aclReplicationStatusLock.RLock() + status = s2.aclReplicationStatus + s2.aclReplicationStatusLock.RUnlock() + if !status.Enabled || !status.Running || + status.ReplicatedIndex != index || + status.SourceDatacenter != "dc1" { + return false, nil + } + return true, nil } diff --git a/consul/server.go b/consul/server.go index 5419f0b6e..e31b92845 100644 --- a/consul/server.go +++ b/consul/server.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/state" + "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/raft" "github.com/hashicorp/raft-boltdb" @@ -149,6 +150,11 @@ type Server struct { // for the KV tombstones tombstoneGC *state.TombstoneGC + // aclReplicationStatus (and its associated lock) provide information + // about the health of the ACL replication goroutine. + aclReplicationStatus structs.ACLReplicationStatus + aclReplicationStatusLock sync.RWMutex + // shutdown and the associated members here are used in orchestrating // a clean shutdown. The shutdownCh is never written to, only closed to // indicate a shutdown has been initiated. diff --git a/consul/structs/structs.go b/consul/structs/structs.go index fdd36ea2b..c9ee486fc 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -748,6 +748,17 @@ type ACLPolicy struct { QueryMeta } +// ACLReplicationStatus provides information about the health of the ACL +// replication system. +type ACLReplicationStatus struct { + Enabled bool + Running bool + SourceDatacenter string + ReplicatedIndex uint64 + LastSuccess time.Time + LastError time.Time +} + // Coordinate stores a node name with its associated network coordinate. type Coordinate struct { Node string