From 0b80f70a393f84871c2743a156311e13c6731ece Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Mon, 14 Feb 2022 10:41:33 -0600 Subject: [PATCH] local: fixes a data race in anti-entropy sync (#12324) The race detector noticed this initially in `TestAgentConfigWatcherSidecarProxy` but it is not restricted to just tests. The two main changes here were: - ensure that before we mutate the internal `agent/local` representation of a Service (for tags or VIPs) we clone those fields - ensure that there's no function argument joint ownership between the caller of a function and the local state when calling `AddService`, `AddCheck`, and related using `copystructure` for now. --- .changelog/12324.txt | 3 ++ agent/local/state.go | 57 ++++++++++++++++++++---- agent/local/state_test.go | 93 ++++++++++++++++++++++++++++----------- 3 files changed, 118 insertions(+), 35 deletions(-) create mode 100644 .changelog/12324.txt diff --git a/.changelog/12324.txt b/.changelog/12324.txt new file mode 100644 index 000000000..ca5f30b30 --- /dev/null +++ b/.changelog/12324.txt @@ -0,0 +1,3 @@ +```release-note:bug +local: fixes a data race in anti-entropy sync that could cause the wrong tags to be applied to a service when EnableTagOverride is used +``` diff --git a/agent/local/state.go b/agent/local/state.go index 1eb5733bb..8427068d7 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -12,6 +12,7 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" + "github.com/mitchellh/copystructure" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul" @@ -267,6 +268,13 @@ func (l *State) addServiceLocked(service *structs.NodeService, token string) err return fmt.Errorf("no service") } + // Avoid having the stored service have any call-site ownership. + var err error + service, err = cloneService(service) + if err != nil { + return err + } + // use the service name as id if the id was omitted if service.ID == "" { service.ID = service.Service @@ -530,8 +538,12 @@ func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error { return fmt.Errorf("no check") } - // clone the check since we will be modifying it. - check = check.Clone() + // Avoid having the stored check have any call-site ownership. + var err error + check, err = cloneCheck(check) + if err != nil { + return err + } if l.discardCheckOutput.Load().(bool) { check.Output = "" @@ -1083,22 +1095,26 @@ func (l *State) updateSyncState() error { continue } + // Make a shallow copy since we may mutate it below and other readers + // may be reading it and we want to avoid a race. + nextService := *ls.Service + changed := false + // If our definition is different, we need to update it. Make a // copy so that we don't retain a pointer to any actual state // store info for in-memory RPCs. - if ls.Service.EnableTagOverride { - tags := make([]string, len(rs.Tags)) - copy(tags, rs.Tags) - ls.Service.Tags = tags + if nextService.EnableTagOverride { + nextService.Tags = structs.CloneStringSlice(rs.Tags) + changed = true } // Merge any tagged addresses with the consul- prefix (set by the server) // back into the local state. - if !reflect.DeepEqual(ls.Service.TaggedAddresses, rs.TaggedAddresses) { + if !reflect.DeepEqual(nextService.TaggedAddresses, rs.TaggedAddresses) { // Make a copy of TaggedAddresses to prevent races when writing // since other goroutines may be reading from the map m := make(map[string]structs.ServiceAddress) - for k, v := range ls.Service.TaggedAddresses { + for k, v := range nextService.TaggedAddresses { m[k] = v } for k, v := range rs.TaggedAddresses { @@ -1106,7 +1122,12 @@ func (l *State) updateSyncState() error { m[k] = v } } - ls.Service.TaggedAddresses = m + nextService.TaggedAddresses = m + changed = true + } + + if changed { + ls.Service = &nextService } ls.InSync = ls.Service.IsSame(rs) } @@ -1549,3 +1570,21 @@ func (l *State) aclAccessorID(secretID string) string { } return ident.AccessorID() } + +func cloneService(ns *structs.NodeService) (*structs.NodeService, error) { + // TODO: consider doing a hand-managed clone function + raw, err := copystructure.Copy(ns) + if err != nil { + return nil, err + } + return raw.(*structs.NodeService), err +} + +func cloneCheck(check *structs.HealthCheck) (*structs.HealthCheck, error) { + // TODO: consider doing a hand-managed clone function + raw, err := copystructure.Copy(check) + if err != nil { + return nil, err + } + return raw.(*structs.HealthCheck), err +} diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 1be9274e1..afca783e8 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" + "github.com/mitchellh/copystructure" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -267,13 +268,14 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") + clone := func(ns *structs.NodeService) *structs.NodeService { + raw, err := copystructure.Copy(ns) + require.NoError(t, err) + return raw.(*structs.NodeService) + } + // Register node info var out struct{} - args := &structs.RegisterRequest{ - Datacenter: "dc1", - Node: a.Config.NodeName, - Address: "127.0.0.1", - } // Exists both same (noop) srv1 := &structs.NodeService{ @@ -289,8 +291,12 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), } a.State.AddService(srv1, "") - args.Service = srv1 - assert.Nil(t, a.RPC("Catalog.Register", args, &out)) + require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{ + Datacenter: "dc1", + Node: a.Config.NodeName, + Address: "127.0.0.1", + Service: srv1, + }, &out)) // Exists both, different (update) srv2 := &structs.NodeService{ @@ -307,11 +313,14 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { } a.State.AddService(srv2, "") - srv2_mod := new(structs.NodeService) - *srv2_mod = *srv2 + srv2_mod := clone(srv2) srv2_mod.Port = 9000 - args.Service = srv2_mod - assert.Nil(t, a.RPC("Catalog.Register", args, &out)) + require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{ + Datacenter: "dc1", + Node: a.Config.NodeName, + Address: "127.0.0.1", + Service: srv2_mod, + }, &out)) // Exists local (create) srv3 := &structs.NodeService{ @@ -341,8 +350,12 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), } - args.Service = srv4 - assert.Nil(t, a.RPC("Catalog.Register", args, &out)) + require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{ + Datacenter: "dc1", + Node: a.Config.NodeName, + Address: "127.0.0.1", + Service: srv4, + }, &out)) // Exists local, in sync, remote missing (create) srv5 := &structs.NodeService{ @@ -362,28 +375,56 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { InSync: true, }) - assert.Nil(t, a.State.SyncFull()) + require.NoError(t, a.State.SyncFull()) var services structs.IndexedNodeServices req := structs.NodeSpecificRequest{ Datacenter: "dc1", Node: a.Config.NodeName, } - assert.Nil(t, a.RPC("Catalog.NodeServices", &req, &services)) + require.NoError(t, a.RPC("Catalog.NodeServices", &req, &services)) // We should have 5 services (consul included) - assert.Len(t, services.NodeServices.Services, 5) + require.Len(t, services.NodeServices.Services, 5) // Check that virtual IPs have been set vips := make(map[string]struct{}) + serviceToVIP := make(map[string]string) for _, serv := range services.NodeServices.Services { if serv.TaggedAddresses != nil { serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address - assert.NotEmpty(t, serviceVIP) + require.NotEmpty(t, serviceVIP) vips[serviceVIP] = struct{}{} + serviceToVIP[serv.ID] = serviceVIP } } - assert.Len(t, vips, 4) + require.Len(t, vips, 4) + + // Update our assertions for the tagged addresses. + srv1.TaggedAddresses = map[string]structs.ServiceAddress{ + structs.TaggedAddressVirtualIP: { + Address: serviceToVIP["mysql-proxy"], + Port: srv1.Port, + }, + } + srv2.TaggedAddresses = map[string]structs.ServiceAddress{ + structs.TaggedAddressVirtualIP: { + Address: serviceToVIP["redis-proxy"], + Port: srv2.Port, + }, + } + srv3.TaggedAddresses = map[string]structs.ServiceAddress{ + structs.TaggedAddressVirtualIP: { + Address: serviceToVIP["web-proxy"], + Port: srv3.Port, + }, + } + srv5.TaggedAddresses = map[string]structs.ServiceAddress{ + structs.TaggedAddressVirtualIP: { + Address: serviceToVIP["cache-proxy"], + Port: srv5.Port, + }, + } // All the services should match // Retry to mitigate data races between local and remote state @@ -408,26 +449,26 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { } }) - assert.NoError(t, servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition())) + require.NoError(t, servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition())) // Remove one of the services a.State.RemoveService(structs.NewServiceID("cache-proxy", nil)) - assert.Nil(t, a.State.SyncFull()) - assert.Nil(t, a.RPC("Catalog.NodeServices", &req, &services)) + require.NoError(t, a.State.SyncFull()) + require.NoError(t, a.RPC("Catalog.NodeServices", &req, &services)) // We should have 4 services (consul included) - assert.Len(t, services.NodeServices.Services, 4) + require.Len(t, services.NodeServices.Services, 4) // All the services should match for id, serv := range services.NodeServices.Services { serv.CreateIndex, serv.ModifyIndex = 0, 0 switch id { case "mysql-proxy": - assert.Equal(t, srv1, serv) + require.Equal(t, srv1, serv) case "redis-proxy": - assert.Equal(t, srv2, serv) + require.Equal(t, srv2, serv) case "web-proxy": - assert.Equal(t, srv3, serv) + require.Equal(t, srv3, serv) case structs.ConsulServiceID: // ignore default: @@ -435,7 +476,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) { } } - assert.Nil(t, servicesInSync(a.State, 3, structs.DefaultEnterpriseMetaInDefaultPartition())) + require.NoError(t, servicesInSync(a.State, 3, structs.DefaultEnterpriseMetaInDefaultPartition())) } func TestAgent_ServiceWatchCh(t *testing.T) {