open-nomad/nomad/event_endpoint.go
Drew Bailey a4a2975edf
Event Stream API/RPC (#8947)
This Commit adds an /v1/events/stream endpoint to stream events from.

The stream framer has been updated to include a SendFull method which
does not fragment the data between multiple frames. This essentially
treats the stream framer as a envelope to adhere to the stream framer
interface in the UI.

If the `encode` query parameter is omitted events will be streamed as
newline delimted JSON.
2020-10-14 12:44:36 -04:00

214 lines
4.8 KiB
Go

package nomad
import (
"context"
"io"
"time"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
type Event struct {
srv *Server
}
func (e *Event) register() {
e.srv.streamingRpcs.Register("Event.Stream", e.stream)
}
func (e *Event) stream(conn io.ReadWriteCloser) {
defer conn.Close()
var args structs.EventStreamRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
return
}
// forward to appropriate region
if args.Region != e.srv.config.Region {
err := e.forwardStreamingRPC(args.Region, "Event.Stream", args, conn)
if err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
}
return
}
// ACL check
// TODO(drew) ACL checks need to be per topic
// All Events Management
// System Events Management
// Node Events NamespaceCapabilityReadEvents
// Job/Alloc Events NamespaceCapabilityReadEvents
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
handleJsonResultError(err, nil, encoder)
return
} else if aclObj != nil && !aclObj.IsManagement() {
handleJsonResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder)
return
}
// TODO(drew) handle streams without ACLS
reqToken := args.AuthToken
if reqToken == "" {
// generate a random request token
reqToken = uuid.Generate()
}
subReq := &stream.SubscribeRequest{
Token: reqToken,
Topics: args.Topics,
Index: uint64(args.Index),
}
publisher, err := e.srv.State().EventPublisher()
if err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// start subscription to publisher
subscription, err := publisher.Subscribe(subReq)
if err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
return
}
defer subscription.Unsubscribe()
ndJsonCh := make(chan *stream.NDJson)
errCh := make(chan error)
jsonStream := stream.NewNDJsonStream(ndJsonCh, 30*time.Second)
jsonStream.Run(ctx)
// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe explicitly closed, exit
cancel()
return
}
select {
case <-errCh:
case <-ctx.Done():
return
}
}()
go func() {
defer cancel()
LOOP:
for {
events, err := subscription.Next(ctx)
if err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
// Continue if there are no events
if events == nil {
continue
}
// Send each event as its own frame
for _, e := range events {
if err := jsonStream.Send(e); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
}
}
}()
var streamErr error
OUTER:
for {
select {
case streamErr = <-errCh:
break OUTER
case <-ctx.Done():
break OUTER
case eventJSON, ok := <-ndJsonCh:
// check if ndjson may have been closed when an error occurred,
// check once more for an error.
if !ok {
select {
case streamErr = <-errCh:
// There was a pending error
default:
}
break OUTER
}
var resp structs.EventStreamWrapper
resp.Event = eventJSON
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
}
}
if streamErr != nil {
handleJsonResultError(streamErr, helper.Int64ToPtr(500), encoder)
return
}
}
func (e *Event) forwardStreamingRPC(region string, method string, args interface{}, in io.ReadWriteCloser) error {
server, err := e.srv.findRegionServer(region)
if err != nil {
return err
}
return e.forwardStreamingRPCToServer(server, method, args, in)
}
func (e *Event) forwardStreamingRPCToServer(server *serverParts, method string, args interface{}, in io.ReadWriteCloser) error {
srvConn, err := e.srv.streamingRpc(server, method)
if err != nil {
return err
}
defer srvConn.Close()
outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
return err
}
structs.Bridge(in, srvConn)
return nil
}
// handleJsonResultError is a helper for sending an error with a potential
// error code. The transmission of the error is ignored if the error has been
// generated by the closing of the underlying transport.
func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) {
// Nothing to do as the conn is closed
if err == io.EOF {
return
}
encoder.Encode(&structs.EventStreamWrapper{
Error: structs.NewRpcError(err, code),
})
}