event sink crud operation api (#9155)

* network sink rpc/api plumbing

state store methods and restore

upsert sink test

get sink

delete sink

event sink list and tests

go generate new msg types

validate sink on upsert

* go generate
This commit is contained in:
Drew Bailey 2020-10-23 14:23:00 -04:00 committed by GitHub
parent dc5d16bd9e
commit 1ae39a9ed9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 915 additions and 72 deletions

View File

@ -17,6 +17,103 @@ import (
"golang.org/x/sync/errgroup"
)
func (s *HTTPServer) EventSinksRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
}
args := structs.EventSinkListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.EventSinkListResponse
if err := s.agent.RPC("Event.ListSinks", &args, &out); err != nil {
return nil, err
}
if out.Sinks == nil {
out.Sinks = make([]*structs.EventSink, 0)
}
setMeta(resp, &out.QueryMeta)
return out.Sinks, nil
}
func (s *HTTPServer) EventSinkSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
name := strings.TrimPrefix(req.URL.Path, "/v1/event/sink/")
if len(name) == 0 {
return nil, CodedError(400, "Missing Policy Name")
}
switch req.Method {
case http.MethodGet:
return s.eventSinkGet(resp, req, name)
case http.MethodPost, http.MethodPut:
return s.eventSinkUpdate(resp, req, name)
case http.MethodDelete:
return s.eventSinkDelete(resp, req, name)
default:
return nil, CodedError(405, ErrInvalidMethod)
}
}
func (s *HTTPServer) eventSinkGet(resp http.ResponseWriter, req *http.Request, sink string) (interface{}, error) {
args := structs.EventSinkSpecificRequest{
ID: sink,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.EventSinkResponse
if err := s.agent.RPC("Event.GetSink", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
if out.Sink == nil {
return nil, CodedError(404, "event sink not found")
}
return out.Sink, nil
}
func (s *HTTPServer) eventSinkUpdate(resp http.ResponseWriter, req *http.Request, sinkName string) (interface{}, error) {
var sink structs.EventSink
if err := decodeBody(req, &sink); err != nil {
return nil, CodedError(500, err.Error())
}
if sink.ID != sinkName {
return nil, CodedError(400, "Event sink name does not match request path")
}
args := structs.EventSinkUpsertRequest{
Sink: &sink,
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.GenericResponse
if err := s.agent.RPC("Event.UpsertSink", &args, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return nil, nil
}
func (s *HTTPServer) eventSinkDelete(resp http.ResponseWriter, req *http.Request, sink string) (interface{}, error) {
args := structs.EventSinkDeleteRequest{
IDs: []string{sink},
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.GenericResponse
if err := s.agent.RPC("Event.DeleteSink", &args, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return nil, nil
}
func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
query := req.URL.Query()

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
@ -22,6 +23,95 @@ type testEvent struct {
ID string
}
func TestHTTP_EventSinkList(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()
s2 := mock.EventSink()
require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))
require.NoError(t, s.Agent.server.State().UpsertEventSink(1001, s2))
req, err := http.NewRequest("GET", "/v1/event/sinks", nil)
require.NoError(t, err)
respW := httptest.NewRecorder()
obj, err := s.Server.EventSinksRequest(respW, req)
require.NoError(t, err)
require.Equal(t, "1001", respW.HeaderMap.Get("X-Nomad-Index"))
n := obj.([]*structs.EventSink)
require.Len(t, n, 2)
})
}
func TestHTTP_EventSinkGet(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()
require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))
req, err := http.NewRequest("GET", "/v1/event/sink/"+s1.ID, nil)
require.NoError(t, err)
respW := httptest.NewRecorder()
obj, err := s.Server.EventSinkSpecificRequest(respW, req)
require.NoError(t, err)
require.Equal(t, "1000", respW.HeaderMap.Get("X-Nomad-Index"))
n := obj.(*structs.EventSink)
require.Equal(t, s1, n)
})
}
func TestHTTP_EventSinkUpsert(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()
buf := encodeReq(s1)
req, err := http.NewRequest("POST", "/v1/event/sink/"+s1.ID, buf)
require.NoError(t, err)
respW := httptest.NewRecorder()
_, err = s.Server.EventSinkSpecificRequest(respW, req)
require.NoError(t, err)
require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-Index"))
state := s.Agent.server.State()
out, err := state.EventSinkByID(nil, s1.ID)
require.NoError(t, err)
require.Equal(t, s1.Address, out.Address)
require.Equal(t, s1.ID, out.ID)
})
}
func TestHTTP_EventSinkDelete(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()
require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))
req, err := http.NewRequest("DELETE", "/v1/event/sink/"+s1.ID, nil)
require.NoError(t, err)
respW := httptest.NewRecorder()
_, err = s.Server.EventSinkSpecificRequest(respW, req)
require.NoError(t, err)
require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-Index"))
state := s.Agent.server.State()
out, err := state.EventSinkByID(nil, s1.ID)
require.NoError(t, err)
require.Nil(t, out)
})
}
func TestEventStream(t *testing.T) {
t.Parallel()

View File

@ -327,6 +327,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/operator/scheduler/configuration", s.wrap(s.OperatorSchedulerConfiguration))
s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream))
s.mux.HandleFunc("/v1/event/sinks", s.wrap(s.EventSinksRequest))
s.mux.HandleFunc("/v1/event/sink/", s.wrap(s.EventSinkSpecificRequest))
s.mux.HandleFunc("/v1/namespaces", s.wrap(s.NamespacesRequest))
s.mux.HandleFunc("/v1/namespace", s.wrap(s.NamespaceCreateRequest))

View File

@ -45,6 +45,8 @@ var msgTypeNames = map[structs.MessageType]string{
structs.ScalingEventRegisterRequestType: "ScalingEventRegisterRequestType",
structs.CSIVolumeClaimBatchRequestType: "CSIVolumeClaimBatchRequestType",
structs.CSIPluginDeleteRequestType: "CSIPluginDeleteRequestType",
structs.EventSinkUpsertRequestType: "EventSinkUpsertRequestType",
structs.EventSinkDeleteRequestType: "EventSinkDeleteRequestType",
structs.NamespaceUpsertRequestType: "NamespaceUpsertRequestType",
structs.NamespaceDeleteRequestType: "NamespaceDeleteRequestType",
}

View File

@ -7,9 +7,12 @@ import (
"io/ioutil"
"time"
metrics "github.com/armon/go-metrics"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -22,6 +25,150 @@ func (e *Event) register() {
e.srv.streamingRpcs.Register("Event.Stream", e.stream)
}
// ListSinks is used to list the event sinks registered in Nomad
func (e *Event) ListSinks(args *structs.EventSinkListRequest, reply *structs.EventSinkListResponse) error {
if done, err := e.srv.forward("Event.ListSinks", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "list_sinks"}, time.Now())
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowOperatorRead() {
return structs.ErrPermissionDenied
}
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
iter, err := state.EventSinks(ws)
if err != nil {
return err
}
var sinks []*structs.EventSink
for {
raw := iter.Next()
if raw == nil {
break
}
sink := raw.(*structs.EventSink)
sinks = append(sinks, sink)
}
reply.Sinks = sinks
index, err := state.Index("event_sink")
if err != nil {
return err
}
// Ensure we never set the index to zero, otherwise a blocking query cannot be used.
// We floor the index at one, since realistically the first write must have a higher index.
if index == 0 {
index = 1
}
reply.Index = index
return nil
},
}
return e.srv.blockingRPC(&opts)
}
// UpsertSink is used to create or update an event sink
func (e *Event) UpsertSink(args *structs.EventSinkUpsertRequest, reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Event.UpsertSink", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "upsert_sink"}, time.Now())
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.IsManagement() {
return structs.ErrPermissionDenied
}
if err := args.Sink.Validate(); err != nil {
return err
}
// Update via Raft
_, index, err := e.srv.raftApply(structs.EventSinkUpsertRequestType, args)
if err != nil {
return err
}
reply.Index = index
return nil
}
// GetSink returns the requested event sink
func (e *Event) GetSink(args *structs.EventSinkSpecificRequest, reply *structs.EventSinkResponse) error {
if done, err := e.srv.forward("Event.GetSink", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "get_sink"}, time.Now())
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowOperatorRead() {
return structs.ErrPermissionDenied
}
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
s, err := state.EventSinkByID(ws, args.ID)
if err != nil {
return nil
}
reply.Sink = s
index, err := state.Index("event_sink")
if err != nil {
return err
}
if index == 0 {
index = 1
}
reply.Index = index
return nil
},
}
return e.srv.blockingRPC(&opts)
}
// DeleteSink deletes an event sink
func (e *Event) DeleteSink(args *structs.EventSinkDeleteRequest, reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Event.DeleteSink", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "delete_sink"}, time.Now())
if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.IsManagement() {
return structs.ErrPermissionDenied
}
// Update via Raft
_, index, err := e.srv.raftApply(structs.EventSinkDeleteRequestType, args)
if err != nil {
return err
}
reply.Index = index
return nil
}
func (e *Event) stream(conn io.ReadWriteCloser) {
defer conn.Close()

View File

@ -10,7 +10,9 @@ import (
"time"
"github.com/hashicorp/go-msgpack/codec"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
@ -508,3 +510,151 @@ func TestEventStream_ACL(t *testing.T) {
})
}
}
func TestEvent_UpsertSink(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
sink := mock.EventSink()
req := &structs.EventSinkUpsertRequest{
Sink: sink,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Event.UpsertSink", req, &resp))
require.NotEqual(t, 0, resp.Index)
// Check for the sink in the FSM
state := s1.fsm.State()
out, err := state.EventSinkByID(nil, sink.ID)
require.NoError(t, err)
// set the index so we can compare values
sink.CreateIndex = resp.Index
sink.ModifyIndex = resp.Index
require.EqualValues(t, sink, out)
}
func TestEvent_UpsertSink_Invalid(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
sink := &structs.EventSink{
Type: structs.SinkWebhook,
}
req := &structs.EventSinkUpsertRequest{
Sink: sink,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Event.UpsertSink", req, &resp)
require.Error(t, err)
require.Contains(t, err.Error(), "Missing sink ID")
require.Contains(t, err.Error(), "Webhook sink requires a valid Address")
}
func TestEvent_GetSink(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
sink := mock.EventSink()
require.NoError(t, s1.fsm.State().UpsertEventSink(1000, sink))
get := &structs.EventSinkSpecificRequest{
ID: sink.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var resp structs.EventSinkResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Event.GetSink", get, &resp))
require.EqualValues(t, 1000, resp.Index)
require.Equal(t, sink.ID, resp.Sink.ID)
// Query for a non-existent sink
get.ID = uuid.Generate()
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Event.GetSink", get, &resp))
require.EqualValues(t, 1000, resp.Index)
require.Nil(t, resp.Sink)
}
func TestEvent_DeleteSink(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
sink := mock.EventSink()
require.NoError(t, s1.fsm.State().UpsertEventSink(1000, sink))
get := &structs.EventSinkDeleteRequest{
IDs: []string{sink.ID},
WriteRequest: structs.WriteRequest{
Region: "global",
},
}
var resp structs.GenericResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Event.DeleteSink", get, &resp))
require.NotEqual(t, uint64(0), resp.Index)
state := s1.fsm.State()
out, err := state.EventSinkByID(nil, sink.ID)
require.NoError(t, err)
require.Nil(t, out)
}
func TestEvent_ListSinks(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
sink := mock.EventSink()
sink2 := mock.EventSink()
require.NoError(t, s1.fsm.State().UpsertEventSink(1000, sink))
require.NoError(t, s1.fsm.State().UpsertEventSink(1001, sink2))
get := &structs.EventSinkListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var resp structs.EventSinkListResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Event.ListSinks", get, &resp))
require.Len(t, resp.Sinks, 2)
}

View File

@ -52,7 +52,7 @@ const (
CSIPluginSnapshot SnapshotType = 17
CSIVolumeSnapshot SnapshotType = 18
ScalingEventsSnapshot SnapshotType = 19
EventSinkSnapshot SnapshotType = 20
// Namespace appliers were moved from enterprise and therefore start at 64
NamespaceSnapshot SnapshotType = 64
)
@ -293,6 +293,10 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyNamespaceUpsert(buf[1:], log.Index)
case structs.NamespaceDeleteRequestType:
return n.applyNamespaceDelete(buf[1:], log.Index)
case structs.EventSinkUpsertRequestType:
return n.applyUpsertEventSink(buf[1:], log.Index)
case structs.EventSinkDeleteRequestType:
return n.applyDeleteEventSink(buf[1:], log.Index)
}
// Check enterprise only message types.
@ -1301,6 +1305,35 @@ func (n *nomadFSM) applyNamespaceDelete(buf []byte, index uint64) interface{} {
if err := n.state.DeleteNamespaces(index, req.Namespaces); err != nil {
n.logger.Error("DeleteNamespaces failed", "error", err)
}
return nil
}
func (n *nomadFSM) applyUpsertEventSink(buf []byte, index uint64) interface{} {
var req structs.EventSinkUpsertRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_upsert_event_sink"}, time.Now())
if err := n.state.UpsertEventSink(index, req.Sink); err != nil {
n.logger.Error("UpsertEventSink failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyDeleteEventSink(buf []byte, index uint64) interface{} {
var req structs.EventSinkDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_delete_event_sink"}, time.Now())
if err := n.state.DeleteEventSinks(index, req.IDs); err != nil {
n.logger.Error("DeleteEventSink failed", "error", err)
return err
}
@ -1583,6 +1616,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}
case EventSinkSnapshot:
sink := new(structs.EventSink)
if err := dec.Decode(sink); err != nil {
return err
}
if err := restore.EventSinkRestore(sink); err != nil {
return err
}
default:
// Check if this is an enterprise only object being restored
restorer, ok := n.enterpriseRestorers[snapType]
@ -1900,6 +1943,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistEventSinks(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}
@ -2431,6 +2478,29 @@ func (s *nomadSnapshot) persistCSIVolumes(sink raft.SnapshotSink,
return nil
}
func (s *nomadSnapshot) persistEventSinks(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
sinks, err := s.snap.EventSinks(nil)
if err != nil {
return err
}
for {
raw := sinks.Next()
if raw == nil {
break
}
es := raw.(*structs.EventSink)
sink.Write([]byte{byte(EventSinkSnapshot)})
if err := encoder.Encode(es); err != nil {
return err
}
}
return nil
}
// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.

View File

@ -1516,3 +1516,11 @@ func Namespace() *structs.Namespace {
ns.SetHash()
return ns
}
func EventSink() *structs.EventSink {
return &structs.EventSink{
ID: fmt.Sprintf("webhook-sink-%s", uuid.Generate()[0:8]),
Type: structs.SinkWebhook,
Address: "http://127.0.0.1/",
}
}

View File

@ -1194,6 +1194,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
server.Register(s.staticEndpoints.FileSystem)
server.Register(s.staticEndpoints.Agent)
server.Register(s.staticEndpoints.Namespace)
server.Register(s.staticEndpoints.Event)
// Create new dynamic endpoints and add them to the RPC server.
node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")}

View File

@ -57,6 +57,7 @@ func init() {
scalingPolicyTableSchema,
scalingEventTableSchema,
namespaceTableSchema,
eventSinkTableSchema,
}...)
}
@ -930,3 +931,23 @@ func namespaceTableSchema() *memdb.TableSchema {
},
}
}
func eventSinkTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "event_sink",
Indexes: map[string]*memdb.IndexSchema{
// Primary index is used for event sink management and simple
// direct lookup. ID is required to be unique.
"id": {
Name: "id",
AllowMissing: false,
Unique: true,
// Sink ID is uniquely identifying
Indexer: &memdb.StringFieldIndex{
Field: "ID",
},
},
},
}
}

View File

@ -5857,6 +5857,81 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[
return nil, nil
}
func (s *StateStore) EventSinks(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
// Walk the entire event sink table
iter, err := txn.Get("event_sink", "id")
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}
func (s *StateStore) EventSinkByID(ws memdb.WatchSet, id string) (*structs.EventSink, error) {
txn := s.db.ReadTxn()
return s.eventSinkByIDTxn(ws, id, txn)
}
func (s *StateStore) eventSinkByIDTxn(ws memdb.WatchSet, id string, txn Txn) (*structs.EventSink, error) {
watchCh, existing, err := txn.FirstWatch("event_sink", "id", id)
if err != nil {
return nil, fmt.Errorf("event sink lookup failed: %w", err)
}
ws.Add(watchCh)
if existing != nil {
return existing.(*structs.EventSink), nil
}
return nil, nil
}
func (s *StateStore) UpsertEventSink(idx uint64, sink *structs.EventSink) error {
txn := s.db.WriteTxn(idx)
defer txn.Abort()
existing, err := txn.First("event_sink", "id", sink.ID)
if err != nil {
return fmt.Errorf("event sink lookup failed: %w", err)
}
if existing != nil {
sink.CreateIndex = existing.(*structs.EventSink).CreateIndex
sink.ModifyIndex = idx
} else {
sink.CreateIndex = idx
sink.ModifyIndex = idx
}
// Insert the sink
if err := txn.Insert("event_sink", sink); err != nil {
return fmt.Errorf("event sink insert failed: %w", err)
}
if err := txn.Insert("index", &IndexEntry{"event_sink", idx}); err != nil {
return fmt.Errorf("index update failed: %w", err)
}
return txn.Commit()
}
func (s *StateStore) DeleteEventSinks(idx uint64, sinks []string) error {
txn := s.db.WriteTxn(idx)
defer txn.Abort()
for _, id := range sinks {
if _, err := txn.DeleteAll("event_sink", "id", id); err != nil {
return fmt.Errorf("deleting event sink failed: %v", err)
}
}
if err := txn.Insert("index", &IndexEntry{"event_sink", idx}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return txn.Commit()
}
// StateSnapshot is used to provide a point-in-time snapshot
type StateSnapshot struct {
StateStore
@ -6118,3 +6193,10 @@ func (r *StateRestore) NamespaceRestore(ns *structs.Namespace) error {
}
return nil
}
func (r *StateRestore) EventSinkRestore(sink *structs.EventSink) error {
if err := r.txn.Insert("event_sink", sink); err != nil {
return fmt.Errorf("event sink insert failed: %v", err)
}
return nil
}

View File

@ -9553,6 +9553,98 @@ func TestStateStore_RestoreScalingEvents(t *testing.T) {
require.EqualValues(jobScalingEvents.ScalingEvents, out)
}
func TestStateStore_UpsertEventSink(t *testing.T) {
t.Parallel()
state := testStateStore(t)
sink := &structs.EventSink{
ID: "webhook-sink",
Type: structs.SinkWebhook,
}
require.NoError(t, state.UpsertEventSink(100, sink))
out, err := state.EventSinkByID(nil, "webhook-sink")
require.NoError(t, err)
require.Equal(t, structs.SinkWebhook, out.Type)
}
func TestStateStore_DeleteEventSinks(t *testing.T) {
t.Parallel()
state := testStateStore(t)
s1 := mock.EventSink()
s2 := mock.EventSink()
s3 := mock.EventSink()
s4 := mock.EventSink()
require.NoError(t, state.UpsertEventSink(100, s1))
require.NoError(t, state.UpsertEventSink(101, s2))
require.NoError(t, state.UpsertEventSink(102, s3))
require.NoError(t, state.UpsertEventSink(103, s4))
require.NoError(t, state.DeleteEventSinks(1000, []string{s1.ID, s2.ID, s3.ID}))
out, err := state.EventSinkByID(nil, s4.ID)
require.NoError(t, err)
require.NotNil(t, out)
out, err = state.EventSinkByID(nil, s1.ID)
require.NoError(t, err)
require.Nil(t, out)
}
func TestStateStore_EventSinks(t *testing.T) {
t.Parallel()
state := testStateStore(t)
s1 := mock.EventSink()
s2 := mock.EventSink()
s3 := mock.EventSink()
require.NoError(t, state.UpsertEventSink(100, s1))
require.NoError(t, state.UpsertEventSink(101, s2))
require.NoError(t, state.UpsertEventSink(102, s3))
iter, err := state.EventSinks(nil)
require.NoError(t, err)
var out []*structs.EventSink
for {
raw := iter.Next()
if raw == nil {
break
}
sink := raw.(*structs.EventSink)
out = append(out, sink)
}
require.Len(t, out, 3)
}
func TestStateStore_RestoreEventSink(t *testing.T) {
t.Parallel()
require := require.New(t)
state := testStateStore(t)
eventSink := &structs.EventSink{
ID: "eventsink",
}
restore, err := state.Restore()
require.NoError(err)
err = restore.EventSinkRestore(eventSink)
require.NoError(err)
require.NoError(restore.Commit())
out, err := state.EventSinkByID(nil, "eventsink")
require.NoError(err)
require.NotNil(out)
require.EqualValues(eventSink, out)
}
func TestStateStore_Abandon(t *testing.T) {
t.Parallel()

150
nomad/structs/event.go Normal file
View File

@ -0,0 +1,150 @@
package structs
import (
"fmt"
"net/url"
"strings"
multierror "github.com/hashicorp/go-multierror"
)
// EventStreamRequest is used to stream events from a servers EventBroker
type EventStreamRequest struct {
Topics map[Topic][]string
Index int
QueryOptions
}
type EventStreamWrapper struct {
Error *RpcError
Event *EventJson
}
type Topic string
const (
TopicDeployment Topic = "Deployment"
TopicEval Topic = "Eval"
TopicAlloc Topic = "Alloc"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicAll Topic = "*"
)
// Event represents a change in Nomads state.
type Event struct {
// Topic represeents the primary object for the event
Topic Topic
// Type is a short string representing the reason for the event
Type string
// Key is the primary identifier of the Event, The involved objects ID
Key string
// Namespace is the namespace of the object, If the object is not namespace
// aware (Node) it is left blank
Namespace string
// FilterKeys are a set of additional related keys that are used to include
// events during filtering.
FilterKeys []string
// Index is the raft index that corresponds to the event
Index uint64
// Payload is the Event itself see state/events.go for a list of events
Payload interface{}
}
// Events is a wrapper that contains a set of events for a given index.
type Events struct {
Index uint64
Events []Event
}
// EventJson is a wrapper for a JSON object
type EventJson struct {
Data []byte
}
func (j *EventJson) Copy() *EventJson {
n := new(EventJson)
*n = *j
n.Data = make([]byte, len(j.Data))
copy(n.Data, j.Data)
return n
}
type EventSinkUpsertRequest struct {
Sink *EventSink
WriteRequest
}
type EventSinkSpecificRequest struct {
ID string
QueryOptions
}
type EventSinkResponse struct {
Sink *EventSink
QueryMeta
}
type EventSinkDeleteRequest struct {
IDs []string
WriteRequest
}
type EventSinkListRequest struct {
QueryOptions
}
type EventSinkListResponse struct {
Sinks []*EventSink
QueryMeta
}
type SinkType string
const (
SinkWebhook SinkType = "webhook"
)
type EventSink struct {
ID string
Type SinkType
Topics map[Topic][]string
Address string
CreateIndex uint64
ModifyIndex uint64
}
func (e *EventSink) Validate() error {
var mErr multierror.Error
if e.ID == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Missing sink ID"))
} else if strings.Contains(e.ID, " ") {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink ID contains a space"))
} else if strings.Contains(e.ID, "\000") {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink ID contains a null character"))
}
switch e.Type {
case SinkWebhook:
if e.Address == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Webhook sink requires a valid Address"))
} else if _, err := url.Parse(e.Address); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Webhook sink Address must be a valid url: %w", err))
}
default:
mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink type invalid"))
}
return mErr.ErrorOrNil()
}

View File

@ -97,6 +97,8 @@ const (
ScalingEventRegisterRequestType MessageType = 38
CSIVolumeClaimBatchRequestType MessageType = 39
CSIPluginDeleteRequestType MessageType = 40
EventSinkUpsertRequestType MessageType = 41
EventSinkDeleteRequestType MessageType = 42
// Namespace types were moved from enterprise and therefore start at 64
NamespaceUpsertRequestType MessageType = 64
@ -10914,19 +10916,6 @@ type ACLTokenUpsertResponse struct {
WriteMeta
}
// EventStreamRequest is used to stream events from a servers EventBroker
type EventStreamRequest struct {
Topics map[Topic][]string
Index int
QueryOptions
}
type EventStreamWrapper struct {
Error *RpcError
Event *EventJson
}
// RpcError is used for serializing errors with a potential error code
type RpcError struct {
Message string
@ -10943,59 +10932,3 @@ func NewRpcError(err error, code *int64) *RpcError {
func (r *RpcError) Error() string {
return r.Message
}
type Topic string
const (
TopicDeployment Topic = "Deployment"
TopicEval Topic = "Eval"
TopicAlloc Topic = "Alloc"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicAll Topic = "*"
)
// Event represents a change in Nomads state.
type Event struct {
// Topic represeents the primary object for the event
Topic Topic
// Type is a short string representing the reason for the event
Type string
// Key is the primary identifier of the Event, The involved objects ID
Key string
// Namespace is the namespace of the object, If the object is not namespace
// aware (Node) it is left blank
Namespace string
// FilterKeys are a set of additional related keys that are used to include
// events during filtering.
FilterKeys []string
// Index is the raft index that corresponds to the event
Index uint64
// Payload is the Event itself see state/events.go for a list of events
Payload interface{}
}
// Events is a wrapper that contains a set of events for a given index.
type Events struct {
Index uint64
Events []Event
}
// EventJson is a wrapper for a JSON object
type EventJson struct {
Data []byte
}
func (j *EventJson) Copy() *EventJson {
n := new(EventJson)
*n = *j
n.Data = make([]byte, len(j.Data))
copy(n.Data, j.Data)
return n
}

View File

@ -5,8 +5,6 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us=
github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/a8m/tree v0.0.0-20181222104329-6a0b80129de4 h1:mK1/QgFPU4osbhjJ26B1w738kjQHaGJcon8uCLMS8fk=
github.com/a8m/tree v0.0.0-20181222104329-6a0b80129de4/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg=
github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8 h1:CkFIJJAKEbZbM2tKmCqt/v9ivgpikjPu5lnDsk8huLE=
github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg=
github.com/agext/levenshtein v1.2.1 h1:QmvMAjj2aEICytGiWzmxoE0x2KZvE0fvmqMOfy2tjT8=