diff --git a/.changelog/9284.txt b/.changelog/9284.txt new file mode 100644 index 000000000..2b9e08d8f --- /dev/null +++ b/.changelog/9284.txt @@ -0,0 +1,3 @@ +```release-note:bug +agent: prevent duplicate services and check registrations from being synced to servers. +``` diff --git a/agent/ae/ae.go b/agent/ae/ae.go index 3c986b5ac..9b2841cc5 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -3,12 +3,14 @@ package ae import ( "fmt" - "github.com/hashicorp/consul/lib" - "github.com/hashicorp/consul/logging" - "github.com/hashicorp/go-hclog" "math" "sync" "time" + + "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/logging" ) // scaleThreshold is the number of nodes after which regular sync runs are @@ -173,8 +175,7 @@ func (s *StateSyncer) nextFSMState(fs fsmState) fsmState { return retryFullSyncState } - err := s.State.SyncFull() - if err != nil { + if err := s.State.SyncFull(); err != nil { s.Logger.Error("failed to sync remote state", "error", err) return retryFullSyncState } diff --git a/agent/local/state.go b/agent/local/state.go index b4414e910..5851648f9 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -11,13 +11,14 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-hclog" ) var StateCounters = []prometheus.CounterDefinition{ @@ -287,7 +288,6 @@ func (l *State) AddServiceWithChecks(service *structs.NodeService, checks []*str return err } } - return nil } @@ -399,12 +399,14 @@ func (l *State) SetServiceState(s *ServiceState) { } func (l *State) setServiceStateLocked(s *ServiceState) { - s.WatchCh = make(chan struct{}, 1) - key := s.Service.CompoundServiceID() old, hasOld := l.services[key] + if hasOld { + s.InSync = s.Service.IsSame(old.Service) + } l.services[key] = s + s.WatchCh = make(chan struct{}, 1) if hasOld && old.WatchCh != nil { close(old.WatchCh) } @@ -722,7 +724,13 @@ func (l *State) SetCheckState(c *CheckState) { } func (l *State) setCheckStateLocked(c *CheckState) { - l.checks[c.Check.CompoundCheckID()] = c + id := c.Check.CompoundCheckID() + existing := l.checks[id] + if existing != nil { + c.InSync = c.Check.IsSame(existing.Check) + } + + l.checks[id] = c // If this is a check for an aliased service, then notify the waiters. l.notifyIfAliased(c.Check.CompoundServiceID()) @@ -868,8 +876,8 @@ func (l *State) Stats() map[string]string { } } -// updateSyncState does a read of the server state, and updates -// the local sync status as appropriate +// updateSyncState queries the server for all the services and checks in the catalog +// registered to this node, and updates the local entries as InSync or Deleted. func (l *State) updateSyncState() error { // Get all checks and services from the master req := structs.NodeSpecificRequest{ diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 4a9b7f8e7..64ccf630f 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -7,8 +7,9 @@ import ( "testing" "time" - "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/agent/config" @@ -17,9 +18,8 @@ import ( "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func unNilMap(in map[string]string) map[string]string { @@ -28,6 +28,7 @@ func unNilMap(in map[string]string) map[string]string { } return in } + func TestAgentAntiEntropy_Services(t *testing.T) { t.Parallel() a := agent.NewTestAgent(t, "") @@ -2195,3 +2196,52 @@ func drainCh(ch chan struct{}) { } } } + +func TestState_SyncChanges_DuplicateAddServiceOnlySyncsOnce(t *testing.T) { + state := local.NewState(local.Config{}, hclog.New(nil), new(token.Store)) + rpc := &fakeRPC{} + state.Delegate = rpc + state.TriggerSyncChanges = func() {} + + srv := &structs.NodeService{ + Kind: structs.ServiceKindTypical, + ID: "the-service-id", + Service: "web", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + checks := []*structs.HealthCheck{ + {Node: "this-node", CheckID: "the-id-1", Name: "check-healthy-1"}, + {Node: "this-node", CheckID: "the-id-2", Name: "check-healthy-2"}, + } + tok := "the-token" + err := state.AddServiceWithChecks(srv, checks, tok) + require.NoError(t, err) + require.NoError(t, state.SyncChanges()) + // 4 rpc calls, one node register, one service register, two checks + require.Len(t, rpc.calls, 4) + + // adding the service again should not catalog register + err = state.AddServiceWithChecks(srv, checks, tok) + require.NoError(t, err) + require.NoError(t, state.SyncChanges()) + require.Len(t, rpc.calls, 4) +} + +type fakeRPC struct { + calls []callRPC +} + +type callRPC struct { + method string + args interface{} + reply interface{} +} + +func (f *fakeRPC) RPC(method string, args interface{}, reply interface{}) error { + f.calls = append(f.calls, callRPC{method: method, args: args, reply: reply}) + return nil +} + +func (f *fakeRPC) ResolveTokenToIdentity(_ string) (structs.ACLIdentity, error) { + return nil, nil +}