open-nomad/nomad/client_agent_endpoint.go
Drew Bailey 9b63828658
serverID to target remote leader or server
handle the case where we request a server-id which is this current server

update docs, error on node and server id params

more accurate names for tests

use shared no leader err, formatting

rm bad comment

remove redundant variable
2019-11-14 10:07:35 -05:00

309 lines
7.1 KiB
Go

package nomad
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"time"
log "github.com/hashicorp/go-hclog"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
)
type Agent struct {
srv *Server
}
func (m *Agent) register() {
m.srv.streamingRpcs.Register("Agent.Monitor", m.monitor)
}
func (m *Agent) monitor(conn io.ReadWriteCloser) {
defer conn.Close()
// Decode args
var args cstructs.MonitorRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
// Check agent read permissions
if aclObj, err := m.srv.ResolveToken(args.AuthToken); err != nil {
handleStreamResultError(err, nil, encoder)
return
} else if aclObj != nil && !aclObj.AllowAgentRead() {
handleStreamResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder)
return
}
logLevel := log.LevelFromString(args.LogLevel)
if args.LogLevel == "" {
logLevel = log.LevelFromString("INFO")
}
if logLevel == log.NoLevel {
handleStreamResultError(errors.New("Unknown log level"), helper.Int64ToPtr(400), encoder)
return
}
// Targeting a node, forward request to node
if args.NodeID != "" {
m.forwardMonitorClient(conn, args, encoder, decoder)
// forwarded request has ended, return
return
}
currentServer := m.srv.serf.LocalMember().Name
var forwardServer bool
// Targeting a remote server which is not the leader and not this server
if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer {
forwardServer = true
}
// Targeting leader and this server is not current leader
if args.ServerID == "leader" && !m.srv.IsLeader() {
forwardServer = true
}
if forwardServer {
m.forwardMonitorServer(conn, args, encoder, decoder)
return
}
// NodeID was empty, so monitor this current server
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{
Level: logLevel,
JSONFormat: args.LogJSON,
})
frames := make(chan *sframer.StreamFrame, 32)
errCh := make(chan error)
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)
framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
framer.Run()
defer framer.Destroy()
// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe explicitly closed, exit
cancel()
return
}
select {
case <-ctx.Done():
return
}
}()
logCh := monitor.Start()
defer monitor.Stop()
initialOffset := int64(0)
// receive logs and build frames
go func() {
defer framer.Destroy()
LOOP:
for {
select {
case log := <-logCh:
if err := framer.Send("", "log", log, initialOffset); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
}()
var streamErr error
OUTER:
for {
select {
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
// There was a pending error!
default:
// No error, continue on
}
break OUTER
}
var resp cstructs.StreamErrWrapper
if args.PlainText {
resp.Payload = frame.Data
} else {
if err := frameCodec.Encode(frame); err != nil {
streamErr = err
break OUTER
}
resp.Payload = buf.Bytes()
buf.Reset()
}
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
case <-ctx.Done():
break OUTER
}
}
if streamErr != nil {
handleStreamResultError(streamErr, helper.Int64ToPtr(500), encoder)
return
}
}
func (m *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
nodeID := args.NodeID
snap, err := m.srv.State().Snapshot()
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
node, err := snap.NodeByID(nil, nodeID)
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
if node == nil {
err := fmt.Errorf("Unknown node %q", nodeID)
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
if err := nodeSupportsRpc(node); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
// Get the Connection to the client either by fowarding to another server
// or creating direct stream
var clientConn net.Conn
state, ok := m.srv.getNodeConn(nodeID)
if !ok {
// Determine the server that has a connection to the node
srv, err := m.srv.serverWithNodeConn(nodeID, m.srv.Region())
if err != nil {
var code *int64
if structs.IsErrNoNodeConn(err) {
code = helper.Int64ToPtr(404)
}
handleStreamResultError(err, code, encoder)
return
}
conn, err := m.srv.streamingRpc(srv, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
clientConn = conn
} else {
stream, err := NodeStreamingRpc(state.Session, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, nil, encoder)
return
}
clientConn = stream
}
defer clientConn.Close()
// Send the Request
outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
handleStreamResultError(err, nil, encoder)
return
}
structs.Bridge(conn, clientConn)
return
}
func (m *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
var target *serverParts
serverID := args.ServerID
// empty ServerID to prevent forwarding loop
args.ServerID = ""
if serverID == "leader" {
isLeader, remoteServer := m.srv.getLeader()
if !isLeader && remoteServer != nil {
target = remoteServer
}
if !isLeader && remoteServer == nil {
handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder)
return
}
} else {
// See if the server ID is a known member
serfMembers := m.srv.Members()
for _, mem := range serfMembers {
if mem.Name == serverID {
if ok, srv := isNomadServer(mem); ok {
target = srv
}
}
}
}
// Unable to find a server
if target == nil {
err := fmt.Errorf("unknown nomad server %s", serverID)
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
serverConn, err := m.srv.streamingRpc(target, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
defer serverConn.Close()
// Send the Request
outEncoder := codec.NewEncoder(serverConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
structs.Bridge(conn, serverConn)
return
}