open-nomad/client/agent_endpoint.go

168 lines
3.6 KiB
Go
Raw Normal View History

package client
import (
"bytes"
"context"
"errors"
"io"
"strings"
"time"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
)
type Agent struct {
c *Client
}
func NewAgentEndpoint(c *Client) *Agent {
m := &Agent{c: c}
2019-10-15 14:33:07 +00:00
m.c.streamingRpcs.Register("Agent.Monitor", m.monitor)
return m
}
func (m *Agent) monitor(conn io.ReadWriteCloser) {
defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now())
2019-10-15 14:33:07 +00:00
defer conn.Close()
// Decode arguments
var args cstructs.MonitorRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
// Check acl
2019-10-25 18:25:19 +00:00
if aclObj, err := m.c.ResolveToken(args.AuthToken); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(403), encoder)
return
2019-10-25 18:25:19 +00:00
} else if aclObj != nil && !aclObj.AllowAgentRead() {
handleStreamResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder)
return
}
logLevel := log.LevelFromString(args.LogLevel)
if args.LogLevel == "" {
logLevel = log.LevelFromString("INFO")
}
if logLevel == log.NoLevel {
handleStreamResultError(errors.New("Unknown log level"), helper.Int64ToPtr(400), encoder)
return
}
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer close(stopCh)
defer cancel()
monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{
JSONFormat: args.LogJSON,
Level: logLevel,
})
frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
errCh := make(chan error)
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
framer.Run()
defer framer.Destroy()
// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe explicitly closed, exit
cancel()
return
}
select {
case <-ctx.Done():
return
}
}()
logCh := monitor.Start(stopCh)
initialOffset := int64(0)
// receive logs and build frames
go func() {
defer framer.Destroy()
LOOP:
for {
select {
case log := <-logCh:
if err := framer.Send("", "log", log, initialOffset); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
}()
var streamErr error
OUTER:
for {
select {
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
// There was a pending error!
default:
// No error, continue on
}
break OUTER
}
var resp cstructs.StreamErrWrapper
if args.PlainText {
resp.Payload = frame.Data
} else {
if err := frameCodec.Encode(frame); err != nil {
streamErr = err
break OUTER
}
resp.Payload = buf.Bytes()
buf.Reset()
}
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
case <-ctx.Done():
break OUTER
}
}
if streamErr != nil {
// Nothing to do as conn is closed
if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") {
return
}
}
}