node pools: add event stream support (#17412)

This commit is contained in:
Luiz Aoqui 2023-06-06 10:14:47 -04:00 committed by GitHub
parent 7c7f2d00bb
commit aa1b33d157
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 218 additions and 1 deletions

View File

@ -19,6 +19,7 @@ const (
TopicAllocation Topic = "Allocation"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicNodePool Topic = "NodePool"
TopicService Topic = "Service"
TopicAll Topic = "*"
)
@ -99,6 +100,16 @@ func (e *Event) Node() (*Node, error) {
return out.Node, nil
}
// NodePool returns a NodePool struct from a given event payload. If the Event
// Topic is NodePool this will return a valid NodePool.
func (e *Event) NodePool() (*NodePool, error) {
out, err := e.decodePayload()
if err != nil {
return nil, err
}
return out.NodePool, nil
}
// Service returns a ServiceRegistration struct from a given event payload. If
// the Event Topic is Service this will return a valid ServiceRegistration.
func (e *Event) Service() (*ServiceRegistration, error) {
@ -115,6 +126,7 @@ type eventPayload struct {
Evaluation *Evaluation `mapstructure:"Evaluation"`
Job *Job `mapstructure:"Job"`
Node *Node `mapstructure:"Node"`
NodePool *NodePool `mapstructure:"NodePool"`
Service *ServiceRegistration `mapstructure:"Service"`
}

View File

@ -41,6 +41,10 @@ func TestTopic_String(t *testing.T) {
inputTopic: TopicNode,
expectedOutput: "Node",
},
{
inputTopic: TopicNodePool,
expectedOutput: "NodePool",
},
{
inputTopic: TopicService,
expectedOutput: "Service",
@ -302,6 +306,19 @@ func TestEventStream_PayloadValueHelpers(t *testing.T) {
}, n)
},
},
{
desc: "node_pool",
input: []byte(`{"Topic":"NodePool","Payload":{"NodePool":{"Description":"prod pool","Name":"prod"}}}`),
expectFn: func(t *testing.T, event Event) {
must.Eq(t, TopicNodePool, event.Topic)
n, err := event.NodePool()
must.NoError(t, err)
must.Eq(t, &NodePool{
Name: "prod",
Description: "prod pool",
}, n)
},
},
{
desc: "service",
input: []byte(`{"Topic": "Service", "Payload": {"Service":{"ID":"some-service-id","Namespace":"some-service-namespace-id","Datacenter":"us-east-1a"}}}`),

View File

@ -12,6 +12,8 @@ var MsgTypeEvents = map[structs.MessageType]string{
structs.NodeRegisterRequestType: structs.TypeNodeRegistration,
structs.NodeDeregisterRequestType: structs.TypeNodeDeregistration,
structs.UpsertNodeEventsType: structs.TypeNodeEvent,
structs.NodePoolUpsertRequestType: structs.TypeNodePoolUpserted,
structs.NodePoolDeleteRequestType: structs.TypeNodePoolDeleted,
structs.EvalUpdateRequestType: structs.TypeEvalUpdated,
structs.AllocClientUpdateRequestType: structs.TypeAllocationUpdated,
structs.JobRegisterRequestType: structs.TypeJobRegistered,
@ -137,6 +139,18 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Node: before,
},
}, true
case TableNodePools:
before, ok := change.Before.(*structs.NodePool)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicNodePool,
Key: before.Name,
Payload: &structs.NodePoolEvent{
NodePool: before,
},
}, true
case TableServiceRegistrations:
before, ok := change.Before.(*structs.ServiceRegistration)
if !ok {
@ -288,6 +302,18 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Node: after,
},
}, true
case TableNodePools:
after, ok := change.After.(*structs.NodePool)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicNodePool,
Key: after.Name,
Payload: &structs.NodePoolEvent{
NodePool: after,
},
}, true
case "deployment":
after, ok := change.After.(*structs.Deployment)
if !ok {

View File

@ -390,6 +390,62 @@ func TestEventsFromChanges_NodeUpdateStatusRequest(t *testing.T) {
require.Equal(t, structs.NodeStatusDown, event.Node.Status)
}
func TestEventsFromChanges_NodePoolUpsertRequestType(t *testing.T) {
ci.Parallel(t)
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
// Create test node pool.
pool := mock.NodePool()
err := s.UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
must.NoError(t, err)
// Update test node pool.
updated := pool.Copy()
updated.Meta["updated"] = "true"
err = s.UpsertNodePools(structs.NodePoolUpsertRequestType, 1001, []*structs.NodePool{updated})
must.NoError(t, err)
// Wait and verify update event.
events := WaitForEvents(t, s, 1001, 1, 1*time.Second)
must.Len(t, 1, events)
e := events[0]
must.Eq(t, structs.TopicNodePool, e.Topic)
must.Eq(t, structs.TypeNodePoolUpserted, e.Type)
must.Eq(t, pool.Name, e.Key)
payload := e.Payload.(*structs.NodePoolEvent)
must.Eq(t, updated, payload.NodePool)
}
func TestEventsFromChanges_NodePoolDeleteRequestType(t *testing.T) {
ci.Parallel(t)
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()
// Create test node pool.
pool := mock.NodePool()
err := s.UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool})
must.NoError(t, err)
// Delete test node pool.
err = s.DeleteNodePools(structs.NodePoolDeleteRequestType, 1001, []string{pool.Name})
must.NoError(t, err)
// Wait and verify delete event.
events := WaitForEvents(t, s, 1001, 1, 1*time.Second)
must.Len(t, 1, events)
e := events[0]
must.Eq(t, structs.TopicNodePool, e.Topic)
must.Eq(t, structs.TypeNodePoolDeleted, e.Type)
must.Eq(t, pool.Name, e.Key)
payload := e.Payload.(*structs.NodePoolEvent)
must.Eq(t, pool, payload.NodePool)
}
func TestEventsFromChanges_EvalUpdateRequestType(t *testing.T) {
ci.Parallel(t)
s := TestStateStoreCfg(t, TestStateStorePublisher(t))

View File

@ -24,7 +24,7 @@ func (s *StateStore) nodePoolInit() error {
}
return s.UpsertNodePools(
structs.NodePoolUpsertRequestType,
structs.SystemInitializationType,
1,
[]*structs.NodePool{allNodePool, defaultNodePool},
)

View File

@ -367,6 +367,12 @@ func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool {
if ok := aclObj.AllowNodeRead(); !ok {
return false
}
case structs.TopicNodePool:
// Require management token for node pools since we can't filter
// out node pools the token doesn't have access to.
if ok := aclObj.IsManagement(); !ok {
return false
}
default:
if ok := aclObj.IsManagement(); !ok {
return false

View File

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
@ -989,6 +990,88 @@ func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) {
}
}
func TestEventBroker_NodePool_ACL(t *testing.T) {
ci.Parallel(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
testCases := []struct {
name string
token *structs.ACLToken
policy *structs.ACLPolicy
expectedErr string
}{
{
name: "management token",
token: &structs.ACLToken{
AccessorID: uuid.Generate(),
SecretID: uuid.Generate(),
Type: structs.ACLManagementToken,
},
},
{
name: "client token",
token: &structs.ACLToken{
AccessorID: uuid.Generate(),
SecretID: uuid.Generate(),
Type: structs.ACLClientToken,
},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "node pool read",
token: &structs.ACLToken{
AccessorID: uuid.Generate(),
SecretID: uuid.Generate(),
Type: structs.ACLClientToken,
Policies: []string{"node-pool-read"},
},
policy: &structs.ACLPolicy{
Name: "node-pool-read",
Rules: `node_pool "*" { policy = "read" }`,
},
expectedErr: structs.ErrPermissionDenied.Error(),
},
{
name: "node pool write",
token: &structs.ACLToken{
AccessorID: uuid.Generate(),
SecretID: uuid.Generate(),
Type: structs.ACLClientToken,
Policies: []string{"node-pool-write"},
},
policy: &structs.ACLPolicy{
Name: "node-pool-write",
Rules: `node_pool "*" { policy = "write" }`,
},
expectedErr: structs.ErrPermissionDenied.Error(),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tokenProvider := &fakeACLTokenProvider{token: tc.token, policy: tc.policy}
aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider}
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
must.NoError(t, err)
_, _, err = publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{structs.TopicNodePool: {"*"}},
Token: tc.token.SecretID,
})
if tc.expectedErr != "" {
must.ErrorContains(t, err, tc.expectedErr)
} else {
must.NoError(t, err)
}
})
}
}
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
go func() {

View File

@ -24,6 +24,7 @@ const (
TopicAllocation Topic = "Allocation"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicNodePool Topic = "NodePool"
TopicACLPolicy Topic = "ACLPolicy"
TopicACLToken Topic = "ACLToken"
TopicACLRole Topic = "ACLRole"
@ -37,6 +38,8 @@ const (
TypeNodeEligibilityUpdate = "NodeEligibility"
TypeNodeDrain = "NodeDrain"
TypeNodeEvent = "NodeStreamEvent"
TypeNodePoolUpserted = "NodePoolUpserted"
TypeNodePoolDeleted = "NodePoolDeleted"
TypeDeploymentUpdate = "DeploymentStatusUpdate"
TypeDeploymentPromotion = "DeploymentPromotion"
TypeDeploymentAllocHealth = "DeploymentAllocHealth"
@ -133,6 +136,11 @@ type NodeStreamEvent struct {
Node *Node
}
// NodePoolEvent holds a newly updated NodePool.
type NodePoolEvent struct {
NodePool *NodePool
}
type ACLTokenEvent struct {
ACLToken *ACLToken
secretID string

View File

@ -134,6 +134,11 @@ const (
)
const (
// SystemInitializationType is used for messages that initialize parts of
// the system, such as the state store. These messages are not included in
// the event stream.
SystemInitializationType MessageType = 127
// IgnoreUnknownTypeFlag is set along with a MessageType
// to indicate that the message type can be safely ignored
// if it is not recognized. This is for future proofing, so

View File

@ -39,6 +39,7 @@ by default, requiring a management token.
| `Deployment` | `namespace:read-job` |
| `Evaluation` | `namespace:read-job` |
| `Node` | `node:read` |
| `NodePool` | `management` |
| `Service` | `namespace:read-job` |
### Parameters
@ -75,6 +76,7 @@ by default, requiring a management token.
| Deployment | Deployment |
| Node | Node |
| NodeDrain | Node |
| NodePool | NodePool |
| Service | Service Registrations |
### Event Types
@ -102,6 +104,8 @@ by default, requiring a management token.
| NodeEligibility |
| NodeDrain |
| NodeEvent |
| NodePoolUpserted |
| NodePoolDeleted |
| PlanResult |
| ServiceRegistration |
| ServiceDeregistration |