diff --git a/api/agent.go b/api/agent.go index 0422a2842..6b6166635 100644 --- a/api/agent.go +++ b/api/agent.go @@ -1,6 +1,7 @@ package api import ( + "bufio" "fmt" ) @@ -411,3 +412,37 @@ func (a *Agent) DisableNodeMaintenance() error { resp.Body.Close() return nil } + +// Monitor returns a channel which will receive streaming logs from the agent +// Providing a non-nil stopCh can be used to close the connection and stop the +// log stream +func (a *Agent) Monitor(loglevel string, stopCh chan struct{}) (chan string, error) { + r := a.c.newRequest("GET", "/v1/agent/monitor") + if loglevel != "" { + r.params.Add("loglevel", loglevel) + } + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + + logCh := make(chan string, 64) + go func() { + defer resp.Body.Close() + + scanner := bufio.NewScanner(resp.Body) + for { + select { + case <-stopCh: + close(logCh) + return + default: + } + if scanner.Scan() { + logCh <- scanner.Text() + } + } + }() + + return logCh, nil +} diff --git a/api/agent_test.go b/api/agent_test.go index 215d240dc..970380565 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -3,6 +3,7 @@ package api import ( "strings" "testing" + "time" ) func TestAgent_Self(t *testing.T) { @@ -558,6 +559,29 @@ func TestAgent_ForceLeave(t *testing.T) { } } +func TestAgent_Monitor(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + agent := c.Agent() + + logCh, err := agent.Monitor("info", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the first log message and validate it + select { + case log := <-logCh: + if !strings.Contains(log, "[INFO] raft: Initial configuration") { + t.Fatalf("bad: %q", log) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to get a log message") + } +} + func TestServiceMaintenance(t *testing.T) { t.Parallel() c, s := makeClient(t) diff --git a/command/agent/agent.go b/command/agent/agent.go index 54e2b0c9e..6bee6fba0 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-uuid" "github.com/hashicorp/serf/coordinate" @@ -66,6 +67,9 @@ type Agent struct { // Output sink for logs logOutput io.Writer + // Used for streaming logs to + logWriter *logger.LogWriter + // We have one of a client or a server, depending // on our configuration server *consul.Server diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index db33256af..fd4586b20 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -2,12 +2,15 @@ package agent import ( "fmt" + "log" "net/http" "strconv" "strings" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/types" + "github.com/hashicorp/logutils" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" ) @@ -393,6 +396,61 @@ func (s *HTTPServer) AgentNodeMaintenance(resp http.ResponseWriter, req *http.Re return nil, nil } +func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Only GET supported + if req.Method != "GET" { + resp.WriteHeader(405) + return nil, nil + } + + // Get the provided loglevel + logLevel := req.URL.Query().Get("loglevel") + if logLevel == "" { + logLevel = "INFO" + } + + // Upper case the log level + logLevel = strings.ToUpper(logLevel) + + // Create a level filter + filter := logger.LevelFilter() + filter.MinLevel = logutils.LogLevel(logLevel) + if !logger.ValidateLevelFilter(filter.MinLevel, filter) { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Unknown log level: %s", filter.MinLevel))) + return nil, nil + } + + flusher, ok := resp.(http.Flusher) + if !ok { + return nil, fmt.Errorf("Streaming not supported") + } + + // Set up a log handler + handler := &httpLogHandler{ + filter: filter, + logCh: make(chan string, 512), + logger: s.logger, + } + s.agent.logWriter.RegisterHandler(handler) + defer s.agent.logWriter.DeregisterHandler(handler) + + notify := resp.(http.CloseNotifier).CloseNotify() + + // Stream logs until the connection is closed + for { + select { + case <-notify: + return nil, nil + case log := <-handler.logCh: + resp.Write([]byte(log + "\n")) + flusher.Flush() + } + } + + return nil, nil +} + // syncChanges is a helper function which wraps a blocking call to sync // services and checks to the server. If the operation fails, we only // only warn because the write did succeed and anti-entropy will sync later. @@ -401,3 +459,27 @@ func (s *HTTPServer) syncChanges() { s.logger.Printf("[ERR] agent: failed to sync changes: %v", err) } } + +type httpLogHandler struct { + filter *logutils.LevelFilter + logCh chan string + logger *log.Logger +} + +func (h *httpLogHandler) HandleLog(log string) { + // Check the log level + if !h.filter.Check([]byte(log)) { + return + } + + // Do a non-blocking send + select { + case h.logCh <- log: + default: + // We can't log synchronously, since we are already being invoked + // from the logWriter, and a log will need to invoke Write() which + // already holds the lock. We must therefor do the log async, so + // as to not deadlock + go h.logger.Printf("[WARN] Dropping logs to monitor http endpoint") + } +} diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index a29b8ae9d..6bc875a30 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -1,8 +1,10 @@ package agent import ( + "bytes" "errors" "fmt" + "io" "net/http" "net/http/httptest" "os" @@ -12,6 +14,7 @@ import ( "time" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/types" "github.com/hashicorp/serf/serf" @@ -1019,3 +1022,69 @@ func TestHTTPAgentRegisterServiceCheck(t *testing.T) { t.Fatalf("bad: %#v", result["memcached_check2"]) } } + +func TestHTTPAgent_Monitor(t *testing.T) { + logWriter := logger.NewLogWriter(512) + expectedLogs := bytes.Buffer{} + logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter) + + dir, srv := makeHTTPServerWithConfigLog(t, nil, logger) + srv.agent.logWriter = logWriter + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Begin streaming logs from the monitor endpoint + req, _ := http.NewRequest("GET", "/v1/agent/monitor?loglevel=debug", nil) + resp := newClosableRecorder() + go func() { + if _, err := srv.AgentMonitor(resp, req); err != nil { + t.Fatalf("err: %s", err) + } + }() + + // Write the incoming logs to a channel for reading + logCh := make(chan string, 0) + go func() { + for { + line, err := resp.Body.ReadString('\n') + if err != nil && err != io.EOF { + t.Fatalf("err: %v", err) + } + if line != "" { + logCh <- line + } + } + }() + + // Verify that the first 5 logs we get match the expected stream + for i := 0; i < 5; i++ { + select { + case log := <-logCh: + expected, err := expectedLogs.ReadString('\n') + if err != nil { + t.Fatalf("err: %v", err) + } + if log != expected { + t.Fatalf("bad: %q %q", expected, log) + } + case <-time.After(10 * time.Second): + t.Fatalf("failed to get log within timeout") + } + } +} + +type closableRecorder struct { + *httptest.ResponseRecorder + closer chan bool +} + +func newClosableRecorder() *closableRecorder { + r := httptest.NewRecorder() + closer := make(chan bool) + return &closableRecorder{r, closer} +} + +func (r *closableRecorder) CloseNotify() <-chan bool { + return r.closer +} diff --git a/command/agent/command.go b/command/agent/command.go index 247ffd1b9..c4df74660 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -471,6 +471,7 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err)) return err } + agent.logWriter = logWriter c.agent = agent // Setup the RPC listener diff --git a/command/agent/http.go b/command/agent/http.go index 9d210fc2a..7ba8a3c0a 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -251,6 +251,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { } s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) + s.handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor)) s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 7cc80cb24..5953f4c32 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -28,6 +28,10 @@ func makeHTTPServer(t *testing.T) (string, *HTTPServer) { } func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPServer) { + return makeHTTPServerWithConfigLog(t, cb, nil) +} + +func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer) (string, *HTTPServer) { configTry := 0 RECONF: configTry += 1 @@ -36,7 +40,7 @@ RECONF: cb(conf) } - dir, agent := makeAgent(t, conf) + dir, agent := makeAgentLog(t, conf, l) servers, err := NewHTTPServers(agent, conf, agent.logOutput) if err != nil { if configTry < 3 { diff --git a/website/source/docs/agent/http/agent.html.markdown b/website/source/docs/agent/http/agent.html.markdown index 74f3752ca..6a9a9ce76 100644 --- a/website/source/docs/agent/http/agent.html.markdown +++ b/website/source/docs/agent/http/agent.html.markdown @@ -21,6 +21,7 @@ The following endpoints are supported: * [`/v1/agent/members`](#agent_members) : Returns the members as seen by the local serf agent * [`/v1/agent/self`](#agent_self) : Returns the local node configuration * [`/v1/agent/maintenance`](#agent_maintenance) : Manages node maintenance mode +* [`/v1/agent/monitor`](#agent_monitor) : Streams logs from the local agent * [`/v1/agent/join/
`](#agent_join) : Triggers the local agent to join a node * [`/v1/agent/force-leave/