open-nomad/nomad/event_endpoint.go
Drew Bailey c463479848
filter on additional filter keys, remove switch statement duplication
properly wire up durable event count

move newline responsibility

moves newline creation from NDJson to the http handler, json stream only encodes and sends now

ignore snapshot restore if broker is disabled

enable dev mode to access event steam without acl

use mapping instead of switch

use pointers for config sizes, remove unused ttl, simplify closed conn logic
2020-10-14 14:14:33 -04:00

243 lines
5.4 KiB
Go

package nomad
import (
"context"
"fmt"
"io"
"io/ioutil"
"time"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
type Event struct {
srv *Server
}
func (e *Event) register() {
e.srv.streamingRpcs.Register("Event.Stream", e.stream)
}
func (e *Event) stream(conn io.ReadWriteCloser) {
defer conn.Close()
var args structs.EventStreamRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := decoder.Decode(&args); err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
return
}
// forward to appropriate region
if args.Region != e.srv.config.Region {
err := e.forwardStreamingRPC(args.Region, "Event.Stream", args, conn)
if err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
}
return
}
aclObj, err := e.srv.ResolveToken(args.AuthToken)
if err != nil {
handleJsonResultError(err, nil, encoder)
return
}
subReq := &stream.SubscribeRequest{
Token: args.AuthToken,
Topics: args.Topics,
Index: uint64(args.Index),
Namespace: args.Namespace,
}
// Check required ACL permissions for requested Topics
if aclObj != nil {
if err := aclCheckForEvents(subReq, aclObj); err != nil {
handleJsonResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder)
return
}
}
// Get the servers broker and subscribe
publisher, err := e.srv.State().EventBroker()
if err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// start subscription to publisher
subscription, err := publisher.Subscribe(subReq)
if err != nil {
handleJsonResultError(err, helper.Int64ToPtr(500), encoder)
return
}
defer subscription.Unsubscribe()
errCh := make(chan error)
jsonStream := stream.NewJsonStream(ctx, 30*time.Second)
// goroutine to detect remote side closing
go func() {
io.Copy(ioutil.Discard, conn)
cancel()
}()
go func() {
defer cancel()
for {
events, err := subscription.Next(ctx)
if err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
return
}
// Continue if there are no events
if len(events.Events) == 0 {
continue
}
if err := jsonStream.Send(events); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
return
}
}
}()
var streamErr error
OUTER:
for {
select {
case streamErr = <-errCh:
break OUTER
case <-ctx.Done():
break OUTER
case eventJSON, ok := <-jsonStream.OutCh():
// check if ndjson may have been closed when an error occurred,
// check once more for an error.
if !ok {
select {
case streamErr = <-errCh:
// There was a pending error
default:
}
break OUTER
}
var resp structs.EventStreamWrapper
resp.Event = eventJSON
if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
}
}
if streamErr != nil {
handleJsonResultError(streamErr, helper.Int64ToPtr(500), encoder)
return
}
}
func (e *Event) forwardStreamingRPC(region string, method string, args interface{}, in io.ReadWriteCloser) error {
server, err := e.srv.findRegionServer(region)
if err != nil {
return err
}
return e.forwardStreamingRPCToServer(server, method, args, in)
}
func (e *Event) forwardStreamingRPCToServer(server *serverParts, method string, args interface{}, in io.ReadWriteCloser) error {
srvConn, err := e.srv.streamingRpc(server, method)
if err != nil {
return err
}
defer srvConn.Close()
outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
return err
}
structs.Bridge(in, srvConn)
return nil
}
// handleJsonResultError is a helper for sending an error with a potential
// error code. The transmission of the error is ignored if the error has been
// generated by the closing of the underlying transport.
func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) {
// Nothing to do as the conn is closed
if err == io.EOF {
return
}
encoder.Encode(&structs.EventStreamWrapper{
Error: structs.NewRpcError(err, code),
})
}
func aclCheckForEvents(subReq *stream.SubscribeRequest, aclObj *acl.ACL) error {
if len(subReq.Topics) == 0 {
return fmt.Errorf("invalid topic request")
}
reqPolicies := make(map[string]struct{})
var required = struct{}{}
for topic := range subReq.Topics {
switch topic {
case structs.TopicDeployment, structs.TopicEval,
structs.TopicAlloc, structs.TopicJob:
if _, ok := reqPolicies[acl.NamespaceCapabilityReadJob]; !ok {
reqPolicies[acl.NamespaceCapabilityReadJob] = required
}
case structs.TopicNode:
reqPolicies["node-read"] = required
case structs.TopicAll:
reqPolicies["management"] = required
default:
return fmt.Errorf("unknown topic %s", topic)
}
}
for checks := range reqPolicies {
switch checks {
case acl.NamespaceCapabilityReadJob:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
return structs.ErrPermissionDenied
}
case "node-read":
if ok := aclObj.AllowNodeRead(); !ok {
return structs.ErrPermissionDenied
}
case "management":
if ok := aclObj.IsManagement(); !ok {
return structs.ErrPermissionDenied
}
}
}
return nil
}