Merge pull request #9025 from hashicorp/dnephin/streaming-options
streaming: Use a no-op event publisher if streaming is disabled
This commit is contained in:
commit
bcb67d9861
|
@ -6,10 +6,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/connect"
|
"github.com/hashicorp/consul/agent/connect"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type consulCAMockDelegate struct {
|
type consulCAMockDelegate struct {
|
||||||
|
@ -48,10 +49,7 @@ func (c *consulCAMockDelegate) ApplyCARequest(req *structs.CARequest) (interface
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMockDelegate(t *testing.T, conf *structs.CAConfiguration) *consulCAMockDelegate {
|
func newMockDelegate(t *testing.T, conf *structs.CAConfiguration) *consulCAMockDelegate {
|
||||||
s, err := state.NewStateStore(nil)
|
s := state.NewStateStore(nil)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %s", err)
|
|
||||||
}
|
|
||||||
if s == nil {
|
if s == nil {
|
||||||
t.Fatalf("missing state store")
|
t.Fatalf("missing state store")
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,13 +6,14 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
"github.com/hashicorp/consul/logging"
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
"github.com/hashicorp/go-raftchunking"
|
"github.com/hashicorp/go-raftchunking"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/logging"
|
||||||
)
|
)
|
||||||
|
|
||||||
// command is a command method on the FSM.
|
// command is a command method on the FSM.
|
||||||
|
@ -41,7 +42,9 @@ func registerCommand(msg structs.MessageType, fn unboundCommand) {
|
||||||
// along with Raft to provide strong consistency. We implement
|
// along with Raft to provide strong consistency. We implement
|
||||||
// this outside the Server to avoid exposing this outside the package.
|
// this outside the Server to avoid exposing this outside the package.
|
||||||
type FSM struct {
|
type FSM struct {
|
||||||
logger hclog.Logger
|
deps Deps
|
||||||
|
logger hclog.Logger
|
||||||
|
chunker *raftchunking.ChunkingFSM
|
||||||
|
|
||||||
// apply is built off the commands global and is used to route apply
|
// apply is built off the commands global and is used to route apply
|
||||||
// operations to their appropriate handlers.
|
// operations to their appropriate handlers.
|
||||||
|
@ -53,28 +56,40 @@ type FSM struct {
|
||||||
// Raft side, so doesn't need to lock this.
|
// Raft side, so doesn't need to lock this.
|
||||||
stateLock sync.RWMutex
|
stateLock sync.RWMutex
|
||||||
state *state.Store
|
state *state.Store
|
||||||
|
|
||||||
gc *state.TombstoneGC
|
|
||||||
|
|
||||||
chunker *raftchunking.ChunkingFSM
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New is used to construct a new FSM with a blank state.
|
// New is used to construct a new FSM with a blank state.
|
||||||
|
//
|
||||||
|
// Deprecated: use NewFromDeps.
|
||||||
func New(gc *state.TombstoneGC, logger hclog.Logger) (*FSM, error) {
|
func New(gc *state.TombstoneGC, logger hclog.Logger) (*FSM, error) {
|
||||||
if logger == nil {
|
newStateStore := func() *state.Store {
|
||||||
logger = hclog.New(&hclog.LoggerOptions{})
|
return state.NewStateStore(gc)
|
||||||
}
|
}
|
||||||
|
return NewFromDeps(Deps{Logger: logger, NewStateStore: newStateStore}), nil
|
||||||
|
}
|
||||||
|
|
||||||
stateNew, err := state.NewStateStore(gc)
|
// Deps are dependencies used to construct the FSM.
|
||||||
if err != nil {
|
type Deps struct {
|
||||||
return nil, err
|
// Logger used to emit log messages
|
||||||
|
Logger hclog.Logger
|
||||||
|
// NewStateStore returns a state.Store which the FSM will use to make changes
|
||||||
|
// to the state.
|
||||||
|
// NewStateStore will be called once when the FSM is created and again any
|
||||||
|
// time Restore() is called.
|
||||||
|
NewStateStore func() *state.Store
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewFromDeps creates a new FSM from its dependencies.
|
||||||
|
func NewFromDeps(deps Deps) *FSM {
|
||||||
|
if deps.Logger == nil {
|
||||||
|
deps.Logger = hclog.New(&hclog.LoggerOptions{})
|
||||||
}
|
}
|
||||||
|
|
||||||
fsm := &FSM{
|
fsm := &FSM{
|
||||||
logger: logger.Named(logging.FSM),
|
deps: deps,
|
||||||
|
logger: deps.Logger.Named(logging.FSM),
|
||||||
apply: make(map[structs.MessageType]command),
|
apply: make(map[structs.MessageType]command),
|
||||||
state: stateNew,
|
state: deps.NewStateStore(),
|
||||||
gc: gc,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build out the apply dispatch table based on the registered commands.
|
// Build out the apply dispatch table based on the registered commands.
|
||||||
|
@ -86,8 +101,7 @@ func New(gc *state.TombstoneGC, logger hclog.Logger) (*FSM, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
fsm.chunker = raftchunking.NewChunkingFSM(fsm, nil)
|
fsm.chunker = raftchunking.NewChunkingFSM(fsm, nil)
|
||||||
|
return fsm
|
||||||
return fsm, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FSM) ChunkingFSM() *raftchunking.ChunkingFSM {
|
func (c *FSM) ChunkingFSM() *raftchunking.ChunkingFSM {
|
||||||
|
@ -149,11 +163,7 @@ func (c *FSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||||
func (c *FSM) Restore(old io.ReadCloser) error {
|
func (c *FSM) Restore(old io.ReadCloser) error {
|
||||||
defer old.Close()
|
defer old.Close()
|
||||||
|
|
||||||
// Create a new state store.
|
stateNew := c.deps.NewStateStore()
|
||||||
stateNew, err := state.NewStateStore(c.gc)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set up a new restore transaction
|
// Set up a new restore transaction
|
||||||
restore := stateNew.Restore()
|
restore := stateNew.Restore()
|
||||||
|
|
|
@ -5,18 +5,18 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGatewayLocator(t *testing.T) {
|
func TestGatewayLocator(t *testing.T) {
|
||||||
state, err := state.NewStateStore(nil)
|
state := state.NewStateStore(nil)
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
dc1 := &structs.FederationState{
|
dc1 := &structs.FederationState{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
@ -362,10 +362,6 @@ func (d *testServerDelegate) blockingQuery(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFakeStateStore() (*state.Store, error) {
|
|
||||||
return state.NewStateStore(nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *testServerDelegate) IsLeader() bool {
|
func (d *testServerDelegate) IsLeader() bool {
|
||||||
return d.isLeader
|
return d.isLeader
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,11 +4,13 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
|
|
||||||
consulfsm "github.com/hashicorp/consul/agent/consul/fsm"
|
consulfsm "github.com/hashicorp/consul/agent/consul/fsm"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
|
||||||
"github.com/hashicorp/raft"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func makeLog(buf []byte) *raft.Log {
|
func makeLog(buf []byte) *raft.Log {
|
||||||
|
@ -23,11 +25,12 @@ func makeLog(buf []byte) *raft.Log {
|
||||||
// Testing for GH-300 and GH-279
|
// Testing for GH-300 and GH-279
|
||||||
func TestHealthCheckRace(t *testing.T) {
|
func TestHealthCheckRace(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
logger := testutil.Logger(t)
|
fsm := consulfsm.NewFromDeps(consulfsm.Deps{
|
||||||
fsm, err := consulfsm.New(nil, logger)
|
Logger: hclog.New(nil),
|
||||||
if err != nil {
|
NewStateStore: func() *state.Store {
|
||||||
t.Fatalf("err: %v", err)
|
return state.NewStateStore(nil)
|
||||||
}
|
},
|
||||||
|
})
|
||||||
state := fsm.State()
|
state := fsm.State()
|
||||||
|
|
||||||
req := structs.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
|
|
|
@ -391,6 +391,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
||||||
shutdownCh: shutdownCh,
|
shutdownCh: shutdownCh,
|
||||||
leaderRoutineManager: NewLeaderRoutineManager(logger),
|
leaderRoutineManager: NewLeaderRoutineManager(logger),
|
||||||
aclAuthMethodValidators: authmethod.NewCache(),
|
aclAuthMethodValidators: authmethod.NewCache(),
|
||||||
|
fsm: newFSMFromConfig(flat.Logger, gc, config),
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.config.ConnectMeshGatewayWANFederationEnabled {
|
if s.config.ConnectMeshGatewayWANFederationEnabled {
|
||||||
|
@ -616,6 +617,21 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newFSMFromConfig(logger hclog.Logger, gc *state.TombstoneGC, config *Config) *fsm.FSM {
|
||||||
|
deps := fsm.Deps{Logger: logger}
|
||||||
|
if config.RPCConfig.EnableStreaming {
|
||||||
|
deps.NewStateStore = func() *state.Store {
|
||||||
|
return state.NewStateStoreWithEventPublisher(gc)
|
||||||
|
}
|
||||||
|
return fsm.NewFromDeps(deps)
|
||||||
|
}
|
||||||
|
|
||||||
|
deps.NewStateStore = func() *state.Store {
|
||||||
|
return state.NewStateStore(gc)
|
||||||
|
}
|
||||||
|
return fsm.NewFromDeps(deps)
|
||||||
|
}
|
||||||
|
|
||||||
func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler {
|
func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler {
|
||||||
if !config.RPCConfig.EnableStreaming {
|
if !config.RPCConfig.EnableStreaming {
|
||||||
return agentgrpc.NoOpHandler{Logger: deps.Logger}
|
return agentgrpc.NoOpHandler{Logger: deps.Logger}
|
||||||
|
@ -665,13 +681,6 @@ func (s *Server) setupRaft() error {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Create the FSM.
|
|
||||||
var err error
|
|
||||||
s.fsm, err = fsm.New(s.tombstoneGC, s.logger)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var serverAddressProvider raft.ServerAddressProvider = nil
|
var serverAddressProvider raft.ServerAddressProvider = nil
|
||||||
if s.config.RaftConfig.ProtocolVersion >= 3 { //ServerAddressProvider needs server ids to work correctly, which is only supported in protocol version 3 or higher
|
if s.config.RaftConfig.ProtocolVersion >= 3 { //ServerAddressProvider needs server ids to work correctly, which is only supported in protocol version 3 or higher
|
||||||
serverAddressProvider = s.serverLookup
|
serverAddressProvider = s.serverLookup
|
||||||
|
@ -772,10 +781,12 @@ func (s *Server) setupRaft() error {
|
||||||
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
|
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tmpFsm, err := fsm.New(s.tombstoneGC, s.logger)
|
tmpFsm := fsm.NewFromDeps(fsm.Deps{
|
||||||
if err != nil {
|
Logger: s.logger,
|
||||||
return fmt.Errorf("recovery failed to make temp FSM: %v", err)
|
NewStateStore: func() *state.Store {
|
||||||
}
|
return state.NewStateStore(s.tombstoneGC)
|
||||||
|
},
|
||||||
|
})
|
||||||
if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm,
|
if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm,
|
||||||
log, stable, snap, trans, configuration); err != nil {
|
log, stable, snap, trans, configuration); err != nil {
|
||||||
return fmt.Errorf("recovery failed: %v", err)
|
return fmt.Errorf("recovery failed: %v", err)
|
||||||
|
@ -817,11 +828,9 @@ func (s *Server) setupRaft() error {
|
||||||
s.raftNotifyCh = raftNotifyCh
|
s.raftNotifyCh = raftNotifyCh
|
||||||
|
|
||||||
// Setup the Raft store.
|
// Setup the Raft store.
|
||||||
|
var err error
|
||||||
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm.ChunkingFSM(), log, stable, snap, trans)
|
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm.ChunkingFSM(), log, stable, snap, trans)
|
||||||
if err != nil {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// endpointFactory is a function that returns an RPC endpoint bound to the given
|
// endpointFactory is a function that returns an RPC endpoint bound to the given
|
||||||
|
|
|
@ -19,9 +19,9 @@ type EventPayloadCheckServiceNode struct {
|
||||||
// of stream.Events that describe the current state of a service health query.
|
// of stream.Events that describe the current state of a service health query.
|
||||||
//
|
//
|
||||||
// TODO: no tests for this yet
|
// TODO: no tests for this yet
|
||||||
func serviceHealthSnapshot(s *Store, topic stream.Topic) stream.SnapshotFunc {
|
func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
|
||||||
return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
|
return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
|
||||||
tx := s.db.Txn(false)
|
tx := db.ReadTxn()
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
connect := topic == topicServiceHealthConnect
|
connect := topic == topicServiceHealthConnect
|
||||||
|
|
|
@ -7,14 +7,15 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
"github.com/hashicorp/consul/api"
|
|
||||||
"github.com/hashicorp/consul/lib/stringslice"
|
|
||||||
"github.com/hashicorp/consul/types"
|
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
uuid "github.com/hashicorp/go-uuid"
|
uuid "github.com/hashicorp/go-uuid"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
|
"github.com/hashicorp/consul/lib/stringslice"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func makeRandomNodeID(t *testing.T) types.NodeID {
|
func makeRandomNodeID(t *testing.T) types.NodeID {
|
||||||
|
@ -1080,10 +1081,7 @@ func TestStateStore_GetNodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkGetNodes(b *testing.B) {
|
func BenchmarkGetNodes(b *testing.B) {
|
||||||
s, err := NewStateStore(nil)
|
s := NewStateStore(nil)
|
||||||
if err != nil {
|
|
||||||
b.Fatalf("err: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.EnsureNode(100, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
if err := s.EnsureNode(100, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||||
b.Fatalf("err: %v", err)
|
b.Fatalf("err: %v", err)
|
||||||
|
@ -3710,10 +3708,7 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkCheckServiceNodes(b *testing.B) {
|
func BenchmarkCheckServiceNodes(b *testing.B) {
|
||||||
s, err := NewStateStore(nil)
|
s := NewStateStore(nil)
|
||||||
if err != nil {
|
|
||||||
b.Fatalf("err: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
if err := s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
|
||||||
b.Fatalf("err: %v", err)
|
b.Fatalf("err: %v", err)
|
||||||
|
|
|
@ -8,8 +8,9 @@ import (
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestStateStore_ReapTombstones(t *testing.T) {
|
func TestStateStore_ReapTombstones(t *testing.T) {
|
||||||
|
@ -91,10 +92,7 @@ func TestStateStore_GC(t *testing.T) {
|
||||||
|
|
||||||
// Enable it and attach it to the state store.
|
// Enable it and attach it to the state store.
|
||||||
gc.SetEnabled(true)
|
gc.SetEnabled(true)
|
||||||
s, err := NewStateStore(gc)
|
s := NewStateStore(gc)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create some KV pairs.
|
// Create some KV pairs.
|
||||||
testSetKey(t, s, 1, "foo", "foo", nil)
|
testSetKey(t, s, 1, "foo", "foo", nil)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package state
|
package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
|
@ -22,6 +23,11 @@ type AbortTxn interface {
|
||||||
Abort()
|
Abort()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadDB is a DB that provides read-only transactions.
|
||||||
|
type ReadDB interface {
|
||||||
|
ReadTxn() AbortTxn
|
||||||
|
}
|
||||||
|
|
||||||
// WriteTxn is implemented by memdb.Txn to perform write operations.
|
// WriteTxn is implemented by memdb.Txn to perform write operations.
|
||||||
type WriteTxn interface {
|
type WriteTxn interface {
|
||||||
ReadTxn
|
ReadTxn
|
||||||
|
@ -46,10 +52,16 @@ type Changes struct {
|
||||||
// 2. Sent to the eventPublisher which will create and emit change events
|
// 2. Sent to the eventPublisher which will create and emit change events
|
||||||
type changeTrackerDB struct {
|
type changeTrackerDB struct {
|
||||||
db *memdb.MemDB
|
db *memdb.MemDB
|
||||||
publisher *stream.EventPublisher
|
publisher EventPublisher
|
||||||
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
|
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type EventPublisher interface {
|
||||||
|
Publish([]stream.Event)
|
||||||
|
Run(context.Context)
|
||||||
|
Subscribe(*stream.SubscribeRequest) (*stream.Subscription, error)
|
||||||
|
}
|
||||||
|
|
||||||
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
|
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
|
||||||
// code may use it to create a read-only transaction, but it will panic if called
|
// code may use it to create a read-only transaction, but it will panic if called
|
||||||
// with write=true.
|
// with write=true.
|
||||||
|
@ -160,6 +172,12 @@ func (tx *txn) Commit() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type readDB memdb.MemDB
|
||||||
|
|
||||||
|
func (db *readDB) ReadTxn() AbortTxn {
|
||||||
|
return (*memdb.MemDB)(db).Txn(false)
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
topicServiceHealth = pbsubscribe.Topic_ServiceHealth
|
topicServiceHealth = pbsubscribe.Topic_ServiceHealth
|
||||||
topicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect
|
topicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect
|
||||||
|
@ -182,11 +200,11 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSnapshotHandlers(s *Store) stream.SnapshotHandlers {
|
func newSnapshotHandlers(db ReadDB) stream.SnapshotHandlers {
|
||||||
return stream.SnapshotHandlers{
|
return stream.SnapshotHandlers{
|
||||||
topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth),
|
topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth),
|
||||||
// The connect topic is temporarily disabled until the correct events are
|
// The connect topic is temporarily disabled until the correct events are
|
||||||
// created for terminating gateway changes.
|
// created for terminating gateway changes.
|
||||||
//topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect),
|
//topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,9 +6,10 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -152,36 +153,46 @@ type sessionCheck struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStateStore creates a new in-memory state storage layer.
|
// NewStateStore creates a new in-memory state storage layer.
|
||||||
func NewStateStore(gc *TombstoneGC) (*Store, error) {
|
func NewStateStore(gc *TombstoneGC) *Store {
|
||||||
// Create the in-memory DB.
|
// Create the in-memory DB.
|
||||||
schema := stateStoreSchema()
|
schema := stateStoreSchema()
|
||||||
db, err := memdb.NewMemDB(schema)
|
db, err := memdb.NewMemDB(schema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed setting up state store: %s", err)
|
// the only way for NewMemDB to error is if the schema is invalid. The
|
||||||
|
// scheme is static and tested to be correct, so any failure here would
|
||||||
|
// be a programming error, which should panic.
|
||||||
|
panic(fmt.Sprintf("failed to create state store: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.TODO())
|
|
||||||
s := &Store{
|
s := &Store{
|
||||||
schema: schema,
|
schema: schema,
|
||||||
abandonCh: make(chan struct{}),
|
abandonCh: make(chan struct{}),
|
||||||
kvsGraveyard: NewGraveyard(gc),
|
kvsGraveyard: NewGraveyard(gc),
|
||||||
lockDelay: NewDelay(),
|
lockDelay: NewDelay(),
|
||||||
stopEventPublisher: cancel,
|
stopEventPublisher: func() {},
|
||||||
}
|
db: &changeTrackerDB{
|
||||||
pub := stream.NewEventPublisher(newSnapshotHandlers(s), 10*time.Second)
|
db: db,
|
||||||
s.db = &changeTrackerDB{
|
publisher: stream.NoOpEventPublisher{},
|
||||||
db: db,
|
processChanges: processDBChanges,
|
||||||
publisher: pub,
|
},
|
||||||
processChanges: processDBChanges,
|
|
||||||
}
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStateStoreWithEventPublisher(gc *TombstoneGC) *Store {
|
||||||
|
store := NewStateStore(gc)
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
store.stopEventPublisher = cancel
|
||||||
|
|
||||||
|
pub := stream.NewEventPublisher(newSnapshotHandlers((*readDB)(store.db.db)), 10*time.Second)
|
||||||
|
store.db.publisher = pub
|
||||||
|
|
||||||
go pub.Run(ctx)
|
go pub.Run(ctx)
|
||||||
return s, nil
|
return store
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventPublisher returns the stream.EventPublisher used by the Store to
|
// EventPublisher returns the stream.EventPublisher used by the Store to
|
||||||
// publish events.
|
// publish events.
|
||||||
func (s *Store) EventPublisher() *stream.EventPublisher {
|
func (s *Store) EventPublisher() EventPublisher {
|
||||||
return s.db.publisher
|
return s.db.publisher
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,10 +6,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
"github.com/hashicorp/consul/types"
|
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testUUID() string {
|
func testUUID() string {
|
||||||
|
@ -48,10 +49,7 @@ func restoreIndexes(indexes []*IndexEntry, r *Restore) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func testStateStore(t *testing.T) *Store {
|
func testStateStore(t *testing.T) *Store {
|
||||||
s, err := NewStateStore(nil)
|
s := NewStateStore(nil)
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %s", err)
|
|
||||||
}
|
|
||||||
if s == nil {
|
if s == nil {
|
||||||
t.Fatalf("missing state store")
|
t.Fatalf("missing state store")
|
||||||
}
|
}
|
||||||
|
|
16
agent/consul/stream/noop.go
Normal file
16
agent/consul/stream/noop.go
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
package stream
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NoOpEventPublisher struct{}
|
||||||
|
|
||||||
|
func (NoOpEventPublisher) Publish([]Event) {}
|
||||||
|
|
||||||
|
func (NoOpEventPublisher) Run(context.Context) {}
|
||||||
|
|
||||||
|
func (NoOpEventPublisher) Subscribe(*SubscribeRequest) (*Subscription, error) {
|
||||||
|
return nil, fmt.Errorf("stream event publisher is disabled")
|
||||||
|
}
|
|
@ -7,13 +7,14 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func newStateStore() (*state.Store, error) {
|
func newStateStore() *state.Store {
|
||||||
return state.NewStateStore(nil)
|
return state.NewStateStore(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,8 +92,7 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
|
||||||
metrics.NewGlobal(cfg, sink)
|
metrics.NewGlobal(cfg, sink)
|
||||||
|
|
||||||
mockStateProvider := &mockStateProvider{}
|
mockStateProvider := &mockStateProvider{}
|
||||||
s, err := newStateStore()
|
s := state.NewStateStore(nil)
|
||||||
require.NoError(t, err)
|
|
||||||
if tcase.modfiyStateStore != nil {
|
if tcase.modfiyStateStore != nil {
|
||||||
tcase.modfiyStateStore(t, s)
|
tcase.modfiyStateStore(t, s)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,11 +5,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/stretchr/testify/mock"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockStateProvider struct {
|
type mockStateProvider struct {
|
||||||
|
@ -61,8 +62,7 @@ func TestUsageReporter_Run_Nodes(t *testing.T) {
|
||||||
metrics.NewGlobal(cfg, sink)
|
metrics.NewGlobal(cfg, sink)
|
||||||
|
|
||||||
mockStateProvider := &mockStateProvider{}
|
mockStateProvider := &mockStateProvider{}
|
||||||
s, err := newStateStore()
|
s := newStateStore()
|
||||||
require.NoError(t, err)
|
|
||||||
if tcase.modfiyStateStore != nil {
|
if tcase.modfiyStateStore != nil {
|
||||||
tcase.modfiyStateStore(t, s)
|
tcase.modfiyStateStore(t, s)
|
||||||
}
|
}
|
||||||
|
|
|
@ -308,10 +308,7 @@ func newTestBackend() (*testBackend, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
store, err := state.NewStateStore(gc)
|
store := state.NewStateStoreWithEventPublisher(gc)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
allowAll := func(_ string) acl.Authorizer {
|
allowAll := func(_ string) acl.Authorizer {
|
||||||
return acl.AllowAll()
|
return acl.AllowAll()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue