peering: avoid a race between peering establishment and termination (#13389)
This commit is contained in:
parent
3deaf767f2
commit
bf647bc9d2
|
@ -510,7 +510,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
|
||||||
|
|
||||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||||
Name: c.NodeName,
|
Name: c.NodeName,
|
||||||
Level: hclog.Trace,
|
Level: testutil.TestLogLevel,
|
||||||
Output: testutil.NewLogBuffer(t),
|
Output: testutil.NewLogBuffer(t),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/rpc/peering"
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
|
@ -17,6 +16,7 @@ import (
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
|
"github.com/hashicorp/consul/agent/rpc/peering"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/proto/pbpeering"
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
)
|
)
|
||||||
|
@ -50,6 +50,39 @@ func (s *Server) stopPeeringStreamSync() {
|
||||||
// syncPeeringsAndBlock is a long-running goroutine that is responsible for watching
|
// syncPeeringsAndBlock is a long-running goroutine that is responsible for watching
|
||||||
// changes to peerings in the state store and managing streams to those peers.
|
// changes to peerings in the state store and managing streams to those peers.
|
||||||
func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, cancelFns map[string]context.CancelFunc) error {
|
func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger, cancelFns map[string]context.CancelFunc) error {
|
||||||
|
// We have to be careful not to introduce a data race here. We want to
|
||||||
|
// compare the current known peerings in the state store with known
|
||||||
|
// connected streams to know when we should TERMINATE stray peerings.
|
||||||
|
//
|
||||||
|
// If you read the current peerings from the state store, then read the
|
||||||
|
// current established streams you could lose the data race and have the
|
||||||
|
// sequence of events be:
|
||||||
|
//
|
||||||
|
// 1. list peerings [A,B,C]
|
||||||
|
// 2. persist new peering [D]
|
||||||
|
// 3. accept new stream for [D]
|
||||||
|
// 4. list streams [A,B,C,D]
|
||||||
|
// 5. terminate [D]
|
||||||
|
//
|
||||||
|
// Which is wrong. If we instead ensure that (4) happens before (1), given
|
||||||
|
// that you can't get an established stream without first passing a "does
|
||||||
|
// this peering exist in the state store?" inquiry then this happens:
|
||||||
|
//
|
||||||
|
// 1. list streams [A,B,C]
|
||||||
|
// 2. list peerings [A,B,C]
|
||||||
|
// 3. persist new peering [D]
|
||||||
|
// 4. accept new stream for [D]
|
||||||
|
// 5. terminate []
|
||||||
|
//
|
||||||
|
// Or even this is fine:
|
||||||
|
//
|
||||||
|
// 1. list streams [A,B,C]
|
||||||
|
// 2. persist new peering [D]
|
||||||
|
// 3. accept new stream for [D]
|
||||||
|
// 4. list peerings [A,B,C,D]
|
||||||
|
// 5. terminate []
|
||||||
|
connectedStreams := s.peeringService.ConnectedStreams()
|
||||||
|
|
||||||
state := s.fsm.State()
|
state := s.fsm.State()
|
||||||
|
|
||||||
// Pull the state store contents and set up to block for changes.
|
// Pull the state store contents and set up to block for changes.
|
||||||
|
@ -121,7 +154,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
|
||||||
|
|
||||||
// Clean up active streams of peerings that were deleted from the state store.
|
// Clean up active streams of peerings that were deleted from the state store.
|
||||||
// TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK?
|
// TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK?
|
||||||
for stream, doneCh := range s.peeringService.ConnectedStreams() {
|
for stream, doneCh := range connectedStreams {
|
||||||
if _, ok := stored[stream]; ok {
|
if _, ok := stored[stream]; ok {
|
||||||
// Active stream is in the state store, nothing to do.
|
// Active stream is in the state store, nothing to do.
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -93,6 +93,10 @@ func (x ReplicationMessage_Response_Operation) GoString() string {
|
||||||
return x.String()
|
return x.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x PeeringState) GoString() string {
|
||||||
|
return x.String()
|
||||||
|
}
|
||||||
|
|
||||||
func (r *TrustBundleReadRequest) CacheInfo() cache.RequestInfo {
|
func (r *TrustBundleReadRequest) CacheInfo() cache.RequestInfo {
|
||||||
info := cache.RequestInfo{
|
info := cache.RequestInfo{
|
||||||
// TODO(peering): Revisit whether this is the token to use once request types accept a token.
|
// TODO(peering): Revisit whether this is the token to use once request types accept a token.
|
||||||
|
|
Loading…
Reference in a new issue