c463479848
properly wire up durable event count move newline responsibility moves newline creation from NDJson to the http handler, json stream only encodes and sends now ignore snapshot restore if broker is disabled enable dev mode to access event steam without acl use mapping instead of switch use pointers for config sizes, remove unused ttl, simplify closed conn logic
161 lines
3.9 KiB
Go
161 lines
3.9 KiB
Go
package agent
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/docker/docker/pkg/ioutils"
|
|
"github.com/hashicorp/go-msgpack/codec"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
query := req.URL.Query()
|
|
|
|
indexStr := query.Get("index")
|
|
if indexStr == "" {
|
|
indexStr = "0"
|
|
}
|
|
index, err := strconv.Atoi(indexStr)
|
|
if err != nil {
|
|
return nil, CodedError(400, fmt.Sprintf("Unable to parse index: %v", err))
|
|
}
|
|
|
|
topics, err := parseEventTopics(query)
|
|
if err != nil {
|
|
return nil, CodedError(400, fmt.Sprintf("Invalid topic query: %v", err))
|
|
}
|
|
|
|
args := &structs.EventStreamRequest{
|
|
Topics: topics,
|
|
Index: index,
|
|
}
|
|
resp.Header().Set("Content-Type", "application/json")
|
|
resp.Header().Set("Cache-Control", "no-cache")
|
|
|
|
// Set region, namespace and authtoken to args
|
|
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
|
|
|
// Determine the RPC handler to use to find a server
|
|
var handler structs.StreamingRpcHandler
|
|
var handlerErr error
|
|
if server := s.agent.Server(); server != nil {
|
|
handler, handlerErr = server.StreamingRpcHandler("Event.Stream")
|
|
} else if client := s.agent.Client(); client != nil {
|
|
handler, handlerErr = client.RemoteStreamingRpcHandler("Event.Stream")
|
|
} else {
|
|
handlerErr = fmt.Errorf("misconfigured connection")
|
|
}
|
|
|
|
if handlerErr != nil {
|
|
return nil, CodedError(500, handlerErr.Error())
|
|
}
|
|
|
|
httpPipe, handlerPipe := net.Pipe()
|
|
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
|
|
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
|
|
|
|
// Create a goroutine that closes the pipe if the connection closes
|
|
ctx, cancel := context.WithCancel(req.Context())
|
|
defer cancel()
|
|
go func() {
|
|
<-ctx.Done()
|
|
httpPipe.Close()
|
|
}()
|
|
|
|
// Create an output that gets flushed on every write
|
|
output := ioutils.NewWriteFlusher(resp)
|
|
|
|
// send request and decode events
|
|
errs, errCtx := errgroup.WithContext(ctx)
|
|
errs.Go(func() error {
|
|
defer cancel()
|
|
|
|
// Send the request
|
|
if err := encoder.Encode(args); err != nil {
|
|
return CodedError(500, err.Error())
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-errCtx.Done():
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
// Decode the response
|
|
var res structs.EventStreamWrapper
|
|
if err := decoder.Decode(&res); err != nil {
|
|
return CodedError(500, err.Error())
|
|
}
|
|
decoder.Reset(httpPipe)
|
|
|
|
if err := res.Error; err != nil {
|
|
if err.Code != nil {
|
|
return CodedError(int(*err.Code), err.Error())
|
|
}
|
|
}
|
|
|
|
// Flush json entry to response
|
|
if _, err := io.Copy(output, bytes.NewReader(res.Event.Data)); err != nil {
|
|
return CodedError(500, err.Error())
|
|
}
|
|
// Each entry is its own new line according to ndjson.org
|
|
// append new line to each entry
|
|
fmt.Fprint(output, "\n")
|
|
}
|
|
})
|
|
|
|
// invoke handler
|
|
handler(handlerPipe)
|
|
cancel()
|
|
|
|
codedErr := errs.Wait()
|
|
if codedErr != nil && strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error()) {
|
|
codedErr = nil
|
|
}
|
|
|
|
return nil, codedErr
|
|
}
|
|
|
|
func parseEventTopics(query url.Values) (map[structs.Topic][]string, error) {
|
|
raw, ok := query["topic"]
|
|
if !ok {
|
|
return allTopics(), nil
|
|
}
|
|
topics := make(map[structs.Topic][]string)
|
|
|
|
for _, topic := range raw {
|
|
k, v, err := parseTopic(topic)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing topics: %w", err)
|
|
}
|
|
|
|
topics[structs.Topic(k)] = append(topics[structs.Topic(k)], v)
|
|
}
|
|
return topics, nil
|
|
}
|
|
|
|
func parseTopic(topic string) (string, string, error) {
|
|
parts := strings.Split(topic, ":")
|
|
// infer wildcard if only given a topic
|
|
if len(parts) == 1 {
|
|
return topic, "*", nil
|
|
} else if len(parts) != 2 {
|
|
return "", "", fmt.Errorf("Invalid key value pair for topic, topic: %s", topic)
|
|
}
|
|
return parts[0], parts[1], nil
|
|
}
|
|
|
|
func allTopics() map[structs.Topic][]string {
|
|
return map[structs.Topic][]string{"*": {"*"}}
|
|
}
|