diff --git a/api/event_stream.go b/api/event_stream.go index 60e08fbe1..7721d15cc 100644 --- a/api/event_stream.go +++ b/api/event_stream.go @@ -19,6 +19,7 @@ const ( TopicAllocation Topic = "Allocation" TopicJob Topic = "Job" TopicNode Topic = "Node" + TopicNodePool Topic = "NodePool" TopicService Topic = "Service" TopicAll Topic = "*" ) @@ -99,6 +100,16 @@ func (e *Event) Node() (*Node, error) { return out.Node, nil } +// NodePool returns a NodePool struct from a given event payload. If the Event +// Topic is NodePool this will return a valid NodePool. +func (e *Event) NodePool() (*NodePool, error) { + out, err := e.decodePayload() + if err != nil { + return nil, err + } + return out.NodePool, nil +} + // Service returns a ServiceRegistration struct from a given event payload. If // the Event Topic is Service this will return a valid ServiceRegistration. func (e *Event) Service() (*ServiceRegistration, error) { @@ -115,6 +126,7 @@ type eventPayload struct { Evaluation *Evaluation `mapstructure:"Evaluation"` Job *Job `mapstructure:"Job"` Node *Node `mapstructure:"Node"` + NodePool *NodePool `mapstructure:"NodePool"` Service *ServiceRegistration `mapstructure:"Service"` } diff --git a/api/event_stream_test.go b/api/event_stream_test.go index a1b6c1baf..e9693517c 100644 --- a/api/event_stream_test.go +++ b/api/event_stream_test.go @@ -41,6 +41,10 @@ func TestTopic_String(t *testing.T) { inputTopic: TopicNode, expectedOutput: "Node", }, + { + inputTopic: TopicNodePool, + expectedOutput: "NodePool", + }, { inputTopic: TopicService, expectedOutput: "Service", @@ -302,6 +306,19 @@ func TestEventStream_PayloadValueHelpers(t *testing.T) { }, n) }, }, + { + desc: "node_pool", + input: []byte(`{"Topic":"NodePool","Payload":{"NodePool":{"Description":"prod pool","Name":"prod"}}}`), + expectFn: func(t *testing.T, event Event) { + must.Eq(t, TopicNodePool, event.Topic) + n, err := event.NodePool() + must.NoError(t, err) + must.Eq(t, &NodePool{ + Name: "prod", + Description: "prod pool", + }, n) + }, + }, { desc: "service", input: []byte(`{"Topic": "Service", "Payload": {"Service":{"ID":"some-service-id","Namespace":"some-service-namespace-id","Datacenter":"us-east-1a"}}}`), diff --git a/nomad/state/events.go b/nomad/state/events.go index 9823a8941..410aa1845 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -12,6 +12,8 @@ var MsgTypeEvents = map[structs.MessageType]string{ structs.NodeRegisterRequestType: structs.TypeNodeRegistration, structs.NodeDeregisterRequestType: structs.TypeNodeDeregistration, structs.UpsertNodeEventsType: structs.TypeNodeEvent, + structs.NodePoolUpsertRequestType: structs.TypeNodePoolUpserted, + structs.NodePoolDeleteRequestType: structs.TypeNodePoolDeleted, structs.EvalUpdateRequestType: structs.TypeEvalUpdated, structs.AllocClientUpdateRequestType: structs.TypeAllocationUpdated, structs.JobRegisterRequestType: structs.TypeJobRegistered, @@ -137,6 +139,18 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Node: before, }, }, true + case TableNodePools: + before, ok := change.Before.(*structs.NodePool) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicNodePool, + Key: before.Name, + Payload: &structs.NodePoolEvent{ + NodePool: before, + }, + }, true case TableServiceRegistrations: before, ok := change.Before.(*structs.ServiceRegistration) if !ok { @@ -288,6 +302,18 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Node: after, }, }, true + case TableNodePools: + after, ok := change.After.(*structs.NodePool) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicNodePool, + Key: after.Name, + Payload: &structs.NodePoolEvent{ + NodePool: after, + }, + }, true case "deployment": after, ok := change.After.(*structs.Deployment) if !ok { diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index c9d88322d..9a24e14cf 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -390,6 +390,62 @@ func TestEventsFromChanges_NodeUpdateStatusRequest(t *testing.T) { require.Equal(t, structs.NodeStatusDown, event.Node.Status) } +func TestEventsFromChanges_NodePoolUpsertRequestType(t *testing.T) { + ci.Parallel(t) + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // Create test node pool. + pool := mock.NodePool() + err := s.UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool}) + must.NoError(t, err) + + // Update test node pool. + updated := pool.Copy() + updated.Meta["updated"] = "true" + err = s.UpsertNodePools(structs.NodePoolUpsertRequestType, 1001, []*structs.NodePool{updated}) + must.NoError(t, err) + + // Wait and verify update event. + events := WaitForEvents(t, s, 1001, 1, 1*time.Second) + must.Len(t, 1, events) + + e := events[0] + must.Eq(t, structs.TopicNodePool, e.Topic) + must.Eq(t, structs.TypeNodePoolUpserted, e.Type) + must.Eq(t, pool.Name, e.Key) + + payload := e.Payload.(*structs.NodePoolEvent) + must.Eq(t, updated, payload.NodePool) +} + +func TestEventsFromChanges_NodePoolDeleteRequestType(t *testing.T) { + ci.Parallel(t) + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // Create test node pool. + pool := mock.NodePool() + err := s.UpsertNodePools(structs.MsgTypeTestSetup, 1000, []*structs.NodePool{pool}) + must.NoError(t, err) + + // Delete test node pool. + err = s.DeleteNodePools(structs.NodePoolDeleteRequestType, 1001, []string{pool.Name}) + must.NoError(t, err) + + // Wait and verify delete event. + events := WaitForEvents(t, s, 1001, 1, 1*time.Second) + must.Len(t, 1, events) + + e := events[0] + must.Eq(t, structs.TopicNodePool, e.Topic) + must.Eq(t, structs.TypeNodePoolDeleted, e.Type) + must.Eq(t, pool.Name, e.Key) + + payload := e.Payload.(*structs.NodePoolEvent) + must.Eq(t, pool, payload.NodePool) +} + func TestEventsFromChanges_EvalUpdateRequestType(t *testing.T) { ci.Parallel(t) s := TestStateStoreCfg(t, TestStateStorePublisher(t)) diff --git a/nomad/state/state_store_node_pools.go b/nomad/state/state_store_node_pools.go index ab004dd77..d4a466f32 100644 --- a/nomad/state/state_store_node_pools.go +++ b/nomad/state/state_store_node_pools.go @@ -24,7 +24,7 @@ func (s *StateStore) nodePoolInit() error { } return s.UpsertNodePools( - structs.NodePoolUpsertRequestType, + structs.SystemInitializationType, 1, []*structs.NodePool{allNodePool, defaultNodePool}, ) diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index 587f7b7ab..27e270577 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -367,6 +367,12 @@ func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool { if ok := aclObj.AllowNodeRead(); !ok { return false } + case structs.TopicNodePool: + // Require management token for node pools since we can't filter + // out node pools the token doesn't have access to. + if ok := aclObj.IsManagement(); !ok { + return false + } default: if ok := aclObj.IsManagement(); !ok { return false diff --git a/nomad/stream/event_broker_test.go b/nomad/stream/event_broker_test.go index d4cd174fa..aa08e1370 100644 --- a/nomad/stream/event_broker_test.go +++ b/nomad/stream/event_broker_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -989,6 +990,88 @@ func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) { } } +func TestEventBroker_NodePool_ACL(t *testing.T) { + ci.Parallel(t) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + testCases := []struct { + name string + token *structs.ACLToken + policy *structs.ACLPolicy + expectedErr string + }{ + { + name: "management token", + token: &structs.ACLToken{ + AccessorID: uuid.Generate(), + SecretID: uuid.Generate(), + Type: structs.ACLManagementToken, + }, + }, + { + name: "client token", + token: &structs.ACLToken{ + AccessorID: uuid.Generate(), + SecretID: uuid.Generate(), + Type: structs.ACLClientToken, + }, + expectedErr: structs.ErrPermissionDenied.Error(), + }, + { + name: "node pool read", + token: &structs.ACLToken{ + AccessorID: uuid.Generate(), + SecretID: uuid.Generate(), + Type: structs.ACLClientToken, + Policies: []string{"node-pool-read"}, + }, + policy: &structs.ACLPolicy{ + Name: "node-pool-read", + Rules: `node_pool "*" { policy = "read" }`, + }, + expectedErr: structs.ErrPermissionDenied.Error(), + }, + { + name: "node pool write", + token: &structs.ACLToken{ + AccessorID: uuid.Generate(), + SecretID: uuid.Generate(), + Type: structs.ACLClientToken, + Policies: []string{"node-pool-write"}, + }, + policy: &structs.ACLPolicy{ + Name: "node-pool-write", + Rules: `node_pool "*" { policy = "write" }`, + }, + expectedErr: structs.ErrPermissionDenied.Error(), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tokenProvider := &fakeACLTokenProvider{token: tc.token, policy: tc.policy} + aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider} + + publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{}) + must.NoError(t, err) + + _, _, err = publisher.SubscribeWithACLCheck(&SubscribeRequest{ + Topics: map[structs.Topic][]string{structs.TopicNodePool: {"*"}}, + Token: tc.token.SecretID, + }) + + if tc.expectedErr != "" { + must.ErrorContains(t, err, tc.expectedErr) + } else { + must.NoError(t, err) + } + }) + } + +} + func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { eventCh := make(chan subNextResult, 1) go func() { diff --git a/nomad/structs/event.go b/nomad/structs/event.go index 07b4e394f..2d729b644 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -24,6 +24,7 @@ const ( TopicAllocation Topic = "Allocation" TopicJob Topic = "Job" TopicNode Topic = "Node" + TopicNodePool Topic = "NodePool" TopicACLPolicy Topic = "ACLPolicy" TopicACLToken Topic = "ACLToken" TopicACLRole Topic = "ACLRole" @@ -37,6 +38,8 @@ const ( TypeNodeEligibilityUpdate = "NodeEligibility" TypeNodeDrain = "NodeDrain" TypeNodeEvent = "NodeStreamEvent" + TypeNodePoolUpserted = "NodePoolUpserted" + TypeNodePoolDeleted = "NodePoolDeleted" TypeDeploymentUpdate = "DeploymentStatusUpdate" TypeDeploymentPromotion = "DeploymentPromotion" TypeDeploymentAllocHealth = "DeploymentAllocHealth" @@ -133,6 +136,11 @@ type NodeStreamEvent struct { Node *Node } +// NodePoolEvent holds a newly updated NodePool. +type NodePoolEvent struct { + NodePool *NodePool +} + type ACLTokenEvent struct { ACLToken *ACLToken secretID string diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 031415394..33812e35c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -134,6 +134,11 @@ const ( ) const ( + // SystemInitializationType is used for messages that initialize parts of + // the system, such as the state store. These messages are not included in + // the event stream. + SystemInitializationType MessageType = 127 + // IgnoreUnknownTypeFlag is set along with a MessageType // to indicate that the message type can be safely ignored // if it is not recognized. This is for future proofing, so diff --git a/website/content/api-docs/events.mdx b/website/content/api-docs/events.mdx index aa826c6b4..f12866e4e 100644 --- a/website/content/api-docs/events.mdx +++ b/website/content/api-docs/events.mdx @@ -39,6 +39,7 @@ by default, requiring a management token. | `Deployment` | `namespace:read-job` | | `Evaluation` | `namespace:read-job` | | `Node` | `node:read` | +| `NodePool` | `management` | | `Service` | `namespace:read-job` | ### Parameters @@ -75,6 +76,7 @@ by default, requiring a management token. | Deployment | Deployment | | Node | Node | | NodeDrain | Node | +| NodePool | NodePool | | Service | Service Registrations | ### Event Types @@ -102,6 +104,8 @@ by default, requiring a management token. | NodeEligibility | | NodeDrain | | NodeEvent | +| NodePoolUpserted | +| NodePoolDeleted | | PlanResult | | ServiceRegistration | | ServiceDeregistration |