enable json formatting, use queryoptions
This commit is contained in:
parent
786989dbe3
commit
a45ae1cd58
10
api/agent.go
10
api/agent.go
|
@ -240,20 +240,13 @@ func (a *Agent) Health() (*AgentHealthResponse, error) {
|
|||
|
||||
// Monitor returns a channel which will receive streaming logs from the agent
|
||||
// Providing a non-nil stopCh can be used to close the connection and stop log streaming
|
||||
func (a *Agent) Monitor(loglevel string, nodeID string, stopCh <-chan struct{}, q *QueryOptions) (chan string, error) {
|
||||
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (chan string, error) {
|
||||
r, err := a.client.newRequest("GET", "/v1/agent/monitor")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.setQueryOptions(q)
|
||||
if loglevel != "" {
|
||||
r.params.Add("loglevel", loglevel)
|
||||
}
|
||||
if nodeID != "" {
|
||||
r.params.Add("nodeID", nodeID)
|
||||
}
|
||||
|
||||
_, resp, err := requireOK(a.client.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -267,7 +260,6 @@ func (a *Agent) Monitor(loglevel string, nodeID string, stopCh <-chan struct{},
|
|||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
close(logCh)
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
|
|
@ -270,7 +270,12 @@ func TestAgent_MonitorServer(t *testing.T) {
|
|||
agent := c.Agent()
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
logCh, err := agent.Monitor("debug", "", doneCh, nil)
|
||||
q := &QueryOptions{
|
||||
Params: map[string]string{
|
||||
"log-level": "debug",
|
||||
},
|
||||
}
|
||||
logCh, err := agent.Monitor(doneCh, q)
|
||||
defer close(doneCh)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -301,9 +306,16 @@ func TestAgent_MonitorWithNode(t *testing.T) {
|
|||
agent := c.Agent()
|
||||
id, _ := uuid.GenerateUUID()
|
||||
|
||||
q := &QueryOptions{
|
||||
Params: map[string]string{
|
||||
"log-level": "debug",
|
||||
"node-id": id,
|
||||
},
|
||||
}
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
// todo need to create or stub a nodeid?
|
||||
logCh, err := agent.Monitor("debug", id, doneCh, nil)
|
||||
logCh, err := agent.Monitor(doneCh, q)
|
||||
defer close(doneCh)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
|
|
@ -33,29 +33,29 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) {
|
|||
defer conn.Close()
|
||||
|
||||
// Decode arguments
|
||||
var req cstructs.MonitorRequest
|
||||
var args cstructs.MonitorRequest
|
||||
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
|
||||
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
|
||||
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
if err := decoder.Decode(&args); err != nil {
|
||||
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
|
||||
return
|
||||
}
|
||||
|
||||
// Check acl
|
||||
if aclObj, err := m.c.ResolveToken(req.QueryOptions.AuthToken); err != nil {
|
||||
if aclObj, err := m.c.ResolveToken(args.QueryOptions.AuthToken); err != nil {
|
||||
handleStreamResultError(err, helper.Int64ToPtr(403), encoder)
|
||||
return
|
||||
} else if aclObj != nil && !aclObj.AllowNsOp(req.Namespace, acl.NamespaceCapabilityReadFS) {
|
||||
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS) {
|
||||
handleStreamResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder)
|
||||
return
|
||||
}
|
||||
|
||||
var logLevel log.Level
|
||||
if req.LogLevel == "" {
|
||||
if args.LogLevel == "" {
|
||||
logLevel = log.LevelFromString("INFO")
|
||||
} else {
|
||||
logLevel = log.LevelFromString(req.LogLevel)
|
||||
logLevel = log.LevelFromString(args.LogLevel)
|
||||
}
|
||||
|
||||
if logLevel == log.NoLevel {
|
||||
|
@ -69,13 +69,13 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) {
|
|||
defer cancel()
|
||||
|
||||
monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{
|
||||
JSONFormat: args.LogJSON,
|
||||
Level: logLevel,
|
||||
JSONFormat: false,
|
||||
})
|
||||
|
||||
go func() {
|
||||
if _, err := conn.Read(nil); err != nil {
|
||||
close(stopCh)
|
||||
// One end of the pipe explicitly closed, exit
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"net"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/docker/pkg/ioutils"
|
||||
|
@ -165,7 +166,7 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
|
|||
}
|
||||
|
||||
// Get the provided loglevel.
|
||||
logLevel := req.URL.Query().Get("loglevel")
|
||||
logLevel := req.URL.Query().Get("log-level")
|
||||
if logLevel == "" {
|
||||
logLevel = "INFO"
|
||||
}
|
||||
|
@ -175,13 +176,19 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
|
|||
}
|
||||
|
||||
// Determine if we are targeting a server or client
|
||||
nodeID := req.URL.Query().Get("nodeID")
|
||||
nodeID := req.URL.Query().Get("node-id")
|
||||
|
||||
logJSONStr := req.URL.Query().Get("log-json")
|
||||
logJSON, err := strconv.ParseBool(logJSONStr)
|
||||
if err != nil {
|
||||
logJSON = false
|
||||
}
|
||||
|
||||
// Build the request and parse the ACL token
|
||||
args := cstructs.MonitorRequest{
|
||||
NodeID: nodeID,
|
||||
LogLevel: logLevel,
|
||||
LogJSON: false,
|
||||
LogJSON: logJSON,
|
||||
}
|
||||
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
||||
|
||||
|
@ -208,7 +215,7 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
|
|||
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
|
||||
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx, cancel := context.WithCancel(req.Context())
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
httpPipe.Close()
|
||||
|
|
|
@ -256,7 +256,7 @@ func TestHTTP_AgentMonitor(t *testing.T) {
|
|||
|
||||
httpTest(t, nil, func(s *TestAgent) {
|
||||
{
|
||||
req, err := http.NewRequest("GET", "/v1/agent/monitor?loglevel=unknown", nil)
|
||||
req, err := http.NewRequest("GET", "/v1/agent/monitor?log-level=unknown", nil)
|
||||
require.Nil(t, err)
|
||||
resp := newClosableRecorder()
|
||||
|
||||
|
@ -269,7 +269,7 @@ func TestHTTP_AgentMonitor(t *testing.T) {
|
|||
|
||||
// check for a specific log
|
||||
{
|
||||
req, err := http.NewRequest("GET", "/v1/agent/monitor?loglevel=warn", nil)
|
||||
req, err := http.NewRequest("GET", "/v1/agent/monitor?log-level=warn", nil)
|
||||
require.Nil(t, err)
|
||||
resp := newClosableRecorder()
|
||||
defer resp.Close()
|
||||
|
@ -305,7 +305,7 @@ func TestHTTP_AgentMonitor(t *testing.T) {
|
|||
|
||||
// stream logs for a given node
|
||||
{
|
||||
req, err := http.NewRequest("GET", "/v1/agent/monitor?loglevel=warn&nodeID="+s.client.NodeID(), nil)
|
||||
req, err := http.NewRequest("GET", "/v1/agent/monitor?log-level=warn&node-id="+s.client.NodeID(), nil)
|
||||
require.Nil(t, err)
|
||||
resp := newClosableRecorder()
|
||||
defer resp.Close()
|
||||
|
|
|
@ -41,6 +41,9 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
|
|||
case log := <-d.logCh:
|
||||
logCh <- log
|
||||
case <-stopCh:
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
d.logger.DeregisterSink(d.sink)
|
||||
close(d.logCh)
|
||||
return
|
||||
|
|
|
@ -4,9 +4,11 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/hashicorp/nomad/api"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
|
@ -46,10 +48,13 @@ func (c *MonitorCommand) Run(args []string) int {
|
|||
|
||||
var logLevel string
|
||||
var nodeID string
|
||||
var logJSON bool
|
||||
|
||||
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
|
||||
flags.Usage = func() { c.Ui.Output(c.Help()) }
|
||||
flags.StringVar(&logLevel, "log-level", "", "")
|
||||
flags.StringVar(&nodeID, "node-id", "", "")
|
||||
flags.BoolVar(&logJSON, "log-json", false, "")
|
||||
|
||||
if err := flags.Parse(args); err != nil {
|
||||
return 1
|
||||
|
@ -62,8 +67,17 @@ func (c *MonitorCommand) Run(args []string) int {
|
|||
return 1
|
||||
}
|
||||
|
||||
params := map[string]string{
|
||||
"log-level": logLevel,
|
||||
"node-id": nodeID,
|
||||
"log-json": strconv.FormatBool(logJSON),
|
||||
}
|
||||
|
||||
query := &api.QueryOptions{
|
||||
Params: params,
|
||||
}
|
||||
eventDoneCh := make(chan struct{})
|
||||
logCh, err := client.Agent().Monitor(logLevel, nodeID, eventDoneCh, nil)
|
||||
logCh, err := client.Agent().Monitor(eventDoneCh, query)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error starting monitor: %s", err))
|
||||
c.Ui.Error(commandErrorText(c))
|
||||
|
|
|
@ -137,7 +137,7 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) {
|
|||
|
||||
monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{
|
||||
Level: logLevel,
|
||||
JSONFormat: false,
|
||||
JSONFormat: args.LogJSON,
|
||||
})
|
||||
|
||||
go func() {
|
||||
|
|
Loading…
Reference in New Issue