2017-11-29 02:01:17 +00:00
|
|
|
package fsm
|
2013-12-06 23:43:07 +00:00
|
|
|
|
|
|
|
import (
|
2013-12-11 22:04:44 +00:00
|
|
|
"fmt"
|
2013-12-06 23:43:07 +00:00
|
|
|
"io"
|
2017-01-20 14:12:10 +00:00
|
|
|
"sync"
|
2013-12-17 19:13:19 +00:00
|
|
|
"time"
|
2014-10-09 19:23:32 +00:00
|
|
|
|
2020-01-28 23:50:41 +00:00
|
|
|
"github.com/hashicorp/go-hclog"
|
2019-07-24 21:06:39 +00:00
|
|
|
"github.com/hashicorp/go-raftchunking"
|
2014-10-20 17:21:31 +00:00
|
|
|
"github.com/hashicorp/raft"
|
2020-10-23 19:21:37 +00:00
|
|
|
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
|
|
|
|
|
2020-10-23 19:21:37 +00:00
|
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
2022-04-12 13:47:42 +00:00
|
|
|
"github.com/hashicorp/consul/agent/consul/stream"
|
2020-10-23 19:21:37 +00:00
|
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
|
|
"github.com/hashicorp/consul/logging"
|
2013-12-06 23:43:07 +00:00
|
|
|
)
|
|
|
|
|
2017-11-29 20:43:27 +00:00
|
|
|
// command is a command method on the FSM.
|
|
|
|
type command func(buf []byte, index uint64) interface{}
|
|
|
|
|
|
|
|
// unboundCommand is a command method on the FSM, not yet bound to an FSM
|
|
|
|
// instance.
|
|
|
|
type unboundCommand func(c *FSM, buf []byte, index uint64) interface{}
|
|
|
|
|
|
|
|
// commands is a map from message type to unbound command.
|
|
|
|
var commands map[structs.MessageType]unboundCommand
|
|
|
|
|
|
|
|
// registerCommand registers a new command with the FSM, which should be done
|
|
|
|
// at package init() time.
|
|
|
|
func registerCommand(msg structs.MessageType, fn unboundCommand) {
|
|
|
|
if commands == nil {
|
|
|
|
commands = make(map[structs.MessageType]unboundCommand)
|
|
|
|
}
|
|
|
|
if commands[msg] != nil {
|
|
|
|
panic(fmt.Errorf("Message %d is already registered", msg))
|
|
|
|
}
|
|
|
|
commands[msg] = fn
|
|
|
|
}
|
|
|
|
|
2017-11-29 02:01:17 +00:00
|
|
|
// FSM implements a finite state machine that is used
|
2013-12-11 01:00:48 +00:00
|
|
|
// along with Raft to provide strong consistency. We implement
|
|
|
|
// this outside the Server to avoid exposing this outside the package.
|
2017-11-29 02:01:17 +00:00
|
|
|
type FSM struct {
|
2020-10-23 19:21:37 +00:00
|
|
|
deps Deps
|
|
|
|
logger hclog.Logger
|
|
|
|
chunker *raftchunking.ChunkingFSM
|
2017-01-20 14:12:10 +00:00
|
|
|
|
2017-11-29 20:43:27 +00:00
|
|
|
// apply is built off the commands global and is used to route apply
|
|
|
|
// operations to their appropriate handlers.
|
|
|
|
apply map[structs.MessageType]command
|
|
|
|
|
2017-01-20 14:12:10 +00:00
|
|
|
// stateLock is only used to protect outside callers to State() from
|
|
|
|
// racing with Restore(), which is called by Raft (it puts in a totally
|
2017-01-25 17:45:25 +00:00
|
|
|
// new state store). Everything internal here is synchronized by the
|
|
|
|
// Raft side, so doesn't need to lock this.
|
2017-01-20 14:12:10 +00:00
|
|
|
stateLock sync.RWMutex
|
2017-04-21 00:46:29 +00:00
|
|
|
state *state.Store
|
2022-04-12 13:47:42 +00:00
|
|
|
|
|
|
|
publisher *stream.EventPublisher
|
2013-12-06 23:43:07 +00:00
|
|
|
}
|
|
|
|
|
2017-11-29 02:01:17 +00:00
|
|
|
// New is used to construct a new FSM with a blank state.
|
2020-10-23 19:21:37 +00:00
|
|
|
//
|
|
|
|
// Deprecated: use NewFromDeps.
|
2020-01-28 23:50:41 +00:00
|
|
|
func New(gc *state.TombstoneGC, logger hclog.Logger) (*FSM, error) {
|
2020-10-23 19:21:37 +00:00
|
|
|
newStateStore := func() *state.Store {
|
|
|
|
return state.NewStateStore(gc)
|
2020-01-28 23:50:41 +00:00
|
|
|
}
|
2020-10-23 19:21:37 +00:00
|
|
|
return NewFromDeps(Deps{Logger: logger, NewStateStore: newStateStore}), nil
|
|
|
|
}
|
2020-01-28 23:50:41 +00:00
|
|
|
|
2020-10-23 19:21:37 +00:00
|
|
|
// Deps are dependencies used to construct the FSM.
|
|
|
|
type Deps struct {
|
|
|
|
// Logger used to emit log messages
|
|
|
|
Logger hclog.Logger
|
|
|
|
// NewStateStore returns a state.Store which the FSM will use to make changes
|
|
|
|
// to the state.
|
|
|
|
// NewStateStore will be called once when the FSM is created and again any
|
|
|
|
// time Restore() is called.
|
|
|
|
NewStateStore func() *state.Store
|
2022-04-12 13:47:42 +00:00
|
|
|
|
|
|
|
Publisher *stream.EventPublisher
|
2020-10-23 19:21:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewFromDeps creates a new FSM from its dependencies.
|
|
|
|
func NewFromDeps(deps Deps) *FSM {
|
|
|
|
if deps.Logger == nil {
|
|
|
|
deps.Logger = hclog.New(&hclog.LoggerOptions{})
|
2015-09-20 08:36:39 +00:00
|
|
|
}
|
|
|
|
|
2017-11-29 02:01:17 +00:00
|
|
|
fsm := &FSM{
|
2020-10-23 19:21:37 +00:00
|
|
|
deps: deps,
|
|
|
|
logger: deps.Logger.Named(logging.FSM),
|
2020-01-28 23:50:41 +00:00
|
|
|
apply: make(map[structs.MessageType]command),
|
2020-10-23 19:21:37 +00:00
|
|
|
state: deps.NewStateStore(),
|
2013-12-11 01:00:48 +00:00
|
|
|
}
|
2017-11-29 20:43:27 +00:00
|
|
|
|
|
|
|
// Build out the apply dispatch table based on the registered commands.
|
|
|
|
for msg, fn := range commands {
|
|
|
|
thisFn := fn
|
|
|
|
fsm.apply[msg] = func(buf []byte, index uint64) interface{} {
|
|
|
|
return thisFn(fsm, buf, index)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-07-24 21:06:39 +00:00
|
|
|
fsm.chunker = raftchunking.NewChunkingFSM(fsm, nil)
|
2022-04-12 13:47:42 +00:00
|
|
|
|
|
|
|
// register the streaming snapshot handlers if an event publisher was provided.
|
|
|
|
fsm.registerStreamSnapshotHandlers()
|
|
|
|
|
2020-10-23 19:21:37 +00:00
|
|
|
return fsm
|
2013-12-11 01:00:48 +00:00
|
|
|
}
|
|
|
|
|
2023-02-08 16:50:22 +00:00
|
|
|
func (c *FSM) ChunkingFSM() raft.FSM {
|
|
|
|
// Wrap the chunker in a shim. This is not a ChunkingFSM any more but the only
|
|
|
|
// caller of this passes it directly to Raft as a raft.FSM.
|
|
|
|
return &logVerificationChunkingShim{chunker: c.chunker}
|
2019-07-24 21:06:39 +00:00
|
|
|
}
|
|
|
|
|
2015-10-13 05:21:39 +00:00
|
|
|
// State is used to return a handle to the current state
|
2017-11-29 02:01:17 +00:00
|
|
|
func (c *FSM) State() *state.Store {
|
2017-01-20 14:12:10 +00:00
|
|
|
c.stateLock.RLock()
|
|
|
|
defer c.stateLock.RUnlock()
|
2015-10-13 05:21:39 +00:00
|
|
|
return c.state
|
2015-09-20 08:36:39 +00:00
|
|
|
}
|
|
|
|
|
2017-11-29 02:01:17 +00:00
|
|
|
func (c *FSM) Apply(log *raft.Log) interface{} {
|
2014-01-04 01:15:09 +00:00
|
|
|
buf := log.Data
|
2015-05-06 02:44:21 +00:00
|
|
|
msgType := structs.MessageType(buf[0])
|
|
|
|
|
2023-02-08 16:50:22 +00:00
|
|
|
// This is tricky stuff. We no longer let the ChunkingFSM wrap us completely
|
|
|
|
// because Chunking FSM doesn't know how to handle raft log verification
|
|
|
|
// checkpoints properly. So instead we have to be extra careful to correctly
|
|
|
|
// call into the chunking FSM when we need it.
|
|
|
|
|
2015-05-06 02:44:21 +00:00
|
|
|
// Check if this message type should be ignored when unknown. This is
|
|
|
|
// used so that new commands can be added with developer control if older
|
|
|
|
// versions can safely ignore the command, or if they should crash.
|
|
|
|
ignoreUnknown := false
|
|
|
|
if msgType&structs.IgnoreUnknownTypeFlag == structs.IgnoreUnknownTypeFlag {
|
|
|
|
msgType &= ^structs.IgnoreUnknownTypeFlag
|
|
|
|
ignoreUnknown = true
|
|
|
|
}
|
|
|
|
|
2017-11-29 20:43:27 +00:00
|
|
|
// Apply based on the dispatch table, if possible.
|
2017-11-30 01:33:57 +00:00
|
|
|
if fn := c.apply[msgType]; fn != nil {
|
2017-11-29 20:43:27 +00:00
|
|
|
return fn(buf[1:], log.Index)
|
2014-12-15 23:04:21 +00:00
|
|
|
}
|
|
|
|
|
2017-11-29 20:43:27 +00:00
|
|
|
// Otherwise, see if it's safe to ignore. If not, we have to panic so
|
|
|
|
// that we crash and our state doesn't diverge.
|
|
|
|
if ignoreUnknown {
|
2020-01-28 23:50:41 +00:00
|
|
|
c.logger.Warn("ignoring unknown message type, upgrade to newer version", "type", msgType)
|
2017-11-29 20:43:27 +00:00
|
|
|
return nil
|
2017-02-24 21:08:49 +00:00
|
|
|
}
|
2017-11-29 20:43:27 +00:00
|
|
|
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
2017-02-24 21:08:49 +00:00
|
|
|
}
|
|
|
|
|
2017-11-29 02:01:17 +00:00
|
|
|
func (c *FSM) Snapshot() (raft.FSMSnapshot, error) {
|
2013-12-17 19:13:19 +00:00
|
|
|
defer func(start time.Time) {
|
2020-01-28 23:50:41 +00:00
|
|
|
c.logger.Info("snapshot created", "duration", time.Since(start).String())
|
2013-12-17 19:13:19 +00:00
|
|
|
}(time.Now())
|
|
|
|
|
2019-07-24 21:06:39 +00:00
|
|
|
chunkState, err := c.chunker.CurrentState()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return &snapshot{
|
|
|
|
state: c.state.Snapshot(),
|
|
|
|
chunkState: chunkState,
|
|
|
|
}, nil
|
2013-12-06 23:43:07 +00:00
|
|
|
}
|
|
|
|
|
2017-01-26 03:42:34 +00:00
|
|
|
// Restore streams in the snapshot and replaces the current state store with a
|
|
|
|
// new one based on the snapshot if all goes OK during the restore.
|
2017-11-29 02:01:17 +00:00
|
|
|
func (c *FSM) Restore(old io.ReadCloser) error {
|
2013-12-11 02:19:15 +00:00
|
|
|
defer old.Close()
|
|
|
|
|
2020-10-23 19:21:37 +00:00
|
|
|
stateNew := c.deps.NewStateStore()
|
2017-01-20 14:12:10 +00:00
|
|
|
|
2015-10-20 06:06:59 +00:00
|
|
|
// Set up a new restore transaction
|
2017-01-26 03:42:34 +00:00
|
|
|
restore := stateNew.Restore()
|
2015-10-20 06:06:59 +00:00
|
|
|
defer restore.Abort()
|
|
|
|
|
2020-10-09 19:57:29 +00:00
|
|
|
handler := func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error {
|
2019-07-24 21:06:39 +00:00
|
|
|
switch {
|
|
|
|
case msg == structs.ChunkingStateType:
|
|
|
|
chunkState := &raftchunking.State{
|
|
|
|
ChunkMap: make(raftchunking.ChunkMap),
|
|
|
|
}
|
|
|
|
if err := dec.Decode(chunkState); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := c.chunker.RestoreState(chunkState); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
case restorers[msg] != nil:
|
|
|
|
fn := restorers[msg]
|
2020-10-09 19:57:29 +00:00
|
|
|
if err := fn(header, restore, dec); err != nil {
|
2013-12-16 18:47:14 +00:00
|
|
|
return err
|
|
|
|
}
|
2019-07-24 21:06:39 +00:00
|
|
|
default:
|
2022-10-24 15:40:26 +00:00
|
|
|
if msg >= 64 {
|
|
|
|
return fmt.Errorf("msg type <%d> is a Consul Enterprise log entry. Consul OSS cannot restore it", msg)
|
|
|
|
} else {
|
|
|
|
return fmt.Errorf("Unrecognized msg type %d", msg)
|
|
|
|
}
|
2013-12-16 18:47:14 +00:00
|
|
|
}
|
2020-10-09 19:57:29 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
if err := ReadSnapshot(old, handler); err != nil {
|
|
|
|
return err
|
2013-12-16 18:47:14 +00:00
|
|
|
}
|
2020-10-09 19:57:29 +00:00
|
|
|
|
2020-06-02 20:34:56 +00:00
|
|
|
if err := restore.Commit(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-01-26 03:42:34 +00:00
|
|
|
|
|
|
|
// External code might be calling State(), so we need to synchronize
|
|
|
|
// here to make sure we swap in the new state store atomically.
|
|
|
|
c.stateLock.Lock()
|
|
|
|
stateOld := c.state
|
|
|
|
c.state = stateNew
|
2022-04-12 13:47:42 +00:00
|
|
|
|
|
|
|
// Tell the EventPublisher to cycle anything watching these topics. Replacement
|
|
|
|
// of the state store means that indexes could have gone backwards and data changed.
|
|
|
|
//
|
|
|
|
// This needs to happen while holding the state lock to ensure its not racey. If we
|
|
|
|
// did this outside of the locked section closer to where we abandon the old store
|
|
|
|
// then there would be a possibility for new streams to be opened that would get
|
|
|
|
// a snapshot from the cache sourced from old data but would be receiving events
|
|
|
|
// for new data. To prevent that inconsistency we refresh the topics while holding
|
|
|
|
// the lock which ensures that any subscriptions to topics for FSM generated events
|
|
|
|
if c.deps.Publisher != nil {
|
|
|
|
c.deps.Publisher.RefreshTopic(state.EventTopicServiceHealth)
|
|
|
|
c.deps.Publisher.RefreshTopic(state.EventTopicServiceHealthConnect)
|
|
|
|
c.deps.Publisher.RefreshTopic(state.EventTopicCARoots)
|
|
|
|
}
|
2017-01-26 03:42:34 +00:00
|
|
|
c.stateLock.Unlock()
|
|
|
|
|
|
|
|
// Signal that the old state store has been abandoned. This is required
|
|
|
|
// because we don't operate on it any more, we just throw it away, so
|
|
|
|
// blocking queries won't see any changes and need to be woken up.
|
|
|
|
stateOld.Abandon()
|
2022-04-12 13:47:42 +00:00
|
|
|
|
2013-12-06 23:43:07 +00:00
|
|
|
return nil
|
|
|
|
}
|
2020-10-09 19:57:29 +00:00
|
|
|
|
|
|
|
// ReadSnapshot decodes each message type and utilizes the handler function to
|
|
|
|
// process each message type individually
|
|
|
|
func ReadSnapshot(r io.Reader, handler func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error) error {
|
|
|
|
// Create a decoder
|
|
|
|
dec := codec.NewDecoder(r, structs.MsgpackHandle)
|
|
|
|
|
|
|
|
// Read in the header
|
|
|
|
var header SnapshotHeader
|
|
|
|
if err := dec.Decode(&header); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Populate the new state
|
|
|
|
msgType := make([]byte, 1)
|
|
|
|
for {
|
|
|
|
// Read the message type
|
|
|
|
_, err := r.Read(msgType)
|
|
|
|
if err == io.EOF {
|
|
|
|
return nil
|
|
|
|
} else if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Decode
|
|
|
|
msg := structs.MessageType(msgType[0])
|
|
|
|
|
|
|
|
if err := handler(&header, msg, dec); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-04-12 13:47:42 +00:00
|
|
|
|
|
|
|
func (c *FSM) registerStreamSnapshotHandlers() {
|
|
|
|
if c.deps.Publisher == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
err := c.deps.Publisher.RegisterHandler(state.EventTopicServiceHealth, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().ServiceHealthSnapshot(req, buf)
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
}, false)
|
2022-04-12 13:47:42 +00:00
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicServiceHealthConnect, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().ServiceHealthSnapshot(req, buf)
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
}, false)
|
2022-04-12 13:47:42 +00:00
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicCARoots, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().CARootsSnapshot(req, buf)
|
proxycfg: server-local config entry data sources
This is the OSS portion of enterprise PR 2056.
This commit provides server-local implementations of the proxycfg.ConfigEntry
and proxycfg.ConfigEntryList interfaces, that source data from streaming events.
It makes use of the LocalMaterializer type introduced for peering replication,
adding the necessary support for authorization.
It also adds support for "wildcard" subscriptions (within a topic) to the event
publisher, as this is needed to fetch service-resolvers for all services when
configuring mesh gateways.
Currently, events will be emitted for just the ingress-gateway, service-resolver,
and mesh config entry types, as these are the only entries required by proxycfg
— the events will be emitted on topics named IngressGateway, ServiceResolver,
and MeshConfig topics respectively.
Though these events will only be consumed "locally" for now, they can also be
consumed via the gRPC endpoint (confirmed using grpcurl) so using them from
client agents should be a case of swapping the LocalMaterializer for an
RPCMaterializer.
2022-07-01 15:09:47 +00:00
|
|
|
}, false)
|
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicMeshConfig, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().MeshConfigSnapshot(req, buf)
|
|
|
|
}, true)
|
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicServiceResolver, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().ServiceResolverSnapshot(req, buf)
|
|
|
|
}, true)
|
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicIngressGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().IngressGatewaySnapshot(req, buf)
|
|
|
|
}, true)
|
2022-04-12 13:47:42 +00:00
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
2022-07-01 15:15:49 +00:00
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicServiceIntentions, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().ServiceIntentionsSnapshot(req, buf)
|
|
|
|
}, true)
|
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
2022-07-12 10:35:52 +00:00
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicServiceList, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().ServiceListSnapshot(req, buf)
|
|
|
|
}, true)
|
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
2022-10-24 18:50:28 +00:00
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicServiceDefaults, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().ServiceDefaultsSnapshot(req, buf)
|
|
|
|
}, true)
|
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
2023-01-18 22:14:34 +00:00
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().APIGatewaySnapshot(req, buf)
|
|
|
|
}, true)
|
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicInlineCertificate, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().InlineCertificateSnapshot(req, buf)
|
|
|
|
}, true)
|
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicHTTPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().HTTPRouteSnapshot(req, buf)
|
|
|
|
}, true)
|
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicTCPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().TCPRouteSnapshot(req, buf)
|
|
|
|
}, true)
|
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
|
|
|
|
|
|
|
err = c.deps.Publisher.RegisterHandler(state.EventTopicBoundAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
|
|
|
|
return c.State().BoundAPIGatewaySnapshot(req, buf)
|
|
|
|
}, true)
|
|
|
|
if err != nil {
|
|
|
|
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
|
|
|
|
}
|
2022-04-12 13:47:42 +00:00
|
|
|
}
|