local: fixes a data race in anti-entropy sync (#12324)
The race detector noticed this initially in `TestAgentConfigWatcherSidecarProxy` but it is not restricted to just tests. The two main changes here were: - ensure that before we mutate the internal `agent/local` representation of a Service (for tags or VIPs) we clone those fields - ensure that there's no function argument joint ownership between the caller of a function and the local state when calling `AddService`, `AddCheck`, and related using `copystructure` for now.
This commit is contained in:
parent
0519a9240e
commit
0b80f70a39
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
local: fixes a data race in anti-entropy sync that could cause the wrong tags to be applied to a service when EnableTagOverride is used
|
||||
```
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/mitchellh/copystructure"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
|
@ -267,6 +268,13 @@ func (l *State) addServiceLocked(service *structs.NodeService, token string) err
|
|||
return fmt.Errorf("no service")
|
||||
}
|
||||
|
||||
// Avoid having the stored service have any call-site ownership.
|
||||
var err error
|
||||
service, err = cloneService(service)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// use the service name as id if the id was omitted
|
||||
if service.ID == "" {
|
||||
service.ID = service.Service
|
||||
|
@ -530,8 +538,12 @@ func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error {
|
|||
return fmt.Errorf("no check")
|
||||
}
|
||||
|
||||
// clone the check since we will be modifying it.
|
||||
check = check.Clone()
|
||||
// Avoid having the stored check have any call-site ownership.
|
||||
var err error
|
||||
check, err = cloneCheck(check)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if l.discardCheckOutput.Load().(bool) {
|
||||
check.Output = ""
|
||||
|
@ -1083,22 +1095,26 @@ func (l *State) updateSyncState() error {
|
|||
continue
|
||||
}
|
||||
|
||||
// Make a shallow copy since we may mutate it below and other readers
|
||||
// may be reading it and we want to avoid a race.
|
||||
nextService := *ls.Service
|
||||
changed := false
|
||||
|
||||
// If our definition is different, we need to update it. Make a
|
||||
// copy so that we don't retain a pointer to any actual state
|
||||
// store info for in-memory RPCs.
|
||||
if ls.Service.EnableTagOverride {
|
||||
tags := make([]string, len(rs.Tags))
|
||||
copy(tags, rs.Tags)
|
||||
ls.Service.Tags = tags
|
||||
if nextService.EnableTagOverride {
|
||||
nextService.Tags = structs.CloneStringSlice(rs.Tags)
|
||||
changed = true
|
||||
}
|
||||
|
||||
// Merge any tagged addresses with the consul- prefix (set by the server)
|
||||
// back into the local state.
|
||||
if !reflect.DeepEqual(ls.Service.TaggedAddresses, rs.TaggedAddresses) {
|
||||
if !reflect.DeepEqual(nextService.TaggedAddresses, rs.TaggedAddresses) {
|
||||
// Make a copy of TaggedAddresses to prevent races when writing
|
||||
// since other goroutines may be reading from the map
|
||||
m := make(map[string]structs.ServiceAddress)
|
||||
for k, v := range ls.Service.TaggedAddresses {
|
||||
for k, v := range nextService.TaggedAddresses {
|
||||
m[k] = v
|
||||
}
|
||||
for k, v := range rs.TaggedAddresses {
|
||||
|
@ -1106,7 +1122,12 @@ func (l *State) updateSyncState() error {
|
|||
m[k] = v
|
||||
}
|
||||
}
|
||||
ls.Service.TaggedAddresses = m
|
||||
nextService.TaggedAddresses = m
|
||||
changed = true
|
||||
}
|
||||
|
||||
if changed {
|
||||
ls.Service = &nextService
|
||||
}
|
||||
ls.InSync = ls.Service.IsSame(rs)
|
||||
}
|
||||
|
@ -1549,3 +1570,21 @@ func (l *State) aclAccessorID(secretID string) string {
|
|||
}
|
||||
return ident.AccessorID()
|
||||
}
|
||||
|
||||
func cloneService(ns *structs.NodeService) (*structs.NodeService, error) {
|
||||
// TODO: consider doing a hand-managed clone function
|
||||
raw, err := copystructure.Copy(ns)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return raw.(*structs.NodeService), err
|
||||
}
|
||||
|
||||
func cloneCheck(check *structs.HealthCheck) (*structs.HealthCheck, error) {
|
||||
// TODO: consider doing a hand-managed clone function
|
||||
raw, err := copystructure.Copy(check)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return raw.(*structs.HealthCheck), err
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/mitchellh/copystructure"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
|
@ -267,13 +268,14 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
|||
defer a.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||
|
||||
clone := func(ns *structs.NodeService) *structs.NodeService {
|
||||
raw, err := copystructure.Copy(ns)
|
||||
require.NoError(t, err)
|
||||
return raw.(*structs.NodeService)
|
||||
}
|
||||
|
||||
// Register node info
|
||||
var out struct{}
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: a.Config.NodeName,
|
||||
Address: "127.0.0.1",
|
||||
}
|
||||
|
||||
// Exists both same (noop)
|
||||
srv1 := &structs.NodeService{
|
||||
|
@ -289,8 +291,12 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
|||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
}
|
||||
a.State.AddService(srv1, "")
|
||||
args.Service = srv1
|
||||
assert.Nil(t, a.RPC("Catalog.Register", args, &out))
|
||||
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: a.Config.NodeName,
|
||||
Address: "127.0.0.1",
|
||||
Service: srv1,
|
||||
}, &out))
|
||||
|
||||
// Exists both, different (update)
|
||||
srv2 := &structs.NodeService{
|
||||
|
@ -307,11 +313,14 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
|||
}
|
||||
a.State.AddService(srv2, "")
|
||||
|
||||
srv2_mod := new(structs.NodeService)
|
||||
*srv2_mod = *srv2
|
||||
srv2_mod := clone(srv2)
|
||||
srv2_mod.Port = 9000
|
||||
args.Service = srv2_mod
|
||||
assert.Nil(t, a.RPC("Catalog.Register", args, &out))
|
||||
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: a.Config.NodeName,
|
||||
Address: "127.0.0.1",
|
||||
Service: srv2_mod,
|
||||
}, &out))
|
||||
|
||||
// Exists local (create)
|
||||
srv3 := &structs.NodeService{
|
||||
|
@ -341,8 +350,12 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
|||
},
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
}
|
||||
args.Service = srv4
|
||||
assert.Nil(t, a.RPC("Catalog.Register", args, &out))
|
||||
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: a.Config.NodeName,
|
||||
Address: "127.0.0.1",
|
||||
Service: srv4,
|
||||
}, &out))
|
||||
|
||||
// Exists local, in sync, remote missing (create)
|
||||
srv5 := &structs.NodeService{
|
||||
|
@ -362,28 +375,56 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
|||
InSync: true,
|
||||
})
|
||||
|
||||
assert.Nil(t, a.State.SyncFull())
|
||||
require.NoError(t, a.State.SyncFull())
|
||||
|
||||
var services structs.IndexedNodeServices
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: a.Config.NodeName,
|
||||
}
|
||||
assert.Nil(t, a.RPC("Catalog.NodeServices", &req, &services))
|
||||
require.NoError(t, a.RPC("Catalog.NodeServices", &req, &services))
|
||||
|
||||
// We should have 5 services (consul included)
|
||||
assert.Len(t, services.NodeServices.Services, 5)
|
||||
require.Len(t, services.NodeServices.Services, 5)
|
||||
|
||||
// Check that virtual IPs have been set
|
||||
vips := make(map[string]struct{})
|
||||
serviceToVIP := make(map[string]string)
|
||||
for _, serv := range services.NodeServices.Services {
|
||||
if serv.TaggedAddresses != nil {
|
||||
serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
|
||||
assert.NotEmpty(t, serviceVIP)
|
||||
require.NotEmpty(t, serviceVIP)
|
||||
vips[serviceVIP] = struct{}{}
|
||||
serviceToVIP[serv.ID] = serviceVIP
|
||||
}
|
||||
}
|
||||
assert.Len(t, vips, 4)
|
||||
require.Len(t, vips, 4)
|
||||
|
||||
// Update our assertions for the tagged addresses.
|
||||
srv1.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||
structs.TaggedAddressVirtualIP: {
|
||||
Address: serviceToVIP["mysql-proxy"],
|
||||
Port: srv1.Port,
|
||||
},
|
||||
}
|
||||
srv2.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||
structs.TaggedAddressVirtualIP: {
|
||||
Address: serviceToVIP["redis-proxy"],
|
||||
Port: srv2.Port,
|
||||
},
|
||||
}
|
||||
srv3.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||
structs.TaggedAddressVirtualIP: {
|
||||
Address: serviceToVIP["web-proxy"],
|
||||
Port: srv3.Port,
|
||||
},
|
||||
}
|
||||
srv5.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||
structs.TaggedAddressVirtualIP: {
|
||||
Address: serviceToVIP["cache-proxy"],
|
||||
Port: srv5.Port,
|
||||
},
|
||||
}
|
||||
|
||||
// All the services should match
|
||||
// Retry to mitigate data races between local and remote state
|
||||
|
@ -408,26 +449,26 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
assert.NoError(t, servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
|
||||
require.NoError(t, servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
|
||||
|
||||
// Remove one of the services
|
||||
a.State.RemoveService(structs.NewServiceID("cache-proxy", nil))
|
||||
assert.Nil(t, a.State.SyncFull())
|
||||
assert.Nil(t, a.RPC("Catalog.NodeServices", &req, &services))
|
||||
require.NoError(t, a.State.SyncFull())
|
||||
require.NoError(t, a.RPC("Catalog.NodeServices", &req, &services))
|
||||
|
||||
// We should have 4 services (consul included)
|
||||
assert.Len(t, services.NodeServices.Services, 4)
|
||||
require.Len(t, services.NodeServices.Services, 4)
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "mysql-proxy":
|
||||
assert.Equal(t, srv1, serv)
|
||||
require.Equal(t, srv1, serv)
|
||||
case "redis-proxy":
|
||||
assert.Equal(t, srv2, serv)
|
||||
require.Equal(t, srv2, serv)
|
||||
case "web-proxy":
|
||||
assert.Equal(t, srv3, serv)
|
||||
require.Equal(t, srv3, serv)
|
||||
case structs.ConsulServiceID:
|
||||
// ignore
|
||||
default:
|
||||
|
@ -435,7 +476,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
assert.Nil(t, servicesInSync(a.State, 3, structs.DefaultEnterpriseMetaInDefaultPartition()))
|
||||
require.NoError(t, servicesInSync(a.State, 3, structs.DefaultEnterpriseMetaInDefaultPartition()))
|
||||
}
|
||||
|
||||
func TestAgent_ServiceWatchCh(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue