open-nomad/nomad/event_sink_manager_test.go
Drew Bailey d62d8a8587
Event sink manager improvements (#9206)
* Improve managed sink run loop and reloading

resetCh no longer needed

length of buffer equal to count of items, not count of events in each item

update equality fn name, pr feedback

clean up sink manager sink creation

* update test to reflect changes

* bad editor find and replace

* pr feedback
2020-11-02 09:21:32 -05:00

635 lines
16 KiB
Go

package nomad
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
var _ SinkDelegate = &Server{}
type testDelegate struct{ s *state.StateStore }
func (t *testDelegate) State() *state.StateStore { return t.s }
func (t *testDelegate) getLeaderAcl() string { return "" }
func (t *testDelegate) Region() string { return "" }
func (t *testDelegate) RPC(method string, args interface{}, reply interface{}) error {
return nil
}
func newTestDelegate(t *testing.T) *testDelegate {
return &testDelegate{
s: state.TestStateStoreCfg(t, state.TestStateStorePublisher(t)),
}
}
func TestManager_Run(t *testing.T) {
t.Parallel()
// Start a test server and count the number of requests
receivedCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "application/json", r.Header.Get("Content-Type"))
var event structs.Events
dec := json.NewDecoder(r.Body)
require.NoError(t, dec.Decode(&event))
require.Equal(t, "Deployment", string(event.Events[0].Topic))
receivedCount++
}))
defer ts.Close()
// Store two sinks
s1 := mock.EventSink()
s1.Address = ts.URL
s2 := mock.EventSink()
s2.Address = ts.URL
td := newTestDelegate(t)
require.NoError(t, td.State().UpsertEventSink(1000, s1))
require.NoError(t, td.State().UpsertEventSink(1001, s2))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create the manager
manager := NewSinkManager(ctx, td, hclog.Default())
require.NoError(t, manager.establishManagedSinks())
require.Len(t, manager.sinkSubscriptions, 2)
// Run the manager in the background
runErr := make(chan error)
go func() {
err := manager.Run()
runErr <- err
}()
// Publish an event
broker, err := td.State().EventBroker()
require.NoError(t, err)
broker.Publish(&structs.Events{Index: 1, Events: []structs.Event{{Topic: "Deployment"}}})
testutil.WaitForResult(func() (bool, error) {
return receivedCount == 2, fmt.Errorf("webhook count not equal to expected want %d, got %d", 2, receivedCount)
}, func(err error) {
require.Fail(t, err.Error())
})
// Stop the manager
cancel()
select {
case err := <-runErr:
require.NoError(t, err)
case <-time.After(2 * time.Second):
require.Fail(t, "timeout waiting for manager to stop")
}
}
func TestManager_SinkErr(t *testing.T) {
t.Parallel()
receivedCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "application/json", r.Header.Get("Content-Type"))
var event structs.Events
dec := json.NewDecoder(r.Body)
require.NoError(t, dec.Decode(&event))
require.Equal(t, "Deployment", string(event.Events[0].Topic))
receivedCount++
}))
defer ts.Close()
s1 := mock.EventSink()
s1.Address = ts.URL
s2 := mock.EventSink()
s2.Address = ts.URL
td := newTestDelegate(t)
require.NoError(t, td.State().UpsertEventSink(1000, s1))
require.NoError(t, td.State().UpsertEventSink(1001, s2))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager := NewSinkManager(ctx, td, hclog.Default())
require.NoError(t, manager.establishManagedSinks())
require.Len(t, manager.sinkSubscriptions, 2)
runErr := make(chan error)
go func() {
err := manager.Run()
runErr <- err
}()
// Publish an event
broker, err := td.State().EventBroker()
require.NoError(t, err)
broker.Publish(&structs.Events{Index: 1, Events: []structs.Event{{Topic: "Deployment"}}})
testutil.WaitForResult(func() (bool, error) {
return receivedCount == 2, fmt.Errorf("webhook count not equal to expected want %d, got %d", 2, receivedCount)
}, func(err error) {
require.Fail(t, err.Error())
})
require.NoError(t, td.State().DeleteEventSinks(2000, []string{s1.ID}))
// Wait for the manager to drop the old managed sink
testutil.WaitForResult(func() (bool, error) {
if len(manager.sinkSubscriptions) != 1 {
return false, fmt.Errorf("expected manager to have 1 managed sink")
}
return true, nil
}, func(err error) {
require.Fail(t, err.Error())
})
// Stop the manager
cancel()
select {
case err := <-runErr:
require.NoError(t, err)
case <-time.After(2 * time.Second):
require.Fail(t, "timeout waiting for manager to stop")
}
}
// TestManager_Run_AddNew asserts that adding a new managed sink to the state
// store notifies the manager and starts the sink while leaving the existing
// managed sinks running
func TestManager_Run_AddNew(t *testing.T) {
t.Parallel()
received := make(chan struct{})
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "application/json", r.Header.Get("Content-Type"))
var event structs.Events
dec := json.NewDecoder(r.Body)
require.NoError(t, dec.Decode(&event))
require.Equal(t, "Deployment", string(event.Events[0].Topic))
close(received)
}))
defer ts.Close()
received2 := make(chan struct{})
ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "application/json", r.Header.Get("Content-Type"))
var event structs.Events
dec := json.NewDecoder(r.Body)
require.NoError(t, dec.Decode(&event))
require.Equal(t, "Deployment", string(event.Events[0].Topic))
close(received2)
}))
defer ts2.Close()
s1 := mock.EventSink()
s1.Address = ts.URL
td := newTestDelegate(t)
require.NoError(t, td.State().UpsertEventSink(1000, s1))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager := NewSinkManager(ctx, td, hclog.Default())
require.NoError(t, manager.establishManagedSinks())
require.Len(t, manager.sinkSubscriptions, 1)
runErr := make(chan error)
go func() {
err := manager.Run()
runErr <- err
}()
// Publish an event
broker, err := td.State().EventBroker()
require.NoError(t, err)
broker.Publish(&structs.Events{Index: 1, Events: []structs.Event{{Topic: "Deployment"}}})
select {
case <-received:
// pass
case <-time.After(2 * time.Second):
require.Fail(t, "timeout waiting for first sink to send event")
}
s2 := mock.EventSink()
s2.Address = ts2.URL
require.NoError(t, td.State().UpsertEventSink(1001, s2))
select {
case <-received2:
// pass
case <-time.After(2 * time.Second):
require.Fail(t, "timeout waiting for first sink to send event")
}
// stop
cancel()
select {
case err := <-runErr:
require.NoError(t, err)
case <-time.After(2 * time.Second):
require.Fail(t, "timeout waiting for manager to stop")
}
}
func TestManagedSink_Run_Webhook(t *testing.T) {
t.Parallel()
// Setup webhook destination
received := make(chan struct{})
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "application/json", r.Header.Get("Content-Type"))
var event structs.Events
dec := json.NewDecoder(r.Body)
require.NoError(t, dec.Decode(&event))
require.Equal(t, "Deployment", string(event.Events[0].Topic))
close(received)
}))
defer ts.Close()
td := newTestDelegate(t)
s1 := mock.EventSink()
s1.Address = ts.URL
require.NoError(t, td.State().UpsertEventSink(1000, s1))
ws := memdb.NewWatchSet()
_, err := td.State().EventSinkByID(ws, s1.ID)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create sink
mSink, err := NewManagedSink(ctx, s1.ID, td.State, hclog.NewNullLogger())
require.NoError(t, err)
// Run in background
go func() {
mSink.run()
}()
// Publish an event
broker, err := td.State().EventBroker()
require.NoError(t, err)
broker.Publish(&structs.Events{Index: 1, Events: []structs.Event{{Topic: "Deployment"}}})
// Ensure the webhook destination receives event
select {
case <-received:
// pass
case <-time.After(2 * time.Second):
require.Fail(t, "timeout waiting for webhook received")
}
}
// TestManagedSink_Run_Webhook_Update tests the behavior when updating a
// managed sink's EventSink address to different values
func TestManagedSink_Run_Webhook_Update(t *testing.T) {
t.Parallel()
// Setup webhook destination
received1 := make(chan int, 3)
ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "application/json", r.Header.Get("Content-Type"))
var event structs.Events
dec := json.NewDecoder(r.Body)
require.NoError(t, dec.Decode(&event))
require.Equal(t, "Deployment", string(event.Events[0].Topic))
received1 <- int(event.Index)
}))
defer ts1.Close()
// Setup a second webhook destination
received2 := make(chan int, 3)
ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "application/json", r.Header.Get("Content-Type"))
var event structs.Events
dec := json.NewDecoder(r.Body)
require.NoError(t, dec.Decode(&event))
require.Equal(t, "Deployment", string(event.Events[0].Topic))
received2 <- int(event.Index)
}))
defer ts2.Close()
td := newTestDelegate(t)
// Create and store a sink
s1 := mock.EventSink()
s1.ID = "sink1"
s1.Address = ts1.URL
require.NoError(t, td.State().UpsertEventSink(1000, s1))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start the managed sink
mSink, err := NewManagedSink(ctx, s1.ID, td.State, hclog.Default())
require.NoError(t, err)
errCh := make(chan error)
go func() {
err := mSink.run()
errCh <- err
}()
// Publish and event
broker, err := td.State().EventBroker()
require.NoError(t, err)
broker.Publish(&structs.Events{Index: 1, Events: []structs.Event{{Topic: "Deployment"}}})
// Ensure webhook received the event
select {
case got := <-received1:
require.Equal(t, 1, got)
case <-time.After(2 * time.Second):
require.Fail(t, "timeout waiting for webhook received")
}
// Wait and check that the sink reported the succesfully sent index
testutil.WaitForResult(func() (bool, error) {
ls := mSink.GetLastSuccess()
return int(ls) == 1, fmt.Errorf("expected last success to update")
}, func(err error) {
require.NoError(t, err)
})
// Update sink to point to new address
updateSink := mock.EventSink()
updateSink.ID = s1.ID
updateSink.Address = ts2.URL
require.NoError(t, td.State().UpsertEventSink(1001, updateSink))
// Wait for the address to propogate
testutil.WaitForResult(func() (bool, error) {
return mSink.Sink.Address == updateSink.Address, fmt.Errorf("expected managed sink address to update")
}, func(err error) {
require.Fail(t, err.Error())
})
// Publish a new event
broker.Publish(&structs.Events{Index: 2, Events: []structs.Event{{Topic: "Deployment"}}})
// Check we got it on the webhook side
select {
case got := <-received2:
require.Equal(t, 1, got)
case <-time.After(2 * time.Second):
require.Fail(t, "timeout waiting for webhook received")
}
// Wait and check that the sink reported the successfully sent index
testutil.WaitForResult(func() (bool, error) {
ls := mSink.GetLastSuccess()
if int(ls) != 2 {
return false, fmt.Errorf("expected last success to update")
}
return true, nil
}, func(error) {
t.Fatalf("expected sink progress")
})
// Point back to original
updateSink = mock.EventSink()
updateSink.ID = s1.ID
updateSink.Address = ts1.URL
require.NoError(t, td.State().UpsertEventSink(1002, updateSink))
// Wait for the address to propogate
testutil.WaitForResult(func() (bool, error) {
return mSink.Sink.Address == s1.Address, fmt.Errorf("expected managed sink address to update")
}, func(err error) {
require.FailNow(t, err.Error())
})
broker.Publish(&structs.Events{Index: 3, Events: []structs.Event{{Topic: "Deployment"}}})
select {
case got := <-received1:
got2 := <-received1
require.Equal(t, 2, got)
require.Equal(t, 3, got2)
case <-time.After(2 * time.Second):
require.Fail(t, "timeout waiting for webhook received")
}
// Stop
cancel()
select {
case err := <-errCh:
require.Error(t, err)
require.Equal(t, context.Canceled, err)
case <-time.After(2 * time.Second):
require.Fail(t, "timeout waiting for shutdown")
}
}
func TestManagedSink_Shutdown(t *testing.T) {
t.Parallel()
td := newTestDelegate(t)
s1 := mock.EventSink()
require.NoError(t, td.State().UpsertEventSink(1000, s1))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create sink
mSink, err := NewManagedSink(ctx, s1.ID, td.State, hclog.NewNullLogger())
require.NoError(t, err)
// Run in background
closed := make(chan struct{})
go func() {
err := mSink.run()
require.Error(t, err)
require.Equal(t, context.Canceled, err)
close(closed)
}()
// Stop the parent context
cancel()
select {
case <-closed:
case <-time.After(2 * time.Second):
require.Fail(t, "expected managed sink to stop")
}
}
func TestManagedSink_DeregisterSink(t *testing.T) {
t.Parallel()
td := newTestDelegate(t)
s1 := mock.EventSink()
require.NoError(t, td.State().UpsertEventSink(1000, s1))
ws := memdb.NewWatchSet()
_, err := td.State().EventSinkByID(ws, s1.ID)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create sink
mSink, err := NewManagedSink(ctx, s1.ID, td.State, hclog.NewNullLogger())
require.NoError(t, err)
// Run in background
closed := make(chan struct{})
go func() {
err := mSink.run()
close(closed)
require.Error(t, err)
require.Equal(t, ErrEventSinkDeregistered, err)
}()
// Stop the parent context
require.NoError(t, td.State().DeleteEventSinks(1001, []string{s1.ID}))
select {
case <-closed:
// success
case <-time.After(2 * time.Second):
require.Fail(t, "expected managed sink to stop")
}
}
// TestManagedSink_Run_UpdateProgress tests that the sink manager updates the
// event sinks progress in the state store
func TestManagedSink_Run_UpdateProgress(t *testing.T) {
t.Parallel()
// Start a test server and count the number of requests
receivedCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "application/json", r.Header.Get("Content-Type"))
var event structs.Events
dec := json.NewDecoder(r.Body)
require.NoError(t, dec.Decode(&event))
require.Equal(t, "Deployment", string(event.Events[0].Topic))
receivedCount++
}))
defer ts.Close()
// Store two sinks
s1 := mock.EventSink()
s1.Address = ts.URL
s2 := mock.EventSink()
s2.Address = ts.URL
srv, cancelSrv := TestServer(t, nil)
defer cancelSrv()
testutil.WaitForLeader(t, srv.RPC)
require.NoError(t, srv.State().UpsertEventSink(1000, s1))
require.NoError(t, srv.State().UpsertEventSink(1001, s2))
// Get the manager from the server
manager := srv.eventSinkManager
// Wait for manager to be running
testutil.WaitForResult(func() (bool, error) {
if manager.Running() {
return true, nil
}
return false, fmt.Errorf("expected manager to be running")
}, func(error) {
require.Fail(t, "expected manager to be running")
})
// Ensure the manager is running
require.True(t, manager.Running())
// Publish an event
broker, err := srv.State().EventBroker()
require.NoError(t, err)
broker.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Topic: "Deployment"}}})
// Wait for the webhook to receive the event
testutil.WaitForResult(func() (bool, error) {
return receivedCount == 2, fmt.Errorf("webhook count not equal to expected want %d, got %d", 2, receivedCount)
}, func(err error) {
require.Fail(t, err.Error())
})
// force an update
require.NoError(t, manager.updateSinkProgress())
// Check that the progress was saved via raft
testutil.WaitForResult(func() (bool, error) {
out1, err := srv.State().EventSinkByID(nil, s1.ID)
require.NoError(t, err)
out2, err := srv.State().EventSinkByID(nil, s2.ID)
require.NoError(t, err)
if out1.LatestIndex == 100 && out2.LatestIndex == 100 {
return true, nil
}
return false, fmt.Errorf("expected sinks to update from index out1: %d out2: %d", out1.LatestIndex, out2.LatestIndex)
}, func(error) {
require.Fail(t, "timeout waiting for progress to update via raft")
})
// Stop the server ensure manager is shutdown
cancelSrv()
testutil.WaitForResult(func() (bool, error) {
running := manager.Running()
if !running {
return true, nil
}
return false, fmt.Errorf("expected manager to not be running")
}, func(error) {
require.Fail(t, "timeout waiting for manager to report not running")
})
}