6677a103c2
This changeset configures the RPC rate metrics that were added in #15515 to all the RPCs that support authenticated HTTP API requests. These endpoints already configured with pre-forwarding authentication in #15870, and a handful of others were done already as part of the proof-of-concept work. So this changeset is entirely copy-and-pasting one method call into a whole mess of handlers. Upcoming PRs will wire up pre-forwarding auth and rate metrics for the remaining set of RPCs that have no API consumers or aren't authenticated, in smaller chunks that can be more thoughtfully reviewed.
214 lines
4.8 KiB
Go
214 lines
4.8 KiB
Go
package nomad
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"io/ioutil"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-msgpack/codec"
|
|
|
|
"github.com/hashicorp/nomad/helper/pointer"
|
|
"github.com/hashicorp/nomad/nomad/stream"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
type Event struct {
|
|
srv *Server
|
|
}
|
|
|
|
func NewEventEndpoint(srv *Server) *Event {
|
|
return &Event{srv: srv}
|
|
}
|
|
|
|
func (e *Event) register() {
|
|
e.srv.streamingRpcs.Register("Event.Stream", e.stream)
|
|
}
|
|
|
|
func (e *Event) stream(conn io.ReadWriteCloser) {
|
|
defer conn.Close()
|
|
|
|
var args structs.EventStreamRequest
|
|
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
|
|
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
|
|
|
|
if err := decoder.Decode(&args); err != nil {
|
|
handleJsonResultError(err, pointer.Of(int64(500)), encoder)
|
|
return
|
|
}
|
|
|
|
authErr := e.srv.Authenticate(nil, &args)
|
|
|
|
// forward to appropriate region
|
|
if args.Region != e.srv.config.Region {
|
|
err := e.forwardStreamingRPC(args.Region, "Event.Stream", args, conn)
|
|
if err != nil {
|
|
handleJsonResultError(err, pointer.Of(int64(500)), encoder)
|
|
}
|
|
return
|
|
}
|
|
|
|
e.srv.MeasureRPCRate("event", structs.RateMetricRead, &args)
|
|
if authErr != nil {
|
|
handleJsonResultError(structs.ErrPermissionDenied, pointer.Of(int64(403)), encoder)
|
|
}
|
|
|
|
// Generate the subscription request
|
|
subReq := &stream.SubscribeRequest{
|
|
Token: args.AuthToken,
|
|
Topics: args.Topics,
|
|
Index: uint64(args.Index),
|
|
Namespace: args.Namespace,
|
|
}
|
|
|
|
// Get the servers broker and subscribe
|
|
publisher, err := e.srv.State().EventBroker()
|
|
if err != nil {
|
|
handleJsonResultError(err, pointer.Of(int64(500)), encoder)
|
|
return
|
|
}
|
|
|
|
// start subscription to publisher
|
|
var subscription *stream.Subscription
|
|
var subErr error
|
|
|
|
// Track whether the ACL token being used has an expiry time.
|
|
var expiryTime *time.Time
|
|
|
|
// Check required ACL permissions for requested Topics
|
|
if e.srv.config.ACLEnabled {
|
|
subscription, expiryTime, subErr = publisher.SubscribeWithACLCheck(subReq)
|
|
} else {
|
|
subscription, subErr = publisher.Subscribe(subReq)
|
|
}
|
|
if subErr != nil {
|
|
handleJsonResultError(subErr, pointer.Of(int64(500)), encoder)
|
|
return
|
|
}
|
|
defer subscription.Unsubscribe()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
// goroutine to detect remote side closing
|
|
go func() {
|
|
io.Copy(ioutil.Discard, conn)
|
|
cancel()
|
|
}()
|
|
|
|
jsonStream := stream.NewJsonStream(ctx, 30*time.Second)
|
|
errCh := make(chan error)
|
|
go func() {
|
|
defer cancel()
|
|
for {
|
|
events, err := subscription.Next(ctx)
|
|
if err != nil {
|
|
select {
|
|
case errCh <- err:
|
|
case <-ctx.Done():
|
|
}
|
|
return
|
|
}
|
|
|
|
// Ensure the token being used is not expired before we any events
|
|
// to subscribers.
|
|
if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
|
|
select {
|
|
case errCh <- structs.ErrTokenExpired:
|
|
case <-ctx.Done():
|
|
}
|
|
return
|
|
}
|
|
|
|
// Continue if there are no events
|
|
if len(events.Events) == 0 {
|
|
continue
|
|
}
|
|
|
|
if err := jsonStream.Send(events); err != nil {
|
|
select {
|
|
case errCh <- err:
|
|
case <-ctx.Done():
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
var streamErr error
|
|
OUTER:
|
|
for {
|
|
select {
|
|
case streamErr = <-errCh:
|
|
break OUTER
|
|
case <-ctx.Done():
|
|
break OUTER
|
|
case eventJSON, ok := <-jsonStream.OutCh():
|
|
// check if ndjson may have been closed when an error occurred,
|
|
// check once more for an error.
|
|
if !ok {
|
|
select {
|
|
case streamErr = <-errCh:
|
|
// There was a pending error
|
|
default:
|
|
}
|
|
break OUTER
|
|
}
|
|
|
|
var resp structs.EventStreamWrapper
|
|
resp.Event = eventJSON
|
|
|
|
if err := encoder.Encode(resp); err != nil {
|
|
streamErr = err
|
|
break OUTER
|
|
}
|
|
encoder.Reset(conn)
|
|
}
|
|
|
|
}
|
|
|
|
if streamErr != nil {
|
|
handleJsonResultError(streamErr, pointer.Of(int64(500)), encoder)
|
|
return
|
|
}
|
|
|
|
}
|
|
|
|
func (e *Event) forwardStreamingRPC(region string, method string, args interface{}, in io.ReadWriteCloser) error {
|
|
server, err := e.srv.findRegionServer(region)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return e.forwardStreamingRPCToServer(server, method, args, in)
|
|
}
|
|
|
|
func (e *Event) forwardStreamingRPCToServer(server *serverParts, method string, args interface{}, in io.ReadWriteCloser) error {
|
|
srvConn, err := e.srv.streamingRpc(server, method)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer srvConn.Close()
|
|
|
|
outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle)
|
|
if err := outEncoder.Encode(args); err != nil {
|
|
return err
|
|
}
|
|
|
|
structs.Bridge(in, srvConn)
|
|
return nil
|
|
}
|
|
|
|
// handleJsonResultError is a helper for sending an error with a potential
|
|
// error code. The transmission of the error is ignored if the error has been
|
|
// generated by the closing of the underlying transport.
|
|
func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) {
|
|
// Nothing to do as the conn is closed
|
|
if err == io.EOF {
|
|
return
|
|
}
|
|
|
|
encoder.Encode(&structs.EventStreamWrapper{
|
|
Error: structs.NewRpcError(err, code),
|
|
})
|
|
}
|