Merge pull request #10396 from hashicorp/dnephin/fix-more-data-races
Fix some data races
This commit is contained in:
commit
901a5cdd8c
|
@ -18,6 +18,7 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -3729,10 +3730,27 @@ func TestAgent_SecurityChecks(t *testing.T) {
|
|||
defer a.Shutdown()
|
||||
|
||||
data := make([]byte, 0, 8192)
|
||||
bytesBuffer := bytes.NewBuffer(data)
|
||||
a.LogOutput = bytesBuffer
|
||||
buf := &syncBuffer{b: bytes.NewBuffer(data)}
|
||||
a.LogOutput = buf
|
||||
assert.NoError(t, a.Start(t))
|
||||
assert.Contains(t, bytesBuffer.String(), "using enable-script-checks without ACLs and without allow_write_http_from is DANGEROUS")
|
||||
assert.Contains(t, buf.String(), "using enable-script-checks without ACLs and without allow_write_http_from is DANGEROUS")
|
||||
}
|
||||
|
||||
type syncBuffer struct {
|
||||
lock sync.RWMutex
|
||||
b *bytes.Buffer
|
||||
}
|
||||
|
||||
func (b *syncBuffer) Write(data []byte) (int, error) {
|
||||
b.lock.Lock()
|
||||
defer b.lock.Unlock()
|
||||
return b.b.Write(data)
|
||||
}
|
||||
|
||||
func (b *syncBuffer) String() string {
|
||||
b.lock.Lock()
|
||||
defer b.lock.Unlock()
|
||||
return b.b.String()
|
||||
}
|
||||
|
||||
func TestAgent_ReloadConfigOutgoingRPCConfig(t *testing.T) {
|
||||
|
|
|
@ -310,43 +310,51 @@ func testIdentityForToken(token string) (bool, structs.ACLIdentity, error) {
|
|||
func testPolicyForID(policyID string) (bool, *structs.ACLPolicy, error) {
|
||||
switch policyID {
|
||||
case "acl-ro":
|
||||
return true, &structs.ACLPolicy{
|
||||
p := &structs.ACLPolicy{
|
||||
ID: "acl-ro",
|
||||
Name: "acl-ro",
|
||||
Description: "acl-ro",
|
||||
Rules: `acl = "read"`,
|
||||
Syntax: acl.SyntaxCurrent,
|
||||
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
|
||||
}, nil
|
||||
}
|
||||
p.SetHash(false)
|
||||
return true, p, nil
|
||||
case "acl-wr":
|
||||
return true, &structs.ACLPolicy{
|
||||
p := &structs.ACLPolicy{
|
||||
ID: "acl-wr",
|
||||
Name: "acl-wr",
|
||||
Description: "acl-wr",
|
||||
Rules: `acl = "write"`,
|
||||
Syntax: acl.SyntaxCurrent,
|
||||
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
|
||||
}, nil
|
||||
}
|
||||
p.SetHash(false)
|
||||
return true, p, nil
|
||||
case "service-ro":
|
||||
return true, &structs.ACLPolicy{
|
||||
p := &structs.ACLPolicy{
|
||||
ID: "service-ro",
|
||||
Name: "service-ro",
|
||||
Description: "service-ro",
|
||||
Rules: `service_prefix "" { policy = "read" }`,
|
||||
Syntax: acl.SyntaxCurrent,
|
||||
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
|
||||
}, nil
|
||||
}
|
||||
p.SetHash(false)
|
||||
return true, p, nil
|
||||
case "service-wr":
|
||||
return true, &structs.ACLPolicy{
|
||||
p := &structs.ACLPolicy{
|
||||
ID: "service-wr",
|
||||
Name: "service-wr",
|
||||
Description: "service-wr",
|
||||
Rules: `service_prefix "" { policy = "write" }`,
|
||||
Syntax: acl.SyntaxCurrent,
|
||||
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
|
||||
}, nil
|
||||
}
|
||||
p.SetHash(false)
|
||||
return true, p, nil
|
||||
case "node-wr":
|
||||
return true, &structs.ACLPolicy{
|
||||
p := &structs.ACLPolicy{
|
||||
ID: "node-wr",
|
||||
Name: "node-wr",
|
||||
Description: "node-wr",
|
||||
|
@ -354,9 +362,11 @@ func testPolicyForID(policyID string) (bool, *structs.ACLPolicy, error) {
|
|||
Syntax: acl.SyntaxCurrent,
|
||||
Datacenters: []string{"dc1"},
|
||||
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
|
||||
}, nil
|
||||
}
|
||||
p.SetHash(false)
|
||||
return true, p, nil
|
||||
case "dc2-key-wr":
|
||||
return true, &structs.ACLPolicy{
|
||||
p := &structs.ACLPolicy{
|
||||
ID: "dc2-key-wr",
|
||||
Name: "dc2-key-wr",
|
||||
Description: "dc2-key-wr",
|
||||
|
@ -364,7 +374,9 @@ func testPolicyForID(policyID string) (bool, *structs.ACLPolicy, error) {
|
|||
Syntax: acl.SyntaxCurrent,
|
||||
Datacenters: []string{"dc2"},
|
||||
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
|
||||
}, nil
|
||||
}
|
||||
p.SetHash(false)
|
||||
return true, p, nil
|
||||
default:
|
||||
return testPolicyForIDEnterprise(policyID)
|
||||
}
|
||||
|
|
|
@ -373,7 +373,10 @@ func TestLeader_Vault_PrimaryCA_IntermediateRenew(t *testing.T) {
|
|||
}
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
defer func() {
|
||||
s1.Shutdown()
|
||||
s1.leaderRoutineManager.Wait()
|
||||
}()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
|
@ -482,7 +485,10 @@ func TestLeader_SecondaryCA_IntermediateRenew(t *testing.T) {
|
|||
}
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
defer func() {
|
||||
s1.Shutdown()
|
||||
s1.leaderRoutineManager.Wait()
|
||||
}()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
|
@ -493,7 +499,10 @@ func TestLeader_SecondaryCA_IntermediateRenew(t *testing.T) {
|
|||
c.Build = "1.6.0"
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
defer func() {
|
||||
s2.Shutdown()
|
||||
s2.leaderRoutineManager.Wait()
|
||||
}()
|
||||
|
||||
// Create the WAN link
|
||||
joinWAN(t, s2, s1)
|
||||
|
|
|
@ -20,6 +20,11 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/NYTimes/gziphandler"
|
||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/http2"
|
||||
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
tokenStore "github.com/hashicorp/consul/agent/token"
|
||||
|
@ -27,10 +32,6 @@ import (
|
|||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
func TestHTTPServer_UnixSocket(t *testing.T) {
|
||||
|
@ -632,7 +633,7 @@ func TestHTTP_wrap_obfuscateLog(t *testing.T) {
|
|||
}
|
||||
|
||||
t.Parallel()
|
||||
buf := new(bytes.Buffer)
|
||||
buf := &syncBuffer{b: new(bytes.Buffer)}
|
||||
a := StartTestAgent(t, TestAgent{LogOutput: buf})
|
||||
defer a.Shutdown()
|
||||
|
||||
|
|
|
@ -8,11 +8,12 @@ import (
|
|||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestServiceManager_RegisterService(t *testing.T) {
|
||||
|
@ -330,26 +331,27 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
|||
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
|
||||
// Now register a sidecar proxy via the API.
|
||||
svc := &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "web-sidecar-proxy",
|
||||
Service: "web-sidecar-proxy",
|
||||
Port: 21000,
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "web",
|
||||
DestinationServiceID: "web",
|
||||
LocalServiceAddress: "127.0.0.1",
|
||||
LocalServicePort: 8000,
|
||||
Upstreams: structs.Upstreams{
|
||||
{
|
||||
DestinationName: "redis",
|
||||
DestinationNamespace: "default",
|
||||
LocalBindPort: 5000,
|
||||
newNodeService := func() *structs.NodeService {
|
||||
return &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "web-sidecar-proxy",
|
||||
Service: "web-sidecar-proxy",
|
||||
Port: 21000,
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "web",
|
||||
DestinationServiceID: "web",
|
||||
LocalServiceAddress: "127.0.0.1",
|
||||
LocalServicePort: 8000,
|
||||
Upstreams: structs.Upstreams{
|
||||
{
|
||||
DestinationName: "redis",
|
||||
DestinationNamespace: "default",
|
||||
LocalBindPort: 5000,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
}
|
||||
}
|
||||
|
||||
expectState := &structs.NodeService{
|
||||
|
@ -385,6 +387,7 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
|||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
}
|
||||
|
||||
svc := newNodeService()
|
||||
svcID := svc.CompoundServiceID()
|
||||
|
||||
svcFile := filepath.Join(a.Config.DataDir, servicesDir, svcID.StringHash())
|
||||
|
@ -443,8 +446,15 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
|||
}
|
||||
|
||||
// Updates service definition on disk
|
||||
svc = newNodeService()
|
||||
svc.Proxy.LocalServicePort = 8001
|
||||
require.NoError(a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceRemote))
|
||||
err = a.AddService(AddServiceRequest{
|
||||
Service: svc,
|
||||
persist: true,
|
||||
token: "mytoken",
|
||||
Source: ConfigSourceRemote,
|
||||
})
|
||||
require.NoError(err)
|
||||
requireFileIsPresent(t, svcFile)
|
||||
requireFileIsPresent(t, configFile)
|
||||
|
||||
|
|
|
@ -53,6 +53,8 @@ type TestAgent struct {
|
|||
Config *config.RuntimeConfig
|
||||
|
||||
// LogOutput is the sink for the logs. If nil, logs are written to os.Stderr.
|
||||
// The io.Writer must allow concurrent reads and writes. Note that
|
||||
// bytes.Buffer is not safe for concurrent reads and writes.
|
||||
LogOutput io.Writer
|
||||
|
||||
// DataDir may be set to a directory which exists. If is it not set,
|
||||
|
@ -343,8 +345,8 @@ func (a *TestAgent) Client() *api.Client {
|
|||
// DNSDisableCompression disables compression for all started DNS servers.
|
||||
func (a *TestAgent) DNSDisableCompression(b bool) {
|
||||
for _, srv := range a.dnsServers {
|
||||
cfg := srv.config.Load().(*dnsConfig)
|
||||
cfg.DisableCompression = b
|
||||
a.config.DNSDisableCompression = b
|
||||
srv.ReloadConfig(a.config)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -55,7 +55,7 @@ require (
|
|||
github.com/hashicorp/raft v1.3.1
|
||||
github.com/hashicorp/raft-autopilot v0.1.5
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
|
||||
github.com/hashicorp/serf v0.9.5
|
||||
github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9
|
||||
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086
|
||||
github.com/hashicorp/yamux v0.0.0-20200609203250-aecfd211c9ce
|
||||
github.com/imdario/mergo v0.3.6
|
||||
|
|
3
go.sum
3
go.sum
|
@ -291,8 +291,9 @@ github.com/hashicorp/raft-autopilot v0.1.5 h1:onEfMH5uHVdXQqtas36zXUHEZxLdsJVu/n
|
|||
github.com/hashicorp/raft-autopilot v0.1.5/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
|
||||
github.com/hashicorp/serf v0.9.5 h1:EBWvyu9tcRszt3Bxp3KNssBMP1KuHWyO51lz9+786iM=
|
||||
github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk=
|
||||
github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9 h1:lCZfMBDn/Puwg9VosHMf/9p9jNDYYkbzVjb4jYjVfqU=
|
||||
github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9/go.mod h1:qapjppkpNXHYTyzx+HqkyWGGkmUxafHjuspm/Bqb2Jc=
|
||||
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086 h1:OKsyxKi2sNmqm1Gv93adf2AID2FOBFdCbbZn9fGtIdg=
|
||||
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086/go.mod h1:R3Umvhlxi2TN7Ex2hzOowyeNb+SfbVWI973N+ctaFMk=
|
||||
github.com/hashicorp/vault/sdk v0.1.14-0.20200519221838-e0cfd64bc267 h1:e1ok06zGrWJW91rzRroyl5nRNqraaBe4d5hiKcVZuHM=
|
||||
|
|
|
@ -24,6 +24,10 @@ func (r *routineTracker) running() bool {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *routineTracker) wait() {
|
||||
<-r.stoppedCh
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
lock sync.RWMutex
|
||||
logger hclog.Logger
|
||||
|
@ -131,6 +135,8 @@ func (m *Manager) stopInstance(name string) *routineTracker {
|
|||
return instance
|
||||
}
|
||||
|
||||
// StopAll goroutines. Once StopAll is called, it is no longer safe to add no
|
||||
// goroutines to the Manager.
|
||||
func (m *Manager) StopAll() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
@ -142,7 +148,14 @@ func (m *Manager) StopAll() {
|
|||
m.logger.Debug("stopping routine", "routine", name)
|
||||
routine.cancel()
|
||||
}
|
||||
|
||||
// just wipe out the entire map
|
||||
m.routines = make(map[string]*routineTracker)
|
||||
}
|
||||
|
||||
// Wait for all goroutines to stop after StopAll is called.
|
||||
func (m *Manager) Wait() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
for _, routine := range m.routines {
|
||||
routine.wait()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -958,7 +958,7 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) {
|
|||
|
||||
member.Status = StatusAlive
|
||||
member.leaveTime = time.Time{}
|
||||
member.Addr = net.IP(n.Addr)
|
||||
member.Addr = n.Addr
|
||||
member.Port = n.Port
|
||||
member.Tags = s.decodeTags(n.Meta)
|
||||
}
|
||||
|
@ -1088,6 +1088,7 @@ func (s *Serf) handleNodeUpdate(n *memberlist.Node) {
|
|||
|
||||
// handleNodeLeaveIntent is called when an intent to leave is received.
|
||||
func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
||||
state := s.State()
|
||||
|
||||
// Witness a potentially newer time
|
||||
s.clock.Witness(leaveMsg.LTime)
|
||||
|
@ -1108,7 +1109,7 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|||
|
||||
// Refute us leaving if we are in the alive state
|
||||
// Must be done in another goroutine since we have the memberLock
|
||||
if leaveMsg.Node == s.config.NodeName && s.state == SerfAlive {
|
||||
if leaveMsg.Node == s.config.NodeName && state == SerfAlive {
|
||||
s.logger.Printf("[DEBUG] serf: Refuting an older leave intent")
|
||||
go s.broadcastJoin(s.clock.Time())
|
||||
return false
|
||||
|
@ -1639,7 +1640,6 @@ func (s *Serf) reconnect() {
|
|||
// Select a random member to try and join
|
||||
idx := rand.Int31n(int32(n))
|
||||
mem := s.failedMembers[idx]
|
||||
s.memberLock.RUnlock()
|
||||
|
||||
// Format the addr
|
||||
addr := net.UDPAddr{IP: mem.Addr, Port: int(mem.Port)}
|
||||
|
@ -1649,6 +1649,7 @@ func (s *Serf) reconnect() {
|
|||
if mem.Name != "" {
|
||||
joinAddr = mem.Name + "/" + addr.String()
|
||||
}
|
||||
s.memberLock.RUnlock()
|
||||
|
||||
// Attempt to join at the memberlist level
|
||||
s.memberlist.Join([]string{joinAddr})
|
||||
|
|
|
@ -489,7 +489,7 @@ github.com/hashicorp/raft
|
|||
github.com/hashicorp/raft-autopilot
|
||||
# github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
|
||||
github.com/hashicorp/raft-boltdb
|
||||
# github.com/hashicorp/serf v0.9.5
|
||||
# github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9
|
||||
github.com/hashicorp/serf/coordinate
|
||||
github.com/hashicorp/serf/serf
|
||||
# github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086
|
||||
|
|
Loading…
Reference in New Issue