1275 lines
37 KiB
Go
1275 lines
37 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package state
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/nomad/ci"
|
|
"github.com/hashicorp/nomad/helper/pointer"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/shoenig/test/must"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// TestEventFromChange_SingleEventPerTable ensures that only a single event is
|
|
// created per table per memdb.Change
|
|
func TestEventFromChange_SingleEventPerTable(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
changes := Changes{
|
|
Index: 100,
|
|
MsgType: structs.JobRegisterRequestType,
|
|
Changes: memdb.Changes{
|
|
{
|
|
Table: "job_version",
|
|
Before: mock.Job(),
|
|
After: mock.Job(),
|
|
},
|
|
{
|
|
Table: "jobs",
|
|
Before: mock.Job(),
|
|
After: mock.Job(),
|
|
},
|
|
},
|
|
}
|
|
|
|
out := eventsFromChanges(s.db.ReadTxn(), changes)
|
|
require.Len(t, out.Events, 1)
|
|
require.Equal(t, out.Events[0].Type, structs.TypeJobRegistered)
|
|
}
|
|
|
|
func TestEventFromChange_ACLTokenSecretID(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
token := mock.ACLToken()
|
|
require.NotEmpty(t, token.SecretID)
|
|
|
|
// Create
|
|
changes := Changes{
|
|
Index: 100,
|
|
MsgType: structs.NodeRegisterRequestType,
|
|
Changes: memdb.Changes{
|
|
{
|
|
Table: "acl_token",
|
|
Before: nil,
|
|
After: token,
|
|
},
|
|
},
|
|
}
|
|
|
|
out := eventsFromChanges(s.db.ReadTxn(), changes)
|
|
require.Len(t, out.Events, 1)
|
|
// Ensure original value not altered
|
|
require.NotEmpty(t, token.SecretID)
|
|
|
|
aclTokenEvent, ok := out.Events[0].Payload.(*structs.ACLTokenEvent)
|
|
require.True(t, ok)
|
|
require.Empty(t, aclTokenEvent.ACLToken.SecretID)
|
|
|
|
require.Equal(t, token.SecretID, aclTokenEvent.SecretID())
|
|
|
|
// Delete
|
|
changes = Changes{
|
|
Index: 100,
|
|
MsgType: structs.NodeDeregisterRequestType,
|
|
Changes: memdb.Changes{
|
|
{
|
|
Table: "acl_token",
|
|
Before: token,
|
|
After: nil,
|
|
},
|
|
},
|
|
}
|
|
|
|
out2 := eventsFromChanges(s.db.ReadTxn(), changes)
|
|
require.Len(t, out2.Events, 1)
|
|
|
|
tokenEvent2, ok := out2.Events[0].Payload.(*structs.ACLTokenEvent)
|
|
require.True(t, ok)
|
|
require.Empty(t, tokenEvent2.ACLToken.SecretID)
|
|
}
|
|
|
|
func TestEventsFromChanges_DeploymentUpdate(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// setup
|
|
setupTx := s.db.WriteTxn(10)
|
|
|
|
j := mock.Job()
|
|
e := mock.Eval()
|
|
e.JobID = j.ID
|
|
|
|
d := mock.Deployment()
|
|
d.JobID = j.ID
|
|
|
|
require.NoError(t, s.upsertJobImpl(10, nil, j, false, setupTx))
|
|
require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx))
|
|
|
|
setupTx.Txn.Commit()
|
|
|
|
msgType := structs.DeploymentStatusUpdateRequestType
|
|
|
|
req := &structs.DeploymentStatusUpdateRequest{
|
|
DeploymentUpdate: &structs.DeploymentStatusUpdate{
|
|
DeploymentID: d.ID,
|
|
Status: structs.DeploymentStatusPaused,
|
|
StatusDescription: structs.DeploymentStatusDescriptionPaused,
|
|
},
|
|
Eval: e,
|
|
// Exlude Job and assert its added
|
|
}
|
|
|
|
require.NoError(t, s.UpdateDeploymentStatus(msgType, 100, req))
|
|
|
|
events := WaitForEvents(t, s, 100, 1, 1*time.Second)
|
|
require.Len(t, events, 2)
|
|
|
|
got := events[0]
|
|
require.Equal(t, uint64(100), got.Index)
|
|
require.Equal(t, d.ID, got.Key)
|
|
|
|
de := got.Payload.(*structs.DeploymentEvent)
|
|
require.Equal(t, structs.DeploymentStatusPaused, de.Deployment.Status)
|
|
require.Contains(t, got.FilterKeys, j.ID)
|
|
}
|
|
|
|
func TestEventsFromChanges_DeploymentPromotion(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// setup
|
|
setupTx := s.db.WriteTxn(10)
|
|
|
|
j := mock.Job()
|
|
tg1 := j.TaskGroups[0]
|
|
tg2 := tg1.Copy()
|
|
tg2.Name = "foo"
|
|
j.TaskGroups = append(j.TaskGroups, tg2)
|
|
require.NoError(t, s.upsertJobImpl(10, nil, j, false, setupTx))
|
|
|
|
d := mock.Deployment()
|
|
d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion
|
|
d.JobID = j.ID
|
|
d.TaskGroups = map[string]*structs.DeploymentState{
|
|
"web": {
|
|
DesiredTotal: 10,
|
|
DesiredCanaries: 1,
|
|
},
|
|
"foo": {
|
|
DesiredTotal: 10,
|
|
DesiredCanaries: 1,
|
|
},
|
|
}
|
|
require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx))
|
|
|
|
// create set of allocs
|
|
c1 := mock.Alloc()
|
|
c1.JobID = j.ID
|
|
c1.DeploymentID = d.ID
|
|
d.TaskGroups[c1.TaskGroup].PlacedCanaries = append(d.TaskGroups[c1.TaskGroup].PlacedCanaries, c1.ID)
|
|
c1.DeploymentStatus = &structs.AllocDeploymentStatus{
|
|
Healthy: pointer.Of(true),
|
|
}
|
|
c2 := mock.Alloc()
|
|
c2.JobID = j.ID
|
|
c2.DeploymentID = d.ID
|
|
d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID)
|
|
c2.TaskGroup = tg2.Name
|
|
c2.DeploymentStatus = &structs.AllocDeploymentStatus{
|
|
Healthy: pointer.Of(true),
|
|
}
|
|
|
|
require.NoError(t, s.upsertAllocsImpl(10, []*structs.Allocation{c1, c2}, setupTx))
|
|
|
|
// commit setup transaction
|
|
setupTx.Txn.Commit()
|
|
|
|
e := mock.Eval()
|
|
// Request to promote canaries
|
|
msgType := structs.DeploymentPromoteRequestType
|
|
req := &structs.ApplyDeploymentPromoteRequest{
|
|
DeploymentPromoteRequest: structs.DeploymentPromoteRequest{
|
|
DeploymentID: d.ID,
|
|
All: true,
|
|
},
|
|
Eval: e,
|
|
}
|
|
|
|
require.NoError(t, s.UpdateDeploymentPromotion(msgType, 100, req))
|
|
|
|
events := WaitForEvents(t, s, 100, 1, 1*time.Second)
|
|
require.Len(t, events, 4)
|
|
|
|
got := events[0]
|
|
require.Equal(t, uint64(100), got.Index)
|
|
require.Equal(t, d.ID, got.Key)
|
|
|
|
de := got.Payload.(*structs.DeploymentEvent)
|
|
require.Equal(t, structs.DeploymentStatusRunning, de.Deployment.Status)
|
|
require.Equal(t, structs.TypeDeploymentPromotion, got.Type)
|
|
}
|
|
|
|
func TestEventsFromChanges_DeploymentAllocHealthRequestType(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// setup
|
|
setupTx := s.db.WriteTxn(10)
|
|
|
|
j := mock.Job()
|
|
tg1 := j.TaskGroups[0]
|
|
tg2 := tg1.Copy()
|
|
tg2.Name = "foo"
|
|
j.TaskGroups = append(j.TaskGroups, tg2)
|
|
require.NoError(t, s.upsertJobImpl(10, nil, j, false, setupTx))
|
|
|
|
d := mock.Deployment()
|
|
d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion
|
|
d.JobID = j.ID
|
|
d.TaskGroups = map[string]*structs.DeploymentState{
|
|
"web": {
|
|
DesiredTotal: 10,
|
|
DesiredCanaries: 1,
|
|
},
|
|
"foo": {
|
|
DesiredTotal: 10,
|
|
DesiredCanaries: 1,
|
|
},
|
|
}
|
|
require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx))
|
|
|
|
// create set of allocs
|
|
c1 := mock.Alloc()
|
|
c1.JobID = j.ID
|
|
c1.DeploymentID = d.ID
|
|
d.TaskGroups[c1.TaskGroup].PlacedCanaries = append(d.TaskGroups[c1.TaskGroup].PlacedCanaries, c1.ID)
|
|
c1.DeploymentStatus = &structs.AllocDeploymentStatus{
|
|
Healthy: pointer.Of(true),
|
|
}
|
|
c2 := mock.Alloc()
|
|
c2.JobID = j.ID
|
|
c2.DeploymentID = d.ID
|
|
d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID)
|
|
c2.TaskGroup = tg2.Name
|
|
c2.DeploymentStatus = &structs.AllocDeploymentStatus{
|
|
Healthy: pointer.Of(true),
|
|
}
|
|
|
|
require.NoError(t, s.upsertAllocsImpl(10, []*structs.Allocation{c1, c2}, setupTx))
|
|
|
|
// Commit setup
|
|
setupTx.Commit()
|
|
|
|
msgType := structs.DeploymentAllocHealthRequestType
|
|
|
|
req := &structs.ApplyDeploymentAllocHealthRequest{
|
|
DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{
|
|
DeploymentID: d.ID,
|
|
HealthyAllocationIDs: []string{c1.ID},
|
|
UnhealthyAllocationIDs: []string{c2.ID},
|
|
},
|
|
DeploymentUpdate: &structs.DeploymentStatusUpdate{
|
|
DeploymentID: d.ID,
|
|
},
|
|
}
|
|
|
|
require.NoError(t, s.UpdateDeploymentAllocHealth(msgType, 100, req))
|
|
|
|
events := WaitForEvents(t, s, 100, 1, 1*time.Second)
|
|
require.Len(t, events, 3)
|
|
|
|
var allocEvents []structs.Event
|
|
var deploymentEvent []structs.Event
|
|
for _, e := range events {
|
|
if e.Topic == structs.TopicAllocation {
|
|
allocEvents = append(allocEvents, e)
|
|
} else if e.Topic == structs.TopicDeployment {
|
|
deploymentEvent = append(deploymentEvent, e)
|
|
}
|
|
}
|
|
|
|
require.Len(t, allocEvents, 2)
|
|
for _, e := range allocEvents {
|
|
require.Equal(t, 100, int(e.Index))
|
|
require.Equal(t, structs.TypeDeploymentAllocHealth, e.Type)
|
|
require.Equal(t, structs.TopicAllocation, e.Topic)
|
|
}
|
|
|
|
require.Len(t, deploymentEvent, 1)
|
|
for _, e := range deploymentEvent {
|
|
require.Equal(t, 100, int(e.Index))
|
|
require.Equal(t, structs.TypeDeploymentAllocHealth, e.Type)
|
|
require.Equal(t, structs.TopicDeployment, e.Topic)
|
|
require.Equal(t, d.ID, e.Key)
|
|
}
|
|
}
|
|
|
|
func TestEventsFromChanges_UpsertNodeEventsType(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// setup
|
|
n1 := mock.Node()
|
|
n2 := mock.Node()
|
|
|
|
require.NoError(t, s.UpsertNode(structs.MsgTypeTestSetup, 10, n1))
|
|
require.NoError(t, s.UpsertNode(structs.MsgTypeTestSetup, 12, n2))
|
|
|
|
msgType := structs.UpsertNodeEventsType
|
|
req := &structs.EmitNodeEventsRequest{
|
|
NodeEvents: map[string][]*structs.NodeEvent{
|
|
n1.ID: {
|
|
{
|
|
Message: "update",
|
|
},
|
|
},
|
|
n2.ID: {
|
|
{
|
|
Message: "update",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
require.NoError(t, s.UpsertNodeEvents(msgType, 100, req.NodeEvents))
|
|
events := WaitForEvents(t, s, 100, 1, 1*time.Second)
|
|
require.Len(t, events, 2)
|
|
|
|
for _, e := range events {
|
|
require.Equal(t, structs.TopicNode, e.Topic)
|
|
require.Equal(t, structs.TypeNodeEvent, e.Type)
|
|
event := e.Payload.(*structs.NodeStreamEvent)
|
|
require.Equal(t, "update", event.Node.Events[len(event.Node.Events)-1].Message)
|
|
}
|
|
|
|
}
|
|
|
|
func TestEventsFromChanges_NodeUpdateStatusRequest(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// setup
|
|
n1 := mock.Node()
|
|
|
|
require.NoError(t, s.UpsertNode(structs.MsgTypeTestSetup, 10, n1))
|
|
|
|
updated := time.Now()
|
|
msgType := structs.NodeUpdateStatusRequestType
|
|
req := &structs.NodeUpdateStatusRequest{
|
|
NodeID: n1.ID,
|
|
Status: structs.NodeStatusDown,
|
|
UpdatedAt: updated.UnixNano(),
|
|
NodeEvent: &structs.NodeEvent{Message: "down"},
|
|
}
|
|
|
|
require.NoError(t, s.UpdateNodeStatus(msgType, 100, req.NodeID, req.Status, req.UpdatedAt, req.NodeEvent))
|
|
events := WaitForEvents(t, s, 100, 1, 1*time.Second)
|
|
require.Len(t, events, 1)
|
|
|
|
e := events[0]
|
|
require.Equal(t, structs.TopicNode, e.Topic)
|
|
require.Equal(t, structs.TypeNodeEvent, e.Type)
|
|
event := e.Payload.(*structs.NodeStreamEvent)
|
|
require.Equal(t, "down", event.Node.Events[len(event.Node.Events)-1].Message)
|
|
require.Equal(t, structs.NodeStatusDown, event.Node.Status)
|
|
}
|
|
|
|
func TestEventsFromChanges_NodePoolUpsertRequestType(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// Create test node pool.
|
|
pool := mock.NodePool()
|
|
err := s.UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
|
|
must.NoError(t, err)
|
|
|
|
// Update test node pool.
|
|
updated := pool.Copy()
|
|
updated.Meta["updated"] = "true"
|
|
err = s.UpsertNodePools(structs.NodePoolUpsertRequestType, 1001, []*structs.NodePool{updated})
|
|
must.NoError(t, err)
|
|
|
|
// Wait and verify update event.
|
|
events := WaitForEvents(t, s, 1001, 1, 1*time.Second)
|
|
must.Len(t, 1, events)
|
|
|
|
e := events[0]
|
|
must.Eq(t, structs.TopicNodePool, e.Topic)
|
|
must.Eq(t, structs.TypeNodePoolUpserted, e.Type)
|
|
must.Eq(t, pool.Name, e.Key)
|
|
|
|
payload := e.Payload.(*structs.NodePoolEvent)
|
|
must.Eq(t, updated, payload.NodePool)
|
|
}
|
|
|
|
func TestEventsFromChanges_NodePoolDeleteRequestType(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// Create test node pool.
|
|
pool := mock.NodePool()
|
|
err := s.UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
|
|
must.NoError(t, err)
|
|
|
|
// Delete test node pool.
|
|
err = s.DeleteNodePools(structs.NodePoolDeleteRequestType, 1001, []string{pool.Name})
|
|
must.NoError(t, err)
|
|
|
|
// Wait and verify delete event.
|
|
events := WaitForEvents(t, s, 1001, 1, 1*time.Second)
|
|
must.Len(t, 1, events)
|
|
|
|
e := events[0]
|
|
must.Eq(t, structs.TopicNodePool, e.Topic)
|
|
must.Eq(t, structs.TypeNodePoolDeleted, e.Type)
|
|
must.Eq(t, pool.Name, e.Key)
|
|
|
|
payload := e.Payload.(*structs.NodePoolEvent)
|
|
must.Eq(t, pool, payload.NodePool)
|
|
}
|
|
|
|
func TestEventsFromChanges_EvalUpdateRequestType(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// setup
|
|
e1 := mock.Eval()
|
|
|
|
require.NoError(t, s.UpsertEvals(structs.MsgTypeTestSetup, 10, []*structs.Evaluation{e1}))
|
|
|
|
e2 := mock.Eval()
|
|
e2.ID = e1.ID
|
|
e2.JobID = e1.JobID
|
|
e2.Status = structs.EvalStatusBlocked
|
|
|
|
msgType := structs.EvalUpdateRequestType
|
|
req := &structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{e2},
|
|
}
|
|
|
|
require.NoError(t, s.UpsertEvals(msgType, 100, req.Evals))
|
|
|
|
events := WaitForEvents(t, s, 100, 1, 1*time.Second)
|
|
require.Len(t, events, 1)
|
|
|
|
e := events[0]
|
|
require.Equal(t, structs.TopicEvaluation, e.Topic)
|
|
require.Equal(t, structs.TypeEvalUpdated, e.Type)
|
|
require.Contains(t, e.FilterKeys, e2.JobID)
|
|
require.Contains(t, e.FilterKeys, e2.DeploymentID)
|
|
event := e.Payload.(*structs.EvaluationEvent)
|
|
require.Equal(t, "blocked", event.Evaluation.Status)
|
|
}
|
|
|
|
func TestEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// setup
|
|
alloc := mock.Alloc()
|
|
alloc2 := mock.Alloc()
|
|
job := alloc.Job
|
|
alloc.Job = nil
|
|
alloc2.Job = nil
|
|
|
|
d := mock.Deployment()
|
|
alloc.DeploymentID = d.ID
|
|
alloc2.DeploymentID = d.ID
|
|
|
|
require.NoError(t, s.UpsertJob(structs.MsgTypeTestSetup, 9, nil, job))
|
|
|
|
eval := mock.Eval()
|
|
eval.JobID = job.ID
|
|
|
|
// Create an eval
|
|
require.NoError(t, s.UpsertEvals(structs.MsgTypeTestSetup, 10, []*structs.Evaluation{eval}))
|
|
|
|
msgType := structs.ApplyPlanResultsRequestType
|
|
req := &structs.ApplyPlanResultsRequest{
|
|
AllocUpdateRequest: structs.AllocUpdateRequest{
|
|
Alloc: []*structs.Allocation{alloc, alloc2},
|
|
Job: job,
|
|
},
|
|
Deployment: d,
|
|
EvalID: eval.ID,
|
|
}
|
|
|
|
require.NoError(t, s.UpsertPlanResults(msgType, 100, req))
|
|
|
|
events := WaitForEvents(t, s, 100, 1, 1*time.Second)
|
|
require.Len(t, events, 5)
|
|
|
|
var allocs []structs.Event
|
|
var evals []structs.Event
|
|
var jobs []structs.Event
|
|
var deploys []structs.Event
|
|
for _, e := range events {
|
|
if e.Topic == structs.TopicAllocation {
|
|
allocs = append(allocs, e)
|
|
} else if e.Topic == structs.TopicEvaluation {
|
|
evals = append(evals, e)
|
|
} else if e.Topic == structs.TopicJob {
|
|
jobs = append(jobs, e)
|
|
} else if e.Topic == structs.TopicDeployment {
|
|
deploys = append(deploys, e)
|
|
}
|
|
require.Equal(t, structs.TypePlanResult, e.Type)
|
|
}
|
|
require.Len(t, allocs, 2)
|
|
require.Len(t, evals, 1)
|
|
require.Len(t, jobs, 1)
|
|
require.Len(t, deploys, 1)
|
|
}
|
|
|
|
func TestEventsFromChanges_BatchNodeUpdateDrainRequestType(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// setup
|
|
n1 := mock.Node()
|
|
n2 := mock.Node()
|
|
|
|
require.NoError(t, s.UpsertNode(structs.MsgTypeTestSetup, 10, n1))
|
|
require.NoError(t, s.UpsertNode(structs.MsgTypeTestSetup, 11, n2))
|
|
|
|
updated := time.Now()
|
|
msgType := structs.BatchNodeUpdateDrainRequestType
|
|
|
|
expectedDrain := &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: -1 * time.Second,
|
|
},
|
|
}
|
|
event := &structs.NodeEvent{
|
|
Message: "Drain strategy enabled",
|
|
Subsystem: structs.NodeEventSubsystemDrain,
|
|
Timestamp: time.Now(),
|
|
}
|
|
req := structs.BatchNodeUpdateDrainRequest{
|
|
Updates: map[string]*structs.DrainUpdate{
|
|
n1.ID: {
|
|
DrainStrategy: expectedDrain,
|
|
},
|
|
n2.ID: {
|
|
DrainStrategy: expectedDrain,
|
|
},
|
|
},
|
|
NodeEvents: map[string]*structs.NodeEvent{
|
|
n1.ID: event,
|
|
n2.ID: event,
|
|
},
|
|
UpdatedAt: updated.UnixNano(),
|
|
}
|
|
|
|
require.NoError(t, s.BatchUpdateNodeDrain(msgType, 100, req.UpdatedAt, req.Updates, req.NodeEvents))
|
|
|
|
events := WaitForEvents(t, s, 100, 1, 1*time.Second)
|
|
require.Len(t, events, 2)
|
|
|
|
for _, e := range events {
|
|
require.Equal(t, 100, int(e.Index))
|
|
require.Equal(t, structs.TypeNodeDrain, e.Type)
|
|
require.Equal(t, structs.TopicNode, e.Topic)
|
|
ne := e.Payload.(*structs.NodeStreamEvent)
|
|
require.Equal(t, event.Message, ne.Node.Events[len(ne.Node.Events)-1].Message)
|
|
}
|
|
}
|
|
|
|
func TestEventsFromChanges_NodeUpdateEligibilityRequestType(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// setup
|
|
n1 := mock.Node()
|
|
|
|
require.NoError(t, s.UpsertNode(structs.MsgTypeTestSetup, 10, n1))
|
|
|
|
msgType := structs.NodeUpdateEligibilityRequestType
|
|
|
|
event := &structs.NodeEvent{
|
|
Message: "Node marked as ineligible",
|
|
Subsystem: structs.NodeEventSubsystemCluster,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
req := structs.NodeUpdateEligibilityRequest{
|
|
NodeID: n1.ID,
|
|
NodeEvent: event,
|
|
Eligibility: structs.NodeSchedulingIneligible,
|
|
UpdatedAt: time.Now().UnixNano(),
|
|
}
|
|
|
|
require.NoError(t, s.UpdateNodeEligibility(msgType, 100, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent))
|
|
|
|
events := WaitForEvents(t, s, 100, 1, 1*time.Second)
|
|
require.Len(t, events, 1)
|
|
|
|
for _, e := range events {
|
|
require.Equal(t, 100, int(e.Index))
|
|
require.Equal(t, structs.TypeNodeDrain, e.Type)
|
|
require.Equal(t, structs.TopicNode, e.Topic)
|
|
ne := e.Payload.(*structs.NodeStreamEvent)
|
|
require.Equal(t, event.Message, ne.Node.Events[len(ne.Node.Events)-1].Message)
|
|
require.Equal(t, structs.NodeSchedulingIneligible, ne.Node.SchedulingEligibility)
|
|
}
|
|
}
|
|
|
|
func TestEventsFromChanges_AllocUpdateDesiredTransitionRequestType(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
alloc := mock.Alloc()
|
|
|
|
require.Nil(t, s.UpsertJob(structs.MsgTypeTestSetup, 10, nil, alloc.Job))
|
|
require.Nil(t, s.UpsertAllocs(structs.MsgTypeTestSetup, 11, []*structs.Allocation{alloc}))
|
|
|
|
msgType := structs.AllocUpdateDesiredTransitionRequestType
|
|
|
|
eval := &structs.Evaluation{
|
|
ID: uuid.Generate(),
|
|
Namespace: alloc.Namespace,
|
|
Priority: alloc.Job.Priority,
|
|
Type: alloc.Job.Type,
|
|
TriggeredBy: structs.EvalTriggerNodeDrain,
|
|
JobID: alloc.Job.ID,
|
|
JobModifyIndex: alloc.Job.ModifyIndex,
|
|
Status: structs.EvalStatusPending,
|
|
}
|
|
evals := []*structs.Evaluation{eval}
|
|
|
|
req := &structs.AllocUpdateDesiredTransitionRequest{
|
|
Allocs: map[string]*structs.DesiredTransition{
|
|
alloc.ID: {Migrate: pointer.Of(true)},
|
|
},
|
|
Evals: evals,
|
|
}
|
|
|
|
require.NoError(t, s.UpdateAllocsDesiredTransitions(msgType, 100, req.Allocs, req.Evals))
|
|
|
|
events := WaitForEvents(t, s, 100, 1, 1*time.Second)
|
|
require.Len(t, events, 2)
|
|
|
|
var allocs []structs.Event
|
|
var evalEvents []structs.Event
|
|
for _, e := range events {
|
|
if e.Topic == structs.TopicEvaluation {
|
|
evalEvents = append(evalEvents, e)
|
|
} else if e.Topic == structs.TopicAllocation {
|
|
allocs = append(allocs, e)
|
|
} else {
|
|
require.Fail(t, "unexpected event type")
|
|
}
|
|
|
|
require.Equal(t, structs.TypeAllocationUpdateDesiredStatus, e.Type)
|
|
}
|
|
|
|
require.Len(t, allocs, 1)
|
|
require.Len(t, evalEvents, 1)
|
|
}
|
|
|
|
func TestEventsFromChanges_JobBatchDeregisterRequestType(t *testing.T) {
|
|
// TODO Job batch deregister logic mostly occurs in the FSM
|
|
t.SkipNow()
|
|
|
|
}
|
|
func TestEventsFromChanges_AllocClientUpdateRequestType(t *testing.T) {
|
|
t.SkipNow()
|
|
}
|
|
|
|
func TestEventsFromChanges_JobDeregisterRequestType(t *testing.T) {
|
|
t.SkipNow()
|
|
}
|
|
|
|
func TestEventsFromChanges_WithDeletion(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
changes := Changes{
|
|
Index: uint64(1),
|
|
Changes: memdb.Changes{
|
|
{
|
|
Table: "jobs",
|
|
Before: &structs.Job{},
|
|
After: &structs.Job{},
|
|
},
|
|
{
|
|
Table: "jobs",
|
|
Before: &structs.Job{},
|
|
After: nil, // deleted
|
|
},
|
|
},
|
|
MsgType: structs.JobDeregisterRequestType,
|
|
}
|
|
|
|
event := eventsFromChanges(nil, changes)
|
|
require.NotNil(t, event)
|
|
|
|
require.Len(t, event.Events, 1)
|
|
}
|
|
|
|
func TestEventsFromChanges_WithNodeDeregistration(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
before := &structs.Node{
|
|
ID: "some-id",
|
|
Datacenter: "some-datacenter",
|
|
}
|
|
|
|
changes := Changes{
|
|
Index: uint64(1),
|
|
Changes: memdb.Changes{
|
|
{
|
|
Table: "nodes",
|
|
Before: before,
|
|
After: nil, // deleted
|
|
},
|
|
},
|
|
MsgType: structs.NodeDeregisterRequestType,
|
|
}
|
|
|
|
actual := eventsFromChanges(nil, changes)
|
|
require.NotNil(t, actual)
|
|
|
|
require.Len(t, actual.Events, 1)
|
|
|
|
event := actual.Events[0]
|
|
|
|
require.Equal(t, structs.TypeNodeDeregistration, event.Type)
|
|
require.Equal(t, uint64(1), event.Index)
|
|
require.Equal(t, structs.TopicNode, event.Topic)
|
|
require.Equal(t, "some-id", event.Key)
|
|
|
|
require.Len(t, event.FilterKeys, 0)
|
|
|
|
nodeEvent, ok := event.Payload.(*structs.NodeStreamEvent)
|
|
require.True(t, ok)
|
|
require.Equal(t, *before, *nodeEvent.Node)
|
|
}
|
|
|
|
func TestNodeEventsFromChanges(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
cases := []struct {
|
|
Name string
|
|
MsgType structs.MessageType
|
|
Setup func(s *StateStore, tx *txn) error
|
|
Mutate func(s *StateStore, tx *txn) error
|
|
WantEvents []structs.Event
|
|
WantTopic structs.Topic
|
|
}{
|
|
{
|
|
MsgType: structs.NodeRegisterRequestType,
|
|
WantTopic: structs.TopicNode,
|
|
Name: "node registered",
|
|
Mutate: func(s *StateStore, tx *txn) error {
|
|
return upsertNodeTxn(tx, tx.Index, testNode())
|
|
},
|
|
WantEvents: []structs.Event{{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeRegistration,
|
|
Key: testNodeID(),
|
|
Index: 100,
|
|
Payload: &structs.NodeStreamEvent{
|
|
Node: testNode(),
|
|
},
|
|
}},
|
|
},
|
|
{
|
|
MsgType: structs.NodeRegisterRequestType,
|
|
WantTopic: structs.TopicNode,
|
|
Name: "node registered initializing",
|
|
Mutate: func(s *StateStore, tx *txn) error {
|
|
return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady))
|
|
},
|
|
WantEvents: []structs.Event{{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeRegistration,
|
|
Key: testNodeID(),
|
|
Index: 100,
|
|
Payload: &structs.NodeStreamEvent{
|
|
Node: testNode(nodeNotReady),
|
|
},
|
|
}},
|
|
},
|
|
{
|
|
MsgType: structs.NodeDeregisterRequestType,
|
|
WantTopic: structs.TopicNode,
|
|
Name: "node deregistered",
|
|
Setup: func(s *StateStore, tx *txn) error {
|
|
return upsertNodeTxn(tx, tx.Index, testNode())
|
|
},
|
|
Mutate: func(s *StateStore, tx *txn) error {
|
|
return deleteNodeTxn(tx, tx.Index, []string{testNodeID()})
|
|
},
|
|
WantEvents: []structs.Event{{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeDeregistration,
|
|
Key: testNodeID(),
|
|
Index: 100,
|
|
Payload: &structs.NodeStreamEvent{
|
|
Node: testNode(),
|
|
},
|
|
}},
|
|
},
|
|
{
|
|
MsgType: structs.NodeDeregisterRequestType,
|
|
WantTopic: structs.TopicNode,
|
|
Name: "batch node deregistered",
|
|
Setup: func(s *StateStore, tx *txn) error {
|
|
require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode()))
|
|
return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo))
|
|
},
|
|
Mutate: func(s *StateStore, tx *txn) error {
|
|
return deleteNodeTxn(tx, tx.Index, []string{testNodeID(), testNodeIDTwo()})
|
|
},
|
|
WantEvents: []structs.Event{
|
|
{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeDeregistration,
|
|
Key: testNodeID(),
|
|
Index: 100,
|
|
Payload: &structs.NodeStreamEvent{
|
|
Node: testNode(),
|
|
},
|
|
},
|
|
{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeDeregistration,
|
|
Key: testNodeIDTwo(),
|
|
Index: 100,
|
|
Payload: &structs.NodeStreamEvent{
|
|
Node: testNode(nodeIDTwo),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
MsgType: structs.UpsertNodeEventsType,
|
|
WantTopic: structs.TopicNode,
|
|
Name: "batch node events upserted",
|
|
Setup: func(s *StateStore, tx *txn) error {
|
|
require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode()))
|
|
return upsertNodeTxn(tx, tx.Index, testNode(nodeIDTwo))
|
|
},
|
|
Mutate: func(s *StateStore, tx *txn) error {
|
|
eventFn := func(id string) []*structs.NodeEvent {
|
|
return []*structs.NodeEvent{
|
|
{
|
|
Message: "test event one",
|
|
Subsystem: "Cluster",
|
|
Details: map[string]string{
|
|
"NodeID": id,
|
|
},
|
|
},
|
|
{
|
|
Message: "test event two",
|
|
Subsystem: "Cluster",
|
|
Details: map[string]string{
|
|
"NodeID": id,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
require.NoError(t, s.upsertNodeEvents(tx.Index, testNodeID(), eventFn(testNodeID()), tx))
|
|
return s.upsertNodeEvents(tx.Index, testNodeIDTwo(), eventFn(testNodeIDTwo()), tx)
|
|
},
|
|
WantEvents: []structs.Event{
|
|
{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeEvent,
|
|
Key: testNodeID(),
|
|
Index: 100,
|
|
Payload: &structs.NodeStreamEvent{
|
|
Node: testNode(),
|
|
},
|
|
},
|
|
{
|
|
Topic: structs.TopicNode,
|
|
Type: structs.TypeNodeEvent,
|
|
Key: testNodeIDTwo(),
|
|
Index: 100,
|
|
Payload: &structs.NodeStreamEvent{
|
|
Node: testNode(nodeIDTwo),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.Name, func(t *testing.T) {
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
if tc.Setup != nil {
|
|
// Bypass publish mechanism for setup
|
|
setupTx := s.db.WriteTxn(10)
|
|
require.NoError(t, tc.Setup(s, setupTx))
|
|
setupTx.Txn.Commit()
|
|
}
|
|
|
|
tx := s.db.WriteTxnMsgT(tc.MsgType, 100)
|
|
require.NoError(t, tc.Mutate(s, tx))
|
|
|
|
changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: tc.MsgType}
|
|
got := eventsFromChanges(tx, changes)
|
|
|
|
require.NotNil(t, got)
|
|
|
|
require.Equal(t, len(tc.WantEvents), len(got.Events))
|
|
for idx, g := range got.Events {
|
|
// assert equality of shared fields
|
|
|
|
want := tc.WantEvents[idx]
|
|
require.Equal(t, want.Index, g.Index)
|
|
require.Equal(t, want.Key, g.Key)
|
|
require.Equal(t, want.Topic, g.Topic)
|
|
|
|
switch tc.MsgType {
|
|
case structs.NodeRegisterRequestType:
|
|
requireNodeRegistrationEventEqual(t, tc.WantEvents[idx], g)
|
|
case structs.NodeDeregisterRequestType:
|
|
requireNodeDeregistrationEventEqual(t, tc.WantEvents[idx], g)
|
|
case structs.UpsertNodeEventsType:
|
|
requireNodeEventEqual(t, tc.WantEvents[idx], g)
|
|
default:
|
|
require.Fail(t, "unhandled message type")
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestNodeDrainEventFromChanges(t *testing.T) {
|
|
ci.Parallel(t)
|
|
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer s.StopEventBroker()
|
|
|
|
// setup
|
|
setupTx := s.db.WriteTxn(10)
|
|
|
|
node := mock.Node()
|
|
alloc1 := mock.Alloc()
|
|
alloc2 := mock.Alloc()
|
|
alloc1.NodeID = node.ID
|
|
alloc2.NodeID = node.ID
|
|
|
|
require.NoError(t, upsertNodeTxn(setupTx, 10, node))
|
|
require.NoError(t, s.upsertAllocsImpl(100, []*structs.Allocation{alloc1, alloc2}, setupTx))
|
|
setupTx.Txn.Commit()
|
|
|
|
// changes
|
|
tx := s.db.WriteTxn(100)
|
|
|
|
strat := &structs.DrainStrategy{
|
|
DrainSpec: structs.DrainSpec{
|
|
Deadline: 10 * time.Minute,
|
|
IgnoreSystemJobs: false,
|
|
},
|
|
StartedAt: time.Now(),
|
|
}
|
|
markEligible := false
|
|
updatedAt := time.Now()
|
|
event := &structs.NodeEvent{}
|
|
|
|
require.NoError(t, s.updateNodeDrainImpl(tx, 100, node.ID, strat, markEligible, updatedAt.UnixNano(), event, nil, "", false))
|
|
changes := Changes{Changes: tx.Changes(), Index: 100, MsgType: structs.NodeUpdateDrainRequestType}
|
|
got := eventsFromChanges(tx, changes)
|
|
|
|
require.Len(t, got.Events, 1)
|
|
|
|
require.Equal(t, structs.TopicNode, got.Events[0].Topic)
|
|
require.Equal(t, structs.TypeNodeDrain, got.Events[0].Type)
|
|
require.Equal(t, uint64(100), got.Events[0].Index)
|
|
|
|
nodeEvent, ok := got.Events[0].Payload.(*structs.NodeStreamEvent)
|
|
require.True(t, ok)
|
|
|
|
require.Equal(t, structs.NodeSchedulingIneligible, nodeEvent.Node.SchedulingEligibility)
|
|
require.Equal(t, strat, nodeEvent.Node.DrainStrategy)
|
|
}
|
|
|
|
func Test_eventsFromChanges_ServiceRegistration(t *testing.T) {
|
|
ci.Parallel(t)
|
|
testState := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer testState.StopEventBroker()
|
|
|
|
// Generate test service registration.
|
|
service := mock.ServiceRegistrations()[0]
|
|
|
|
// Upsert a service registration.
|
|
writeTxn := testState.db.WriteTxn(10)
|
|
updated, err := testState.upsertServiceRegistrationTxn(10, writeTxn, service)
|
|
require.True(t, updated)
|
|
require.NoError(t, err)
|
|
writeTxn.Txn.Commit()
|
|
|
|
// Pull the events from the stream.
|
|
registerChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ServiceRegistrationUpsertRequestType}
|
|
receivedChange := eventsFromChanges(writeTxn, registerChange)
|
|
|
|
// Check the event, and it's payload are what we are expecting.
|
|
require.Len(t, receivedChange.Events, 1)
|
|
require.Equal(t, structs.TopicService, receivedChange.Events[0].Topic)
|
|
require.Equal(t, structs.TypeServiceRegistration, receivedChange.Events[0].Type)
|
|
require.Equal(t, uint64(10), receivedChange.Events[0].Index)
|
|
|
|
eventPayload := receivedChange.Events[0].Payload.(*structs.ServiceRegistrationStreamEvent)
|
|
require.Equal(t, service, eventPayload.Service)
|
|
|
|
// Delete the previously upserted service registration.
|
|
deleteTxn := testState.db.WriteTxn(20)
|
|
require.NoError(t, testState.deleteServiceRegistrationByIDTxn(uint64(20), deleteTxn, service.Namespace, service.ID))
|
|
writeTxn.Txn.Commit()
|
|
|
|
// Pull the events from the stream.
|
|
deregisterChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ServiceRegistrationDeleteByIDRequestType}
|
|
receivedDeleteChange := eventsFromChanges(deleteTxn, deregisterChange)
|
|
|
|
// Check the event, and it's payload are what we are expecting.
|
|
require.Len(t, receivedDeleteChange.Events, 1)
|
|
require.Equal(t, structs.TopicService, receivedDeleteChange.Events[0].Topic)
|
|
require.Equal(t, structs.TypeServiceDeregistration, receivedDeleteChange.Events[0].Type)
|
|
require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index)
|
|
|
|
eventPayload = receivedChange.Events[0].Payload.(*structs.ServiceRegistrationStreamEvent)
|
|
require.Equal(t, service, eventPayload.Service)
|
|
}
|
|
|
|
func Test_eventsFromChanges_ACLRole(t *testing.T) {
|
|
ci.Parallel(t)
|
|
testState := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer testState.StopEventBroker()
|
|
|
|
// Generate a test ACL role.
|
|
aclRole := mock.ACLRole()
|
|
|
|
// Upsert the role into state, skipping the checks perform to ensure the
|
|
// linked policies exist.
|
|
writeTxn := testState.db.WriteTxn(10)
|
|
updated, err := testState.upsertACLRoleTxn(10, writeTxn, aclRole, true)
|
|
require.True(t, updated)
|
|
require.NoError(t, err)
|
|
writeTxn.Txn.Commit()
|
|
|
|
// Pull the events from the stream.
|
|
upsertChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ACLRolesUpsertRequestType}
|
|
receivedChange := eventsFromChanges(writeTxn, upsertChange)
|
|
|
|
// Check the event, and it's payload are what we are expecting.
|
|
require.Len(t, receivedChange.Events, 1)
|
|
require.Equal(t, structs.TopicACLRole, receivedChange.Events[0].Topic)
|
|
require.Equal(t, aclRole.ID, receivedChange.Events[0].Key)
|
|
require.Equal(t, aclRole.Name, receivedChange.Events[0].FilterKeys[0])
|
|
require.Equal(t, structs.TypeACLRoleUpserted, receivedChange.Events[0].Type)
|
|
require.Equal(t, uint64(10), receivedChange.Events[0].Index)
|
|
|
|
eventPayload := receivedChange.Events[0].Payload.(*structs.ACLRoleStreamEvent)
|
|
require.Equal(t, aclRole, eventPayload.ACLRole)
|
|
|
|
// Delete the previously upserted ACL role.
|
|
deleteTxn := testState.db.WriteTxn(20)
|
|
require.NoError(t, testState.deleteACLRoleByIDTxn(deleteTxn, aclRole.ID))
|
|
require.NoError(t, deleteTxn.Insert(tableIndex, &IndexEntry{TableACLRoles, 20}))
|
|
deleteTxn.Txn.Commit()
|
|
|
|
// Pull the events from the stream.
|
|
deleteChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ACLRolesDeleteByIDRequestType}
|
|
receivedDeleteChange := eventsFromChanges(deleteTxn, deleteChange)
|
|
|
|
// Check the event, and it's payload are what we are expecting.
|
|
require.Len(t, receivedDeleteChange.Events, 1)
|
|
require.Equal(t, structs.TopicACLRole, receivedDeleteChange.Events[0].Topic)
|
|
require.Equal(t, aclRole.ID, receivedDeleteChange.Events[0].Key)
|
|
require.Equal(t, aclRole.Name, receivedDeleteChange.Events[0].FilterKeys[0])
|
|
require.Equal(t, structs.TypeACLRoleDeleted, receivedDeleteChange.Events[0].Type)
|
|
require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index)
|
|
|
|
eventPayload = receivedChange.Events[0].Payload.(*structs.ACLRoleStreamEvent)
|
|
require.Equal(t, aclRole, eventPayload.ACLRole)
|
|
}
|
|
|
|
func Test_eventsFromChanges_ACLAuthMethod(t *testing.T) {
|
|
ci.Parallel(t)
|
|
testState := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer testState.StopEventBroker()
|
|
|
|
// Generate a test ACL auth method
|
|
authMethod := mock.ACLOIDCAuthMethod()
|
|
|
|
// Upsert the auth method straight into state
|
|
writeTxn := testState.db.WriteTxn(10)
|
|
updated, err := testState.upsertACLAuthMethodTxn(10, writeTxn, authMethod)
|
|
must.True(t, updated)
|
|
must.NoError(t, err)
|
|
writeTxn.Txn.Commit()
|
|
|
|
// Pull the events from the stream.
|
|
upsertChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ACLAuthMethodsUpsertRequestType}
|
|
receivedChange := eventsFromChanges(writeTxn, upsertChange)
|
|
must.NotNil(t, receivedChange)
|
|
|
|
// Check the event, and its payload are what we are expecting.
|
|
must.Len(t, 1, receivedChange.Events)
|
|
must.Eq(t, structs.TopicACLAuthMethod, receivedChange.Events[0].Topic)
|
|
must.Eq(t, authMethod.Name, receivedChange.Events[0].Key)
|
|
must.Eq(t, structs.TypeACLAuthMethodUpserted, receivedChange.Events[0].Type)
|
|
must.Eq(t, uint64(10), receivedChange.Events[0].Index)
|
|
|
|
eventPayload := receivedChange.Events[0].Payload.(*structs.ACLAuthMethodEvent)
|
|
must.Eq(t, authMethod, eventPayload.AuthMethod)
|
|
|
|
// Delete the previously upserted auth method
|
|
deleteTxn := testState.db.WriteTxn(20)
|
|
must.NoError(t, testState.deleteACLAuthMethodTxn(deleteTxn, authMethod.Name))
|
|
must.NoError(t, deleteTxn.Insert(tableIndex, &IndexEntry{TableACLAuthMethods, 20}))
|
|
deleteTxn.Txn.Commit()
|
|
|
|
// Pull the events from the stream.
|
|
deleteChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ACLAuthMethodsDeleteRequestType}
|
|
receivedDeleteChange := eventsFromChanges(deleteTxn, deleteChange)
|
|
must.NotNil(t, receivedDeleteChange)
|
|
|
|
// Check the event, and its payload are what we are expecting.
|
|
must.Len(t, 1, receivedDeleteChange.Events)
|
|
must.Eq(t, structs.TopicACLAuthMethod, receivedDeleteChange.Events[0].Topic)
|
|
must.Eq(t, authMethod.Name, receivedDeleteChange.Events[0].Key)
|
|
must.Eq(t, structs.TypeACLAuthMethodDeleted, receivedDeleteChange.Events[0].Type)
|
|
must.Eq(t, uint64(20), receivedDeleteChange.Events[0].Index)
|
|
|
|
eventPayload = receivedChange.Events[0].Payload.(*structs.ACLAuthMethodEvent)
|
|
must.Eq(t, authMethod, eventPayload.AuthMethod)
|
|
}
|
|
|
|
func Test_eventsFromChanges_ACLBindingRule(t *testing.T) {
|
|
ci.Parallel(t)
|
|
testState := TestStateStoreCfg(t, TestStateStorePublisher(t))
|
|
defer testState.StopEventBroker()
|
|
|
|
// Generate a test ACL binding rule.
|
|
bindingRule := mock.ACLBindingRule()
|
|
|
|
// Upsert the binding rule straight into state.
|
|
writeTxn := testState.db.WriteTxn(10)
|
|
updated, err := testState.upsertACLBindingRuleTxn(10, writeTxn, bindingRule, true)
|
|
must.True(t, updated)
|
|
must.NoError(t, err)
|
|
writeTxn.Txn.Commit()
|
|
|
|
// Pull the events from the stream.
|
|
upsertChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ACLBindingRulesUpsertRequestType}
|
|
receivedChange := eventsFromChanges(writeTxn, upsertChange)
|
|
must.NotNil(t, receivedChange)
|
|
|
|
// Check the event, and its payload are what we are expecting.
|
|
must.Len(t, 1, receivedChange.Events)
|
|
must.Eq(t, structs.TopicACLBindingRule, receivedChange.Events[0].Topic)
|
|
must.Eq(t, bindingRule.ID, receivedChange.Events[0].Key)
|
|
must.SliceContainsAll(t, []string{bindingRule.AuthMethod}, receivedChange.Events[0].FilterKeys)
|
|
must.Eq(t, structs.TypeACLBindingRuleUpserted, receivedChange.Events[0].Type)
|
|
must.Eq(t, 10, receivedChange.Events[0].Index)
|
|
|
|
must.Eq(t, bindingRule, receivedChange.Events[0].Payload.(*structs.ACLBindingRuleEvent).ACLBindingRule)
|
|
|
|
// Delete the previously upserted binding rule.
|
|
deleteTxn := testState.db.WriteTxn(20)
|
|
must.NoError(t, testState.deleteACLBindingRuleTxn(deleteTxn, bindingRule.ID))
|
|
must.NoError(t, deleteTxn.Insert(tableIndex, &IndexEntry{TableACLBindingRules, 20}))
|
|
deleteTxn.Txn.Commit()
|
|
|
|
// Pull the events from the stream.
|
|
deleteChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ACLBindingRulesDeleteRequestType}
|
|
receivedDeleteChange := eventsFromChanges(deleteTxn, deleteChange)
|
|
must.NotNil(t, receivedDeleteChange)
|
|
|
|
// Check the event, and its payload are what we are expecting.
|
|
must.Len(t, 1, receivedDeleteChange.Events)
|
|
must.Eq(t, structs.TopicACLBindingRule, receivedDeleteChange.Events[0].Topic)
|
|
must.Eq(t, bindingRule.ID, receivedDeleteChange.Events[0].Key)
|
|
must.SliceContainsAll(t, []string{bindingRule.AuthMethod}, receivedDeleteChange.Events[0].FilterKeys)
|
|
must.Eq(t, structs.TypeACLBindingRuleDeleted, receivedDeleteChange.Events[0].Type)
|
|
must.Eq(t, uint64(20), receivedDeleteChange.Events[0].Index)
|
|
|
|
must.Eq(t, bindingRule, receivedDeleteChange.Events[0].Payload.(*structs.ACLBindingRuleEvent).ACLBindingRule)
|
|
}
|
|
|
|
func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
|
|
t.Helper()
|
|
|
|
wantPayload := want.Payload.(*structs.NodeStreamEvent)
|
|
gotPayload := got.Payload.(*structs.NodeStreamEvent)
|
|
|
|
// Check payload equality for the fields that we can easily control
|
|
require.Equal(t, wantPayload.Node.Status, gotPayload.Node.Status)
|
|
require.Equal(t, wantPayload.Node.ID, gotPayload.Node.ID)
|
|
require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events)
|
|
}
|
|
|
|
func requireNodeDeregistrationEventEqual(t *testing.T, want, got structs.Event) {
|
|
t.Helper()
|
|
|
|
wantPayload := want.Payload.(*structs.NodeStreamEvent)
|
|
gotPayload := got.Payload.(*structs.NodeStreamEvent)
|
|
|
|
require.Equal(t, wantPayload.Node.ID, gotPayload.Node.ID)
|
|
require.NotEqual(t, wantPayload.Node.Events, gotPayload.Node.Events)
|
|
}
|
|
|
|
func requireNodeEventEqual(t *testing.T, want, got structs.Event) {
|
|
gotPayload := got.Payload.(*structs.NodeStreamEvent)
|
|
|
|
require.Len(t, gotPayload.Node.Events, 3)
|
|
}
|
|
|
|
type nodeOpts func(n *structs.Node)
|
|
|
|
func nodeNotReady(n *structs.Node) {
|
|
n.Status = structs.NodeStatusInit
|
|
}
|
|
|
|
func nodeIDTwo(n *structs.Node) {
|
|
n.ID = testNodeIDTwo()
|
|
}
|
|
|
|
func testNode(opts ...nodeOpts) *structs.Node {
|
|
n := mock.Node()
|
|
n.ID = testNodeID()
|
|
|
|
n.SecretID = "ab9812d3-6a21-40d3-973d-d9d2174a23ee"
|
|
|
|
for _, opt := range opts {
|
|
opt(n)
|
|
}
|
|
return n
|
|
}
|
|
|
|
func testNodeID() string {
|
|
return "9d5741c1-3899-498a-98dd-eb3c05665863"
|
|
}
|
|
|
|
func testNodeIDTwo() string {
|
|
return "694ff31d-8c59-4030-ac83-e15692560c8d"
|
|
}
|