Raft storage backend (#16619)
This commit is contained in:
parent
33f6c3626b
commit
52ce151221
|
@ -10,13 +10,14 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBasicController(t *testing.T) {
|
||||
|
@ -36,7 +37,8 @@ func TestBasicController(t *testing.T) {
|
|||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
||||
},
|
||||
Publisher: publisher,
|
||||
Publisher: publisher,
|
||||
StorageBackend: fsm.NullStorageBackend,
|
||||
}).State()
|
||||
|
||||
for i := 0; i < 200; i++ {
|
||||
|
@ -92,7 +94,8 @@ func TestBasicController_Transform(t *testing.T) {
|
|||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
||||
},
|
||||
Publisher: publisher,
|
||||
Publisher: publisher,
|
||||
StorageBackend: fsm.NullStorageBackend,
|
||||
}).State()
|
||||
|
||||
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
|
||||
|
@ -137,7 +140,8 @@ func TestBasicController_Retry(t *testing.T) {
|
|||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
||||
},
|
||||
Publisher: publisher,
|
||||
Publisher: publisher,
|
||||
StorageBackend: fsm.NullStorageBackend,
|
||||
}).State()
|
||||
|
||||
queueInitialized := make(chan *countingWorkQueue)
|
||||
|
@ -382,7 +386,8 @@ func TestConfigEntrySubscriptions(t *testing.T) {
|
|||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
||||
},
|
||||
Publisher: publisher,
|
||||
Publisher: publisher,
|
||||
StorageBackend: fsm.NullStorageBackend,
|
||||
}).State()
|
||||
|
||||
for i := 0; i < 200; i++ {
|
||||
|
@ -518,7 +523,8 @@ func TestDiscoveryChainController(t *testing.T) {
|
|||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
||||
},
|
||||
Publisher: publisher,
|
||||
Publisher: publisher,
|
||||
StorageBackend: fsm.NullStorageBackend,
|
||||
}).State()
|
||||
|
||||
controller := New(publisher, reconciler)
|
||||
|
|
|
@ -145,6 +145,7 @@ func init() {
|
|||
registerCommand(structs.PeeringTrustBundleWriteType, (*FSM).applyPeeringTrustBundleWrite)
|
||||
registerCommand(structs.PeeringTrustBundleDeleteType, (*FSM).applyPeeringTrustBundleDelete)
|
||||
registerCommand(structs.PeeringSecretsWriteType, (*FSM).applyPeeringSecretsWrite)
|
||||
registerCommand(structs.ResourceOperationType, (*FSM).applyResourceOperation)
|
||||
}
|
||||
|
||||
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
|
||||
|
@ -781,3 +782,7 @@ func (c *FSM) applyPeeringTrustBundleDelete(buf []byte, index uint64) interface{
|
|||
}
|
||||
return c.state.PeeringTrustBundleDelete(index, q)
|
||||
}
|
||||
|
||||
func (f *FSM) applyResourceOperation(buf []byte, idx uint64) any {
|
||||
return f.deps.StorageBackend.Apply(buf, idx)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,8 @@ package fsm
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
|
@ -19,11 +21,17 @@ import (
|
|||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/proto/private/prototest"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
@ -1501,6 +1509,49 @@ func TestFSM_ConfigEntry_DeleteCAS(t *testing.T) {
|
|||
require.True(t, didDelete)
|
||||
}
|
||||
|
||||
func TestFSM_Resources(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
logger := testutil.Logger(t)
|
||||
|
||||
handle := &testRaftHandle{}
|
||||
storageBackend := newStorageBackend(t, handle)
|
||||
fsm := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store { return state.NewStateStore(nil) },
|
||||
StorageBackend: storageBackend,
|
||||
})
|
||||
handle.apply = func(msg []byte) (any, error) {
|
||||
buf := append(
|
||||
[]byte{uint8(structs.ResourceOperationType)},
|
||||
msg...,
|
||||
)
|
||||
return fsm.Apply(makeLog(buf)), nil
|
||||
}
|
||||
|
||||
resource, err := storageBackend.WriteCAS(context.Background(), &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Type: &pbresource.Type{
|
||||
Group: "test",
|
||||
GroupVersion: "v1",
|
||||
Kind: "foo",
|
||||
},
|
||||
Tenancy: &pbresource.Tenancy{
|
||||
Partition: "default",
|
||||
PeerName: "local",
|
||||
Namespace: "default",
|
||||
},
|
||||
Name: "bar",
|
||||
Uid: "a",
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
storedResource, err := storageBackend.Read(context.Background(), storage.EventualConsistency, resource.Id)
|
||||
require.NoError(t, err)
|
||||
prototest.AssertDeepEqual(t, resource, storedResource)
|
||||
}
|
||||
|
||||
// This adapts another test by chunking the encoded data and then performing
|
||||
// out-of-order applies of half the logs. It then snapshots, restores to a new
|
||||
// FSM, and applies the rest. The goal is to verify that chunking snapshotting
|
||||
|
@ -1513,8 +1564,14 @@ func TestFSM_Chunking_Lifecycle(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
logger := testutil.Logger(t)
|
||||
fsm, err := New(nil, logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
fsm := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: newStorageBackend(t, nil),
|
||||
})
|
||||
|
||||
var logOfLogs [][]*raft.Log
|
||||
for i := 0; i < 10; i++ {
|
||||
|
@ -1589,8 +1646,13 @@ func TestFSM_Chunking_Lifecycle(t *testing.T) {
|
|||
err = snap.Persist(sink)
|
||||
require.NoError(t, err)
|
||||
|
||||
fsm2, err := New(nil, logger)
|
||||
require.NoError(t, err)
|
||||
fsm2 := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: newStorageBackend(t, nil),
|
||||
})
|
||||
err = fsm2.Restore(sink)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -1717,3 +1779,27 @@ func TestFSM_Chunking_TermChange(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newStorageBackend(t *testing.T, handle raftstorage.Handle) *raftstorage.Backend {
|
||||
t.Helper()
|
||||
|
||||
backend, err := raftstorage.NewBackend(handle, testutil.Logger(t))
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
go backend.Run(ctx)
|
||||
|
||||
return backend
|
||||
}
|
||||
|
||||
type testRaftHandle struct {
|
||||
apply func(msg []byte) (any, error)
|
||||
}
|
||||
|
||||
func (h *testRaftHandle) Apply(msg []byte) (any, error) { return h.apply(msg) }
|
||||
func (testRaftHandle) IsLeader() bool { return true }
|
||||
func (testRaftHandle) EnsureStrongConsistency(context.Context) error { return nil }
|
||||
func (testRaftHandle) DialLeader() (*grpc.ClientConn, error) {
|
||||
return nil, errors.New("DialLeader not implemented")
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
package fsm
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
@ -18,6 +19,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
)
|
||||
|
||||
|
@ -72,7 +74,11 @@ func New(gc *state.TombstoneGC, logger hclog.Logger) (*FSM, error) {
|
|||
newStateStore := func() *state.Store {
|
||||
return state.NewStateStore(gc)
|
||||
}
|
||||
return NewFromDeps(Deps{Logger: logger, NewStateStore: newStateStore}), nil
|
||||
return NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: newStateStore,
|
||||
StorageBackend: NullStorageBackend,
|
||||
}), nil
|
||||
}
|
||||
|
||||
// Deps are dependencies used to construct the FSM.
|
||||
|
@ -86,6 +92,33 @@ type Deps struct {
|
|||
NewStateStore func() *state.Store
|
||||
|
||||
Publisher *stream.EventPublisher
|
||||
|
||||
// StorageBackend is the storage backend used by the resource service, it
|
||||
// manages its own state and has methods for handling Raft logs, snapshotting,
|
||||
// and restoring snapshots.
|
||||
StorageBackend StorageBackend
|
||||
}
|
||||
|
||||
// StorageBackend contains the methods on the Raft resource storage backend that
|
||||
// are used by the FSM. See the internal/storage/raft package docs for more info.
|
||||
type StorageBackend interface {
|
||||
Apply(buf []byte, idx uint64) any
|
||||
Snapshot() (*raftstorage.Snapshot, error)
|
||||
Restore() (*raftstorage.Restoration, error)
|
||||
}
|
||||
|
||||
// NullStorageBackend can be used as the StorageBackend dependency in tests
|
||||
// that won't exercize resource storage or snapshotting.
|
||||
var NullStorageBackend StorageBackend = nullStorageBackend{}
|
||||
|
||||
type nullStorageBackend struct{}
|
||||
|
||||
func (nullStorageBackend) Apply([]byte, uint64) any { return errors.New("NullStorageBackend in use") }
|
||||
func (nullStorageBackend) Snapshot() (*raftstorage.Snapshot, error) {
|
||||
return nil, errors.New("NullStorageBackend in use")
|
||||
}
|
||||
func (nullStorageBackend) Restore() (*raftstorage.Restoration, error) {
|
||||
return nil, errors.New("NullStorageBackend in use")
|
||||
}
|
||||
|
||||
// NewFromDeps creates a new FSM from its dependencies.
|
||||
|
@ -93,6 +126,9 @@ func NewFromDeps(deps Deps) *FSM {
|
|||
if deps.Logger == nil {
|
||||
deps.Logger = hclog.New(&hclog.LoggerOptions{})
|
||||
}
|
||||
if deps.StorageBackend == nil {
|
||||
panic("StorageBackend is required")
|
||||
}
|
||||
|
||||
fsm := &FSM{
|
||||
deps: deps,
|
||||
|
@ -172,9 +208,15 @@ func (c *FSM) Snapshot() (raft.FSMSnapshot, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
storageSnapshot, err := c.deps.StorageBackend.Snapshot()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &snapshot{
|
||||
state: c.state.Snapshot(),
|
||||
chunkState: chunkState,
|
||||
state: c.state.Snapshot(),
|
||||
chunkState: chunkState,
|
||||
storageSnapshot: storageSnapshot,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -189,6 +231,12 @@ func (c *FSM) Restore(old io.ReadCloser) error {
|
|||
restore := stateNew.Restore()
|
||||
defer restore.Abort()
|
||||
|
||||
storageRestoration, err := c.deps.StorageBackend.Restore()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer storageRestoration.Abort()
|
||||
|
||||
handler := func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error {
|
||||
switch {
|
||||
case msg == structs.ChunkingStateType:
|
||||
|
@ -201,6 +249,14 @@ func (c *FSM) Restore(old io.ReadCloser) error {
|
|||
if err := c.chunker.RestoreState(chunkState); err != nil {
|
||||
return err
|
||||
}
|
||||
case msg == structs.ResourceOperationType:
|
||||
var b []byte
|
||||
if err := dec.Decode(&b); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := storageRestoration.Apply(b); err != nil {
|
||||
return err
|
||||
}
|
||||
case restorers[msg] != nil:
|
||||
fn := restorers[msg]
|
||||
if err := fn(header, restore, dec); err != nil {
|
||||
|
@ -222,6 +278,7 @@ func (c *FSM) Restore(old io.ReadCloser) error {
|
|||
if err := restore.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
storageRestoration.Commit()
|
||||
|
||||
// External code might be calling State(), so we need to synchronize
|
||||
// here to make sure we swap in the new state store atomically.
|
||||
|
|
|
@ -9,11 +9,13 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-raftchunking"
|
||||
"github.com/hashicorp/raft"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-raftchunking"
|
||||
"github.com/hashicorp/raft"
|
||||
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
|
||||
)
|
||||
|
||||
var SnapshotSummaries = []prometheus.SummaryDefinition{
|
||||
|
@ -27,8 +29,9 @@ var SnapshotSummaries = []prometheus.SummaryDefinition{
|
|||
// state in a way that can be accessed concurrently with operations
|
||||
// that may modify the live state.
|
||||
type snapshot struct {
|
||||
state *state.Snapshot
|
||||
chunkState *raftchunking.State
|
||||
state *state.Snapshot
|
||||
chunkState *raftchunking.State
|
||||
storageSnapshot *raftstorage.Snapshot
|
||||
}
|
||||
|
||||
// SnapshotHeader is the first entry in our snapshot
|
||||
|
|
|
@ -7,10 +7,11 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/private/pbpeering"
|
||||
|
@ -104,6 +105,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err
|
|||
if err := s.persistPeeringSecrets(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.persistResources(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -609,6 +613,25 @@ func (s *snapshot) persistPeeringSecrets(sink raft.SnapshotSink, encoder *codec.
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistResources(sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||
for {
|
||||
v, err := s.storageSnapshot.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := sink.Write([]byte{byte(structs.ResourceOperationType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func restoreRegistration(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.RegisterRequest
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
|
|
|
@ -10,18 +10,30 @@ import (
|
|||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
func TestRestoreFromEnterprise(t *testing.T) {
|
||||
|
||||
logger := testutil.Logger(t)
|
||||
fsm, err := New(nil, logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
handle := &testRaftHandle{}
|
||||
storageBackend := newStorageBackend(t, handle)
|
||||
handle.apply = func(buf []byte) (any, error) { return storageBackend.Apply(buf, 123), nil }
|
||||
|
||||
fsm := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: storageBackend,
|
||||
})
|
||||
|
||||
// To verify if a proper message is displayed when Consul OSS tries to
|
||||
// unsuccessfully restore entries from a Consul Ent snapshot.
|
||||
buf := bytes.NewBuffer(nil)
|
||||
|
|
|
@ -5,6 +5,7 @@ package fsm
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -18,7 +19,9 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/lib/stringslice"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/proto/private/pbpeering"
|
||||
"github.com/hashicorp/consul/proto/private/prototest"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
|
@ -28,8 +31,18 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
logger := testutil.Logger(t)
|
||||
fsm, err := New(nil, logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
handle := &testRaftHandle{}
|
||||
storageBackend := newStorageBackend(t, handle)
|
||||
handle.apply = func(buf []byte) (any, error) { return storageBackend.Apply(buf, 123), nil }
|
||||
|
||||
fsm := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: storageBackend,
|
||||
})
|
||||
|
||||
// Add some state
|
||||
node1 := &structs.Node{
|
||||
|
@ -531,6 +544,25 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, vip, "240.0.0.3")
|
||||
|
||||
// Resources
|
||||
resource, err := storageBackend.WriteCAS(context.Background(), &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Type: &pbresource.Type{
|
||||
Group: "test",
|
||||
GroupVersion: "v1",
|
||||
Kind: "foo",
|
||||
},
|
||||
Tenancy: &pbresource.Tenancy{
|
||||
Partition: "default",
|
||||
PeerName: "local",
|
||||
Namespace: "default",
|
||||
},
|
||||
Name: "bar",
|
||||
Uid: "a",
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
require.NoError(t, err)
|
||||
|
@ -569,8 +601,15 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
require.NoError(t, encoder.Encode(&token2))
|
||||
|
||||
// Try to restore on a new FSM
|
||||
fsm2, err := New(nil, logger)
|
||||
require.NoError(t, err)
|
||||
storageBackend2 := newStorageBackend(t, nil)
|
||||
|
||||
fsm2 := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: storageBackend2,
|
||||
})
|
||||
|
||||
// Do a restore
|
||||
require.NoError(t, fsm2.Restore(sink))
|
||||
|
@ -856,6 +895,11 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
require.Len(t, ptbRestored.RootPEMs, 1)
|
||||
require.Equal(t, "qux certificate bundle", ptbRestored.RootPEMs[0])
|
||||
|
||||
// Verify resources are restored.
|
||||
resourceRestored, err := storageBackend2.Read(context.Background(), storage.EventualConsistency, resource.Id)
|
||||
require.NoError(t, err)
|
||||
prototest.AssertDeepEqual(t, resource, resourceRestored)
|
||||
|
||||
// Snapshot
|
||||
snap, err = fsm2.Snapshot()
|
||||
require.NoError(t, err)
|
||||
|
@ -881,8 +925,14 @@ func TestFSM_BadRestore_OSS(t *testing.T) {
|
|||
t.Parallel()
|
||||
// Create an FSM with some state.
|
||||
logger := testutil.Logger(t)
|
||||
fsm, err := New(nil, logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
fsm := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: newStorageBackend(t, nil),
|
||||
})
|
||||
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||
abandonCh := fsm.state.AbandonCh()
|
||||
|
||||
|
@ -912,8 +962,14 @@ func TestFSM_BadSnapshot_NilCAConfig(t *testing.T) {
|
|||
|
||||
// Create an FSM with no config entry.
|
||||
logger := testutil.Logger(t)
|
||||
fsm, err := New(nil, logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
fsm := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: newStorageBackend(t, nil),
|
||||
})
|
||||
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
|
@ -926,8 +982,13 @@ func TestFSM_BadSnapshot_NilCAConfig(t *testing.T) {
|
|||
require.NoError(t, snap.Persist(sink))
|
||||
|
||||
// Try to restore on a new FSM
|
||||
fsm2, err := New(nil, logger)
|
||||
require.NoError(t, err)
|
||||
fsm2 := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: newStorageBackend(t, nil),
|
||||
})
|
||||
|
||||
// Do a restore
|
||||
require.NoError(t, fsm2.Restore(sink))
|
||||
|
@ -963,8 +1024,14 @@ func Test_restoreServiceVirtualIP(t *testing.T) {
|
|||
dec := codec.NewDecoder(buf, structs.MsgpackHandle)
|
||||
|
||||
logger := testutil.Logger(t)
|
||||
fsm, err := New(nil, logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
fsm := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: newStorageBackend(t, nil),
|
||||
})
|
||||
|
||||
restore := fsm.State().Restore()
|
||||
|
||||
|
|
|
@ -11,14 +11,15 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/controller"
|
||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBoundAPIGatewayBindRoute(t *testing.T) {
|
||||
|
@ -3556,7 +3557,8 @@ func TestAPIGatewayController(t *testing.T) {
|
|||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
||||
},
|
||||
Publisher: publisher,
|
||||
Publisher: publisher,
|
||||
StorageBackend: fsm.NullStorageBackend,
|
||||
})
|
||||
|
||||
var index uint64
|
||||
|
@ -3675,7 +3677,8 @@ func TestNewAPIGatewayController(t *testing.T) {
|
|||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStoreWithEventPublisher(nil, publisher)
|
||||
},
|
||||
Publisher: publisher,
|
||||
Publisher: publisher,
|
||||
StorageBackend: fsm.NullStorageBackend,
|
||||
})
|
||||
|
||||
updater := &Updater{
|
||||
|
|
|
@ -33,6 +33,7 @@ func TestHealthCheckRace(t *testing.T) {
|
|||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: consulfsm.NullStorageBackend,
|
||||
})
|
||||
state := fsm.State()
|
||||
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/internal/storage/raft"
|
||||
)
|
||||
|
||||
// raftHandle is the glue layer between the Raft resource storage backend and
|
||||
// the exising Raft logic in Server.
|
||||
type raftHandle struct{ s *Server }
|
||||
|
||||
func (h *raftHandle) IsLeader() bool {
|
||||
return h.s.IsLeader()
|
||||
}
|
||||
|
||||
func (h *raftHandle) EnsureStrongConsistency(ctx context.Context) error {
|
||||
return h.s.consistentReadWithContext(ctx)
|
||||
}
|
||||
|
||||
func (h *raftHandle) Apply(msg []byte) (any, error) {
|
||||
return h.s.raftApplyEncoded(
|
||||
structs.ResourceOperationType,
|
||||
append([]byte{uint8(structs.ResourceOperationType)}, msg...),
|
||||
)
|
||||
}
|
||||
|
||||
func (h *raftHandle) DialLeader() (*grpc.ClientConn, error) {
|
||||
leaderAddr, _ := h.s.raft.LeaderWithID()
|
||||
if leaderAddr == "" {
|
||||
return nil, errors.New("leader unknown")
|
||||
}
|
||||
|
||||
dc := h.s.config.Datacenter
|
||||
tlsCfg := h.s.tlsConfigurator
|
||||
|
||||
return grpc.Dial(string(leaderAddr),
|
||||
// TLS is handled in the dialer below.
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
|
||||
// This dialer negotiates a connection on the multiplexed server port using
|
||||
// our type-byte prefix scheme (see Server.handleConn for other side of it).
|
||||
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
|
||||
var d net.Dialer
|
||||
conn, err := d.DialContext(ctx, "tcp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if tlsCfg.UseTLS(dc) {
|
||||
if _, err := conn.Write([]byte{byte(pool.RPCTLS)}); err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tc, err := tlsCfg.OutgoingRPCWrapper()(dc, conn)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
conn = tc
|
||||
}
|
||||
|
||||
if _, err := conn.Write([]byte{byte(pool.RPCRaftForwarding)}); err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
var _ raft.Handle = (*raftHandle)(nil)
|
|
@ -246,6 +246,9 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
|
|||
case pool.RPCGRPC:
|
||||
s.grpcHandler.Handle(conn)
|
||||
|
||||
case pool.RPCRaftForwarding:
|
||||
s.handleRaftForwarding(conn)
|
||||
|
||||
default:
|
||||
if !s.handleEnterpriseRPCConn(typ, conn, isTLS) {
|
||||
s.rpcLogger().Error("unrecognized RPC byte",
|
||||
|
@ -314,6 +317,9 @@ func (s *Server) handleNativeTLS(conn net.Conn) {
|
|||
case pool.ALPN_RPCGRPC:
|
||||
s.grpcHandler.Handle(tlsConn)
|
||||
|
||||
case pool.ALPN_RPCRaftForwarding:
|
||||
s.handleRaftForwarding(tlsConn)
|
||||
|
||||
case pool.ALPN_WANGossipPacket:
|
||||
if err := s.handleALPN_WANGossipPacketStream(tlsConn); err != nil && err != io.EOF {
|
||||
s.rpcLogger().Error(
|
||||
|
@ -496,6 +502,19 @@ func (s *Server) handleRaftRPC(conn net.Conn) {
|
|||
s.raftLayer.Handoff(conn)
|
||||
}
|
||||
|
||||
func (s *Server) handleRaftForwarding(conn net.Conn) {
|
||||
if tlsConn, ok := conn.(*tls.Conn); ok {
|
||||
err := s.tlsConfigurator.AuthorizeServerConn(s.config.Datacenter, tlsConn)
|
||||
if err != nil {
|
||||
s.rpcLogger().Warn(err.Error(), "from", conn.RemoteAddr(), "operation", "raft forwarding")
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
s.raftStorageBackend.HandleConnection(conn)
|
||||
}
|
||||
|
||||
func (s *Server) handleALPN_WANGossipPacketStream(conn net.Conn) error {
|
||||
defer conn.Close()
|
||||
|
||||
|
@ -903,21 +922,19 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
|
|||
}
|
||||
|
||||
// raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See
|
||||
// raftApplyWithEncoder.
|
||||
// raftApplyEncoded.
|
||||
func (s *Server) raftApplyMsgpack(t structs.MessageType, msg interface{}) (interface{}, error) {
|
||||
return s.raftApplyWithEncoder(t, msg, structs.Encode)
|
||||
}
|
||||
|
||||
// raftApplyProtobuf encodes the msg using protobuf and calls raft.Apply. See
|
||||
// raftApplyWithEncoder.
|
||||
// raftApplyEncoded.
|
||||
func (s *Server) raftApplyProtobuf(t structs.MessageType, msg interface{}) (interface{}, error) {
|
||||
return s.raftApplyWithEncoder(t, msg, structs.EncodeProtoInterface)
|
||||
}
|
||||
|
||||
// raftApplyWithEncoder encodes a message, and then calls raft.Apply with the
|
||||
// encoded message. Returns the FSM response along with any errors. If the
|
||||
// FSM.Apply response is an error it will be returned as the error return
|
||||
// value with a nil response.
|
||||
// encoded message. See raftApplyEncoded.
|
||||
func (s *Server) raftApplyWithEncoder(
|
||||
t structs.MessageType,
|
||||
msg interface{},
|
||||
|
@ -930,7 +947,13 @@ func (s *Server) raftApplyWithEncoder(
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to encode request: %v", err)
|
||||
}
|
||||
return s.raftApplyEncoded(t, buf)
|
||||
}
|
||||
|
||||
// raftApplyEncoded calls raft.Apply with the encoded message. Returns the FSM
|
||||
// response along with any errors. If the FSM.Apply response is an error it will
|
||||
// be returned as the error return value with a nil response.
|
||||
func (s *Server) raftApplyEncoded(t structs.MessageType, buf []byte) (any, error) {
|
||||
// Warn if the command is very large
|
||||
if n := len(buf); n > raftWarnSize {
|
||||
s.rpcLogger().Warn("Attempting to apply large raft entry", "size_in_bytes", n)
|
||||
|
@ -1177,37 +1200,53 @@ func (s *Server) setQueryMeta(m blockingQueryResponseMeta, token string) {
|
|||
}
|
||||
}
|
||||
|
||||
// consistentRead is used to ensure we do not perform a stale
|
||||
// read. This is done by verifying leadership before the read.
|
||||
func (s *Server) consistentRead() error {
|
||||
func (s *Server) consistentReadWithContext(ctx context.Context) error {
|
||||
defer metrics.MeasureSince([]string{"rpc", "consistentRead"}, time.Now())
|
||||
future := s.raft.VerifyLeader()
|
||||
if err := future.Error(); err != nil {
|
||||
return err // fail fast if leader verification fails
|
||||
}
|
||||
// poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds
|
||||
|
||||
if s.isReadyForConsistentReads() {
|
||||
return nil
|
||||
}
|
||||
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction)
|
||||
deadline := time.Now().Add(s.config.RPCHoldTimeout)
|
||||
|
||||
for time.Now().Before(deadline) {
|
||||
// Poll until the context reaches its deadline, or for RPCHoldTimeout if the
|
||||
// context has no deadline.
|
||||
pollFor := s.config.RPCHoldTimeout
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
pollFor = time.Until(deadline)
|
||||
}
|
||||
|
||||
interval := pollFor / structs.JitterFraction
|
||||
if interval <= 0 {
|
||||
return structs.ErrNotReadyForConsistentReads
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(jitter):
|
||||
// Drop through and check before we loop again.
|
||||
|
||||
case <-ticker.C:
|
||||
if s.isReadyForConsistentReads() {
|
||||
return nil
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return structs.ErrNotReadyForConsistentReads
|
||||
case <-s.shutdownCh:
|
||||
return fmt.Errorf("shutdown waiting for leader")
|
||||
}
|
||||
|
||||
if s.isReadyForConsistentReads() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return structs.ErrNotReadyForConsistentReads
|
||||
// consistentRead is used to ensure we do not perform a stale
|
||||
// read. This is done by verifying leadership before the read.
|
||||
func (s *Server) consistentRead() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.config.RPCHoldTimeout)
|
||||
defer cancel()
|
||||
|
||||
return s.consistentReadWithContext(ctx)
|
||||
}
|
||||
|
||||
// rpcQueryTimeout calculates the timeout for the query, ensures it is
|
||||
|
|
|
@ -65,6 +65,8 @@ import (
|
|||
"github.com/hashicorp/consul/agent/rpc/peering"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/routine"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
|
@ -251,6 +253,9 @@ type Server struct {
|
|||
// transition notifications from the Raft layer.
|
||||
raftNotifyCh <-chan bool
|
||||
|
||||
// raftStorageBackend is the Raft-backed storage backend for resources.
|
||||
raftStorageBackend *raftstorage.Backend
|
||||
|
||||
// reconcileCh is used to pass events from the serf handler
|
||||
// into the leader manager, so that the strong state can be
|
||||
// updated
|
||||
|
@ -450,14 +455,6 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
|||
|
||||
loggers := newLoggerStore(serverLogger)
|
||||
|
||||
fsmDeps := fsm.Deps{
|
||||
Logger: flat.Logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStoreWithEventPublisher(gc, flat.EventPublisher)
|
||||
},
|
||||
Publisher: flat.EventPublisher,
|
||||
}
|
||||
|
||||
if incomingRPCLimiter == nil {
|
||||
incomingRPCLimiter = rpcRate.NullRequestLimitsHandler()
|
||||
}
|
||||
|
@ -484,14 +481,27 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
|||
shutdownCh: shutdownCh,
|
||||
leaderRoutineManager: routine.NewManager(logger.Named(logging.Leader)),
|
||||
aclAuthMethodValidators: authmethod.NewCache(),
|
||||
fsm: fsm.NewFromDeps(fsmDeps),
|
||||
publisher: flat.EventPublisher,
|
||||
incomingRPCLimiter: incomingRPCLimiter,
|
||||
routineManager: routine.NewManager(logger.Named(logging.ConsulServer)),
|
||||
}
|
||||
|
||||
incomingRPCLimiter.Register(s)
|
||||
|
||||
s.raftStorageBackend, err = raftstorage.NewBackend(&raftHandle{s}, logger.Named("raft-storage-backend"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create storage backend: %w", err)
|
||||
}
|
||||
go s.raftStorageBackend.Run(&lib.StopChannelContext{StopCh: shutdownCh})
|
||||
|
||||
s.fsm = fsm.NewFromDeps(fsm.Deps{
|
||||
Logger: flat.Logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStoreWithEventPublisher(gc, flat.EventPublisher)
|
||||
},
|
||||
Publisher: flat.EventPublisher,
|
||||
StorageBackend: s.raftStorageBackend,
|
||||
})
|
||||
|
||||
s.hcpManager = hcp.NewManager(hcp.ManagerConfig{
|
||||
Client: flat.HCP.Client,
|
||||
StatusFn: s.hcpServerStatus(flat),
|
||||
|
@ -738,7 +748,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
|||
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
|
||||
// Initialize external gRPC server
|
||||
s.setupExternalGRPC(config, logger)
|
||||
s.setupExternalGRPC(config, s.raftStorageBackend, logger)
|
||||
|
||||
// Initialize internal gRPC server.
|
||||
//
|
||||
|
@ -1038,11 +1048,22 @@ func (s *Server) setupRaft() error {
|
|||
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
|
||||
}
|
||||
|
||||
// It's safe to pass nil as the handle argument here because we won't call
|
||||
// the backend's data access methods (only Apply, Snapshot, and Restore).
|
||||
backend, err := raftstorage.NewBackend(nil, hclog.NewNullLogger())
|
||||
if err != nil {
|
||||
return fmt.Errorf("recovery failed: %w", err)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go backend.Run(ctx)
|
||||
|
||||
tmpFsm := fsm.NewFromDeps(fsm.Deps{
|
||||
Logger: s.logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(s.tombstoneGC)
|
||||
},
|
||||
StorageBackend: backend,
|
||||
})
|
||||
if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm,
|
||||
log, stable, snap, trans, configuration); err != nil {
|
||||
|
@ -1174,7 +1195,7 @@ func (s *Server) setupRPC() error {
|
|||
}
|
||||
|
||||
// Initialize and register services on external gRPC server.
|
||||
func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
|
||||
func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logger hclog.Logger) {
|
||||
|
||||
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
|
||||
ACLsEnabled: s.config.ACLsEnabled,
|
||||
|
@ -1240,7 +1261,9 @@ func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
|
|||
})
|
||||
s.peerStreamServer.Register(s.externalGRPCServer)
|
||||
|
||||
resource.NewServer(resource.Config{}).Register(s.externalGRPCServer)
|
||||
resource.NewServer(resource.Config{
|
||||
Backend: backend,
|
||||
}).Register(s.externalGRPCServer)
|
||||
}
|
||||
|
||||
// Shutdown is used to shutdown the server
|
||||
|
@ -1861,6 +1884,7 @@ func (s *Server) trackLeaderChanges() {
|
|||
|
||||
s.grpcLeaderForwarder.UpdateLeaderAddr(s.config.Datacenter, string(leaderObs.LeaderAddr))
|
||||
s.peeringBackend.SetLeaderAddress(string(leaderObs.LeaderAddr))
|
||||
s.raftStorageBackend.LeaderChanged()
|
||||
|
||||
// Trigger sending an update to HCP status
|
||||
s.hcpManager.SendUpdate()
|
||||
|
|
|
@ -15,7 +15,7 @@ func (s *Server) List(ctx context.Context, req *pbresource.ListRequest) (*pbreso
|
|||
return nil, err
|
||||
}
|
||||
|
||||
resources, err := s.backend.List(ctx, readConsistencyFrom(ctx), storage.UnversionedTypeFrom(req.Type), req.Tenancy, req.NamePrefix)
|
||||
resources, err := s.Backend.List(ctx, readConsistencyFrom(ctx), storage.UnversionedTypeFrom(req.Type), req.Tenancy, req.NamePrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ func TestList_Many(t *testing.T) {
|
|||
},
|
||||
Version: "",
|
||||
}
|
||||
server.backend.WriteCAS(tc.ctx, r)
|
||||
server.Backend.WriteCAS(tc.ctx, r)
|
||||
}
|
||||
|
||||
rsp, err := client.List(tc.ctx, &pbresource.ListRequest{
|
||||
|
@ -90,7 +90,7 @@ func TestList_GroupVersionMismatch(t *testing.T) {
|
|||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
server.registry.Register(resource.Registration{Type: typev1})
|
||||
server.backend.WriteCAS(tc.ctx, &pbresource.Resource{Id: id2})
|
||||
server.Backend.WriteCAS(tc.ctx, &pbresource.Resource{Id: id2})
|
||||
|
||||
rsp, err := client.List(tc.ctx, &pbresource.ListRequest{
|
||||
Type: typev1,
|
||||
|
@ -110,7 +110,7 @@ func TestList_VerifyReadConsistencyArg(t *testing.T) {
|
|||
mockBackend := NewMockBackend(t)
|
||||
server := NewServer(Config{
|
||||
registry: resource.NewRegistry(),
|
||||
backend: mockBackend,
|
||||
Backend: mockBackend,
|
||||
})
|
||||
server.registry.Register(resource.Registration{Type: typev1})
|
||||
resource1 := &pbresource.Resource{Id: id1, Version: "1"}
|
||||
|
|
|
@ -20,7 +20,7 @@ func (s *Server) Read(ctx context.Context, req *pbresource.ReadRequest) (*pbreso
|
|||
return nil, err
|
||||
}
|
||||
|
||||
resource, err := s.backend.Read(ctx, readConsistencyFrom(ctx), req.Id)
|
||||
resource, err := s.Backend.Read(ctx, readConsistencyFrom(ctx), req.Id)
|
||||
if err != nil {
|
||||
if errors.Is(err, storage.ErrNotFound) {
|
||||
return nil, status.Error(codes.NotFound, err.Error())
|
||||
|
|
|
@ -53,7 +53,7 @@ func TestRead_GroupVersionMismatch(t *testing.T) {
|
|||
client := testClient(t, server)
|
||||
|
||||
resource1 := &pbresource.Resource{Id: id1, Version: ""}
|
||||
_, err := server.backend.WriteCAS(tc.ctx, resource1)
|
||||
_, err := server.Backend.WriteCAS(tc.ctx, resource1)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = client.Read(tc.ctx, &pbresource.ReadRequest{Id: id2})
|
||||
|
@ -71,7 +71,7 @@ func TestRead_Success(t *testing.T) {
|
|||
server.registry.Register(resource.Registration{Type: typev1})
|
||||
client := testClient(t, server)
|
||||
resource1 := &pbresource.Resource{Id: id1, Version: ""}
|
||||
resource1, err := server.backend.WriteCAS(tc.ctx, resource1)
|
||||
resource1, err := server.Backend.WriteCAS(tc.ctx, resource1)
|
||||
require.NoError(t, err)
|
||||
|
||||
rsp, err := client.Read(tc.ctx, &pbresource.ReadRequest{Id: id1})
|
||||
|
@ -88,7 +88,7 @@ func TestRead_VerifyReadConsistencyArg(t *testing.T) {
|
|||
mockBackend := NewMockBackend(t)
|
||||
server := NewServer(Config{
|
||||
registry: resource.NewRegistry(),
|
||||
backend: mockBackend,
|
||||
Backend: mockBackend,
|
||||
})
|
||||
server.registry.Register(resource.Registration{Type: typev1})
|
||||
resource1 := &pbresource.Resource{Id: id1, Version: "1"}
|
||||
|
|
|
@ -22,7 +22,9 @@ type Server struct {
|
|||
|
||||
type Config struct {
|
||||
registry Registry
|
||||
backend Backend
|
||||
|
||||
// Backend is the storage backend that will be used for resource persistence.
|
||||
Backend Backend
|
||||
}
|
||||
|
||||
//go:generate mockery --name Registry --inpackage
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
)
|
||||
|
||||
func TestWrite_TODO(t *testing.T) {
|
||||
server := NewServer(Config{})
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
resp, err := client.Write(context.Background(), &pbresource.WriteRequest{})
|
||||
require.NoError(t, err)
|
||||
|
@ -25,7 +25,7 @@ func TestWrite_TODO(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWriteStatus_TODO(t *testing.T) {
|
||||
server := NewServer(Config{})
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
resp, err := client.WriteStatus(context.Background(), &pbresource.WriteStatusRequest{})
|
||||
require.NoError(t, err)
|
||||
|
@ -33,7 +33,7 @@ func TestWriteStatus_TODO(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestDelete_TODO(t *testing.T) {
|
||||
server := NewServer(Config{})
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
resp, err := client.Delete(context.Background(), &pbresource.DeleteRequest{})
|
||||
require.NoError(t, err)
|
||||
|
@ -48,7 +48,7 @@ func testServer(t *testing.T) *Server {
|
|||
go backend.Run(testContext(t))
|
||||
|
||||
registry := resource.NewRegistry()
|
||||
return NewServer(Config{registry: registry, backend: backend})
|
||||
return NewServer(Config{registry: registry, Backend: backend})
|
||||
}
|
||||
|
||||
func testClient(t *testing.T, server *Server) pbresource.ResourceServiceClient {
|
||||
|
|
|
@ -15,7 +15,7 @@ func (s *Server) WatchList(req *pbresource.WatchListRequest, stream pbresource.R
|
|||
}
|
||||
|
||||
unversionedType := storage.UnversionedTypeFrom(req.Type)
|
||||
watch, err := s.backend.WatchList(
|
||||
watch, err := s.Backend.WatchList(
|
||||
stream.Context(),
|
||||
unversionedType,
|
||||
req.Tenancy,
|
||||
|
|
|
@ -55,7 +55,7 @@ func TestWatchList_GroupVersionMatches(t *testing.T) {
|
|||
rspCh := handleResourceStream(t, stream)
|
||||
|
||||
// insert and verify upsert event received
|
||||
r1, err := server.backend.WriteCAS(ctx, resourcev1)
|
||||
r1, err := server.Backend.WriteCAS(ctx, resourcev1)
|
||||
require.NoError(t, err)
|
||||
rsp := mustGetResource(t, rspCh)
|
||||
require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, rsp.Operation)
|
||||
|
@ -63,14 +63,14 @@ func TestWatchList_GroupVersionMatches(t *testing.T) {
|
|||
|
||||
// update and verify upsert event received
|
||||
r2 := clone(r1)
|
||||
r2, err = server.backend.WriteCAS(ctx, r2)
|
||||
r2, err = server.Backend.WriteCAS(ctx, r2)
|
||||
require.NoError(t, err)
|
||||
rsp = mustGetResource(t, rspCh)
|
||||
require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, rsp.Operation)
|
||||
prototest.AssertDeepEqual(t, r2, rsp.Resource)
|
||||
|
||||
// delete and verify delete event received
|
||||
err = server.backend.DeleteCAS(ctx, r2.Id, r2.Version)
|
||||
err = server.Backend.DeleteCAS(ctx, r2.Id, r2.Version)
|
||||
require.NoError(t, err)
|
||||
rsp = mustGetResource(t, rspCh)
|
||||
require.Equal(t, pbresource.WatchEvent_OPERATION_DELETE, rsp.Operation)
|
||||
|
@ -97,16 +97,16 @@ func TestWatchList_GroupVersionMismatch(t *testing.T) {
|
|||
rspCh := handleResourceStream(t, stream)
|
||||
|
||||
// insert
|
||||
r1, err := server.backend.WriteCAS(ctx, resourcev1)
|
||||
r1, err := server.Backend.WriteCAS(ctx, resourcev1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// update
|
||||
r2 := clone(r1)
|
||||
r2, err = server.backend.WriteCAS(ctx, r2)
|
||||
r2, err = server.Backend.WriteCAS(ctx, r2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// delete
|
||||
err = server.backend.DeleteCAS(ctx, r2.Id, r2.Version)
|
||||
err = server.Backend.DeleteCAS(ctx, r2.Id, r2.Version)
|
||||
require.NoError(t, err)
|
||||
|
||||
// verify no events received
|
||||
|
|
|
@ -14,9 +14,10 @@ import (
|
|||
|
||||
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
|
||||
"github.com/hashicorp/consul/agent/consul/rate"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/rate"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -60,15 +61,14 @@ func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server)
|
|||
srv := grpc.NewServer(opts...)
|
||||
register(srv)
|
||||
|
||||
lis := &chanListener{addr: addr, conns: make(chan net.Conn), done: make(chan struct{})}
|
||||
return &Handler{srv: srv, listener: lis}
|
||||
return &Handler{srv: srv, listener: NewListener(addr)}
|
||||
}
|
||||
|
||||
// Handler implements a handler for the rpc server listener, and the
|
||||
// agent.Component interface for managing the lifecycle of the grpc.Server.
|
||||
type Handler struct {
|
||||
srv *grpc.Server
|
||||
listener *chanListener
|
||||
listener *Listener
|
||||
}
|
||||
|
||||
// Handle the connection by sending it to a channel for the grpc.Server to receive.
|
||||
|
@ -85,38 +85,6 @@ func (h *Handler) Shutdown() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// chanListener implements net.Listener for grpc.Server.
|
||||
type chanListener struct {
|
||||
conns chan net.Conn
|
||||
addr net.Addr
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// Accept blocks until a connection is received from Handle, and then returns the
|
||||
// connection. Accept implements part of the net.Listener interface for grpc.Server.
|
||||
func (l *chanListener) Accept() (net.Conn, error) {
|
||||
select {
|
||||
case c := <-l.conns:
|
||||
return c, nil
|
||||
case <-l.done:
|
||||
return nil, &net.OpError{
|
||||
Op: "accept",
|
||||
Net: l.addr.Network(),
|
||||
Addr: l.addr,
|
||||
Err: fmt.Errorf("listener closed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *chanListener) Addr() net.Addr {
|
||||
return l.addr
|
||||
}
|
||||
|
||||
func (l *chanListener) Close() error {
|
||||
close(l.done)
|
||||
return nil
|
||||
}
|
||||
|
||||
// NoOpHandler implements the same methods as Handler, but performs no handling.
|
||||
// It may be used in place of Handler to disable the grpc server.
|
||||
type NoOpHandler struct {
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
// Listener implements the net.Listener interface and allows you to manually
|
||||
// pass connections to it. This is useful when you need to accept connections
|
||||
// and do something with them yourself first (e.g. handling our multiplexing
|
||||
// scheme) before giving them to the gRPC server.
|
||||
type Listener struct {
|
||||
addr net.Addr
|
||||
conns chan net.Conn
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
var _ net.Listener = (*Listener)(nil)
|
||||
|
||||
// NewListener creates a Listener with the given address.
|
||||
func NewListener(addr net.Addr) *Listener {
|
||||
return &Listener{
|
||||
addr: addr,
|
||||
conns: make(chan net.Conn),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Handle makes the given connection available to Accept.
|
||||
func (l *Listener) Handle(conn net.Conn) {
|
||||
select {
|
||||
case l.conns <- conn:
|
||||
case <-l.done:
|
||||
_ = conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Accept a connection.
|
||||
func (l *Listener) Accept() (net.Conn, error) {
|
||||
select {
|
||||
case c := <-l.conns:
|
||||
return c, nil
|
||||
case <-l.done:
|
||||
return nil, &net.OpError{
|
||||
Op: "accept",
|
||||
Net: l.addr.Network(),
|
||||
Addr: l.addr,
|
||||
Err: fmt.Errorf("listener closed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Addr returns the listener's address.
|
||||
func (l *Listener) Addr() net.Addr { return l.addr }
|
||||
|
||||
// Close the listener.
|
||||
func (l *Listener) Close() error {
|
||||
close(l.done)
|
||||
return nil
|
||||
}
|
|
@ -22,6 +22,10 @@ var rpcRateLimitSpecs = map[string]rate.OperationSpec{
|
|||
"/hashicorp.consul.internal.peering.PeeringService/TrustBundleRead": {Type: rate.OperationTypeRead, Category: rate.OperationCategoryPeering},
|
||||
"/hashicorp.consul.internal.peerstream.PeerStreamService/ExchangeSecret": {Type: rate.OperationTypeWrite, Category: rate.OperationCategoryPeerStream},
|
||||
"/hashicorp.consul.internal.peerstream.PeerStreamService/StreamResources": {Type: rate.OperationTypeRead, Category: rate.OperationCategoryPeerStream},
|
||||
"/hashicorp.consul.internal.storage.raft.ForwardingService/Delete": {Type: rate.OperationTypeExempt, Category: rate.OperationCategoryResource},
|
||||
"/hashicorp.consul.internal.storage.raft.ForwardingService/List": {Type: rate.OperationTypeExempt, Category: rate.OperationCategoryResource},
|
||||
"/hashicorp.consul.internal.storage.raft.ForwardingService/Read": {Type: rate.OperationTypeExempt, Category: rate.OperationCategoryResource},
|
||||
"/hashicorp.consul.internal.storage.raft.ForwardingService/Write": {Type: rate.OperationTypeExempt, Category: rate.OperationCategoryResource},
|
||||
"/hashicorp.consul.resource.ResourceService/Delete": {Type: rate.OperationTypeWrite, Category: rate.OperationCategoryResource},
|
||||
"/hashicorp.consul.resource.ResourceService/List": {Type: rate.OperationTypeRead, Category: rate.OperationCategoryResource},
|
||||
"/hashicorp.consul.resource.ResourceService/Read": {Type: rate.OperationTypeRead, Category: rate.OperationCategoryResource},
|
||||
|
|
|
@ -44,13 +44,14 @@ const (
|
|||
// configuration. At the time of writing there is only AutoEncrypt.Sign
|
||||
// that is supported and it might be the only one there
|
||||
// ever is.
|
||||
RPCTLSInsecure RPCType = 7
|
||||
RPCGRPC RPCType = 8
|
||||
RPCTLSInsecure RPCType = 7
|
||||
RPCGRPC RPCType = 8
|
||||
RPCRaftForwarding RPCType = 9
|
||||
|
||||
// RPCMaxTypeValue is the maximum rpc type byte value currently used for the
|
||||
// various protocols riding over our "rpc" port.
|
||||
//
|
||||
// Currently our 0-8 values are mutually exclusive with any valid first byte
|
||||
// Currently our 0-9 values are mutually exclusive with any valid first byte
|
||||
// of a TLS header. The first TLS header byte will begin with a TLS content
|
||||
// type and the values 0-19 are all explicitly unassigned and marked as
|
||||
// requiring coordination. RFC 7983 does the marking and goes into some
|
||||
|
@ -62,17 +63,18 @@ const (
|
|||
//
|
||||
// NOTE: if you add new RPCTypes beyond this value, you must similarly bump
|
||||
// this value.
|
||||
RPCMaxTypeValue = 8
|
||||
RPCMaxTypeValue = 9
|
||||
)
|
||||
|
||||
const (
|
||||
// regular old rpc (note there is no equivalent of RPCMultiplex, RPCTLS, or RPCTLSInsecure)
|
||||
ALPN_RPCConsul = "consul/rpc-single" // RPCConsul
|
||||
ALPN_RPCRaft = "consul/raft" // RPCRaft
|
||||
ALPN_RPCMultiplexV2 = "consul/rpc-multi" // RPCMultiplexV2
|
||||
ALPN_RPCSnapshot = "consul/rpc-snapshot" // RPCSnapshot
|
||||
ALPN_RPCGossip = "consul/rpc-gossip" // RPCGossip
|
||||
ALPN_RPCGRPC = "consul/rpc-grpc" // RPCGRPC
|
||||
ALPN_RPCConsul = "consul/rpc-single" // RPCConsul
|
||||
ALPN_RPCRaft = "consul/raft" // RPCRaft
|
||||
ALPN_RPCMultiplexV2 = "consul/rpc-multi" // RPCMultiplexV2
|
||||
ALPN_RPCSnapshot = "consul/rpc-snapshot" // RPCSnapshot
|
||||
ALPN_RPCGossip = "consul/rpc-gossip" // RPCGossip
|
||||
ALPN_RPCGRPC = "consul/rpc-grpc" // RPCGRPC
|
||||
ALPN_RPCRaftForwarding = "consul/raft-forwarding" // RPCRaftForwarding
|
||||
// wan federation additions
|
||||
ALPN_WANGossipPacket = "consul/wan-gossip/packet"
|
||||
ALPN_WANGossipStream = "consul/wan-gossip/stream"
|
||||
|
@ -85,6 +87,7 @@ var RPCNextProtos = []string{
|
|||
ALPN_RPCSnapshot,
|
||||
ALPN_RPCGossip,
|
||||
ALPN_RPCGRPC,
|
||||
ALPN_RPCRaftForwarding,
|
||||
ALPN_WANGossipPacket,
|
||||
ALPN_WANGossipStream,
|
||||
}
|
||||
|
|
|
@ -87,6 +87,7 @@ const (
|
|||
PeeringTrustBundleDeleteType = 39
|
||||
PeeringSecretsWriteType = 40
|
||||
RaftLogVerifierCheckpoint = 41 // Only used for log verifier, no-op on FSM.
|
||||
ResourceOperationType = 42
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -154,6 +155,7 @@ var requestTypeStrings = map[MessageType]string{
|
|||
PeeringTrustBundleDeleteType: "PeeringTrustBundleDelete",
|
||||
PeeringSecretsWriteType: "PeeringSecret",
|
||||
RaftLogVerifierCheckpoint: "RaftLogVerifierCheckpoint",
|
||||
ResourceOperationType: "Resource",
|
||||
}
|
||||
|
||||
const (
|
||||
|
|
|
@ -7,11 +7,12 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/mitchellh/cli"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/mitchellh/cli"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestIntentionListCommand_noTabs(t *testing.T) {
|
||||
|
|
|
@ -135,7 +135,7 @@ func testRead(t *testing.T, opts TestOptions) {
|
|||
|
||||
var e storage.GroupVersionMismatchError
|
||||
if errors.As(err, &e) {
|
||||
require.Equal(t, id.Type, e.RequestedType)
|
||||
prototest.AssertDeepEqual(t, id.Type, e.RequestedType)
|
||||
prototest.AssertDeepEqual(t, res, e.Stored, ignoreVersion)
|
||||
} else {
|
||||
t.Fatalf("expected storage.GroupVersionMismatchError, got: %T", err)
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
package inmem
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
// Snapshot obtains a point-in-time snapshot of the store that can later be
|
||||
// persisted and restored later.
|
||||
func (s *Store) Snapshot() (*Snapshot, error) {
|
||||
tx := s.txn(false)
|
||||
|
||||
iter, err := tx.Get(tableNameResources, indexNameID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Snapshot{iter: iter}, nil
|
||||
}
|
||||
|
||||
// Snapshot is a point-in-time snapshot of a store.
|
||||
type Snapshot struct {
|
||||
iter memdb.ResultIterator
|
||||
}
|
||||
|
||||
// Next returns the next resource in the snapshot. nil will be returned when
|
||||
// the end of the snapshot has been reached.
|
||||
func (s *Snapshot) Next() *pbresource.Resource {
|
||||
v := s.iter.Next()
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
return v.(*pbresource.Resource)
|
||||
}
|
||||
|
||||
// Restore starts the process of restoring a snapshot.
|
||||
//
|
||||
// Callers *must* call Abort or Commit when done, to free resources.
|
||||
func (s *Store) Restore() (*Restoration, error) {
|
||||
db, err := newDB()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Restoration{
|
||||
s: s,
|
||||
db: db,
|
||||
tx: db.Txn(true),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Restoration is a handle that can be used to restore a snapshot.
|
||||
type Restoration struct {
|
||||
s *Store
|
||||
db *memdb.MemDB
|
||||
tx *memdb.Txn
|
||||
}
|
||||
|
||||
// Apply the given resource to the store.
|
||||
func (r *Restoration) Apply(res *pbresource.Resource) error {
|
||||
return r.tx.Insert(tableNameResources, res)
|
||||
}
|
||||
|
||||
// Commit the restoration. Replaces the in-memory database wholesale and closes
|
||||
// any watches.
|
||||
func (r *Restoration) Commit() {
|
||||
r.tx.Commit()
|
||||
|
||||
r.s.mu.Lock()
|
||||
defer r.s.mu.Unlock()
|
||||
|
||||
r.s.db = r.db
|
||||
r.s.pub.RefreshTopic(eventTopic)
|
||||
}
|
||||
|
||||
// Abort the restoration. It's safe to always call this in a defer statement
|
||||
// because aborting a committed restoration is a no-op.
|
||||
func (r *Restoration) Abort() { r.tx.Abort() }
|
|
@ -0,0 +1,96 @@
|
|||
package inmem_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/internal/storage/inmem"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/proto/private/prototest"
|
||||
)
|
||||
|
||||
func TestSnapshotRestore(t *testing.T) {
|
||||
oldStore, err := inmem.NewStore()
|
||||
require.NoError(t, err)
|
||||
|
||||
a := &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Type: &pbresource.Type{
|
||||
Group: "mesh",
|
||||
GroupVersion: "v1",
|
||||
Kind: "service",
|
||||
},
|
||||
Tenancy: &pbresource.Tenancy{
|
||||
Partition: "default",
|
||||
PeerName: "local",
|
||||
Namespace: "default",
|
||||
},
|
||||
Name: "billing",
|
||||
Uid: "a",
|
||||
},
|
||||
Version: "1",
|
||||
}
|
||||
require.NoError(t, oldStore.WriteCAS(a, ""))
|
||||
|
||||
newStore, err := inmem.NewStore()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write something to the new store to make sure it gets blown away.
|
||||
b := &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Type: &pbresource.Type{
|
||||
Group: "mesh",
|
||||
GroupVersion: "v1",
|
||||
Kind: "service",
|
||||
},
|
||||
Tenancy: &pbresource.Tenancy{
|
||||
Partition: "default",
|
||||
PeerName: "local",
|
||||
Namespace: "default",
|
||||
},
|
||||
Name: "api",
|
||||
Uid: "a",
|
||||
},
|
||||
Version: "1",
|
||||
}
|
||||
require.NoError(t, newStore.WriteCAS(b, ""))
|
||||
|
||||
snap, err := oldStore.Snapshot()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start a watch on the new store to make sure it gets closed.
|
||||
watch, err := newStore.WatchList(storage.UnversionedTypeFrom(b.Id.Type), b.Id.Tenancy, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
// Expect the initial state on the watch.
|
||||
_, err = watch.Next(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
restore, err := newStore.Restore()
|
||||
require.NoError(t, err)
|
||||
defer restore.Abort()
|
||||
|
||||
for r := snap.Next(); r != nil; r = snap.Next() {
|
||||
restore.Apply(r)
|
||||
}
|
||||
restore.Commit()
|
||||
|
||||
// Check that resource we wrote to oldStore has been restored to newStore.
|
||||
rsp, err := newStore.Read(a.Id)
|
||||
require.NoError(t, err)
|
||||
prototest.AssertDeepEqual(t, a, rsp)
|
||||
|
||||
// Check that resource written to newStore was removed by snapshot restore.
|
||||
_, err = newStore.Read(b.Id)
|
||||
require.ErrorIs(t, err, storage.ErrNotFound)
|
||||
|
||||
// Check the watch has been closed.
|
||||
_, err = watch.Next(ctx)
|
||||
require.ErrorIs(t, err, storage.ErrWatchClosed)
|
||||
}
|
|
@ -21,7 +21,9 @@ import (
|
|||
// package, but also handles reads in our Raft backend, and can be used as a
|
||||
// local cache when storing data in external systems (e.g. RDBMS, K/V stores).
|
||||
type Store struct {
|
||||
db *memdb.MemDB
|
||||
mu sync.RWMutex // guards db, because Restore.Commit will replace it wholesale.
|
||||
db *memdb.MemDB
|
||||
|
||||
pub *stream.EventPublisher
|
||||
|
||||
// eventLock is used to serialize operations that result in the publishing of
|
||||
|
@ -65,7 +67,8 @@ func (s *Store) Run(ctx context.Context) { s.pub.Run(ctx) }
|
|||
//
|
||||
// For more information, see the storage.Backend documentation.
|
||||
func (s *Store) Read(id *pbresource.ID) (*pbresource.Resource, error) {
|
||||
tx := s.db.Txn(false)
|
||||
tx := s.txn(false)
|
||||
|
||||
defer tx.Abort()
|
||||
|
||||
val, err := tx.First(tableNameResources, indexNameID, id)
|
||||
|
@ -101,7 +104,7 @@ func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error {
|
|||
s.eventLock.Lock()
|
||||
defer s.eventLock.Unlock()
|
||||
|
||||
tx := s.db.Txn(true)
|
||||
tx := s.txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
existing, err := tx.First(tableNameResources, indexNameID, res.Id)
|
||||
|
@ -150,7 +153,7 @@ func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error {
|
|||
s.eventLock.Lock()
|
||||
defer s.eventLock.Unlock()
|
||||
|
||||
tx := s.db.Txn(true)
|
||||
tx := s.txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
existing, err := tx.First(tableNameResources, indexNameID, id)
|
||||
|
@ -195,7 +198,7 @@ func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error {
|
|||
//
|
||||
// For more information, see the storage.Backend documentation.
|
||||
func (s *Store) List(typ storage.UnversionedType, ten *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
|
||||
tx := s.db.Txn(false)
|
||||
tx := s.txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
return listTxn(tx, query{typ, ten, namePrefix})
|
||||
|
@ -261,7 +264,7 @@ func (s *Store) WatchList(typ storage.UnversionedType, ten *pbresource.Tenancy,
|
|||
//
|
||||
// For more information, see the storage.Backend documentation.
|
||||
func (s *Store) OwnerReferences(id *pbresource.ID) ([]*pbresource.ID, error) {
|
||||
tx := s.db.Txn(false)
|
||||
tx := s.txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
iter, err := tx.Get(tableNameResources, indexNameOwner, id)
|
||||
|
@ -275,3 +278,10 @@ func (s *Store) OwnerReferences(id *pbresource.ID) ([]*pbresource.ID, error) {
|
|||
}
|
||||
return refs, nil
|
||||
}
|
||||
|
||||
func (s *Store) txn(write bool) *memdb.Txn {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return s.db.Txn(write)
|
||||
}
|
||||
|
|
|
@ -28,6 +28,9 @@ type Watch struct {
|
|||
func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) {
|
||||
for {
|
||||
e, err := w.nextEvent(ctx)
|
||||
if err == stream.ErrSubForceClosed {
|
||||
return nil, storage.ErrWatchClosed
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -162,7 +165,7 @@ func (s *Store) watchSnapshot(req stream.SubscribeRequest, snap stream.SnapshotA
|
|||
return 0, fmt.Errorf("unhandled subject type: %T", req.Subject)
|
||||
}
|
||||
|
||||
tx := s.db.Txn(false)
|
||||
tx := s.txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
idx, err := currentEventIndex(tx)
|
||||
|
|
|
@ -0,0 +1,344 @@
|
|||
package raft
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/internal/storage/inmem"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
|
||||
pbstorage "github.com/hashicorp/consul/proto/private/pbstorage"
|
||||
)
|
||||
|
||||
// NewBackend returns a storage backend that uses Raft for durable persistence
|
||||
// and serves reads from an in-memory database. It's suitable for production use.
|
||||
//
|
||||
// It's not an entirely clean abstraction because rather than owning the Raft
|
||||
// subsystem directly, it has to integrate with the existing FSM and related
|
||||
// machinery from before generic resources.
|
||||
//
|
||||
// The given Handle will be used to apply logs and interrogate leadership state.
|
||||
// In certain restricted circumstances, Handle may be nil, such as during tests
|
||||
// that only exercise snapshot restoration, or when initializing a throwaway FSM
|
||||
// during peers.json recovery - but calling any of the data access methods (read
|
||||
// or write) will result in a panic.
|
||||
//
|
||||
// With Raft, writes and strongly consistent reads must be done on the leader.
|
||||
// Backend implements a gRPC server, which followers will use to transparently
|
||||
// forward operations to the leader. To do so, they will obtain a connection
|
||||
// using Handle.DialLeader. Connections are cached for re-use, so when there's
|
||||
// a new leader, you must call LeaderChanged to refresh the connection. Leaders
|
||||
// must accept connections and hand them off by calling Backend.HandleConnection.
|
||||
// Backend's gRPC client and server *DO NOT* handle TLS themselves, as they are
|
||||
// intended to communicate over Consul's multiplexed server port (which handles
|
||||
// TLS).
|
||||
//
|
||||
// You must call Run before using the backend.
|
||||
func NewBackend(h Handle, l hclog.Logger) (*Backend, error) {
|
||||
s, err := inmem.NewStore()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b := &Backend{handle: h, store: s}
|
||||
b.forwardingServer = newForwardingServer(b)
|
||||
b.forwardingClient = newForwardingClient(h, l)
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// Handle provides glue for interacting with the Raft subsystem via existing
|
||||
// machinery on consul.Server.
|
||||
type Handle interface {
|
||||
// Apply the given log message.
|
||||
Apply(msg []byte) (any, error)
|
||||
|
||||
// IsLeader determines if this server is the Raft leader (so can handle writes).
|
||||
IsLeader() bool
|
||||
|
||||
// EnsureStrongConsistency checks the server is able to handle consistent reads by
|
||||
// verifying its leadership and checking the FSM has applied all queued writes.
|
||||
EnsureStrongConsistency(ctx context.Context) error
|
||||
|
||||
// DialLeader dials a gRPC connection to the leader for forwarding.
|
||||
DialLeader() (*grpc.ClientConn, error)
|
||||
}
|
||||
|
||||
// Backend is a Raft-backed storage backend implementation.
|
||||
type Backend struct {
|
||||
handle Handle
|
||||
store *inmem.Store
|
||||
|
||||
forwardingServer *forwardingServer
|
||||
forwardingClient *forwardingClient
|
||||
}
|
||||
|
||||
// Run until the given context is canceled. This method blocks, so should be
|
||||
// called in a goroutine.
|
||||
func (b *Backend) Run(ctx context.Context) {
|
||||
group, groupCtx := errgroup.WithContext(ctx)
|
||||
|
||||
group.Go(func() error {
|
||||
b.store.Run(groupCtx)
|
||||
return nil
|
||||
})
|
||||
|
||||
group.Go(func() error {
|
||||
return b.forwardingServer.run(groupCtx)
|
||||
})
|
||||
|
||||
group.Wait()
|
||||
}
|
||||
|
||||
// Read implements the storage.Backend interface.
|
||||
func (b *Backend) Read(ctx context.Context, consistency storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) {
|
||||
// Easy case. Both leaders and followers can read from the local store.
|
||||
if consistency == storage.EventualConsistency {
|
||||
return b.store.Read(id)
|
||||
}
|
||||
|
||||
if consistency != storage.StrongConsistency {
|
||||
return nil, fmt.Errorf("%w: unknown consistency: %s", storage.ErrInconsistent, consistency)
|
||||
}
|
||||
|
||||
// We are the leader. Handle the request ourself.
|
||||
if b.handle.IsLeader() {
|
||||
return b.leaderRead(ctx, id)
|
||||
}
|
||||
|
||||
// Forward the request to the leader.
|
||||
rsp, err := b.forwardingClient.read(ctx, &pbstorage.ReadRequest{Id: id})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rsp.GetResource(), nil
|
||||
}
|
||||
|
||||
func (b *Backend) leaderRead(ctx context.Context, id *pbresource.ID) (*pbresource.Resource, error) {
|
||||
if err := b.ensureStrongConsistency(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b.store.Read(id)
|
||||
}
|
||||
|
||||
// WriteCAS implements the storage.Backend interface.
|
||||
func (b *Backend) WriteCAS(ctx context.Context, res *pbresource.Resource) (*pbresource.Resource, error) {
|
||||
req := &pbstorage.WriteRequest{Resource: res}
|
||||
|
||||
if b.handle.IsLeader() {
|
||||
rsp, err := b.raftApply(&pbstorage.Log{
|
||||
Type: pbstorage.LogType_LOG_TYPE_WRITE,
|
||||
Request: &pbstorage.Log_Write{
|
||||
Write: req,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rsp.GetWrite().GetResource(), nil
|
||||
}
|
||||
|
||||
rsp, err := b.forwardingClient.write(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rsp.GetResource(), nil
|
||||
}
|
||||
|
||||
// DeleteCAS implements the storage.Backend interface.
|
||||
func (b *Backend) DeleteCAS(ctx context.Context, id *pbresource.ID, version string) error {
|
||||
req := &pbstorage.DeleteRequest{
|
||||
Id: id,
|
||||
Version: version,
|
||||
}
|
||||
|
||||
if b.handle.IsLeader() {
|
||||
_, err := b.raftApply(&pbstorage.Log{
|
||||
Type: pbstorage.LogType_LOG_TYPE_DELETE,
|
||||
Request: &pbstorage.Log_Delete{
|
||||
Delete: req,
|
||||
},
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
return b.forwardingClient.delete(ctx, req)
|
||||
}
|
||||
|
||||
// List implements the storage.Backend interface.
|
||||
func (b *Backend) List(ctx context.Context, consistency storage.ReadConsistency, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
|
||||
// Easy case. Both leaders and followers can read from the local store.
|
||||
if consistency == storage.EventualConsistency {
|
||||
return b.store.List(resType, tenancy, namePrefix)
|
||||
}
|
||||
|
||||
if consistency != storage.StrongConsistency {
|
||||
return nil, fmt.Errorf("%w: unknown consistency: %s", storage.ErrInconsistent, consistency)
|
||||
}
|
||||
|
||||
// We are the leader. Handle the request ourself.
|
||||
if b.handle.IsLeader() {
|
||||
return b.leaderList(ctx, resType, tenancy, namePrefix)
|
||||
}
|
||||
|
||||
// Forward the request to the leader.
|
||||
rsp, err := b.forwardingClient.list(ctx, &pbstorage.ListRequest{
|
||||
Type: &pbresource.Type{
|
||||
Group: resType.Group,
|
||||
Kind: resType.Kind,
|
||||
},
|
||||
Tenancy: tenancy,
|
||||
NamePrefix: namePrefix,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rsp.GetResources(), nil
|
||||
}
|
||||
|
||||
func (b *Backend) leaderList(ctx context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
|
||||
if err := b.ensureStrongConsistency(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b.store.List(resType, tenancy, namePrefix)
|
||||
}
|
||||
|
||||
// WatchList implements the storage.Backend interface.
|
||||
func (b *Backend) WatchList(_ context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (storage.Watch, error) {
|
||||
return b.store.WatchList(resType, tenancy, namePrefix)
|
||||
}
|
||||
|
||||
// OwnerReferences implements the storage.Backend interface.
|
||||
func (b *Backend) OwnerReferences(_ context.Context, id *pbresource.ID) ([]*pbresource.ID, error) {
|
||||
return b.store.OwnerReferences(id)
|
||||
}
|
||||
|
||||
// Apply is called by the FSM with the bytes of a Raft log entry, with Consul's
|
||||
// envelope (i.e. type prefix and msgpack wrapper) stripped off.
|
||||
func (b *Backend) Apply(buf []byte, idx uint64) any {
|
||||
var req pbstorage.Log
|
||||
if err := req.UnmarshalBinary(buf); err != nil {
|
||||
return fmt.Errorf("failed to decode request: %w", err)
|
||||
}
|
||||
|
||||
switch req.Type {
|
||||
case pbstorage.LogType_LOG_TYPE_WRITE:
|
||||
res := req.GetWrite().GetResource()
|
||||
oldVsn := res.Version
|
||||
res.Version = strconv.Itoa(int(idx))
|
||||
|
||||
if err := b.store.WriteCAS(res, oldVsn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return &pbstorage.LogResponse{
|
||||
Response: &pbstorage.LogResponse_Write{
|
||||
Write: &pbstorage.WriteResponse{Resource: res},
|
||||
},
|
||||
}
|
||||
case pbstorage.LogType_LOG_TYPE_DELETE:
|
||||
req := req.GetDelete()
|
||||
if err := b.store.DeleteCAS(req.Id, req.Version); err != nil {
|
||||
return err
|
||||
}
|
||||
return &pbstorage.LogResponse{
|
||||
Response: &pbstorage.LogResponse_Delete{},
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("unexpected request type: %s", req.Type)
|
||||
}
|
||||
|
||||
// LeaderChanged should be called whenever the current Raft leader changes, to
|
||||
// drop and re-create the gRPC connection used for forwarding.
|
||||
func (b *Backend) LeaderChanged() { b.forwardingClient.leaderChanged() }
|
||||
|
||||
// HandleConnection should be called whenever a forwarding connection is opened.
|
||||
func (b *Backend) HandleConnection(conn net.Conn) { b.forwardingServer.listener.Handle(conn) }
|
||||
|
||||
// raftApply round trips the given request through the Raft log and FSM.
|
||||
func (b *Backend) raftApply(req *pbstorage.Log) (*pbstorage.LogResponse, error) {
|
||||
msg, err := req.MarshalBinary()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rsp, err := b.handle.Apply(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch t := rsp.(type) {
|
||||
case *pbstorage.LogResponse:
|
||||
return t, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected response from Raft apply: %T", rsp)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Backend) ensureStrongConsistency(ctx context.Context) error {
|
||||
if err := b.handle.EnsureStrongConsistency(ctx); err != nil {
|
||||
return fmt.Errorf("%w: %v", storage.ErrInconsistent, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Snapshot obtains a point-in-time snapshot of the backend's state, so that it
|
||||
// can be written to disk as a backup or sent to bootstrap a follower.
|
||||
func (b *Backend) Snapshot() (*Snapshot, error) {
|
||||
s, err := b.store.Snapshot()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Snapshot{s}, nil
|
||||
}
|
||||
|
||||
// Snapshot is a point-in-time snapshot of a backend's state.
|
||||
type Snapshot struct{ s *inmem.Snapshot }
|
||||
|
||||
// Next returns the next resource in the snapshot, protobuf encoded. nil bytes
|
||||
// will be returned when the end of the snapshot has been reached.
|
||||
func (s *Snapshot) Next() ([]byte, error) {
|
||||
res := s.s.Next()
|
||||
if res == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return res.MarshalBinary()
|
||||
}
|
||||
|
||||
// Restore starts the process of restoring a snapshot (i.e. from an on-disk
|
||||
// backup, or to bootstrap from a leader).
|
||||
//
|
||||
// Callers *must* call Abort or Commit when done, to free resources.
|
||||
func (b *Backend) Restore() (*Restoration, error) {
|
||||
r, err := b.store.Restore()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Restoration{r}, nil
|
||||
}
|
||||
|
||||
// Restoration is a handle that can be used to restore a snapshot.
|
||||
type Restoration struct{ r *inmem.Restoration }
|
||||
|
||||
// Apply the given protobuf-encoded resource to the backend.
|
||||
func (r *Restoration) Apply(msg []byte) error {
|
||||
var res pbresource.Resource
|
||||
if err := res.UnmarshalBinary(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
return r.r.Apply(&res)
|
||||
}
|
||||
|
||||
// Commit the restoration.
|
||||
func (r *Restoration) Commit() { r.r.Commit() }
|
||||
|
||||
// Abort the restoration. It's safe to always call this in a defer statement
|
||||
// because aborting a committed restoration is a no-op.
|
||||
func (r *Restoration) Abort() { r.r.Abort() }
|
|
@ -0,0 +1,170 @@
|
|||
package raft_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/internal/storage/conformance"
|
||||
"github.com/hashicorp/consul/internal/storage/raft"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
func TestBackend_Conformance(t *testing.T) {
|
||||
t.Run("Leader", func(t *testing.T) {
|
||||
conformance.Test(t, conformance.TestOptions{
|
||||
NewBackend: func(t *testing.T) storage.Backend {
|
||||
leader, _ := newRaftCluster(t)
|
||||
return leader
|
||||
},
|
||||
SupportsStronglyConsistentList: true,
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Follower", func(t *testing.T) {
|
||||
conformance.Test(t, conformance.TestOptions{
|
||||
NewBackend: func(t *testing.T) storage.Backend {
|
||||
_, follower := newRaftCluster(t)
|
||||
return follower
|
||||
},
|
||||
SupportsStronglyConsistentList: true,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func newRaftCluster(t *testing.T) (*raft.Backend, *raft.Backend) {
|
||||
t.Helper()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
lis, err := net.Listen("tcp", ":0")
|
||||
require.NoError(t, err)
|
||||
|
||||
lc, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
require.NoError(t, err)
|
||||
|
||||
lh := &leaderHandle{replCh: make(chan log, 10)}
|
||||
leader, err := raft.NewBackend(lh, testutil.Logger(t))
|
||||
require.NoError(t, err)
|
||||
lh.backend = leader
|
||||
go leader.Run(ctx)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
conn, err := lis.Accept()
|
||||
if errors.Is(err, net.ErrClosed) {
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
go leader.HandleConnection(conn)
|
||||
}
|
||||
}()
|
||||
|
||||
follower, err := raft.NewBackend(&followerHandle{leaderConn: lc}, testutil.Logger(t))
|
||||
require.NoError(t, err)
|
||||
go follower.Run(ctx)
|
||||
follower.LeaderChanged()
|
||||
|
||||
go lh.replicate(t, follower)
|
||||
|
||||
return leader, follower
|
||||
}
|
||||
|
||||
type followerHandle struct {
|
||||
leaderConn *grpc.ClientConn
|
||||
}
|
||||
|
||||
func (followerHandle) Apply([]byte) (any, error) {
|
||||
return nil, errors.New("not leader")
|
||||
}
|
||||
|
||||
func (followerHandle) IsLeader() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (followerHandle) EnsureStrongConsistency(context.Context) error {
|
||||
return errors.New("not leader")
|
||||
}
|
||||
|
||||
func (f *followerHandle) DialLeader() (*grpc.ClientConn, error) {
|
||||
return f.leaderConn, nil
|
||||
}
|
||||
|
||||
type leaderHandle struct {
|
||||
index uint64
|
||||
replCh chan log
|
||||
|
||||
backend *raft.Backend
|
||||
}
|
||||
|
||||
type log struct {
|
||||
idx uint64
|
||||
msg []byte
|
||||
}
|
||||
|
||||
func (l *leaderHandle) Apply(msg []byte) (any, error) {
|
||||
idx := atomic.AddUint64(&l.index, 1)
|
||||
|
||||
// Apply the operation to the leader synchronously and capture its response
|
||||
// to return to the caller.
|
||||
rsp := l.backend.Apply(msg, idx)
|
||||
|
||||
// Replicate the operation to the follower asynchronously.
|
||||
l.replCh <- log{idx, msg}
|
||||
|
||||
if err, ok := rsp.(error); ok {
|
||||
return nil, err
|
||||
}
|
||||
return rsp, nil
|
||||
}
|
||||
|
||||
func (leaderHandle) IsLeader() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (leaderHandle) EnsureStrongConsistency(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (leaderHandle) DialLeader() (*grpc.ClientConn, error) {
|
||||
return nil, errors.New("leader should not dial itself")
|
||||
}
|
||||
|
||||
func (h *leaderHandle) replicate(t *testing.T, follower *raft.Backend) {
|
||||
doneCh := make(chan struct{})
|
||||
t.Cleanup(func() { close(doneCh) })
|
||||
|
||||
timer := time.NewTimer(replicationLag())
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
select {
|
||||
case l := <-h.replCh:
|
||||
_ = follower.Apply(l.msg, l.idx)
|
||||
default:
|
||||
}
|
||||
timer.Reset(replicationLag())
|
||||
case <-doneCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func replicationLag() time.Duration {
|
||||
if testing.Short() {
|
||||
return 0
|
||||
}
|
||||
return time.Duration(rand.Intn(50)) * time.Millisecond
|
||||
}
|
|
@ -0,0 +1,265 @@
|
|||
package raft
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
grpcinternal "github.com/hashicorp/consul/agent/grpc-internal"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
pbstorage "github.com/hashicorp/consul/proto/private/pbstorage"
|
||||
)
|
||||
|
||||
// forwardingServer implements the gRPC forwarding service.
|
||||
type forwardingServer struct {
|
||||
backend *Backend
|
||||
listener *grpcinternal.Listener
|
||||
}
|
||||
|
||||
var _ pbstorage.ForwardingServiceServer = (*forwardingServer)(nil)
|
||||
|
||||
func newForwardingServer(backend *Backend) *forwardingServer {
|
||||
return &forwardingServer{
|
||||
backend: backend,
|
||||
|
||||
// The address here doesn't actually matter. gRPC uses it as an identifier
|
||||
// internally, but we only bind the server to a single listener.
|
||||
listener: grpcinternal.NewListener(&net.TCPAddr{
|
||||
IP: net.ParseIP("0.0.0.0"),
|
||||
Port: 0,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *forwardingServer) Write(ctx context.Context, req *pbstorage.WriteRequest) (*pbstorage.WriteResponse, error) {
|
||||
rsp, err := s.raftApply(ctx, &pbstorage.Log{
|
||||
Type: pbstorage.LogType_LOG_TYPE_WRITE,
|
||||
Request: &pbstorage.Log_Write{Write: req},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rsp.GetWrite(), nil
|
||||
}
|
||||
|
||||
func (s *forwardingServer) Delete(ctx context.Context, req *pbstorage.DeleteRequest) (*emptypb.Empty, error) {
|
||||
_, err := s.raftApply(ctx, &pbstorage.Log{
|
||||
Type: pbstorage.LogType_LOG_TYPE_DELETE,
|
||||
Request: &pbstorage.Log_Delete{Delete: req},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *forwardingServer) Read(ctx context.Context, req *pbstorage.ReadRequest) (*pbstorage.ReadResponse, error) {
|
||||
res, err := s.backend.leaderRead(ctx, req.Id)
|
||||
if err != nil {
|
||||
return nil, wrapError(err)
|
||||
}
|
||||
return &pbstorage.ReadResponse{Resource: res}, nil
|
||||
}
|
||||
|
||||
func (s *forwardingServer) List(ctx context.Context, req *pbstorage.ListRequest) (*pbstorage.ListResponse, error) {
|
||||
res, err := s.backend.leaderList(ctx, storage.UnversionedTypeFrom(req.Type), req.Tenancy, req.NamePrefix)
|
||||
if err != nil {
|
||||
return nil, wrapError(err)
|
||||
}
|
||||
return &pbstorage.ListResponse{Resources: res}, nil
|
||||
}
|
||||
|
||||
func (s *forwardingServer) raftApply(_ context.Context, req *pbstorage.Log) (*pbstorage.LogResponse, error) {
|
||||
msg, err := req.MarshalBinary()
|
||||
if err != nil {
|
||||
return nil, wrapError(err)
|
||||
}
|
||||
|
||||
rsp, err := s.backend.handle.Apply(msg)
|
||||
if err != nil {
|
||||
return nil, wrapError(err)
|
||||
}
|
||||
|
||||
switch t := rsp.(type) {
|
||||
case *pbstorage.LogResponse:
|
||||
return t, nil
|
||||
default:
|
||||
return nil, status.Errorf(codes.Internal, "unexpected response from Raft apply: %T", rsp)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *forwardingServer) run(ctx context.Context) error {
|
||||
server := grpc.NewServer()
|
||||
pbstorage.RegisterForwardingServiceServer(server, s)
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
server.Stop()
|
||||
}()
|
||||
|
||||
return server.Serve(s.listener)
|
||||
}
|
||||
|
||||
// forwardingClient is used to forward operations to the leader.
|
||||
type forwardingClient struct {
|
||||
handle Handle
|
||||
logger hclog.Logger
|
||||
|
||||
mu sync.RWMutex
|
||||
conn *grpc.ClientConn
|
||||
}
|
||||
|
||||
func newForwardingClient(h Handle, l hclog.Logger) *forwardingClient {
|
||||
return &forwardingClient{
|
||||
handle: h,
|
||||
logger: l,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *forwardingClient) leaderChanged() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.conn == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.conn.Close(); err != nil {
|
||||
c.logger.Error("failed to close connection to previous leader", "error", err)
|
||||
}
|
||||
c.conn = nil
|
||||
}
|
||||
|
||||
func (c *forwardingClient) getConn() (*grpc.ClientConn, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.conn != nil {
|
||||
return c.conn, nil
|
||||
}
|
||||
|
||||
conn, err := c.handle.DialLeader()
|
||||
if err != nil {
|
||||
c.logger.Error("failed to dial leader", "error", err)
|
||||
return nil, err
|
||||
}
|
||||
c.conn = conn
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (c *forwardingClient) getClient() (pbstorage.ForwardingServiceClient, error) {
|
||||
conn, err := c.getConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pbstorage.NewForwardingServiceClient(conn), nil
|
||||
}
|
||||
|
||||
func (c *forwardingClient) delete(ctx context.Context, req *pbstorage.DeleteRequest) error {
|
||||
client, err := c.getClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = client.Delete(ctx, req)
|
||||
return unwrapError(err)
|
||||
}
|
||||
|
||||
func (c *forwardingClient) write(ctx context.Context, req *pbstorage.WriteRequest) (*pbstorage.WriteResponse, error) {
|
||||
client, err := c.getClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rsp, err := client.Write(ctx, req)
|
||||
return rsp, unwrapError(err)
|
||||
}
|
||||
|
||||
func (c *forwardingClient) read(ctx context.Context, req *pbstorage.ReadRequest) (*pbstorage.ReadResponse, error) {
|
||||
client, err := c.getClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rsp, err := client.Read(ctx, req)
|
||||
return rsp, unwrapError(err)
|
||||
}
|
||||
|
||||
func (c *forwardingClient) list(ctx context.Context, req *pbstorage.ListRequest) (*pbstorage.ListResponse, error) {
|
||||
client, err := c.getClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rsp, err := client.List(ctx, req)
|
||||
return rsp, unwrapError(err)
|
||||
}
|
||||
|
||||
var (
|
||||
errorToCode = map[error]codes.Code{
|
||||
// Note: OutOfRange is used to represent GroupVersionMismatchError, but is
|
||||
// handled specially in wrapError and unwrapError because it has extra details.
|
||||
storage.ErrNotFound: codes.NotFound,
|
||||
storage.ErrCASFailure: codes.Aborted,
|
||||
storage.ErrWrongUid: codes.AlreadyExists,
|
||||
storage.ErrInconsistent: codes.FailedPrecondition,
|
||||
}
|
||||
|
||||
codeToError = func() map[codes.Code]error {
|
||||
inverted := make(map[codes.Code]error, len(errorToCode))
|
||||
for k, v := range errorToCode {
|
||||
inverted[v] = k
|
||||
}
|
||||
return inverted
|
||||
}()
|
||||
)
|
||||
|
||||
// wrapError converts the given error to a gRPC status to send over the wire.
|
||||
func wrapError(err error) error {
|
||||
var gvm storage.GroupVersionMismatchError
|
||||
if errors.As(err, &gvm) {
|
||||
s, err := status.New(codes.OutOfRange, err.Error()).
|
||||
WithDetails(&pbstorage.GroupVersionMismatchErrorDetails{
|
||||
RequestedType: gvm.RequestedType,
|
||||
Stored: gvm.Stored,
|
||||
})
|
||||
if err == nil {
|
||||
return s.Err()
|
||||
}
|
||||
}
|
||||
|
||||
code, ok := errorToCode[err]
|
||||
if !ok {
|
||||
code = codes.Internal
|
||||
}
|
||||
return status.Error(code, err.Error())
|
||||
}
|
||||
|
||||
// unwrapError converts the given gRPC status error back to a storage package
|
||||
// error.
|
||||
func unwrapError(err error) error {
|
||||
s, ok := status.FromError(err)
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, d := range s.Details() {
|
||||
if gvm, ok := d.(*pbstorage.GroupVersionMismatchErrorDetails); ok {
|
||||
return storage.GroupVersionMismatchError{
|
||||
RequestedType: gvm.RequestedType,
|
||||
Stored: gvm.Stored,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unwrapped, ok := codeToError[s.Code()]
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
return unwrapped
|
||||
}
|
|
@ -32,6 +32,11 @@ var (
|
|||
// not be achieved because of a consistency or availability issue (e.g. loss of
|
||||
// quorum, or when interacting with a Raft follower).
|
||||
ErrInconsistent = errors.New("cannot satisfy consistency requirements")
|
||||
|
||||
// ErrWatchClosed is returned by Watch.Next when the watch is closed, e.g. when
|
||||
// a snapshot is restored and the watch's events are no longer valid. Consumers
|
||||
// should discard any materialized state and start a new watch.
|
||||
ErrWatchClosed = errors.New("watch closed")
|
||||
)
|
||||
|
||||
// ReadConsistency is used to specify the required consistency guarantees for
|
||||
|
|
|
@ -21,6 +21,7 @@ lint:
|
|||
- ONEOF_LOWER_SNAKE_CASE
|
||||
- ENUM_VALUE_PREFIX
|
||||
service_suffix: ""
|
||||
rpc_allow_google_protobuf_empty_responses: true
|
||||
breaking:
|
||||
use:
|
||||
- FILE
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
// Code generated by protoc-gen-go-binary. DO NOT EDIT.
|
||||
// source: private/pbstorage/raft.proto
|
||||
|
||||
package pbstorage
|
||||
|
||||
import (
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *Log) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *Log) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *LogResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *LogResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WriteRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *WriteRequest) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WriteResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *WriteResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *DeleteRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *DeleteRequest) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *ReadRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *ReadRequest) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *ReadResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *ReadResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *ListRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *ListRequest) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *ListResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *ListResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *GroupVersionMismatchErrorDetails) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *GroupVersionMismatchErrorDetails) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,116 @@
|
|||
syntax = "proto3";
|
||||
|
||||
// This package contains types used by the Raft storage backend that lives in
|
||||
// the internal/storage/raft Go package.
|
||||
package hashicorp.consul.internal.storage.raft;
|
||||
|
||||
import "annotations/ratelimit/ratelimit.proto";
|
||||
import "google/protobuf/empty.proto";
|
||||
import "pbresource/resource.proto";
|
||||
|
||||
// Forwarding service is used for forwarding write and consistent read
|
||||
// operations to the Raft leader. It is served on Consul's multiplexed
|
||||
// server port, which is the same port used for regular Raft traffic.
|
||||
service ForwardingService {
|
||||
// Write handles a forwarded write operation.
|
||||
rpc Write(WriteRequest) returns (WriteResponse) {
|
||||
option (hashicorp.consul.internal.ratelimit.spec) = {
|
||||
operation_type: OPERATION_TYPE_EXEMPT,
|
||||
operation_category: OPERATION_CATEGORY_RESOURCE
|
||||
};
|
||||
}
|
||||
|
||||
// Delete handles a forwarded delete operation.
|
||||
rpc Delete(DeleteRequest) returns (google.protobuf.Empty) {
|
||||
option (hashicorp.consul.internal.ratelimit.spec) = {
|
||||
operation_type: OPERATION_TYPE_EXEMPT,
|
||||
operation_category: OPERATION_CATEGORY_RESOURCE
|
||||
};
|
||||
}
|
||||
|
||||
// Read handles a forwarded read operation.
|
||||
rpc Read(ReadRequest) returns (ReadResponse) {
|
||||
option (hashicorp.consul.internal.ratelimit.spec) = {
|
||||
operation_type: OPERATION_TYPE_EXEMPT,
|
||||
operation_category: OPERATION_CATEGORY_RESOURCE
|
||||
};
|
||||
}
|
||||
|
||||
// List handles a forwarded list operation.
|
||||
rpc List(ListRequest) returns (ListResponse) {
|
||||
option (hashicorp.consul.internal.ratelimit.spec) = {
|
||||
operation_type: OPERATION_TYPE_EXEMPT,
|
||||
operation_category: OPERATION_CATEGORY_RESOURCE
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// LogType describes the type of operation being written to the Raft log.
|
||||
enum LogType {
|
||||
LOG_TYPE_UNSPECIFIED = 0;
|
||||
LOG_TYPE_WRITE = 1;
|
||||
LOG_TYPE_DELETE = 2;
|
||||
}
|
||||
|
||||
// Log is protobuf-encoded and written to the Raft log.
|
||||
message Log {
|
||||
LogType type = 1;
|
||||
|
||||
oneof request {
|
||||
WriteRequest write = 2;
|
||||
DeleteRequest delete = 3;
|
||||
}
|
||||
}
|
||||
|
||||
// LogResponse contains the FSM's response to applying a log.
|
||||
message LogResponse {
|
||||
oneof response {
|
||||
WriteResponse write = 1;
|
||||
google.protobuf.Empty delete = 2;
|
||||
}
|
||||
}
|
||||
|
||||
// WriteRequest contains the parameters for a write operation.
|
||||
message WriteRequest {
|
||||
hashicorp.consul.resource.Resource resource = 1;
|
||||
}
|
||||
|
||||
// WriteResponse contains the results of a write operation.
|
||||
message WriteResponse {
|
||||
hashicorp.consul.resource.Resource resource = 1;
|
||||
}
|
||||
|
||||
// DeleteRequest contains the parameters for a write operation.
|
||||
message DeleteRequest {
|
||||
hashicorp.consul.resource.ID id = 1;
|
||||
string version = 2;
|
||||
}
|
||||
|
||||
// ReadRequest contains the parameters for a consistent read operation.
|
||||
message ReadRequest {
|
||||
hashicorp.consul.resource.ID id = 1;
|
||||
}
|
||||
|
||||
// ReadResponse contains the results of a consistent read operation.
|
||||
message ReadResponse {
|
||||
hashicorp.consul.resource.Resource resource = 1;
|
||||
}
|
||||
|
||||
// ListRequest contains the parameters for a consistent list operation.
|
||||
message ListRequest {
|
||||
hashicorp.consul.resource.Type type = 1;
|
||||
hashicorp.consul.resource.Tenancy tenancy = 2;
|
||||
string name_prefix = 3;
|
||||
}
|
||||
|
||||
// ListResponse contains the results of a consistent list operation.
|
||||
message ListResponse {
|
||||
repeated hashicorp.consul.resource.Resource resources = 1;
|
||||
}
|
||||
|
||||
// GroupVersionMismatchErrorDetails contains the error details that will be
|
||||
// returned when the leader encounters a storage.GroupVersionMismatchError.
|
||||
message GroupVersionMismatchErrorDetails {
|
||||
hashicorp.consul.resource.Type requested_type = 1;
|
||||
hashicorp.consul.resource.Resource stored = 2;
|
||||
}
|
|
@ -0,0 +1,220 @@
|
|||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.2.0
|
||||
// - protoc (unknown)
|
||||
// source: private/pbstorage/raft.proto
|
||||
|
||||
package pbstorage
|
||||
|
||||
import (
|
||||
context "context"
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
emptypb "google.golang.org/protobuf/types/known/emptypb"
|
||||
)
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.32.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion7
|
||||
|
||||
// ForwardingServiceClient is the client API for ForwardingService service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
type ForwardingServiceClient interface {
|
||||
// Write handles a forwarded write operation.
|
||||
Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error)
|
||||
// Delete handles a forwarded delete operation.
|
||||
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
// Read handles a forwarded read operation.
|
||||
Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error)
|
||||
// List handles a forwarded list operation.
|
||||
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
|
||||
}
|
||||
|
||||
type forwardingServiceClient struct {
|
||||
cc grpc.ClientConnInterface
|
||||
}
|
||||
|
||||
func NewForwardingServiceClient(cc grpc.ClientConnInterface) ForwardingServiceClient {
|
||||
return &forwardingServiceClient{cc}
|
||||
}
|
||||
|
||||
func (c *forwardingServiceClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) {
|
||||
out := new(WriteResponse)
|
||||
err := c.cc.Invoke(ctx, "/hashicorp.consul.internal.storage.raft.ForwardingService/Write", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *forwardingServiceClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
|
||||
out := new(emptypb.Empty)
|
||||
err := c.cc.Invoke(ctx, "/hashicorp.consul.internal.storage.raft.ForwardingService/Delete", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *forwardingServiceClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) {
|
||||
out := new(ReadResponse)
|
||||
err := c.cc.Invoke(ctx, "/hashicorp.consul.internal.storage.raft.ForwardingService/Read", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *forwardingServiceClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
|
||||
out := new(ListResponse)
|
||||
err := c.cc.Invoke(ctx, "/hashicorp.consul.internal.storage.raft.ForwardingService/List", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ForwardingServiceServer is the server API for ForwardingService service.
|
||||
// All implementations should embed UnimplementedForwardingServiceServer
|
||||
// for forward compatibility
|
||||
type ForwardingServiceServer interface {
|
||||
// Write handles a forwarded write operation.
|
||||
Write(context.Context, *WriteRequest) (*WriteResponse, error)
|
||||
// Delete handles a forwarded delete operation.
|
||||
Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error)
|
||||
// Read handles a forwarded read operation.
|
||||
Read(context.Context, *ReadRequest) (*ReadResponse, error)
|
||||
// List handles a forwarded list operation.
|
||||
List(context.Context, *ListRequest) (*ListResponse, error)
|
||||
}
|
||||
|
||||
// UnimplementedForwardingServiceServer should be embedded to have forward compatible implementations.
|
||||
type UnimplementedForwardingServiceServer struct {
|
||||
}
|
||||
|
||||
func (UnimplementedForwardingServiceServer) Write(context.Context, *WriteRequest) (*WriteResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Write not implemented")
|
||||
}
|
||||
func (UnimplementedForwardingServiceServer) Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
|
||||
}
|
||||
func (UnimplementedForwardingServiceServer) Read(context.Context, *ReadRequest) (*ReadResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Read not implemented")
|
||||
}
|
||||
func (UnimplementedForwardingServiceServer) List(context.Context, *ListRequest) (*ListResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
|
||||
}
|
||||
|
||||
// UnsafeForwardingServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to ForwardingServiceServer will
|
||||
// result in compilation errors.
|
||||
type UnsafeForwardingServiceServer interface {
|
||||
mustEmbedUnimplementedForwardingServiceServer()
|
||||
}
|
||||
|
||||
func RegisterForwardingServiceServer(s grpc.ServiceRegistrar, srv ForwardingServiceServer) {
|
||||
s.RegisterService(&ForwardingService_ServiceDesc, srv)
|
||||
}
|
||||
|
||||
func _ForwardingService_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(WriteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ForwardingServiceServer).Write(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/hashicorp.consul.internal.storage.raft.ForwardingService/Write",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ForwardingServiceServer).Write(ctx, req.(*WriteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ForwardingService_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(DeleteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ForwardingServiceServer).Delete(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/hashicorp.consul.internal.storage.raft.ForwardingService/Delete",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ForwardingServiceServer).Delete(ctx, req.(*DeleteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ForwardingService_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ReadRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ForwardingServiceServer).Read(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/hashicorp.consul.internal.storage.raft.ForwardingService/Read",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ForwardingServiceServer).Read(ctx, req.(*ReadRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ForwardingService_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ListRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ForwardingServiceServer).List(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/hashicorp.consul.internal.storage.raft.ForwardingService/List",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ForwardingServiceServer).List(ctx, req.(*ListRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
// ForwardingService_ServiceDesc is the grpc.ServiceDesc for ForwardingService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var ForwardingService_ServiceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "hashicorp.consul.internal.storage.raft.ForwardingService",
|
||||
HandlerType: (*ForwardingServiceServer)(nil),
|
||||
Methods: []grpc.MethodDesc{
|
||||
{
|
||||
MethodName: "Write",
|
||||
Handler: _ForwardingService_Write_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "Delete",
|
||||
Handler: _ForwardingService_Delete_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "Read",
|
||||
Handler: _ForwardingService_Read_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "List",
|
||||
Handler: _ForwardingService_List_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{},
|
||||
Metadata: "private/pbstorage/raft.proto",
|
||||
}
|
Loading…
Reference in New Issue