open-nomad/nomad/event_endpoint.go
Tim Gross f61f801e77
provide RPCContext to all RPC handlers (#15430)
Upcoming work to instrument the rate of RPC requests by consumer (and eventually
rate limit) requires that we thread the `RPCContext` through all RPC
handlers so that we can access the underlying connection. This changeset adds
the context to everywhere we intend to initially support it and intentionally
excludes streaming RPCs and client RPCs.

To improve the ergonomics of adding the context everywhere its needed and to
clarify the requirements of dynamic vs static handlers, I've also done a good
bit of refactoring here:

* canonicalized the RPC handler fields so they're as close to identical as
  possible without introducing unused fields (i.e. I didn't add loggers if the
  handler doesn't use them already).
* canonicalized the imports in the handler files.
* added a `NewExampleEndpoint` function for each handler that ensures we're
  constructing the handlers with the required arguments.
* reordered the registration in server.go to match the order of the files (to
  make it easier to see if we've missed one), and added a bunch of commentary
  there as to what the difference between static and dynamic handlers is.
2022-12-01 10:05:15 -05:00

207 lines
4.6 KiB
Go

package nomad
import (
"context"
"io"
"io/ioutil"
"time"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
type Event struct {
srv *Server
}
func NewEventEndpoint(srv *Server) *Event {
return &Event{srv: srv}
}
func (e *Event) register() {
e.srv.streamingRpcs.Register("Event.Stream", e.stream)
}
func (e *Event) stream(conn io.ReadWriteCloser) {
defer conn.Close()
var args structs.EventStreamRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
handleJsonResultError(err, pointer.Of(int64(500)), encoder)
return
}
// forward to appropriate region
if args.Region != e.srv.config.Region {
err := e.forwardStreamingRPC(args.Region, "Event.Stream", args, conn)
if err != nil {
handleJsonResultError(err, pointer.Of(int64(500)), encoder)
}
return
}
// Generate the subscription request
subReq := &stream.SubscribeRequest{
Token: args.AuthToken,
Topics: args.Topics,
Index: uint64(args.Index),
Namespace: args.Namespace,
}
// Get the servers broker and subscribe
publisher, err := e.srv.State().EventBroker()
if err != nil {
handleJsonResultError(err, pointer.Of(int64(500)), encoder)
return
}
// start subscription to publisher
var subscription *stream.Subscription
var subErr error
// Track whether the ACL token being used has an expiry time.
var expiryTime *time.Time
// Check required ACL permissions for requested Topics
if e.srv.config.ACLEnabled {
subscription, expiryTime, subErr = publisher.SubscribeWithACLCheck(subReq)
} else {
subscription, subErr = publisher.Subscribe(subReq)
}
if subErr != nil {
handleJsonResultError(subErr, pointer.Of(int64(500)), encoder)
return
}
defer subscription.Unsubscribe()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// goroutine to detect remote side closing
go func() {
io.Copy(ioutil.Discard, conn)
cancel()
}()
jsonStream := stream.NewJsonStream(ctx, 30*time.Second)
errCh := make(chan error)
go func() {
defer cancel()
for {
events, err := subscription.Next(ctx)
if err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
return
}
// Ensure the token being used is not expired before we any events
// to subscribers.
if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
select {
case errCh <- structs.ErrTokenExpired:
case <-ctx.Done():
}
return
}
// Continue if there are no events
if len(events.Events) == 0 {
continue
}
if err := jsonStream.Send(events); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
return
}
}
}()
var streamErr error
OUTER:
for {
select {
case streamErr = <-errCh:
break OUTER
case <-ctx.Done():
break OUTER
case eventJSON, ok := <-jsonStream.OutCh():
// check if ndjson may have been closed when an error occurred,
// check once more for an error.
if !ok {
select {
case streamErr = <-errCh:
// There was a pending error
default:
}
break OUTER
}
var resp structs.EventStreamWrapper
resp.Event = eventJSON
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
}
}
if streamErr != nil {
handleJsonResultError(streamErr, pointer.Of(int64(500)), encoder)
return
}
}
func (e *Event) forwardStreamingRPC(region string, method string, args interface{}, in io.ReadWriteCloser) error {
server, err := e.srv.findRegionServer(region)
if err != nil {
return err
}
return e.forwardStreamingRPCToServer(server, method, args, in)
}
func (e *Event) forwardStreamingRPCToServer(server *serverParts, method string, args interface{}, in io.ReadWriteCloser) error {
srvConn, err := e.srv.streamingRpc(server, method)
if err != nil {
return err
}
defer srvConn.Close()
outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
return err
}
structs.Bridge(in, srvConn)
return nil
}
// handleJsonResultError is a helper for sending an error with a potential
// error code. The transmission of the error is ignored if the error has been
// generated by the closing of the underlying transport.
func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) {
// Nothing to do as the conn is closed
if err == io.EOF {
return
}
encoder.Encode(&structs.EventStreamWrapper{
Error: structs.NewRpcError(err, code),
})
}