fsm: add FSM functionality for service registration endpoints.

This commit is contained in:
James Rasell 2022-03-03 11:24:29 +01:00
parent 8a23afdb56
commit 52283f057f
No known key found for this signature in database
GPG key ID: AA7D460F5C8377AA
2 changed files with 132 additions and 0 deletions

View file

@ -307,6 +307,12 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyOneTimeTokenDelete(msgType, buf[1:], log.Index)
case structs.OneTimeTokenExpireRequestType:
return n.applyOneTimeTokenExpire(msgType, buf[1:], log.Index)
case structs.ServiceRegistrationUpsertRequestType:
return n.applyUpsertServiceRegistrations(msgType, buf[1:], log.Index)
case structs.ServiceRegistrationDeleteByIDRequestType:
return n.applyDeleteServiceRegistrationByID(msgType, buf[1:], log.Index)
case structs.ServiceRegistrationDeleteByNodeIDRequestType:
return n.applyDeleteServiceRegistrationByNodeID(msgType, buf[1:], log.Index)
}
// Check enterprise only message types.
@ -1894,6 +1900,51 @@ func (n *nomadFSM) applyUpsertScalingEvent(buf []byte, index uint64) interface{}
return nil
}
func (n *nomadFSM) applyUpsertServiceRegistrations(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_upsert"}, time.Now())
var req structs.ServiceRegistrationUpsertRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertServiceRegistrations(msgType, index, req.Services); err != nil {
n.logger.Error("UpsertServiceRegistrations failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyDeleteServiceRegistrationByID(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_delete_id"}, time.Now())
var req structs.ServiceRegistrationDeleteByIDRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteServiceRegistrationByID(msgType, index, req.RequestNamespace(), req.ID); err != nil {
n.logger.Error("DeleteServiceRegistrationByID failed", "error", err)
return err
}
return nil
}
func (n *nomadFSM) applyDeleteServiceRegistrationByNodeID(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_service_registration_delete_node_id"}, time.Now())
var req structs.ServiceRegistrationDeleteByNodeIDRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteServiceRegistrationByNodeID(msgType, index, req.NodeID); err != nil {
n.logger.Error("DeleteServiceRegistrationByNodeID failed", "error", err)
return err
}
return nil
}
func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now())
// Register the nodes

View file

@ -3259,6 +3259,87 @@ func TestFSM_SnapshotRestore_Namespaces(t *testing.T) {
}
}
func TestFSM_UpsertServiceRegistrations(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
// Generate our test service registrations.
services := mock.ServiceRegistrations()
// Build and apply our message.
req := structs.ServiceRegistrationUpsertRequest{Services: services}
buf, err := structs.Encode(structs.ServiceRegistrationUpsertRequestType, req)
assert.Nil(t, err)
assert.Nil(t, fsm.Apply(makeLog(buf)))
// Check that both services are found within state.
ws := memdb.NewWatchSet()
out, err := fsm.State().GetServiceRegistrationByID(ws, services[0].Namespace, services[0].ID)
assert.Nil(t, err)
assert.NotNil(t, out)
out, err = fsm.State().GetServiceRegistrationByID(ws, services[1].Namespace, services[1].ID)
assert.Nil(t, err)
assert.NotNil(t, out)
}
func TestFSM_DeleteServiceRegistrationsByID(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
// Generate our test service registrations.
services := mock.ServiceRegistrations()
// Upsert the services.
assert.NoError(t, fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, uint64(10), services))
// Build and apply our message.
req := structs.ServiceRegistrationDeleteByIDRequest{ID: services[0].ID}
buf, err := structs.Encode(structs.ServiceRegistrationDeleteByIDRequestType, req)
assert.Nil(t, err)
assert.Nil(t, fsm.Apply(makeLog(buf)))
// Check that the service has been deleted, whilst the other is still
// available.
ws := memdb.NewWatchSet()
out, err := fsm.State().GetServiceRegistrationByID(ws, services[0].Namespace, services[0].ID)
assert.Nil(t, err)
assert.Nil(t, out)
out, err = fsm.State().GetServiceRegistrationByID(ws, services[1].Namespace, services[1].ID)
assert.Nil(t, err)
assert.NotNil(t, out)
}
func TestFSM_DeleteServiceRegistrationsByNodeID(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
// Generate our test service registrations. Set them both to have the same
// node ID.
services := mock.ServiceRegistrations()
services[1].NodeID = services[0].NodeID
// Upsert the services.
assert.NoError(t, fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, uint64(10), services))
// Build and apply our message.
req := structs.ServiceRegistrationDeleteByNodeIDRequest{NodeID: services[0].NodeID}
buf, err := structs.Encode(structs.ServiceRegistrationDeleteByNodeIDRequestType, req)
assert.Nil(t, err)
assert.Nil(t, fsm.Apply(makeLog(buf)))
// Check both services have been removed.
ws := memdb.NewWatchSet()
out, err := fsm.State().GetServiceRegistrationByID(ws, services[0].Namespace, services[0].ID)
assert.Nil(t, err)
assert.Nil(t, out)
out, err = fsm.State().GetServiceRegistrationByID(ws, services[1].Namespace, services[1].ID)
assert.Nil(t, err)
assert.Nil(t, out)
}
func TestFSM_ACLEvents(t *testing.T) {
t.Parallel()